flume+kafka+storm

释放双眼,带上耳机,听听看~!

1. 需求

将mysql增量变化的数据,实时的插入到postgresql数据库中,方法有多种实现,这里采用通过flume配置mysql的数据库源,然后flume采集到mysql的增量数据,作为kafka的生产者,然后进入kafka短暂存储,storm作为kafka的消费者,消费到kafka中的增量mysql数据,进行处理,插入到postgresql中。

整个实验环境在HDP环境中,也可以自行搭建Apache Hadoop平台。

2. kafka


1
2
3
4
5
6
7
8
1cd /usr/hdp/2.6.3.0-235/kafka/bin
2
3./kafka-topics.sh --create --zookeeper 192.168.186.48:2181  --replication-factor 1 --partitions 1 --topic xumaosheng  
4./kafka-topics.sh --list --zookeeper 192.168.186.48:2181
5./kafka-console-producer.sh --broker-list 192.168.186.48:6667 --topic xumaosheng
6./kafka-console-consumer.sh --zookeeper 192.168.186.48:2181 --topic xumaosheng    --from-beginning
7
8

上述命令,可以简单模拟,通过kafka实现生产者,消费者。
但是实际过程中,不是单单的shell端生产消费,所以上述命令只需要创建topic即可。

3. flume

在安装了flume的节点上,在flume的conf目录下,添加mysql-flume.conf文件,内容如下:

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
#For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url =
jdbc:mysql://192.168.186.48:3306/xumaosheng
#Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user =
ambari
a1.sources.src-1.hibernate.connection.password =
Gepoint
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
a1.sources.src-1.status.file.path =
/usr/hdp/2.6.3.0-235/flume
a1.sources.src-1.status.file.name = sqlSource.status
#Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query =
select id, name,age from xumaosheng where id > $@$ order by id asc
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000
################################################################
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic =
xumaosheng
a1.sinks.k1.brokerList =
192.168.186.48:6667
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1

注意标红的地方需要对应修改,并且需要下载mysql驱动包,以及flume-ng-sql-source-1.4.3.jar包,放到flume的lib下。

执行:


1
2
3
1# ./bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console
2
3

如果需要后台执行:


1
2
3
1# nohup ./bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console &
2
3

flume+kafka+storm
可以通过简单的代码测试,能否从kafka消费到数据!


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1<dependencies>
2       <dependency>
3           <groupId>org.apache.kafka</groupId>
4           <artifactId>kafka_2.10</artifactId>
5           <version>0.8.2.0</version>
6           <exclusions>
7               <exclusion>
8                   <groupId>org.apache.zookeeper</groupId>
9                   <artifactId>zookeeper</artifactId>
10              </exclusion>
11              <exclusion>
12                  <groupId>org.slf4j</groupId>
13                  <artifactId>slf4j-log4j12</artifactId>
14              </exclusion>
15          </exclusions>
16      </dependency>
17      <dependency>
18          <groupId>org.apache.storm</groupId>
19          <artifactId>storm-core</artifactId>
20          <version>0.9.5</version>
21          <exclusions>
22              <exclusion>
23                  <groupId>org.slf4j</groupId>
24                  <artifactId>log4j-over-slf4j</artifactId>
25              </exclusion>
26              <exclusion>
27                  <groupId>org.slf4j</groupId>
28                  <artifactId>slf4j-api</artifactId>
29              </exclusion>
30          </exclusions>
31      </dependency>
32      <dependency>
33          <groupId>org.apache.storm</groupId>
34          <artifactId>storm-kafka</artifactId>
35          <version>0.9.5</version>
36      </dependency>
37      <dependency>
38          <groupId>com.google.code.gson</groupId>
39          <artifactId>gson</artifactId>
40          <version>2.4</version>
41      </dependency>
42      <dependency>
43          <groupId>redis.clients</groupId>
44          <artifactId>jedis</artifactId>
45          <version>2.7.3</version>
46      </dependency>
47      <dependency>
48          <groupId>commons-io</groupId>
49          <artifactId>commons-io</artifactId>
50          <version>2.4</version>
51      </dependency>
52      <dependency>
53          <groupId>org.postgresql</groupId>
54          <artifactId>postgresql</artifactId>
55          <version>42.2.5.jre7</version>
56      </dependency>
57  </dependencies>
58
59

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1package com.epoint.bigdata;
2
3import java.util.HashMap;
4import java.util.List;
5import java.util.Map;
6import java.util.Properties;
7
8import kafka.consumer.Consumer;
9import kafka.consumer.ConsumerConfig;
10import kafka.consumer.ConsumerIterator;
11import kafka.consumer.KafkaStream;
12import kafka.javaapi.consumer.ConsumerConnector;
13
14public class ConsumerTest extends Thread {
15
16  private String topic;
17
18  public ConsumerTest(String topic) {
19      super();
20      this.topic = topic;
21  }
22
23  @Override
24  public void run() {
25      ConsumerConnector consumer = createConsumer();
26      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
27      topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
28      Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
29      KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
30      ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
31      while (iterator.hasNext()) {
32          String message = new String(iterator.next().message());
33          System.out.println("接收到: " + message);
34      }
35  }
36
37  private ConsumerConnector createConsumer() {
38      Properties properties = new Properties();
39      properties.put("zookeeper.connect", "192.168.186.48:2181");// 声明zk
40      properties.put("zookeeper.session.timeout.ms", "10000");
41      properties.put("group.id", "test-consumer-group");
42      // 必须要使用别的组名称,
43      // 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
44      // 同一组,则不能访问同一组内的topic数据
45      return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
46  }
47
48  public static void main(String[] args) {
49      new ConsumerTest("xumaosheng").start();// 使用kafka集群中创建好的主题 test
50  }
51}
52
53

