Top K Problems

Top K Problems

The following sections is edited based on (credit to) @CyC2018

Source: https://github.com/CyC2018/Backend-Interview-Guide/blob/master/doc/TopK.md

1. 问题描述

TopK Elements 问题用于找出一组数中最大的 K 个的数。

此外还有一种叫 Kth Element 问题,用于找出一组数中第 K 大的数。

其实要求解 TopK Elements,可以先求解 Kth Element,因为找到 Kth Element 之后,再遍历一遍,大于等于 Kth Element 的数都是 TopK Elements。

2. 一般解法

2.1 Quick Select - 快速选择

快速排序的 partition() 方法,对于数组 nums 的 [l, h] 区间,会返回一个整数 k 使得 nums[l..k-1] 小于等于 nums[k],且 nums[k+1..h] 大于等于 nums[k],此时 nums[k] 就是数组的第 k 大元素。可以利用这个特性找出数组的 Kth Element,这种找 Kth Element 的算法称为快速选择算法。

  • 时间复杂度 O(N)、空间复杂度 O(1)

  • 只有当允许修改数组元素时才可以使用

public int findKthElement(int[] nums, int k) {
    k = nums.length - k;
    int l = 0, h = nums.length - 1;
    while (l < h) {
        int j = partition(nums, l, h);
        if (j == k) {
            break;
        } else if (j < k) {
            l = j + 1;
        } else {
            h = j - 1;
        }
    }
    return nums[k];
}

private int partition(int[] a, int l, int h) {
    int i = l, j = h + 1;
    while (true) {
        while (a[++i] < a[l] && i < h) ;
        while (a[--j] > a[l] && j > l) ;
        if (i >= j) {
            break;
        }
        swap(a, i, j);
    }
    swap(a, l, j);
    return j;
}

private void swap(int[] a, int i, int j) {
    int t = a[i];
    a[i] = a[j];
    a[j] = t;
}

2.2 堆 - Heap

维护一个大小为 K 的Min-Heap,堆顶元素就是 Kth Element。

如果使用Max-Heap,则需要变换 k = n - k + 1。

维护一个大小为 K 的Min-Heap过程如下:在添加一个元素之后,如果Heap堆的大小大于 K,那么需要将堆顶元素去除。

  • Min-Heap: 时间复杂度 O(NlogK) 、空间复杂度 O(K)

  • Max-Heap:时间复杂度 O(Nlog(N - K)) 、空间复杂度 O(N - K)

  • 特别适合处理海量数据

Min-Heap

class Solution {
    public int findKthLargest(int[] nums, int k) {
        Queue<Integer> q = new PriorityQueue<Integer>();
        for (int num : nums) {
            q.offer(num);
            if (q.size() > k) {
                q.poll();
            }
        }
        return q.peek();
    }
}

Max-Heap

public int findKthLargest(int[] nums, int k) {
    k = nums.length - k + 1;
    PriorityQueue<Integer> pq = new PriorityQueue<>(Comparator.reverseOrder()); // 大顶堆
    for (int val : nums) {
        pq.add(val);
        if (pq.size() > k)  // 维护堆的大小为 K
            pq.poll();
    }
    return pq.peek();
}

3. 海量数据

在这种场景下,单机通常不能存放下所有数据。

  • 拆分,可以按照哈希取模(注意不能简单地均匀拆分文件)方式拆分到多台机器上,并在每个机器上维护最大堆;

  • 整合,将每台机器得到的最大堆合并成最终的最大堆。

4. 频率统计

Heavy Hitters 问题要求找出一个数据流的最频繁出现的 K 个数,比如热门搜索词汇等。

4.1 HashMap

使用 HashMap 进行频率统计,然后使用快速选择或者堆的方式找出频率 TopK。在海量数据场景下,也是使用先拆分再整合的方式来解决空间问题。

4.2 Count-Min Sketch

维护 d*w 大小的二维统计数组,其中 d 是哈希函数的个数,w 根据情况而定。

  • 在一个数到来时,计算 d 个哈希值,然后分别将哈希值对 w 取模,把对应统计数组上的值加 1;

  • 要得到一个数的频率,也是要计算 d 个哈希值并取模,得到 d 个频率,取其中最小值。

