用Hadoop构建电影推荐系统

释放双眼,带上耳机,听听看~!

前言

Netflix电影推荐的百万美金比赛,把“推荐”变成了时下最热门的数据挖掘算法之一。也正是由于Netflix的比赛,让企业界和学科界有了更深层次的技术碰撞。引发了各种网站“推荐”热,个性时代已经到来。

目录

  1. 推荐系统概述
  2. 需求分析:推荐系统指标设计
  3. 算法模型:Hadoop并行算法
  4. 架构设计:推荐系统架构
  5. 程序开发:MapReduce程序实现
  6. 补充内容:对Step4过程优化

1. 推荐系统概述

电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售额。

不光是电商类,推荐系统无处不在。QQ,人人网的好友推荐;新浪微博的你可能感觉兴趣的人;优酷,土豆的电影推荐;豆瓣的图书推荐;大从点评的餐饮推荐;世纪佳缘的相亲推荐;天际网的职业推荐等。

推荐算法分类:

按数据使用划分:

  • 协同过滤算法:UserCF, ItemCF, ModelCF
  • 基于内容的推荐: 用户内容属性和物品内容属性
  • 社会化过滤:基于用户的社会网络关系

按模型划分:

  • 最近邻模型:基于距离的协同过滤算法
  • Latent Factor Mode(SVD):基于矩阵分解的模型
  • Graph:图模型,社会网络图模型

基于用户的协同过滤算法UserCF

基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。

用例说明:

算法实现及使用介绍,请参考文章:Mahout推荐算法API详解

基于物品的协同过滤算法ItemCF

基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。

用例说明:

算法实现及使用介绍,请参考文章:Mahout推荐算法API详解

注:基于物品的协同过滤算法,是目前商用最广泛的推荐算法。

协同过滤算法实现,分为2个步骤

    1. 计算物品之间的相似度
    1. 根据物品的相似度和用户的历史行为给用户生成推荐列表

有关协同过滤的另一篇文章,请参考:RHadoop实践系列之三 R实现MapReduce的协同过滤算法

2. 需求分析:推荐系统指标设计

下面我们将从一个公司案例出发来全面的解释,如何进行推荐系统指标设计。

案例介绍

Netflix电影推荐百万奖金比赛,http://www.netflixprize.com/
Netflix官方网站:www.netflix.com

Netflix,2006年组织比赛是的时候,是一家以在线电影租赁为生的公司。他们根据网友对电影的打分来判断用户有可能喜欢什么电影,并结合会员看过的电影以及口味偏好设置做出判断,混搭出各种电影风格的需求。

收集会员的一些信息,为他们指定个性化的电影推荐后,有许多冷门电影竟然进入了候租榜单。从公司的电影资源成本方面考量,热门电影的成本一般较高,如果Netflix公司能够在电影租赁中增加冷门电影的比例,自然能够提升自身盈利能力。

Netflix公司曾宣称60%左右的会员根据推荐名单定制租赁顺序,如果推荐系统不能准确地猜测会员喜欢的电影类型,容易造成多次租借冷门电影而并不符合个人口味的会员流失。为了更高效地为会员推荐电影,Netflix一直致力于不断改进和完善个性化推荐服务,在2006年推出百万美元大奖,无论是谁能最好地优化Netflix推荐算法就可获奖励100万美元。到2009年,奖金被一个7人开发小组夺得,Netflix随后又立即推出第二个百万美金悬赏。这充分说明一套好的推荐算法系统是多么重要,同时又是多么困难。

上图为比赛的各支队伍的排名!

补充说明:

    1. Netflix的比赛是基于静态数据的,就是给定“训练级”,匹配“结果集”,“结果集”也是提前就做好的,所以这与我们每天运营的系统,其实是不一样的。
    1. Netflix用于比赛的数据集是小量的,整个全集才666MB,而实际的推荐系统都要基于大量历史数据的,动不动就会上GB,TB等

Netflix数据下载
部分训练集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.train_.gz
部分结果集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.validate.gz
完整数据集:http://www.lifecrunch.biz/wp-content/uploads/2011/04/nf_prize_dataset.tar.gz

所以,我们在真实的环境中设计推荐的时候,要全面考量数据量,算法性能,结果准确度等的指标。

  • 推荐算法选型:基于物品的协同过滤算法ItemCF,并行实现
  • 数据量:基于Hadoop架构,支持GB,TB,PB级数据量
  • 算法检验:可以通过 准确率,召回率,覆盖率,流行度 等指标评判。
  • 结果解读:通过ItemCF的定义,合理给出结果解释

3. 算法模型:Hadoop并行算法

这里我使用”Mahout In Action”书里,第一章第六节介绍的分步式基于物品的协同过滤算法进行实现。Chapter 6: Distributing recommendation computations

测试数据集:small.csv


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1
21,101,5.0
31,102,3.0
41,103,2.5
52,101,2.0
62,102,2.5
72,103,5.0
82,104,2.0
93,101,2.0
103,104,4.0
113,105,4.5
123,107,5.0
134,101,5.0
144,103,3.0
154,104,4.5
164,106,4.0
175,101,4.0
185,102,3.0
195,103,2.0
205,104,4.0
215,105,3.5
225,106,4.0
23
24

每行3个字段,依次是用户ID,电影ID,用户对电影的评分(0-5分,每0.5为一个评分点!)

算法的思想:

    1. 建立物品的同现矩阵
    1. 建立用户对物品的评分矩阵
    1. 矩阵计算推荐结果

1). 建立物品的同现矩阵
按用户分组,找到每个用户所选的物品,单独出现计数及两两一组计数。


1
2
3
4
5
6
7
8
9
10
11
1
2      [101] [102] [103] [104] [105] [106] [107]
3[101]   5     3     4     4     2     2     1
4[102]   3     3     3     2     1     1     0
5[103]   4     3     4     3     1     2     0
6[104]   4     2     3     4     2     2     1
7[105]   2     1     1     2     2     1     1
8[106]   2     1     2     2     1     2     0
9[107]   1     0     0     1     1     0     1
10
11

