实战hadoop海量数据处理系列05 : 实现点击流日志的数据清洗模块

释放双眼,带上耳机,听听看~!

实战hadoop海量数据处理系列05 : 实现点击流日志的数据清洗模块

之前已经实现结构化数据的清洗,下一步我们将实现半结构化(非结构化)数据的清洗。

本文假设读者已搭建好了eclipse环境,并且已经导入ClickStreamETL文件夹下面的子工程。
如果遇到环境相关的问题,可以在专门的帖子下面留言。

在阅读本文前,强烈建议阅读原书“实现点击流日志的数据清洗模块”章节。

本文的代码同步于github,相关地址如下:
github地址
本系列博客专栏地址

overview

经典重现,引入原书流程图和map-reduce排序图说明原理。
为了还原案例,本项目增强了日志中的cookie,独有动态图演示执行过程,并在最后尾部给出开发中的若干思考。
全文脉络如下:

  • 流程图
  • 日志格式解析
  • 排序流程图
  • 重要代码解释
  • 运行结果
  • 工程实践思考

1 流程图

从原书摘取了经典的流程图,在于说明流程框架的重要性,因为流程贯穿全章节。

2日志格式解析

先列出一条日志,样式如下:


1
2
1120.196.145.58 - - [11/Dec/2013:10:00:32 +0800] "GET /__utm.gif" 200 35 "http://easternmiles.ceair.com/flight/index.html" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36" "BIGipServermu_122.119.122.14=192575354.20480.0000;Webtrends=120.196.145.58.1386724976245806;uuid=sid1;userId=099;st=1" 1482 352 - easternmiles.ceair.com 794
2

这里展示的日志,属于apache服务器日志,保留了原书字段,本项目在原有cookie内部加入uuid,user id和sesstion time三个字段,这些字段将在map阶段起重要作用。下面把里面cookie提取在此,其中uuid ,user id, session time分别是sid1,099,1;


1
2
1"BIGipServermu_122.119.122.14=192575354.20480.0000;Webtrends=120.196.145.58.1386724976245806;uuid=sid1;userId=099;st=1"
2

完整的日志请见github中 log_3_2_1.txt文件,本文件将作为map reduce作业的输入。

2.1 日志中获取的相关字段

原书提供了非常详细字段列表信息,这里仅对影响map reduce任务的重要字段进行说明。

ipaddress
ip地址, 可从点击流日志中获取
receive Time
服务器接收时间, 可从点击流日志中获取
url
由流日志中主机地址和请求的一行合成
unique id
非重复的id,可从点击流的cookie中提取
session time
会话发生的时间,可从点击流的cookie中提取
session id
由unique id和session time合成

3 排序流程图

为了避免数据倾斜,作者对map的key进行了重构,利用unqiue id和session time合成了session id ,其中经典的流程图如下,一图胜过万千代码喔 ,已经熟悉本图的忠实读者可以略过。

4重要代码解释

代码主要分map/reduce/partion/二次排序等,
这里只贴出reducer,因为本项目对其进行了增强,其他代码请读者查看原有项目代码 。

4.1 Reducer部分

亮点在代码里,原先作者已经有很好的注释,改进的地方都用注释放在该行的尾部。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1public class ClickStreamReducer extends Reducer<Text, Text, NullWritable, Text>{
2
3    ///表示前一个sessionId
4    public static String preSessionId = "-";//66:改进版把他变为静态变量
5    static int csvp = 0;//66:改进版把csvp变为静态变量,并移到成员函数外面
6
7    protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {
8
9        String sessionId = key.toString().split("&")[0];        
10
11        //如果是第一条数据
12        if(preSessionId.equals("-")){
13            csvp = 1;
14            preSessionId = sessionId;//66:改进版,新加入这句,不然逻辑不完整
15        }else{
16            //如果与前一个sessionId相同,说明是同一个session
17            if(preSessionId.equals(sessionId)){
18                //累加csvp
19                csvp++;
20            //如果不同,说明是新的session,重置preSessionId 和 csvp
21            }else{
22                preSessionId = sessionId;
23                csvp = 1;
24            }
25        }
26
27        //按照clickstream_log的格式再末尾加上csvp
28        String reduceOutValue = values.iterator().next().toString() + "\t" + csvp;
29        context.write(NullWritable.get(), new Text(reduceOutValue));
30    };
31}
32

5 运行结果

5.1 map reduce结果


1
2
3
4
5
6
7
8
1ip          uniqueId    sessionId   SessionTimesReceiveTime  UserId csvp          URL                               ReferUrl                                    
2120.196.145.58  sid1        sid1|1      1       1386727232000   099 1       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
3120.196.145.58  sid1        sid1|1      1       1386727292000   099 2       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
4120.196.145.58  sid1        sid1|1      1       1386727352000   099 3       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
5120.196.145.58  sid2        sid2|10     10      1386727412000   199 1       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
6120.196.145.58  sid2        sid2|10     10      1386727472000   199 2       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
7120.196.145.58  sid3        sid3|100    100     1386727472000   299 1       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
8

从结果中可见,对于同一组seesion,按照接收时间(receive time)先后, 可见访问顺序(csvp)按照需要给予标记。

5.2 动态运行图

时长2分钟左右,请耐心观看。

6 小结

实践了作者的ClickStream作业,还原了过程例子演示的过程。
下一步计划,结合python调用jar文件完善开发过程。

7 其他 开发中的思考

7.1 处理正则表达式匹配过程的堆栈溢出

默认的堆栈大小在正则表达式匹配的时候不能满足内存需求,报以下错误


1
2
3
4
5
1...
2java.lang.Exception: java.lang.StackOverflowError at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
3Caused by: java.lang.StackOverflowError at java.util.regex.Pattern$CharProperty.match(Pattern.java:3692)
4...
5

解决方法是在启动jvm时提大stack 的大小(这里本工程是使用-Xss40m)

7.2 处理DateFormat匹配的细节

在源代码(05-16)版本中,直接运行有以下问题:


1
2
3
4
5
117/06/10 09:36:18 INFO mapred.MapTask: record buffer = 262144/327680
2java.text.ParseException: Unparseable date: "[11/Dec/2013:10:00:32 +0800]"
3    at java.text.DateFormat.parse(DateFormat.java:357)
4    at com.etl.mapreduce.ClickStreamMapper.map(ClickStreamMapper.java:84)
5

分析触发问题的代码,发现有两处,一是被处理的日期串还有额外的字符如[ ] ,第二是匹配串里面没有处理时区的问题,详细的解决方案见github相关代码ClickStreamMapper.java

7.3 调试工程经验

我的开发机是在windows, 部署机是在cent os ,因此存在远程调试的问题,该细节将会详细的总结为一章,请见后续更新

给TA打赏
共{{data.count}}人
人已打赏
安全运维

基于lucene的案例开发:数据库连接池

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索