该算法的思想和布隆过滤器类似,具有一定的误差,特别是当 w 很小时。但是它能够在单机环境下解决海量数据的频率统计问题。

最高频 K 项问题 - 海量数据处理*

Source: jiuzhang.com big-data-interview-questions

这一类问题在海量数据类面试题中出现频率最高。问题的形式通常如下:

找到一个大文件或者数据流中出现频率最高的 K 项

这个问题的难点在于,如果条件不同,解决的办法是完全不一样的:

  1. 是否需要精确的 Top K 结果?即,是否允许小概率出错。

  2. 数据是离线的还是在线的?即是一个大文件的形式计算一次得到一个结果,还是数据流的形式实时返回结果。

整数数组前K大

简单的一个铺垫问题

在一个整数数组中,找最大的 K 个整数

离线算法

这个问题中,我们可以使用 Quick Select 算法,在O(N)的时间内找到数组的第K大的数,然后再扫描一次整个数组就可以得到前K大的数。

缺点:上面的这个算法要求扫描数组两遍,第一遍求得第K大,第二遍求得前K大。如果数据以数据流的形式给出,即没有机会从头遍历第二次,那么上面的算法就不奏效了。

在线算法

在 Top K Largest Number II 这个问题中,你需要实现2个函数,add和topk,一个是增加一个整数到集合中,另外一个是求出当前集合里的 topk 的整数是哪些。

解决这个问题的办法是使用数据结构堆(Heap)。堆这个结构,支持 O(logN) 的插入,O(1) 的求最大值/最小值,O(logN)删除最值(pop)。其中 N 为堆中元素个数。

低效的算法

一个直观的想法是,我们把所有的整数都放到堆里,维护一个最大堆,每次 add 操作的时候,就往堆里 add 一次。每次 topk 操作的时候,就从堆里取 k 次最大值(pop K次)。这种做法的问题在于,必须维护所有的整数在堆里,假如总数据量为 N 的话,通常 K << N。而 add 的时间复杂度为 O(logN),topk 的时间复杂度为 O(klogN),都不是最优的。

高效的算法

因为这个问题中我们不需要支持删除操作,数据只增不减,那么那些已经排到第K名之后的数据,是再也没有机会进入前K名的,因此保留这些数据的意义并不大,我们可以删掉他们。这样一来,我们需要一个最小堆。或许你直观上会觉得很奇怪,为什么我们要求最大的K个整数却要用最小堆呢?事实上,当我们需要决定一个整数是否能加入 Top K 的集合的时候,决定性的因素是:当前整数是否比 Top K 中的最小的数要大。因此在 Top K 的这 K 个整数中,我们应该维护一个最小堆,推举出最小的整数去和数据流中新来的整数作比较,如果新来的数更大,就踢掉堆里最小的整数,把新数加进去,反之则扔掉新数。

最高频 K 项的离线算法

有了最大K项问题的铺垫之后,我们要求最高频K项的话,就可以用下面这个直观的做法来做了:

# 1. 用一个 Hash 表来统计所有项的出现次数
# 2. 循环每一个出现过的项,用最大K项的方法,获得最大的前K项
# 伪代码 in Python:

hash = {}
for e in elements:
    hash[e] = hash.get(e, 0) + 1

for element, frequency in hash.iteritems():
<element, frequency> 二元组用上一节中讲过的最大 K 项的最小堆的做法求得 TOP K

这个算法,我们暂且称之为 标准离线算法。主要使用了两个数据结构:哈希表最小堆

时空复杂度: 第一步统计所有单词的出现次数,需要 O(N) 的空间和 O(N) 的时间。第二步需要 O(K) 的空间和 O(NlogK) 的时间。总的时间耗费是 O(N log K),空间耗费是 O(N)。

Follow up 1: 还有办法提速么?大数据的处理?

问题描述

这里假设你有若干个小文件(如果是一个很大的文件,可以先进行 split,分割为若干个小文件),他们加起来有 1T 那么大。每个文件中用空格隔开若干单词,你需要统计出现次数最多的 K 个单词。

分布式计算 Map Reduce

Map Reduce 的基本思想很简单,就是通过 Map 这个步骤把通过多台机器并行将所有的数据都整理为 <Key, Value> 的二元组,然后Reduce 操作之前这套系统会按照 Key 的不同,将不同的 Key 分给不同的机器进行处理,比如可以简单的根据 hash(key) % 机器数 的方式来进行数据分配(这个过程叫做shuffle)。接下来,每台机器拿到数据之后,进行reduce合并统计的操作将同一个 Key 的数据进行处理。最终得到了每个 Key 的处理结果。

注意:不使用 Map Reduce 单纯的分割文件分别统计行不行?不行。如果单纯的将文件1丢给机器1,文件2丢给机器2,分别统计 Top K 之后再合并,这种方法是不行的。因为可能最高频的那一项分别出现在了文件1和文件2中,但是他在文件1和2里都没有挤进 Top K,那么统计结果就不对了。

最高频 K 项问题如何使用 Map Reduce?

  1. 通过 Map 步骤,将每一个文件中的单词一个个取出,每个单词构造一个<Word, 1>的 Key-value 二元组,作为 Map 的输出。

  2. 由于可能有多个 Reducers(跟你同时运行的机器数有关,当然一台机器也可能会运行多个Reducer),因此我们会得到多个 Top K,最后还需要从这些输出中过一遍,得到最终的 Top K。这个步骤已经在 Map Reduce 之外了,用一个单独的代码扫一遍就可以了。

  3. 通过 Reduce 的步骤,每个 Reducer(Reducer是处理reduce的机器) 会处理若干个不同的 Key,在每个 Reducer 一开始初始化的时候,构建一个最小堆(如最开始我们提到的算法),Reducer 在每次 Reduce 操作的时候,输入是 key(某个 word) 和他对应的values,其实这里我们可以假设 values 就是一堆 1(事实上 Map Reduce 会帮你做一些优化,导致有可能 value 已经被加过,所以实际处理的时候,还是老老实实的把 values 加起来,而不是看一下 values 有多少个)。那么我们把所有的 values 加起来就是当前这个 key(某个 word)的出现次数。那么当我们拿到这个单词的出现次数之后,就可以在当前的 Reducer 里去和最小堆里的第K大比大小,来决定是否淘汰当前的第K大了。Reducer 在处理完他需要处理的数据之后,就输出他得到的 Top K。

Follow up 2: 内存不够怎么办?

问题描述

上面这个算法中,我们花费了 O(N) 的空间耗费,也就是需要把所有的,比如说单词,都放到内存里。

在现实场景中,这很可能是不行的,因为 N 可能非常的大。即便我们使用上了 Follow up 1 中的 Map Reduce 的办法,那么比如说就算你用了很多台机器,但是分到每台机器上的时候,仍然无法全部加载到内存中,也是有可能的。

假设现在只有一台机器,内存为 1G,你有一个 1T 大小的文件,需要统计里面最高频的 K 个单词。

什么是Hash(哈希),Hash Code(哈希值)

Hash(哈希),是一个非常常见的算法(HashTable 哈希表才是数据结构,Hash是算法)。我们在《九章算法班》中对哈希算法做过详细的介绍,如果你不知道哈希算法做了什么事情,你可以假设已经存在这样一个哈希函数,这个哈希函数对于同一个 Key,会返回一个固定的,无规律的整数值。在工程中,你通常不需要不需要自己实现哈希函数,有很多库可以直接用。你可以在网上搜索相关的你所使用语言的库的哈希函数资料。

所谓的固定的,值得是,同样的 Key,得到的结果,肯定是同样的。同一个单词不可能一会儿算出来的哈希值(Hash Code)是1,一会儿是2。

所谓的无规律,说的是,如果你给哈希函数一大堆不同的 Key 的时候,他产生的哈希值不会扎堆,分布还是比较均匀的。