2). 建立用户对物品的评分矩阵
按用户分组,找到每个用户所选的物品及评分


1
2
3
4
5
6
7
8
9
10
11
1
2       U3
3[101] 2.0
4[102] 0.0
5[103] 0.0
6[104] 4.0
7[105] 4.5
8[106] 0.0
9[107] 5.0
10
11

3). 矩阵计算推荐结果
同现矩阵*评分矩阵=推荐结果

图片摘自”Mahout In Action”

MapReduce任务设计

图片摘自”Mahout In Action”

解读MapRduce任务:

  • 步骤1: 按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
  • 步骤2: 对物品组合列表进行计数,建立物品的同现矩阵
  • 步骤3: 合并同现矩阵和评分矩阵
  • 步骤4: 计算推荐结果列表

4. 架构设计:推荐系统架构

上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

  1. 业务系统记录了用户的行为和对物品的打分
  2. 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid,itemid,value,time)。
  3. 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
  4. 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。

5. 程序开发:MapReduce程序实现

win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目

新建Java类:

  • Recommend.java,主任务启动程序
  • Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
  • Step2.java,对物品组合列表进行计数,建立物品的同现矩阵
  • Step3.java,合并同现矩阵和评分矩阵
  • Step4.java,计算推荐结果列表
  • HdfsDAO.java,HDFS操作工具类

1). Recommend.java,主任务启动程序
源代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1
2package org.conan.myhadoop.recommend;
3
4import java.util.HashMap;
5import java.util.Map;
6import java.util.regex.Pattern;
7
8import org.apache.hadoop.mapred.JobConf;
9
10public class Recommend {
11
12    public static final String HDFS = "hdfs://192.168.1.210:9000";
13    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
14
15    public static void main(String[] args) throws Exception {
16        Map<String, String> path = new HashMap<String, String>();
17        path.put("data", "logfile/small.csv");
18        path.put("Step1Input", HDFS + "/user/hdfs/recommend");
19        path.put("Step1Output", path.get("Step1Input") + "/step1");
20        path.put("Step2Input", path.get("Step1Output"));
21        path.put("Step2Output", path.get("Step1Input") + "/step2");
22        path.put("Step3Input1", path.get("Step1Output"));
23        path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
24        path.put("Step3Input2", path.get("Step2Output"));
25        path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
26        path.put("Step4Input1", path.get("Step3Output1"));
27        path.put("Step4Input2", path.get("Step3Output2"));
28        path.put("Step4Output", path.get("Step1Input") + "/step4");
29
30        Step1.run(path);
31        Step2.run(path);
32        Step3.run1(path);
33        Step3.run2(path);
34        Step4.run(path);
35        System.exit(0);
36    }
37
38    public static JobConf config() {
39        JobConf conf = new JobConf(Recommend.class);
40        conf.setJobName("Recommend");
41        conf.addResource("classpath:/hadoop/core-site.xml");
42        conf.addResource("classpath:/hadoop/hdfs-site.xml");
43        conf.addResource("classpath:/hadoop/mapred-site.xml");
44        return conf;
45    }
46
47}
48
49

2). Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵

源代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
1
2package org.conan.myhadoop.recommend;
3
4import java.io.IOException;
5import java.util.Iterator;
6import java.util.Map;
7
8import org.apache.hadoop.fs.Path;
9import org.apache.hadoop.io.IntWritable;
10import org.apache.hadoop.io.Text;
11import org.apache.hadoop.mapred.FileInputFormat;
12import org.apache.hadoop.mapred.FileOutputFormat;
13import org.apache.hadoop.mapred.JobClient;
14import org.apache.hadoop.mapred.JobConf;
15import org.apache.hadoop.mapred.MapReduceBase;
16import org.apache.hadoop.mapred.Mapper;
17import org.apache.hadoop.mapred.OutputCollector;
18import org.apache.hadoop.mapred.Reducer;
19import org.apache.hadoop.mapred.Reporter;
20import org.apache.hadoop.mapred.RunningJob;
21import org.apache.hadoop.mapred.TextInputFormat;
22import org.apache.hadoop.mapred.TextOutputFormat;
23import org.conan.myhadoop.hdfs.HdfsDAO;
24
25public class Step1 {
26
27    public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> {
28        private final static IntWritable k = new IntWritable();
29        private final static Text v = new Text();
30
31        @Override
32        public void map(Object key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
33            String[] tokens = Recommend.DELIMITER.split(value.toString());
34            int userID = Integer.parseInt(tokens[0]);
35            String itemID = tokens[1];
36            String pref = tokens[2];
37            k.set(userID);
38            v.set(itemID + ":" + pref);
39            output.collect(k, v);
40        }
41    }
42
43    public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
44        private final static Text v = new Text();
45
46        @Override
47        public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
48            StringBuilder sb = new StringBuilder();
49            while (values.hasNext()) {
50                sb.append("," + values.next());
51            }
52            v.set(sb.toString().replaceFirst(",", ""));
53            output.collect(key, v);
54        }
55    }
56
57    public static void run(Map<String, String> path) throws IOException {
58        JobConf conf = Recommend.config();
59
60        String input = path.get("Step1Input");
61        String output = path.get("Step1Output");
62
63        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
64        hdfs.rmr(input);
65        hdfs.mkdirs(input);
66        hdfs.copyFile(path.get("data"), input);
67
68        conf.setMapOutputKeyClass(IntWritable.class);
69        conf.setMapOutputValueClass(Text.class);
70
71        conf.setOutputKeyClass(IntWritable.class);
72        conf.setOutputValueClass(Text.class);
73
74        conf.setMapperClass(Step1_ToItemPreMapper.class);
75        conf.setCombinerClass(Step1_ToUserVectorReducer.class);
76        conf.setReducerClass(Step1_ToUserVectorReducer.class);
77
78        conf.setInputFormat(TextInputFormat.class);
79        conf.setOutputFormat(TextOutputFormat.class);
80
81        FileInputFormat.setInputPaths(conf, new Path(input));
82        FileOutputFormat.setOutputPath(conf, new Path(output));
83
84        RunningJob job = JobClient.runJob(conf);
85        while (!job.isComplete()) {
86            job.waitForCompletion();
87        }
88    }
89
90}
91
92
93

