【Flume】flume输出sink到hbase的实现

释放双眼,带上耳机,听听看~!

flume 1.5.2

hbase 0.98.9

hadoop 2.6

zk 3.4.6

以上是基础的软件及对应版本,请先确认以上软件安装成功!

1、添加jar包支持

【Flume】flume输出sink到hbase的实现

将hbase的lib下的这些jar包拷贝到flume的lib下

2、配置flume

【Flume】flume输出sink到hbase的实现

注意看以上的serializer配置,采用的是官方的RegexHbaseEventSerializer,

当然还有一个
SimpleHbaseEventSerializer

如果你使用了
SimpleHbaseEventSerializer

就会出现如下的错误


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1[2015-03-04 09:35:41,244] [SinkRunner-PollingRunner-DefaultSinkProcessor:5672] [ERROR] Failed to commit transaction.Transaction rolled back.
2java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
3        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
4        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
5        at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
6        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
7        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
8        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
9        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
10        at java.lang.Thread.run(Thread.java:745)
11[2015-03-04 09:35:41,249] [SinkRunner-PollingRunner-DefaultSinkProcessor:5677] [ERROR] Failed to commit transaction.Transaction rolled back.
12java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
13        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
14        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
15        at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
16        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
17        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
18        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
19        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
20        at java.lang.Thread.run(Thread.java:745)
21Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
22        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
23        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
24        at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
25        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
26        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
27        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
28        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
29        at java.lang.Thread.run(Thread.java:745)
30

1
2
1所以不要使用该序列化器
2

当然这个序列化器应该自己来开发的,因为列族,列修饰符什么的,应该由我们自己定义的

3、测试
这里注意,你必须提前在hbase将表创建好


1
2
3
4
5
1hbase(main):004:0> create 'flume','chiwei'
20 row(s) in 1.0770 seconds
3
4=> Hbase::Table - flume
5

1
2
1  启动flume
2

待监测的文件内容为


1
2
3
4
5
6
1Hello Flume
2Hello HBase
3Hello Hadoop
4Hello Zk
5Hello chiwei
6

1
2
1再来看一下hbase的表数据
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1hbase(main):006:0> scan 'flume'
2ROW                                              COLUMN+CELL                                                                                                                                
3 1425437884264-rd3B0VE5Uz-0                      column=chiwei:payload, timestamp=1425437884520, value=Hello Flume                                                                          
4 1425437884275-rd3B0VE5Uz-1                      column=chiwei:payload, timestamp=1425437884520, value=Hello HBase                                                                          
5 1425437884276-rd3B0VE5Uz-2                      column=chiwei:payload, timestamp=1425437884520, value=Hello Hadoop                                                                          
6 1425437884277-rd3B0VE5Uz-3                      column=chiwei:payload, timestamp=1425437884520, value=Hello Zk                                                                              
7 1425437884278-rd3B0VE5Uz-4                      column=chiwei:payload, timestamp=1425437884520, value=Hello chiwei                                                                          
8 1425437884624-rd3B0VE5Uz-5                      column=chiwei:payload, timestamp=1425437884639, value=Hello Flume                                                                          
9 1425437884626-rd3B0VE5Uz-6                      column=chiwei:payload, timestamp=1425437884639, value=Hello HBase                                                                          
10 1425437884628-rd3B0VE5Uz-7                      column=chiwei:payload, timestamp=1425437884639, value=Hello Hadoop                                                                          
11 1425437884629-rd3B0VE5Uz-8                      column=chiwei:payload, timestamp=1425437884639, value=Hello Zk                                                                              
12 1425437884630-rd3B0VE5Uz-9                      column=chiwei:payload, timestamp=1425437884639, value=Hello chiwei                                                                          
1310 row(s) in 0.4020 seconds
14
15hbase(main):007:0>
16

1
2
1测试成功!  
2

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索