另外还有一个特点是,哈希值是可能重复的,并不是一对一的。也就是两个不同的 Key,可能得到同一个哈希值。这个特性并不影响我们的计算。

使用哈希算法如何解决内存问题?

  1. 我们只需要先将文件扫描一次,把每个单词作为 Key,算一下他的哈希值,然后模上大概 2000 - 10000 的这样一个数。之所以取这这么一个数是因为,内存的大小是 1G,那么如果将 1T 的文件分成若干个 1G 大小的小文件的话,那么理想需要 1000 个文件。因此反之,如果你将所有的单词,分成了 1000 组的话,理想状况下,每组大概就是 1G 个不同的单词。当然这是理想状况,所以实际上处理的时候,你可以分成 2000 组比较保险。10000 组当然更保险了,但是可能就没有合理利用上内存了。实际做的时候,你可以看一下分成 2000 行不行,不行的话,再放大分组数。

  2. 对于每个文件,分别导入内存进行处理,即使用我们最开始提到的标准离线算法 - 哈希表+最小堆。每一组文件得到一个 Top K。

  3. 类似于 Map Reduce 一样,我们得到了若干个 Top K,我们最后把这若干个 Top K 再合并一次就好了。

最高频 K 项的在线算法

问题描述

数据流中不断流过一些单词,提供一个接口,返回当前出现过的单词中,频次最高的 Top K 个单词。K 从在最开始便已经给出。

即:对于给定的 K,比如 100,实现两个接口:

add(word)
# 添加一个单词到集合中

topk()
# 返回集合中的 Top K 的高频词

数据流Data Stream)问题就是我们所谓的在线问题。数据流问题的特点是,你没有第二次从头访问数据的机会! 因此,在离线算法中,我们用到的先通过哈希表(HashMap)计数,再通过堆(Heap)来统计 Top K 的标准离线算法便不行了。

标准在线算法

对标准离线算法稍微修改一下,我们就可以得到下面这个标准在线算法,思路是,一边计数的同时,一边比较 Top K。

HashMap + HashHeap

class TopKAnalyzer:
    # 构造函数,初始化一个 top k 分析器,指定 k 的大小。
    def __init__(self, k):
        self.k = k

        # 初始化一个哈希堆(HashHeap)。这个数据结构是在堆的基础上加一个哈希表。
        # HashHeap 除了有堆的功能以外,还能够查询某个单词是否在这个堆里。
        # 正如标准离线算法一样,这是一个最小堆
        self.hashheap = HashHeap()

        # 初始化一个哈希表,用于对单词计数
        self.hash = Hash()

    def add(word):
        # 单词计数 +1
        self.hash[word] += 1

        if word in self.hashheap:
           # 调整 word 在 hash heap 中的位置,因为他的计数被更改了
           self.hashheap.adjust(word, self.hash[word])
           return

        # 如果没满,就接着往hashheap里放
        if self.hashheap.size() < self.k:
           self.hashheap.push(word, self.hash[word])
           return

       # 和 hash heap 里出现频次最低的单词相比
       # 如果当前单词更大,就踢掉 hashheap 里的频次最低的单词
       if self.hash[word] > self.hashheap.top().value:
           self.hashheap.pop()
           self.hashheap.push(word, self.hash[word])

    def topk():
        # 从 hashheap 中导出 Top K 的单词
        return self.hashheap.toList()

复杂度分析

这个算法的时间复杂度要分两个维度讨论:

  1. add 的时间复杂度是 O(logK) 的,因为最坏情况下,就是 pop 掉一个单词,push进去一个新的单词。由于 hashheap 的大小最多是 K,那么复杂度为 O(logK)

  2. topk 的时间复杂度是 O(KlogK)。

这个算法的空间复杂度,为计数所用的哈希表的空间复杂度。为数据流中到当前时刻为止的单词总个数。

http://www.lintcode.com/problem/top-k-frequent-words-ii/

Follow up: 内存不够怎么办?

难点描述: 在标准在线算法中,空间复杂度与数据流中流过的数据大小总和有关,也就是用于计数的那个哈希表的耗费。

损失精度换空间

在线的精确的省空间的 Top K 高频项算法,不存在。

因此我们必须损失掉一个因子。这个因子就是 “精确性”。精确性是说,比如我求得了 Top 10 的查询,那么这10个查询中,可能会存在一个查询,他的实际排名在 Top 10 之后。又或者 Top 10 的相对排名并不正确,原本第一名的跑去了第二名。

有哪些算法可以用精度换空间?一些比较成熟的算法有:

  1. Lossy Counting

  2. Sticky Sample

  3. Space Saving

  4. Efficient Count

  5. Hash Count

  6. ...

介绍一个比较容易掌握的,和标准在线算法相比只需要很少的改动的一个算法:Hash Count

class TopKAnalyzer:
    def __init__(self, k):
        self.k = k
        self.hashheap = HashHeap()
        self.hashcount = 开一个整数数组,内存能让你开多大就开多大

    def add(self, word):
        index = hashfunc(word) % self.hashcount.size
        self.hashcount[index] += 1
        wordCount = self.hashcount[index]

        # 其他部分和标准在线算法一样

    def topk(self):
        # 和标准在线算法一样

上面这个算法中,和标准在线算法相比,唯一的区别在于,我们将原本记录所有单词的出现次数的哈希表,换成了一个根据内存大小能开多大开多大的数组。这个数组的每个下标存储了“某些”单词的出现次数。我们使用了 哈希函数hashfunc),对每个单词计算他的哈希值hashcode),将这个值模整个hashcount数组的大小得到一个下标(index),然后用这个下标对应的计数来代表这个单词的出现次数。有了这个单词的出现次数之后,再拿去 hash heap 里进行比较就可以了。

你马上会发现问题,如果有两个单词,他们的 hashcode % hashcount.size 的结果相同,那么他们的计数会被叠加到一起。从而导致计算结果的不精确。

上述问题对精度的影响到底有多大呢?事实上,根据“长尾效应”(Google 一下),在实际数据的统计中,由于 Top K 的 K 相对于整个数据流集合中的不同数据项个数 N 的关系是 K 远远小于 N,而 Top K 的这些数据项的计数又远远大于其他的数据项。因此,Top K 的 hashcode % hashcount.size 扎堆的可能性是非常非常小的。因此这个算法的精确度也就并不会太差。

使用布隆过滤器进行优化

事实上,Hash Count 是一个只有 1 个哈希函数的布隆过滤器(Bloom Filter),一个完整的 BloomFilter 一般至少包含 4 个完全独立的哈希函数。

Reference

https://github.com/CyC2018/Backend-Interview-Guide/blob/master/doc/TopK.md

九章Big Data Interview Questions

Top K 频繁项

This section is by @soulmachine: Heavy Hitters Source: https://soulmachine.gitbooks.io/system-design/cn/bigdata/heavy-hitters.html

寻找数据流中出现最频繁的k个元素(find top k frequent items in a data stream)。这个问题也称为 Heavy Hitters.

这题也是从实践中提炼而来的,例如搜索引擎的热搜榜,找出访问网站次数最多的前10个IP地址,等等。

方案1: HashMap + Heap

用一个HashMap<String, Long>,存放所有元素出现的次数,用一个小根堆,容量为k,存放目前出现过的最频繁的k个元素,

  1. 每次从数据流来一个元素,如果在HashMap里已存在,则把对应的计数器增1,如果不存在,则插入,计数器初始化为1

  2. 在堆里查找该元素,如果找到,把堆里的计数器也增1,并调整堆;如果没有找到,把这个元素的次数跟堆顶元素比较,如果大于堆丁元素的出现次数,则把堆丁元素替换为该元素,并调整堆

  3. 空间复杂度O(n)。HashMap需要存放下所有元素,需要O(n)的空间,堆需要存放k个元素,需要O(k)的空间,跟O(n)相比可以忽略不急,总的时间复杂度是O(n)

  4. 时间复杂度 O(n) 。每次来一个新元素,需要在HashMap里查找一下,需要 O(1) 的时间;然后要在堆里查找一下, O(k) 的时间,有可能需要调堆,又需要 O(logk) 的时间,总的时间复杂度是 O(n(k+logk)) ,k是常量,所以可以看做是O(n)。