计算结果:


1
2
3
4
5
6
7
8
9
10
1
2~ hadoop fs -cat /user/hdfs/recommend/step1/part-00000
3
41       102:3.0,103:2.5,101:5.0
52       101:2.0,102:2.5,103:5.0,104:2.0
63       107:5.0,101:2.0,104:4.0,105:4.5
74       101:5.0,103:3.0,104:4.5,106:4.0
85       101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0
9
10

3). Step2.java,对物品组合列表进行计数,建立物品的同现矩阵
源代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
1
2package org.conan.myhadoop.recommend;
3
4import java.io.IOException;
5import java.util.Iterator;
6import java.util.Map;
7
8import org.apache.hadoop.fs.Path;
9import org.apache.hadoop.io.IntWritable;
10import org.apache.hadoop.io.LongWritable;
11import org.apache.hadoop.io.Text;
12import org.apache.hadoop.mapred.FileInputFormat;
13import org.apache.hadoop.mapred.FileOutputFormat;
14import org.apache.hadoop.mapred.JobClient;
15import org.apache.hadoop.mapred.JobConf;
16import org.apache.hadoop.mapred.MapReduceBase;
17import org.apache.hadoop.mapred.Mapper;
18import org.apache.hadoop.mapred.OutputCollector;
19import org.apache.hadoop.mapred.Reducer;
20import org.apache.hadoop.mapred.Reporter;
21import org.apache.hadoop.mapred.RunningJob;
22import org.apache.hadoop.mapred.TextInputFormat;
23import org.apache.hadoop.mapred.TextOutputFormat;
24import org.conan.myhadoop.hdfs.HdfsDAO;
25
26public class Step2 {
27    public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
28        private final static Text k = new Text();
29        private final static IntWritable v = new IntWritable(1);
30
31        @Override
32        public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
33            String[] tokens = Recommend.DELIMITER.split(values.toString());
34            for (int i = 1; i < tokens.length; i++) {
35                String itemID = tokens[i].split(":")[0];
36                for (int j = 1; j < tokens.length; j++) {
37                    String itemID2 = tokens[j].split(":")[0];
38                    k.set(itemID + ":" + itemID2);
39                    output.collect(k, v);
40                }
41            }
42        }
43    }
44
45    public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
46        private IntWritable result = new IntWritable();
47
48        @Override
49        public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
50            int sum = 0;
51            while (values.hasNext()) {
52                sum += values.next().get();
53            }
54            result.set(sum);
55            output.collect(key, result);
56        }
57    }
58
59    public static void run(Map<String, String> path) throws IOException {
60        JobConf conf = Recommend.config();
61
62        String input = path.get("Step2Input");
63        String output = path.get("Step2Output");
64
65        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
66        hdfs.rmr(output);
67
68        conf.setOutputKeyClass(Text.class);
69        conf.setOutputValueClass(IntWritable.class);
70
71        conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
72        conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
73        conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
74
75        conf.setInputFormat(TextInputFormat.class);
76        conf.setOutputFormat(TextOutputFormat.class);
77
78        FileInputFormat.setInputPaths(conf, new Path(input));
79        FileOutputFormat.setOutputPath(conf, new Path(output));
80
81        RunningJob job = JobClient.runJob(conf);
82        while (!job.isComplete()) {
83            job.waitForCompletion();
84        }
85    }
86}
87
88
89

计算结果:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1
2~ hadoop fs -cat /user/hdfs/recommend/step2/part-00000
3
4101:101 5
5101:102 3
6101:103 4
7101:104 4
8101:105 2
9101:106 2
10101:107 1
11102:101 3
12102:102 3
13102:103 3
14102:104 2
15102:105 1
16102:106 1
17103:101 4
18103:102 3
19103:103 4
20103:104 3
21103:105 1
22103:106 2
23104:101 4
24104:102 2
25104:103 3
26104:104 4
27104:105 2
28104:106 2
29104:107 1
30105:101 2
31105:102 1
32105:103 1
33105:104 2
34105:105 2
35105:106 1
36105:107 1
37106:101 2
38106:102 1
39106:103 2
40106:104 2
41106:105 1
42106:106 2
43107:101 1
44107:104 1
45107:105 1
46107:107 1
47
48

