案例要实现的目标
在Kafka的shell 客户端中输入内容,通过Storm实时去kafka中取数据并进行计算单词出现的次数,并且实时把这些数据信息存储到redis中。
代码编写
编写Pom文件,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>cn.toto.storm.kafkastormredis</groupId>
8 <artifactId>kafkastormredis</artifactId>
9 <version>1.0-SNAPSHOT</version>
10
11 <dependencies>
12 <dependency>
13 <groupId>org.apache.storm</groupId>
14 <artifactId>storm-core</artifactId>
15 <!--<scope>provided</scope>-->
16 <version>1.1.0</version>
17 </dependency>
18 <dependency>
19 <groupId>org.apache.storm</groupId>
20 <artifactId>storm-kafka</artifactId>
21 <version>1.1.0</version>
22 </dependency>
23 <dependency>
24 <groupId>redis.clients</groupId>
25 <artifactId>jedis</artifactId>
26 <version>2.7.3</version>
27 </dependency>
28 <dependency>
29 <groupId>org.apache.kafka</groupId>
30 <artifactId>kafka_2.8.2</artifactId>
31 <version>0.8.1</version>
32 <exclusions>
33 <exclusion>
34 <groupId>org.apache.zookeeper</groupId>
35 <artifactId>zookeeper</artifactId>
36 </exclusion>
37 </exclusions>
38 </dependency>
39 </dependencies>
40 <build>
41 <plugins>
42 <plugin>
43 <artifactId>maven-assembly-plugin</artifactId>
44 <configuration>
45 <descriptorRefs>
46 <descriptorRef>jar-with-dependencies</descriptorRef>
47 </descriptorRefs>
48 <archive>
49 <manifest>
50 <!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改-->
51 <mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass>
52 </manifest>
53 </archive>
54 </configuration>
55 <executions>
56 <execution>
57 <id>make-assembly</id>
58 <phase>package</phase>
59 <goals>
60 <goal>single</goal>
61 </goals>
62 </execution>
63 </executions>
64 </plugin>
65 <plugin>
66 <groupId>org.apache.maven.plugins</groupId>
67 <artifactId>maven-compiler-plugin</artifactId>
68 <configuration>
69 <source>1.7</source>
70 <target>1.7</target>
71 </configuration>
72 </plugin>
73 </plugins>
74 </build>
75</project>
76
在strom案例中需要有spout接收数据。在一些常规学习用的案例中通常从一个文件中获取数据。通常的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/20.
3 */
4
5import org.apache.commons.lang.StringUtils;
6import org.apache.storm.spout.SpoutOutputCollector;
7import org.apache.storm.task.TopologyContext;
8import org.apache.storm.topology.OutputFieldsDeclarer;
9import org.apache.storm.topology.base.BaseRichSpout;
10import org.apache.storm.tuple.Fields;
11
12import java.io.BufferedReader;
13import java.io.File;
14import java.io.FileReader;
15import java.util.ArrayList;
16import java.util.List;
17import java.util.Map;
18
19/**
20 * 这个类是模拟从文件中读取数据的代码。在本案例的strom + kafka + redis的案例中将用不到。
21 *
22 * @author tuzq
23 * @create 2017-06-20 23:41
24 */
25public class MyLocalFileSpout extends BaseRichSpout {
26 private SpoutOutputCollector collector;
27 private BufferedReader bufferedReader;
28
29 /**
30 * 初始化方法
31 * @param map
32 * @param context
33 * @param collector
34 */
35 @Override
36 public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
37 this.collector = collector;
38 try {
39 this.bufferedReader = new BufferedReader(new FileReader(new File("E:/wordcount/input/1.txt")));
40 } catch (Exception e) {
41 e.printStackTrace();
42 }
43 }
44
45 /**
46 * Strom实时计算的特性就是对数据一条一条的处理
47 * while(true) {
48 * this.nextTuple();
49 * }
50 */
51 @Override
52 public void nextTuple() {
53 //每被调用一次就会发送一条数据出去
54 try {
55 String line = bufferedReader.readLine();
56 if (StringUtils.isNotBlank(line)) {
57 List<Object> arrayList = new ArrayList<Object>();
58 arrayList.add(line);
59 collector.emit(arrayList);
60 }
61 } catch(Exception e) {
62 e.printStackTrace();
63 }
64 }
65
66 @Override
67 public void declareOutputFields(OutputFieldsDeclarer declarer) {
68 declarer.declare(new Fields("juzi"));
69 }
70
71}
72
73
在spout编写完成之后,通常通过Bolt来进行文本的切割。在下面的切割代码中,模拟的是从kafka中获取数据,并进行切割。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import org.apache.storm.topology.BasicOutputCollector;
6import org.apache.storm.topology.OutputFieldsDeclarer;
7import org.apache.storm.topology.base.BaseBasicBolt;
8import org.apache.storm.tuple.Fields;
9import org.apache.storm.tuple.Tuple;
10import org.apache.storm.tuple.Values;
11
12/**
13 * 这个Bolt模拟从kafkaSpout接收数据,并把数据信息发送给MyWordCountAndPrintBolt的过程。
14 *
15 * @author tuzq
16 * @create 2017-06-21 9:14
17 */
18public class MySplitBolt extends BaseBasicBolt {
19
20 @Override
21 public void execute(Tuple input, BasicOutputCollector collector) {
22 //1、数据如何获取
23 //如果StormTopologyDriver中的spout配置的是MyLocalFileSpout,则用的是declareOutputFields中的juzi这个key
24 //byte[] juzi = (byte[]) input.getValueByField("juzi");
25 //2、这里用这个是因为StormTopologyDriver这个里面的spout用的是KafkaSpout,而KafkaSpout中的declareOutputFields返回的是bytes,所以下面用bytes,这个地方主要模拟的是从kafka中获取数据
26 byte[] juzi = (byte[]) input.getValueByField("bytes");
27 //2、进行切割
28 String[] strings = new String(juzi).split(" ");
29 //3、发送数据
30 for (String word : strings) {
31 //Values对象帮我们生成一个list
32 collector.emit(new Values(word,1));
33 }
34 }
35
36 @Override
37 public void declareOutputFields(OutputFieldsDeclarer declarer) {
38 declarer.declare(new Fields("word","num"));
39 }
40}
41
42
对文本信息进行切割之后,需要对数据进行统计,这里使用另外一个Bolt来完成,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import org.apache.storm.task.TopologyContext;
6import org.apache.storm.topology.BasicOutputCollector;
7import org.apache.storm.topology.OutputFieldsDeclarer;
8import org.apache.storm.topology.base.BaseBasicBolt;
9import org.apache.storm.tuple.Tuple;
10import redis.clients.jedis.Jedis;
11
12import java.util.HashMap;
13import java.util.Map;
14
15/**
16 * 用于统计分析,并且把统计分析的结果存储到redis中。
17 *
18 * @author tuzq
19 * @create 2017-06-21 9:22
20 */
21public class MyWordCountAndPrintBolt extends BaseBasicBolt {
22 private Jedis jedis;
23 private Map<String,String> wordCountMap = new HashMap<String,String>();
24
25 @Override
26 public void prepare(Map stormConf, TopologyContext context) {
27 //连接redis---代表可以连接任何事物
28 jedis = new Jedis("hadoop11",6379);
29 super.prepare(stormConf,context);
30 }
31
32 @Override
33 public void execute(Tuple input, BasicOutputCollector collector) {
34 String word = (String) input.getValueByField("word");
35 Integer num = (Integer) input.getValueByField("num");
36 //1、查看单词对应的value是否存在
37 Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));
38 if (integer == null || integer.intValue() == 0) {
39 wordCountMap.put(word,num + "");
40 } else {
41 wordCountMap.put(word,(integer.intValue() + num) + "");
42 }
43 //2、保存到redis
44 System.out.println(wordCountMap);
45 //redis key wordcount:-->Map
46 jedis.hmset("wordcount",wordCountMap);
47 }
48
49 @Override
50 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
51 //todo 不需要定义输出的字段
52 }
53}
54
55
接下来通过一个Driver串联起Spout、Bolt实现实时计算,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import org.apache.storm.Config;
6import org.apache.storm.LocalCluster;
7import org.apache.storm.generated.StormTopology;
8import org.apache.storm.kafka.KafkaSpout;
9import org.apache.storm.kafka.SpoutConfig;
10import org.apache.storm.kafka.ZkHosts;
11import org.apache.storm.topology.TopologyBuilder;
12
13/**
14 * 这个Driver使Kafka、strom、redis进行串联起来。
15 *
16 * 这个代码执行前需要创建kafka的topic,创建代码如下:
17 * [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount
18 *
19 * 接着还要向kafka中传递数据,打开一个shell的producer来模拟生产数据
20 * [root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount
21 * 接着输入数据
22 *
23 * @author tuzq
24 * @create 2017-06-21 9:39
25 */
26public class StormTopologyDriver {
27
28 public static void main(String[] args) throws Exception {
29 //1、准备任务信息
30 TopologyBuilder topologyBuilder = new TopologyBuilder();
31 topologyBuilder.setSpout("KafkaSpout",new KafkaSpout(new SpoutConfig(new ZkHosts("hadoop11:2181"),"wordCount","/wordCount","wordCount")),2);
32 topologyBuilder.setBolt("bolt1",new MySplitBolt(),4).shuffleGrouping("KafkaSpout");
33 topologyBuilder.setBolt("bolt2",new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");
34
35 //2、任务提交
36 //提交给谁?提交内容
37 Config config = new Config();
38 config.setNumWorkers(2);
39 StormTopology stormTopology = topologyBuilder.createTopology();
40
41 //本地模式
42 LocalCluster localCluster = new LocalCluster();
43 localCluster.submitTopology("wordcount",config,stormTopology);
44 //集群模式
45 //StormSubmitter.submitTopology("wordcount1",config,stormTopology);
46 }
47}
48
这里我们使用:
//创建kafka的topic
1
2
3 1[root@hadoop1 ~]# cd $KAFKA_HOME
2[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount
3
接下来创建producer,来发送数据到kafka:
1
2 1[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount
2
在上面输入数据。
4、运行程序,进入StormTopologyDriver,右键run.最后的效果如下:
5、最后如果想看MyWordCountAndPrintBolt中记录到redis的wordcount内容,可以编写如下代码案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import redis.clients.jedis.Jedis;
6
7import java.util.Map;
8
9/**
10 * 代码说明
11 *
12 * @author tuzq
13 * @create 2017-06-21 10:13
14 */
15public class TestRedis {
16
17 public static void main(String[] args) {
18 Jedis jedis = new Jedis("hadoop11",6379);
19 Map<String,String> wordcount = jedis.hgetAll("wordcount");
20 System.out.println(wordcount);
21 }
22}
23
24
运行后的结果如下: