Hadoop的Python框架指南

释放双眼,带上耳机,听听看~!

最近,我加入了Cloudera,在这之前,我在计算生物学/基因组学上已经工作了差不多10年。我的分析工作主要是利用Python语言和它来进行的。但Apache Hadoop的生态系统大部分都是用Java来实现的,也是为Java准备的,这让我很恼火。所以,我的头等大事变成了寻找一些Python可以用的Hadoop框架。

在这篇文章里,我会把我个人对这些框架的一些无关科学的看法写下来,这些框架包括:

最终,在我的看来,Hadoop的数据流(streaming)是最快也是最透明的选项,而且最适合于文本处理。mrjob最适合于在Amazon EMR上快速工作,但是会有显著的性能损失。dumbo 对于大多数复杂的工作都很方便(对象作为键名(key)),但是仍然比数据流(streaming)要慢。

请继续往下阅读,以了解实现细节,性能以及功能的比较。

一个有趣的问题

为了测试不同的框架,我们不会做“统计词数”的实验,转而去转化谷歌图书N-元数据。 N-元代表一个n个词构成的元组。这个n-元数据集提供了谷歌图书文集中以年份分组的所有1-,2-,3-,4-,5-元记录的统计数目。 在这个n-元数据集中的每行记录都由三个域构成:n-元,年份,观测次数。(您能够在取得数据)。

我们希望去汇总数据以观测统计任何一对相互临近的词组合所出现的次数,并以年份分组。实验结果将使我们能够判断出是否有词组合在某一年中比正常情况出现的更为频繁。如果统计时,有两个词在四个词的距离内出现过,那么我们定义两个词是“临近”的。 或等价地,如果两个词在2-,3-或者5-元记录中出现过,那么我们也定义它们是”临近“的。 一次,实验的最终产物会包含一个2-元记录,年份和统计次数。

 

有一个微妙的地方必须强调。n-元数据集中每个数据的值都是通过整个谷歌图书语料库来计算的。从原理上来说,给定一个5-元数据集,我可以通过简单地聚合正确的n-元来计算出4-元、3-元和2-元数据集。例如,当5-元数据集包含


1
2
3
4
1(the, cat, in, the, hat)       1999     20
2(the, cat, is, on, youtube)    1999     13
3(how, are, you, doing, today)  1986   5000
4

时,我们可以将它聚合为2-元数据集以得出如下记录


1
2
1(the, cat)  1999    33      // 也就是, 20 + 13
2

然而,实际应用中,只有在整个语料库中出现了40次以上的n元组才会被统计进来。所以,如果某个5元组达不到40次的阈值,那么Google也提供组成这个5元组的2元组数据,这其中有一些或许能够达到阈值。出于这个原因,我们用相邻词的二元数据,隔一个词的三元组,隔两个词的四元组,以此类推。换句话说,与给定二元组相比,三元组多的只是最外层的词。除了对可能的稀疏n元数据更敏感,只用n元组最外层的词还有助于避免重复计算。总的来说,我们将在2元、3元、4元和5元数据集上进行计算。

 

 


1
2
3
4
5
6
7
8
9
10
1def map(record):
2    (ngram, year, count) = unpack(record)
3    // 确保word1为字典第一个字
4    (word1, word2) = sorted(ngram[first], ngram[last])
5    key = (word1, word2, year)
6    emit(key, count)
7
8def reduce(key, values):
9    emit(key, sum(values))
10

 

硬件

这些MapReduce组件在一个大约20GB的随机数据子集上执行。完整的数据集涵盖1500个文件;我们用这个脚本选取一个随机子集。文件名保持完整,这一点相当重要,因为文件名确定了数据块的n-元中n的值。

Hadoop集群包含5个使用CentOS 6.2 x64的虚拟节点,每个都有4个CPU,10GB RAM,100GB硬盘容量,并且运行CDH4。集群每次能够执行20个并行运算,每个组件能够执行10个减速器。

集群上运行的软件版本如下:

 

实现

大多数Python框架都封装了 ,还有一些封装了 ,也有些是基于自己的实现。下面我会分享一些我使用各种Python工具来写Hadoop jobs的经验,并会附上一份性能和特点的比较。我比较感兴趣的特点是易于上手和运行,我不会去优化某个单独的软件的性能。

在处理每一个数据集的时候,都会有一些损坏的记录。对于每一条记录,我们要检查是否有错并识别错误的种类,包括缺少字段以及错误的N元大小。对于后一种情况,我们必须知道记录所在的文件名以便确定该有的N元大小。

获得。

 

Hadoop Streaming

 提供了使用其他可执行程序来作为Hadoop的mapper或者reduce的方式,包括标准Unix工具和Python脚本。这个程序必须使用规定的语义从标准输入读取数据,然后将结果输出到标准输出。直接使用Streaming 的一个缺点是当reduce的输入是按key分组的时候,仍然是一行行迭代的,必须由用户来辨识key与key之间的界限。