4). Step3.java,合并同现矩阵和评分矩阵
源代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
1
2package org.conan.myhadoop.recommend;
3
4import java.io.IOException;
5import java.util.Map;
6
7import org.apache.hadoop.fs.Path;
8import org.apache.hadoop.io.IntWritable;
9import org.apache.hadoop.io.LongWritable;
10import org.apache.hadoop.io.Text;
11import org.apache.hadoop.mapred.FileInputFormat;
12import org.apache.hadoop.mapred.FileOutputFormat;
13import org.apache.hadoop.mapred.JobClient;
14import org.apache.hadoop.mapred.JobConf;
15import org.apache.hadoop.mapred.MapReduceBase;
16import org.apache.hadoop.mapred.Mapper;
17import org.apache.hadoop.mapred.OutputCollector;
18import org.apache.hadoop.mapred.Reporter;
19import org.apache.hadoop.mapred.RunningJob;
20import org.apache.hadoop.mapred.TextInputFormat;
21import org.apache.hadoop.mapred.TextOutputFormat;
22import org.conan.myhadoop.hdfs.HdfsDAO;
23
24public class Step3 {
25
26    public static class Step31_UserVectorSplitterMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
27        private final static IntWritable k = new IntWritable();
28        private final static Text v = new Text();
29
30        @Override
31        public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
32            String[] tokens = Recommend.DELIMITER.split(values.toString());
33            for (int i = 1; i < tokens.length; i++) {
34                String[] vector = tokens[i].split(":");
35                int itemID = Integer.parseInt(vector[0]);
36                String pref = vector[1];
37
38                k.set(itemID);
39                v.set(tokens[0] + ":" + pref);
40                output.collect(k, v);
41            }
42        }
43    }
44
45    public static void run1(Map<String, String> path) throws IOException {
46        JobConf conf = Recommend.config();
47
48        String input = path.get("Step3Input1");
49        String output = path.get("Step3Output1");
50
51        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
52        hdfs.rmr(output);
53
54        conf.setOutputKeyClass(IntWritable.class);
55        conf.setOutputValueClass(Text.class);
56
57        conf.setMapperClass(Step31_UserVectorSplitterMapper.class);
58
59        conf.setInputFormat(TextInputFormat.class);
60        conf.setOutputFormat(TextOutputFormat.class);
61
62        FileInputFormat.setInputPaths(conf, new Path(input));
63        FileOutputFormat.setOutputPath(conf, new Path(output));
64
65        RunningJob job = JobClient.runJob(conf);
66        while (!job.isComplete()) {
67            job.waitForCompletion();
68        }
69    }
70
71    public static class Step32_CooccurrenceColumnWrapperMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
72        private final static Text k = new Text();
73        private final static IntWritable v = new IntWritable();
74
75        @Override
76        public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
77            String[] tokens = Recommend.DELIMITER.split(values.toString());
78            k.set(tokens[0]);
79            v.set(Integer.parseInt(tokens[1]));
80            output.collect(k, v);
81        }
82    }
83
84    public static void run2(Map<String, String> path) throws IOException {
85        JobConf conf = Recommend.config();
86
87        String input = path.get("Step3Input2");
88        String output = path.get("Step3Output2");
89
90        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
91        hdfs.rmr(output);
92
93        conf.setOutputKeyClass(Text.class);
94        conf.setOutputValueClass(IntWritable.class);
95
96        conf.setMapperClass(Step32_CooccurrenceColumnWrapperMapper.class);
97
98        conf.setInputFormat(TextInputFormat.class);
99        conf.setOutputFormat(TextOutputFormat.class);
100
101        FileInputFormat.setInputPaths(conf, new Path(input));
102        FileOutputFormat.setOutputPath(conf, new Path(output));
103
104        RunningJob job = JobClient.runJob(conf);
105        while (!job.isComplete()) {
106            job.waitForCompletion();
107        }
108    }
109
110}
111
112
113

计算结果:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
1
2~ hadoop fs -cat /user/hdfs/recommend/step3_1/part-00000
3
4101     5:4.0
5101     1:5.0
6101     2:2.0
7101     3:2.0
8101     4:5.0
9102     1:3.0
10102     5:3.0
11102     2:2.5
12103     2:5.0
13103     5:2.0
14103     1:2.5
15103     4:3.0
16104     2:2.0
17104     5:4.0
18104     3:4.0
19104     4:4.5
20105     3:4.5
21105     5:3.5
22106     5:4.0
23106     4:4.0
24107     3:5.0
25
26~ hadoop fs -cat /user/hdfs/recommend/step3_2/part-00000
27
28101:101 5
29101:102 3
30101:103 4
31101:104 4
32101:105 2
33101:106 2
34101:107 1
35102:101 3
36102:102 3
37102:103 3
38102:104 2
39102:105 1
40102:106 1
41103:101 4
42103:102 3
43103:103 4
44103:104 3
45103:105 1
46103:106 2
47104:101 4
48104:102 2
49104:103 3
50104:104 4
51104:105 2
52104:106 2
53104:107 1
54105:101 2
55105:102 1
56105:103 1
57105:104 2
58105:105 2
59105:106 1
60105:107 1
61106:101 2
62106:102 1
63106:103 2
64106:104 2
65106:105 1
66106:106 2
67107:101 1
68107:104 1
69107:105 1
70107:107 1
71
72