flume+kafka+storm

4. storm


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
1package com.epoint.bigdata.kafka_storm;
2
3import java.sql.Connection;
4import java.sql.DriverManager;
5import java.sql.Statement;
6import java.util.Map;
7
8import backtype.storm.task.TopologyContext;
9import backtype.storm.topology.BasicOutputCollector;
10import backtype.storm.topology.IBasicBolt;
11import backtype.storm.topology.OutputFieldsDeclarer;
12import backtype.storm.tuple.Tuple;
13
14public class MyKafkaBolt implements IBasicBolt {
15
16  private static final long serialVersionUID = 1L;
17
18  @Override
19  public void cleanup() {
20      // TODO Auto-generated method stub
21
22  }
23
24  @Override
25  public void execute(Tuple input, BasicOutputCollector collector) {
26      // TODO Auto-generated method stub
27
28      String kafkaMsg = input.getString(0);
29      System.err.println("bolt:" + kafkaMsg);
30      String[] arr = kafkaMsg.replaceAll("\"", " ").split(",");
31      insert("xumaosheng", arr[0], arr[1], arr[2]);
32  }
33
34  @Override
35  public void prepare(Map stormConf, TopologyContext context) {
36      // TODO Auto-generated method stub
37
38  }
39
40  @Override
41  public void declareOutputFields(OutputFieldsDeclarer declarer) {
42      // TODO Auto-generated method stub
43
44  }
45
46  @Override
47  public Map<String, Object> getComponentConfiguration() {
48      // TODO Auto-generated method stub
49      return null;
50  }
51
52  private static Connection getConn() {
53      String url = "jdbc:postgresql://192.168.186.14:5434/zzpxjj";
54      String user = "dbuser";
55      String pwd = "123456";
56      Connection conn = null;
57      try {
58          Class.forName("org.postgresql.Driver");
59          conn = DriverManager.getConnection(url, user, pwd);
60      } catch (Exception e) {
61          e.printStackTrace();
62      }
63      return conn;
64  }
65
66  // insert
67  private static void insert(String tablename, String id, String name, String age) {
68      String sql = "insert into " + tablename + "(id,name,age) values('" + id + "','" + name + "','" + age + "')";
69      try {
70          Statement stmt = (Statement) getConn().createStatement();
71          stmt.executeUpdate(sql);
72          System.out.println("插入数据成功!!!");
73          stmt.close();
74      } catch (Exception e) {
75          e.printStackTrace();
76      }
77  }
78}
79
80

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1package com.epoint.bigdata.kafka_storm;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import backtype.storm.Config;
7import backtype.storm.LocalCluster;
8import backtype.storm.StormSubmitter;
9import backtype.storm.generated.AlreadyAliveException;
10import backtype.storm.generated.InvalidTopologyException;
11import backtype.storm.spout.SchemeAsMultiScheme;
12import backtype.storm.topology.TopologyBuilder;
13import storm.kafka.KafkaSpout;
14import storm.kafka.SpoutConfig;
15import storm.kafka.StringScheme;
16import storm.kafka.ZkHosts;
17
18public class MyKafkaSpout {
19
20  public static void main(String[] args) throws Exception {
21      // TODO Auto-generated method stub
22
23      String topic = "xumaosheng1";
24      ZkHosts zkHosts = new ZkHosts("192.168.186.48:2181");
25      SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "", "kafkaspout");
26      List<String> zkServers = new ArrayList<String>();
27      zkServers.add("192.168.186.48");
28      spoutConfig.zkServers = zkServers;
29      spoutConfig.zkPort = 2181;
30      spoutConfig.socketTimeoutMs = 30 * 1000;
31      spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
32
33      TopologyBuilder builder = new TopologyBuilder();
34      builder.setSpout("spout", new KafkaSpout(spoutConfig), 1);
35      builder.setBolt("bolt1", new MyKafkaBolt(), 1).shuffleGrouping("spout");
36
37      Config conf = new Config();
38      conf.setDebug(false);
39
40      if (args.length > 0) {
41          try {
42              StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
43          } catch (AlreadyAliveException e) {
44              e.printStackTrace();
45          } catch (InvalidTopologyException e) {
46              e.printStackTrace();
47          }
48      } else {
49          LocalCluster localCluster = new LocalCluster();
50          localCluster.submitTopology("mytopology", conf, builder.createTopology());
51      }
52  }
53}
54
55

5. 结果验证

flume+kafka+storm
flume+kafka+storm
flume+kafka+storm

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

JavaScript---网络编程(9-1)--DHTML技术演示(2-1)-表格创建的几种方式

2021-12-21 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索