下面是mapper的代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1#! /usr/bin/env python
2
3import os
4import re
5import sys
6
7# determine value of n in the current block of ngrams by parsing the filename
8input_file = os.environ['map_input_file']
9expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
10
11for line in sys.stdin:
12    data = line.split('\t')
13
14    # perform some error checking
15    if len(data) < 3:
16        continue
17
18    # unpack data
19    ngram = data[0].split()
20    year = data[1]
21    count = data[2]
22
23    # more error checking
24    if len(ngram) != expected_tokens:
25        continue
26
27    # build key and emit
28    pair = sorted([ngram[0], ngram[expected_tokens - 1]])
29    print >>sys.stdout, "%s\t%s\t%s\t%s" % (pair[0], pair[1], year, count)
30

 

下面是reducer:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1#! /usr/bin/env python
2
3import sys
4
5total = 0
6prev_key = False
7for line in sys.stdin:
8    data = line.split('\t')
9    curr_key = '\t'.join(data[:3])
10    count = int(data[3])
11
12    # found a boundary; emit current sum
13    if prev_key and curr_key != prev_key:
14        print >>sys.stdout, "%s\t%i" % (prev_key, total)
15        prev_key = curr_key
16        total = count
17    # same key; accumulate sum
18    else:
19        prev_key = curr_key
20        total += count
21
22# emit last key
23if prev_key:
24    print >>sys.stdout, "%s\t%i" % (prev_key, total)
25

 

Hadoop流(Streaming)默认用一个tab字符分割健(key)和值(value)。因为我们也用tab字符分割了各个域(field),所以我们必须通过传递给Hadoop下面三个选项来告诉它我们数据的健(key)由前三个域构成。


1
2
3
1-jobconf stream.num.map.output.key.fields=3
2-jobconf stream.num.reduce.output.key.fields=3
3

 

要执行Hadoop任务命令


1
2
3
4
5
6
7
8
9
10
11
12
1hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.2.jar \
2        -input /ngrams \
3        -output /output-streaming \
4        -mapper mapper.py \
5        -combiner reducer.py \
6        -reducer reducer.py \
7        -jobconf stream.num.map.output.key.fields=3 \
8        -jobconf stream.num.reduce.output.key.fields=3 \
9        -jobconf mapred.reduce.tasks=10 \
10        -file mapper.py \
11        -file reducer.py
12

注意,mapper.py和reducer.py在命令中出现了两次,第一次是告诉Hadoop要执行着两个文件,第二次是告诉Hadoop把这两个文件分发给集群的所有节点。

Streaming缺点是必须要手工操作。用户必须自己决定如何将对象转化为为成键值对(比如JSON 对象)。对于二进制数据的支持也不好。而且如上面说过的,必须在reducer手工监控key的边界,这很容易出错。

 

mrjob

是一个开放源码的Python框架,封装Hadoop的数据流,并积极开发Yelp的。,由于Yelp的运作完全在,mrjob的整合与EMR是令人难以置信的光滑和容易(使用包)。

mrjob提供了一个Python的API与Hadoop的数据流,并允许用户使用任何对象作为键和映射器。默认情况下,这些对象被序列化为的内部,但也有支持。有没有其他的二进制I / O格式的开箱即用,但有一个机制来实现自定义序列化。

值得注意的是,mrjob似乎发展的非常快,并有很好的文档。

所有的Python框架,看起来像伪代码实现:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
1#! /usr/bin/env python
2
3import os
4import re
5
6from mrjob.job import MRJob
7from mrjob.protocol import RawProtocol, ReprProtocol
8
9class NgramNeighbors(MRJob):
10
11    # mrjob allows you to specify input/intermediate/output serialization
12    # default output protocol is JSON; here we set it to text
13    OUTPUT_PROTOCOL = RawProtocol
14
15    def mapper_init(self):
16        # determine value of n in the current block of ngrams by parsing filename
17        input_file = os.environ['map_input_file']
18        self.expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
19
20    def mapper(self, key, line):
21        data = line.split('\t')
22
23        # error checking
24        if len(data) < 3:
25            return
26
27        # unpack data
28        ngram = data[0].split()
29        year = data[1]
30        count = int(data[2])
31
32        # more error checking
33        if len(ngram) != self.expected_tokens:
34            return
35
36        # generate key
37        pair = sorted([ngram[0], ngram[self.expected_tokens - 1]])
38        k = pair + [year]
39
40        # note that the key is an object (a list in this case)
41        # that mrjob will serialize as JSON text
42        yield (k, count)
43
44    def combiner(self, key, counts):
45        # the combiner must be separate from the reducer because the input
46        # and output must both be JSON
47        yield (key, sum(counts))
48
49    def reducer(self, key, counts):
50        # the final output is encoded as text
51        yield "%s\t%s\t%s" % tuple(key), str(sum(counts))
52
53if __name__ == '__main__':
54    # sets up a runner, based on command line options
55    NgramNeighbors.run()
56

 

 

 mrjob只需要安装在客户机上,其中在作业的时候提交。下面是要运行的命令:

 