5). Step4.java,计算推荐结果列表
源代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
1
2package org.conan.myhadoop.recommend;
3
4import java.io.IOException;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.List;
9import java.util.Map;
10
11import org.apache.hadoop.fs.Path;
12import org.apache.hadoop.io.IntWritable;
13import org.apache.hadoop.io.LongWritable;
14import org.apache.hadoop.io.Text;
15import org.apache.hadoop.mapred.FileInputFormat;
16import org.apache.hadoop.mapred.FileOutputFormat;
17import org.apache.hadoop.mapred.JobClient;
18import org.apache.hadoop.mapred.JobConf;
19import org.apache.hadoop.mapred.MapReduceBase;
20import org.apache.hadoop.mapred.Mapper;
21import org.apache.hadoop.mapred.OutputCollector;
22import org.apache.hadoop.mapred.Reducer;
23import org.apache.hadoop.mapred.Reporter;
24import org.apache.hadoop.mapred.RunningJob;
25import org.apache.hadoop.mapred.TextInputFormat;
26import org.apache.hadoop.mapred.TextOutputFormat;
27import org.conan.myhadoop.hdfs.HdfsDAO;
28
29public class Step4 {
30
31    public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
32        private final static IntWritable k = new IntWritable();
33        private final static Text v = new Text();
34
35        private final static Map<Integer, List> cooccurrenceMatrix = new HashMap<Integer, List>();
36
37        @Override
38        public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
39            String[] tokens = Recommend.DELIMITER.split(values.toString());
40
41            String[] v1 = tokens[0].split(":");
42            String[] v2 = tokens[1].split(":");
43
44            if (v1.length > 1) {// cooccurrence
45                int itemID1 = Integer.parseInt(v1[0]);
46                int itemID2 = Integer.parseInt(v1[1]);
47                int num = Integer.parseInt(tokens[1]);
48
49                List list = null;
50                if (!cooccurrenceMatrix.containsKey(itemID1)) {
51                    list = new ArrayList();
52                } else {
53                    list = cooccurrenceMatrix.get(itemID1);
54                }
55                list.add(new Cooccurrence(itemID1, itemID2, num));
56                cooccurrenceMatrix.put(itemID1, list);
57            }
58
59            if (v2.length > 1) {// userVector
60                int itemID = Integer.parseInt(tokens[0]);
61                int userID = Integer.parseInt(v2[0]);
62                double pref = Double.parseDouble(v2[1]);
63                k.set(userID);
64                for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
65                    v.set(co.getItemID2() + "," + pref * co.getNum());
66                    output.collect(k, v);
67                }
68
69            }
70        }
71    }
72
73    public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
74        private final static Text v = new Text();
75
76        @Override
77        public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
78            Map<String, Double> result = new HashMap<String, Double>();
79            while (values.hasNext()) {
80                String[] str = values.next().toString().split(",");
81                if (result.containsKey(str[0])) {
82                    result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1]));
83                } else {
84                    result.put(str[0], Double.parseDouble(str[1]));
85                }
86            }
87            Iterator iter = result.keySet().iterator();
88            while (iter.hasNext()) {
89                String itemID = iter.next();
90                double score = result.get(itemID);
91                v.set(itemID + "," + score);
92                output.collect(key, v);
93            }
94        }
95    }
96
97    public static void run(Map<String, String> path) throws IOException {
98        JobConf conf = Recommend.config();
99
100        String input1 = path.get("Step4Input1");
101        String input2 = path.get("Step4Input2");
102        String output = path.get("Step4Output");
103
104        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
105        hdfs.rmr(output);
106
107        conf.setOutputKeyClass(IntWritable.class);
108        conf.setOutputValueClass(Text.class);
109
110        conf.setMapperClass(Step4_PartialMultiplyMapper.class);
111        conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class);
112        conf.setReducerClass(Step4_AggregateAndRecommendReducer.class);
113
114        conf.setInputFormat(TextInputFormat.class);
115        conf.setOutputFormat(TextOutputFormat.class);
116
117        FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2));
118        FileOutputFormat.setOutputPath(conf, new Path(output));
119
120        RunningJob job = JobClient.runJob(conf);
121        while (!job.isComplete()) {
122            job.waitForCompletion();
123        }
124    }
125
126}
127
128class Cooccurrence {
129    private int itemID1;
130    private int itemID2;
131    private int num;
132
133    public Cooccurrence(int itemID1, int itemID2, int num) {
134        super();
135        this.itemID1 = itemID1;
136        this.itemID2 = itemID2;
137        this.num = num;
138    }
139
140    public int getItemID1() {
141        return itemID1;
142    }
143
144    public void setItemID1(int itemID1) {
145        this.itemID1 = itemID1;
146    }
147
148    public int getItemID2() {
149        return itemID2;
150    }
151
152    public void setItemID2(int itemID2) {
153        this.itemID2 = itemID2;
154    }
155
156    public int getNum() {
157        return num;
158    }
159
160    public void setNum(int num) {
161        this.num = num;
162    }
163
164}
165
166
167

计算结果:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1
2~ hadoop fs -cat /user/hdfs/recommend/step4/part-00000
3
41       107,5.0
51       106,18.0
61       105,15.5
71       104,33.5
81       103,39.0
91       102,31.5
101       101,44.0
112       107,4.0
122       106,20.5
132       105,15.5
142       104,36.0
152       103,41.5
162       102,32.5
172       101,45.5
183       107,15.5
193       106,16.5
203       105,26.0
213       104,38.0
223       103,24.5
233       102,18.5
243       101,40.0
254       107,9.5
264       106,33.0
274       105,26.0
284       104,55.0
294       103,53.5
304       102,37.0
314       101,63.0
325       107,11.5
335       106,34.5
345       105,32.0
355       104,59.0
365       103,56.5
375       102,42.5
385       101,68.0
39
40

对Step4过程优化,请参考本文最后的补充内容。

6). HdfsDAO.java,HDFS操作工具类
详细解释,请参考文章:Hadoop编程调用HDFS