如果元素数量巨大,单机内存存不下,怎么办? 有两个办法,见方案2和3。

方案2: 多机HashMap + Heap

  • 可以把数据进行分片。假设有8台机器,第1台机器只处理

    hash(elem)%8==0

    的元素,第2台机器只处理

    hash(elem)%8==1

    的元素,以此类推。

  • 每台机器都有一个HashMap和一个 Heap, 各自独立计算出 top k 的元素

  • 把每台机器的Heap,通过网络汇总到一台机器上,将多个Heap合并成一个Heap,就可以计算出总的 top k 个元素了

方案3: Count-Min Sketch + Heap

既然方案1中的HashMap太大,内存装不小,那么可以用Count-Min Sketch算法代替HashMap,

  • 在数据流不断流入的过程中,维护一个标准的Count-Min Sketch 二维数组

  • 维护一个小根堆,容量为k

  • 每次来一个新元素,

    • 将相应的sketch增1

    • 在堆中查找该元素,如果找到,把堆里的计数器也增1,并调整堆;如果没有找到,把这个元素的sketch作为钙元素的频率的近似值,跟堆顶元素比较,如果大于堆丁元素的频率,则把堆丁元素替换为该元素,并调整堆

这个方法的时间复杂度和空间复杂度如下:

  • 空间复杂度 O(dm)

    。m是二维数组的列数,d是二维数组的行数,堆需要O(k)

    的空间,不过k通常很小,堆的空间可以忽略不计

  • 时间复杂度 O(nlogk)

    。每次来一个新元素,需要在二维数组里查找一下,需要

    O(1)

    的时间;然后要在堆里查找一下,

    O(logk)

    的时间,有可能需要调堆,又需要

    O(logk)

    的时间,总的时间复杂度是

    O(nlogk)

方案4: Lossy Counting

Lossy Couting 算法流程:

  1. 建立一个HashMap

    ,用于存放每个元素的出现次数

  2. 建立一个窗口(窗口的大小由错误率决定,后面具体讨论)

  3. 等待数据流不断流进这个窗口,直到窗口满了,开始统计每个元素出现的频率,统计结束后,每个元素的频率减1,然后将出现次数为0的元素从HashMap中删除

  4. 返回第2步,不断循环

Lossy Counting 背后朴素的思想是,出现频率高的元素,不太可能减一后变成0,如果某个元素在某个窗口内降到了0,说明它不太可能是高频元素,可以不再跟踪它的计数器了。随着处理的窗口越来越多,HashMap也会不断增长,同时HashMap里的低频元素会被清理出去,这样内存占用会保持在一个很低的水平。

很显然,Lossy Counting 算法是个近似算法,但它的错误率是可以在数学上证明它的边界的。假设要求错误率不大于ε,那么窗口大小为1/ε,对于长度为N的流,有N/(1/ε)=εN 个窗口,由于每个窗口结束时减一了,那么频率最多被少计数了窗口个数εN。

该算法只需要一遍扫描,所以时间复杂度是O(n)

该算法的内存占用,主要在于那个HashMap, Gurmeet Singh Manku 在他的论文里,证明了HashMap里最多有1/ε log (εN)个元素,所以空间复杂度是O(1/ε log (εN))

方案5: SpaceSaving

TODO, 原始论文 "Efficient Computation of Frequent and Top-k Elements in Data Streams"

参考资料

  1. A.Metwally, D.Agrawal, A.El Abbadi. Efficient Computation of Frequent and Top-k Elements in Data Streams. In Proceeding of the 10th International Conference on Database Theory(ICDT), pp 398-412,2005.

  2. Massimo Cafaro, et al. “A parallel space saving algorithm for frequent items and the Hurwitz zeta distribution”. Proceeding arXiv: 1401.0702v12 [cs.DS] 19 Setp 2015.

Last updated