1
2
3
4
1export HADOOP_HOME="/usr/lib/hadoop-0.20-mapreduce"
2./ngrams.py -r hadoop --hadoop-bin /usr/bin/hadoop --jobconf mapred.reduce.tasks=10 -o hdfs:///output-mrjob hdfs:///ngrams
3
4

编写MapReduce的工作是非常直观和简单的。然而,有一个重大的内部序列化计划所产生的成本。最有可能的二进制计划将需要实现的用户(例如,为了支持typedbytes)。也有一些内置的实用程序日志文件的解析。最后,mrjob允许用户写多步骤的MapReduce的工作流程,在那里从一个MapReduce作业的中间输出被自动用作输入到另一个MapReduce工作。

(注:其余的实现都非常相似,除了包具体的实现,他们都能被找到 。)

 

dumbo

 是另外一个使用Hadoop流包装的框架。dumbo出现的较早,本应该被许多人使用,但由于缺少文档,造成开发困难。这也是不如mcjob的一点。

dumbo通过typedbytes执行序列化,能允许更简洁的数据传输,也可以更自然的通过指定JavaInputFormat读取SequenceFiles或者其他格式的文件,比如,dumbo也可以执行Python的egg和Java的JAR文件。

在我的印象中, 我必须要手动安装dumbo中的每一个节点, 它只有在typedbytes和dumbo以eggs形式创建的时候才能运行。 就像他会因为onMemoryErrors终止一样,他也会因为使用组合器停止。

运行dumbo任务的代码是:


1
2
3
4
5
6
7
8
9
10
1dumbo start ngrams.py \
2        -hadoop /usr \
3        -hadooplib /usr/lib/hadoop-0.20-mapreduce/contrib/streaming \
4        -numreducetasks 10 \
5        -input hdfs:///ngrams \
6        -output hdfs:///output-dumbo \
7        -outputformat text \
8        -inputformat text
9
10

hadoopy

 是另外一个兼容dumbo的Streaming封装。同样,它也使用typedbytes序列化数据,并直接把 typedbytes 数据写到HDFS。

它有一个很棒的调试机制, 在这种机制下它可以直接把消息写到标准输出而不会干扰Streaming过程。它和dumbo很相似,但文档要好得多。文档中还提供了与 整合的内容。

用hadoopy的时候有两种发发来启动jobs:

必须在Python程序中启动hadoopy job,它没有内置的命令行工具。

我写了一个脚本通过launch_frozen的方式启动hadoopy


1
2
3
1python launch_hadoopy.py
2
3

用launch_frozen运行之后,我在每个节点上都安装了hadoopy然后用launch方法又运行了一遍,性能明显好得多。

 

pydoop

与其他框架相比

最重要的是,我不能成功的从PIP或者源代码构建pydoop。

 

其他

 

本地java

最后,我使用新的Hadoop Java API接口实施了MR任务,编译完成后,这样来运行它:


1
2
1hadoop jar /root/ngrams/native/target/NgramsComparison-0.0.1-SNAPSHOT.jar NgramsDriver hdfs:///ngrams hdfs:///output-native
2

关于计数器的特别说明

在我的MR jobs的最初实现里,我用计数器来跟踪监控不良记录。在Streaming里,需要把信息写到stderr。事实证明这会带来不容忽视的额外开销:Streaming job花的时间是原生java job的3.4倍。这个框架同样有此问题。

 

 

性能比较

将用Java实现的MapReduce job作为性能基准。 Python框架的值是其相对于Java的性能指标的比率。

Java明显最快,,Streaming要多花一半时间,Python框架花的时间更多。从mrjob mapper的profile数据来看,它在序列化/反序列化上花费了大量时间。dumbo和hadoopy在这方面要好一点。如果用了combiner 的话dumbo 还可以更快。

特点比较

大多来自各自软件包中的文档以及代码库。

结论

Streaming是最快的Python方案,这面面没有任何魔力。但是在用它来实现reduce逻辑的时候,以及有很多复杂对象的时候要特别小心。

所有的Python框架看起来都像是伪码,这非常棒。

mrjob更新快,成熟的易用,用它来组织多步MapReduce的工作流很容易,还可以方便地使用复杂对象。它还可以无缝使用EMR。但是它也是执行速度最慢的。

 

还有一些不是很流行的 Python 框架,他们的主要优势是内置了对于二进制格式的支持,但如果有必要话,这个完全可以由用户代码来自己实现。

如果你在实践中有自己的认识,或是发现本文有错误,请在回复里提出。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

基于lucene的案例开发:数据库连接池

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索