源代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
1
2package org.conan.myhadoop.hdfs;
3
4import java.io.IOException;
5import java.net.URI;
6
7import org.apache.hadoop.conf.Configuration;
8import org.apache.hadoop.fs.FSDataInputStream;
9import org.apache.hadoop.fs.FSDataOutputStream;
10import org.apache.hadoop.fs.FileStatus;
11import org.apache.hadoop.fs.FileSystem;
12import org.apache.hadoop.fs.Path;
13import org.apache.hadoop.io.IOUtils;
14import org.apache.hadoop.mapred.JobConf;
15
16public class HdfsDAO {
17
18    private static final String HDFS = "hdfs://192.168.1.210:9000/";
19
20    public HdfsDAO(Configuration conf) {
21        this(HDFS, conf);
22    }
23
24    public HdfsDAO(String hdfs, Configuration conf) {
25        this.hdfsPath = hdfs;
26        this.conf = conf;
27    }
28
29    private String hdfsPath;
30    private Configuration conf;
31
32    public static void main(String[] args) throws IOException {
33        JobConf conf = config();
34        HdfsDAO hdfs = new HdfsDAO(conf);
35        hdfs.copyFile("datafile/item.csv", "/tmp/new");
36        hdfs.ls("/tmp/new");
37    }        
38
39    public static JobConf config(){
40        JobConf conf = new JobConf(HdfsDAO.class);
41        conf.setJobName("HdfsDAO");
42        conf.addResource("classpath:/hadoop/core-site.xml");
43        conf.addResource("classpath:/hadoop/hdfs-site.xml");
44        conf.addResource("classpath:/hadoop/mapred-site.xml");
45        return conf;
46    }
47
48    public void mkdirs(String folder) throws IOException {
49        Path path = new Path(folder);
50        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
51        if (!fs.exists(path)) {
52            fs.mkdirs(path);
53            System.out.println("Create: " + folder);
54        }
55        fs.close();
56    }
57
58    public void rmr(String folder) throws IOException {
59        Path path = new Path(folder);
60        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
61        fs.deleteOnExit(path);
62        System.out.println("Delete: " + folder);
63        fs.close();
64    }
65
66    public void ls(String folder) throws IOException {
67        Path path = new Path(folder);
68        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
69        FileStatus[] list = fs.listStatus(path);
70        System.out.println("ls: " + folder);
71        System.out.println("==========================================================");
72        for (FileStatus f : list) {
73            System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
74        }
75        System.out.println("==========================================================");
76        fs.close();
77    }
78
79    public void createFile(String file, String content) throws IOException {
80        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
81        byte[] buff = content.getBytes();
82        FSDataOutputStream os = null;
83        try {
84            os = fs.create(new Path(file));
85            os.write(buff, 0, buff.length);
86            System.out.println("Create: " + file);
87        } finally {
88            if (os != null)
89                os.close();
90        }
91        fs.close();
92    }
93
94    public void copyFile(String local, String remote) throws IOException {
95        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
96        fs.copyFromLocalFile(new Path(local), new Path(remote));
97        System.out.println("copy from: " + local + " to " + remote);
98        fs.close();
99    }
100
101    public void download(String remote, String local) throws IOException {
102        Path path = new Path(remote);
103        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
104        fs.copyToLocalFile(path, new Path(local));
105        System.out.println("download: from" + remote + " to " + local);
106        fs.close();
107    }
108
109    public void cat(String remoteFile) throws IOException {
110        Path path = new Path(remoteFile);
111        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
112        FSDataInputStream fsdis = null;
113        System.out.println("cat: " + remoteFile);
114        try {  
115            fsdis =fs.open(path);
116            IOUtils.copyBytes(fsdis, System.out, 4096, false);  
117          } finally {  
118            IOUtils.closeStream(fsdis);
119            fs.close();
120          }
121    }
122}
123
124

这样我们就自己编程实现了MapReduce化基于物品的协同过滤算法。

RHadoop的实现方案,请参考文章:RHadoop实践系列之三 R实现MapReduce的协同过滤算法

Mahout的实现方案,请参考文章:Mahout分步式程序开发 基于物品的协同过滤ItemCF

我已经把整个MapReduce的实现都放到了github上面:
https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend

6. 补充内容:对Step4过程优化

在Step4.java这一步运行过程中,Mapper过程在Step4_PartialMultiplyMapper类通过分别读取两个input数据,在内存中进行了计算。

这种方式有明显的限制条件:

  • a. 两个输入数据集,有严格的读入顺序。由于Hadoop不能指定读入顺序,因此在多节点的Hadoop集群环境,读入顺序有可能会发生错误,造成程序的空指针错误。
  • b. 这个计算过程,在内存中实现。如果矩阵过大,会造成单节点的内存不足。

做为优化的方案,我们需要对Step4的过程,实现MapReduce的矩阵乘法,矩阵算法原理请参考文章:用MapReduce实现矩阵乘法

对Step4优化的实现:把矩阵计算通过两个MapReduce过程实现。

  • 矩阵乘法过程类文件:Step4_Update.java
  • 矩阵加法过程类文件:Step4_Update2.java
  • 修改启动程序:Recommend.java

增加文件:Step4_Update.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
1
2package org.conan.myhadoop.recommend;
3
4import java.io.IOException;
5import java.util.HashMap;
6import java.util.Iterator;
7import java.util.Map;
8
9import org.apache.hadoop.fs.Path;
10import org.apache.hadoop.io.LongWritable;
11import org.apache.hadoop.io.Text;
12import org.apache.hadoop.mapred.JobConf;
13import org.apache.hadoop.mapreduce.Job;
14import org.apache.hadoop.mapreduce.Mapper;
15import org.apache.hadoop.mapreduce.Reducer;
16import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17import org.apache.hadoop.mapreduce.lib.input.FileSplit;
18import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
20import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
21import org.conan.myhadoop.hdfs.HdfsDAO;
22
23public class Step4_Update {
24
25    public static class Step4_PartialMultiplyMapper extends Mapper {
26
27        private String flag;// A同现矩阵 or B评分矩阵
28
29        @Override
30        protected void setup(Context context) throws IOException, InterruptedException {
31            FileSplit split = (FileSplit) context.getInputSplit();
32            flag = split.getPath().getParent().getName();// 判断读的数据集
33
34            // System.out.println(flag);
35        }
36
37        @Override
38        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
39            String[] tokens = Recommend.DELIMITER.split(values.toString());
40
41            if (flag.equals("step3_2")) {// 同现矩阵
42                String[] v1 = tokens[0].split(":");
43                String itemID1 = v1[0];
44                String itemID2 = v1[1];
45                String num = tokens[1];
46
47                Text k = new Text(itemID1);
48                Text v = new Text("A:" + itemID2 + "," + num);
49
50                context.write(k, v);
51                // System.out.println(k.toString() + "  " + v.toString());
52
53            } else if (flag.equals("step3_1")) {// 评分矩阵
54                String[] v2 = tokens[1].split(":");
55                String itemID = tokens[0];
56                String userID = v2[0];
57                String pref = v2[1];
58
59                Text k = new Text(itemID);
60                Text v = new Text("B:" + userID + "," + pref);
61
62                context.write(k, v);
63                // System.out.println(k.toString() + "  " + v.toString());
64            }
65        }
66
67    }
68
69    public static class Step4_AggregateReducer extends Reducer {
70
71        @Override
72        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
73            System.out.println(key.toString() + ":");
74
75            Map mapA = new HashMap();
76            Map mapB = new HashMap();
77
78            for (Text line : values) {
79                String val = line.toString();
80                System.out.println(val);
81
82                if (val.startsWith("A:")) {
83                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
84                    mapA.put(kv[0], kv[1]);
85
86                } else if (val.startsWith("B:")) {
87                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
88                    mapB.put(kv[0], kv[1]);
89
90                }
91            }
92
93            double result = 0;
94            Iterator iter = mapA.keySet().iterator();
95            while (iter.hasNext()) {
96                String mapk = iter.next();// itemID
97
98                int num = Integer.parseInt(mapA.get(mapk));
99                Iterator iterb = mapB.keySet().iterator();
100                while (iterb.hasNext()) {
101                    String mapkb = iterb.next();// userID
102                    double pref = Double.parseDouble(mapB.get(mapkb));
103                    result = num * pref;// 矩阵乘法相乘计算
104
105                    Text k = new Text(mapkb);
106                    Text v = new Text(mapk + "," + result);
107                    context.write(k, v);
108                    System.out.println(k.toString() + "  " + v.toString());
109                }
110            }
111        }
112    }
113
114    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
115        JobConf conf = Recommend.config();
116
117        String input1 = path.get("Step5Input1");
118        String input2 = path.get("Step5Input2");
119        String output = path.get("Step5Output");
120
121        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
122        hdfs.rmr(output);
123
124        Job job = new Job(conf);
125        job.setJarByClass(Step4_Update.class);
126
127        job.setOutputKeyClass(Text.class);
128        job.setOutputValueClass(Text.class);
129
130        job.setMapperClass(Step4_Update.Step4_PartialMultiplyMapper.class);
131        job.setReducerClass(Step4_Update.Step4_AggregateReducer.class);
132
133        job.setInputFormatClass(TextInputFormat.class);
134        job.setOutputFormatClass(TextOutputFormat.class);
135
136        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));
137        FileOutputFormat.setOutputPath(job, new Path(output));
138
139        job.waitForCompletion(true);
140    }
141
142}
143
144

增加文件:Step4_Update2.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
1
2package org.conan.myhadoop.recommend;
3
4import java.io.IOException;
5import java.util.HashMap;
6import java.util.Iterator;
7import java.util.Map;
8
9import org.apache.hadoop.fs.Path;
10import org.apache.hadoop.io.LongWritable;
11import org.apache.hadoop.io.Text;
12import org.apache.hadoop.mapred.JobConf;
13import org.apache.hadoop.mapreduce.Job;
14import org.apache.hadoop.mapreduce.Mapper;
15import org.apache.hadoop.mapreduce.Reducer;
16import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
18import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
19import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
20import org.conan.myhadoop.hdfs.HdfsDAO;
21
22public class Step4_Update2 {
23
24    public static class Step4_RecommendMapper extends Mapper {
25
26        @Override
27        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
28            String[] tokens = Recommend.DELIMITER.split(values.toString());
29            Text k = new Text(tokens[0]);
30            Text v = new Text(tokens[1]+","+tokens[2]);
31            context.write(k, v);
32        }
33    }
34
35    public static class Step4_RecommendReducer extends Reducer {
36        
37        @Override
38        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
39            System.out.println(key.toString() + ":");
40            Map map = new HashMap();// 结果
41            
42            for (Text line : values) {
43                System.out.println(line.toString());
44                String[] tokens = Recommend.DELIMITER.split(line.toString());
45                String itemID = tokens[0];
46                Double score = Double.parseDouble(tokens[1]);
47                
48                 if (map.containsKey(itemID)) {
49                     map.put(itemID, map.get(itemID) + score);// 矩阵乘法求和计算
50                 } else {
51                     map.put(itemID, score);
52                 }
53            }
54            
55            Iterator iter = map.keySet().iterator();
56            while (iter.hasNext()) {
57                String itemID = iter.next();
58                double score = map.get(itemID);
59                Text v = new Text(itemID + "," + score);
60                context.write(key, v);
61            }
62        }
63    }
64
65    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
66        JobConf conf = Recommend.config();
67
68        String input = path.get("Step6Input");
69        String output = path.get("Step6Output");
70
71        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
72        hdfs.rmr(output);
73
74        Job job = new Job(conf);
75        job.setJarByClass(Step4_Update2.class);
76
77        job.setOutputKeyClass(Text.class);
78        job.setOutputValueClass(Text.class);
79
80        job.setMapperClass(Step4_Update2.Step4_RecommendMapper.class);
81        job.setReducerClass(Step4_Update2.Step4_RecommendReducer.class);
82
83        job.setInputFormatClass(TextInputFormat.class);
84        job.setOutputFormatClass(TextOutputFormat.class);
85
86        FileInputFormat.setInputPaths(job, new Path(input));
87        FileOutputFormat.setOutputPath(job, new Path(output));
88
89        job.waitForCompletion(true);
90    }
91
92}
93
94

修改Recommend.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
1
2package org.conan.myhadoop.recommend;
3
4import java.util.HashMap;
5import java.util.Map;
6import java.util.regex.Pattern;
7
8import org.apache.hadoop.mapred.JobConf;
9import org.conan.myhadoop.hdfs.HdfsDAO;
10
11public class Recommend {
12
13    public static final String HDFS = "hdfs://192.168.1.210:9000";
14    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
15
16    public static void main(String[] args) throws Exception {
17        Map path = new HashMap();
18        path.put("data", "logfile/small.csv");
19        path.put("Step1Input", HDFS + "/user/hdfs/recommend");
20        path.put("Step1Output", path.get("Step1Input") + "/step1");
21        path.put("Step2Input", path.get("Step1Output"));
22        path.put("Step2Output", path.get("Step1Input") + "/step2");
23        path.put("Step3Input1", path.get("Step1Output"));
24        path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
25        path.put("Step3Input2", path.get("Step2Output"));
26        path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
27        
28        path.put("Step4Input1", path.get("Step3Output1"));
29        path.put("Step4Input2", path.get("Step3Output2"));
30        path.put("Step4Output", path.get("Step1Input") + "/step4");
31        
32        path.put("Step5Input1", path.get("Step3Output1"));
33        path.put("Step5Input2", path.get("Step3Output2"));
34        path.put("Step5Output", path.get("Step1Input") + "/step5");
35        
36        path.put("Step6Input", path.get("Step5Output"));
37        path.put("Step6Output", path.get("Step1Input") + "/step6");      
38
39        Step1.run(path);
40        Step2.run(path);
41        Step3.run1(path);
42        Step3.run2(path);
43        //Step4.run(path);
44        
45        Step4_Update.run(path);
46        Step4_Update2.run(path);
47              
48        System.exit(0);
49    }
50
51    public static JobConf config() {
52        JobConf conf = new JobConf(Recommend.class);
53        conf.setJobName("Recommand");
54        conf.addResource("classpath:/hadoop/core-site.xml");
55        conf.addResource("classpath:/hadoop/hdfs-site.xml");
56        conf.addResource("classpath:/hadoop/mapred-site.xml");
57        conf.set("io.sort.mb", "1024");
58        return conf;
59    }
60
61}
62
63

运行Step4_Update.java,查看输出结果


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
1
2~ hadoop fs -cat /user/hdfs/recommend/step5/part-r-00000
3
43       107,2.0
52       107,2.0
61       107,5.0
75       107,4.0
84       107,5.0
93       106,4.0
102       106,4.0
111       106,10.0
125       106,8.0
134       106,10.0
143       105,4.0
152       105,4.0
161       105,10.0
175       105,8.0
184       105,10.0
193       104,8.0
202       104,8.0
211       104,20.0
225       104,16.0
234       104,20.0
243       103,8.0
252       103,8.0
261       103,20.0
275       103,16.0
284       103,20.0
293       102,6.0
302       102,6.0
311       102,15.0
325       102,12.0
334       102,15.0
343       101,10.0
352       101,10.0
361       101,25.0
375       101,20.0
384       101,25.0
392       106,2.5
401       106,3.0
415       106,3.0
422       105,2.5
431       105,3.0
445       105,3.0
452       104,5.0
461       104,6.0
475       104,6.0
482       103,7.5
491       103,9.0
505       103,9.0
512       102,7.5
521       102,9.0
535       102,9.0
542       101,7.5
551       101,9.0
565       101,9.0
572       106,10.0
581       106,5.0
595       106,4.0
604       106,6.0
612       105,5.0
621       105,2.5
635       105,2.0
644       105,3.0
652       104,15.0
661       104,7.5
675       104,6.0
684       104,9.0
692       103,20.0
701       103,10.0
715       103,8.0
724       103,12.0
732       102,15.0
741       102,7.5
755       102,6.0
764       102,9.0
772       101,20.0
781       101,10.0
795       101,8.0
804       101,12.0
813       107,4.0
822       107,2.0
835       107,4.0
844       107,4.5
853       106,8.0
862       106,4.0
875       106,8.0
884       106,9.0
893       105,8.0
902       105,4.0
915       105,8.0
924       105,9.0
933       104,16.0
942       104,8.0
955       104,16.0
964       104,18.0
973       103,12.0
982       103,6.0
995       103,12.0
1004       103,13.5
1013       102,8.0
1022       102,4.0
1035       102,8.0
1044       102,9.0
1053       101,16.0
1062       101,8.0
1075       101,16.0
1084       101,18.0
1093       107,4.5
1105       107,3.5
1113       106,4.5
1125       106,3.5
1133       105,9.0
1145       105,7.0
1153       104,9.0
1165       104,7.0
1173       103,4.5
1185       103,3.5
1193       102,4.5
1205       102,3.5
1213       101,9.0
1225       101,7.0
1235       106,8.0
1244       106,8.0
1255       105,4.0
1264       105,4.0
1275       104,8.0
1284       104,8.0
1295       103,8.0
1304       103,8.0
1315       102,4.0
1324       102,4.0
1335       101,8.0
1344       101,8.0
1353       107,5.0
1363       105,5.0
1373       104,5.0
1383       101,5.0
139
140

运行Step4_Update2.java,查看输出结果


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1
2~ hadoop fs -cat /user/hdfs/recommend/step6/part-r-00000
3
41       107,5.0
51       106,18.0
61       105,15.5
71       104,33.5
81       103,39.0
91       102,31.5
101       101,44.0
112       107,4.0
122       106,20.5
132       105,15.5
142       104,36.0
152       103,41.5
162       102,32.5
172       101,45.5
183       107,15.5
193       106,16.5
203       105,26.0
213       104,38.0
223       103,24.5
233       102,18.5
243       101,40.0
254       107,9.5
264       106,33.0
274       105,26.0
284       104,55.0
294       103,53.5
304       102,37.0
314       101,63.0
325       107,11.5
335       106,34.5
345       105,32.0
355       104,59.0
365       103,56.5
375       102,42.5
385       101,68.0
39
40

这样我们就把原来内存中计算的部分,通过MapReduce实现了,结果与之间Step4的结果一致。

代码已经更新到github,请需要的同学更新查看。
https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/recommend

给TA打赏
共{{data.count}}人
人已打赏
安全运维

基于spring boot和mongodb打造一套完整的权限架构(二)【MAVEN依赖以及相应配置】

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索