0°

Strom+Kafka + redis实时计算单词出现频率的案例

案例要实现的目标

在Kafka的shell 客户端中输入内容,通过Storm实时去kafka中取数据并进行计算单词出现的次数,并且实时把这些数据信息存储到redis中。

代码编写

编写Pom文件,代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5    <modelVersion>4.0.0</modelVersion>
6
7    <groupId>cn.toto.storm.kafkastormredis</groupId>
8    <artifactId>kafkastormredis</artifactId>
9    <version>1.0-SNAPSHOT</version>
10
11    <dependencies>
12        <dependency>
13            <groupId>org.apache.storm</groupId>
14            <artifactId>storm-core</artifactId>
15            <!--<scope>provided</scope>-->
16            <version>1.1.0</version>
17        </dependency>
18        <dependency>
19            <groupId>org.apache.storm</groupId>
20            <artifactId>storm-kafka</artifactId>
21            <version>1.1.0</version>
22        </dependency>
23        <dependency>
24            <groupId>redis.clients</groupId>
25            <artifactId>jedis</artifactId>
26            <version>2.7.3</version>
27        </dependency>
28        <dependency>
29            <groupId>org.apache.kafka</groupId>
30            <artifactId>kafka_2.8.2</artifactId>
31            <version>0.8.1</version>
32            <exclusions>
33                <exclusion>
34                    <groupId>org.apache.zookeeper</groupId>
35                    <artifactId>zookeeper</artifactId>
36                </exclusion>
37            </exclusions>
38        </dependency>
39    </dependencies>
40    <build>
41        <plugins>
42            <plugin>
43                <artifactId>maven-assembly-plugin</artifactId>
44                <configuration>
45                    <descriptorRefs>
46                        <descriptorRef>jar-with-dependencies</descriptorRef>
47                    </descriptorRefs>
48                    <archive>
49                        <manifest>
50                            <!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改-->
51                            <mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass>
52                        </manifest>
53                    </archive>
54                </configuration>
55                <executions>
56                    <execution>
57                        <id>make-assembly</id>
58                        <phase>package</phase>
59                        <goals>
60                            <goal>single</goal>
61                        </goals>
62                    </execution>
63                </executions>
64            </plugin>
65            <plugin>
66                <groupId>org.apache.maven.plugins</groupId>
67                <artifactId>maven-compiler-plugin</artifactId>
68                <configuration>
69                    <source>1.7</source>
70                    <target>1.7</target>
71                </configuration>
72            </plugin>
73        </plugins>
74    </build>
75</project>
76

在strom案例中需要有spout接收数据。在一些常规学习用的案例中通常从一个文件中获取数据。通常的代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/20.
3 */
4
5import org.apache.commons.lang.StringUtils;
6import org.apache.storm.spout.SpoutOutputCollector;
7import org.apache.storm.task.TopologyContext;
8import org.apache.storm.topology.OutputFieldsDeclarer;
9import org.apache.storm.topology.base.BaseRichSpout;
10import org.apache.storm.tuple.Fields;
11
12import java.io.BufferedReader;
13import java.io.File;
14import java.io.FileReader;
15import java.util.ArrayList;
16import java.util.List;
17import java.util.Map;
18
19/**
20 * 这个类是模拟从文件中读取数据的代码。在本案例的strom + kafka + redis的案例中将用不到。
21 *
22 * @author tuzq
23 * @create 2017-06-20 23:41
24 */
25public class MyLocalFileSpout extends BaseRichSpout {
26    private SpoutOutputCollector collector;
27    private BufferedReader bufferedReader;
28
29    /**
30     * 初始化方法
31     * @param map
32     * @param context
33     * @param collector
34     */
35    @Override
36    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
37        this.collector = collector;
38        try {
39            this.bufferedReader = new BufferedReader(new FileReader(new File("E:/wordcount/input/1.txt")));
40        } catch (Exception e) {
41            e.printStackTrace();
42        }
43    }
44
45    /**
46     * Strom实时计算的特性就是对数据一条一条的处理
47     * while(true) {
48     *     this.nextTuple();
49     * }
50     */
51    @Override
52    public void nextTuple() {
53        //每被调用一次就会发送一条数据出去
54        try {
55            String line = bufferedReader.readLine();
56            if (StringUtils.isNotBlank(line)) {
57                List<Object> arrayList = new ArrayList<Object>();
58                arrayList.add(line);
59                collector.emit(arrayList);
60            }
61        } catch(Exception e) {
62            e.printStackTrace();
63        }
64    }
65
66    @Override
67    public void declareOutputFields(OutputFieldsDeclarer declarer) {
68        declarer.declare(new Fields("juzi"));
69    }
70
71}
72
73

在spout编写完成之后,通常通过Bolt来进行文本的切割。在下面的切割代码中,模拟的是从kafka中获取数据,并进行切割。代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import org.apache.storm.topology.BasicOutputCollector;
6import org.apache.storm.topology.OutputFieldsDeclarer;
7import org.apache.storm.topology.base.BaseBasicBolt;
8import org.apache.storm.tuple.Fields;
9import org.apache.storm.tuple.Tuple;
10import org.apache.storm.tuple.Values;
11
12/**
13 * 这个Bolt模拟从kafkaSpout接收数据,并把数据信息发送给MyWordCountAndPrintBolt的过程。
14 *
15 * @author tuzq
16 * @create 2017-06-21 9:14
17 */
18public class MySplitBolt extends BaseBasicBolt {
19
20    @Override
21    public void execute(Tuple input, BasicOutputCollector collector) {
22        //1、数据如何获取
23        //如果StormTopologyDriver中的spout配置的是MyLocalFileSpout,则用的是declareOutputFields中的juzi这个key
24        //byte[] juzi = (byte[]) input.getValueByField("juzi");
25        //2、这里用这个是因为StormTopologyDriver这个里面的spout用的是KafkaSpout,而KafkaSpout中的declareOutputFields返回的是bytes,所以下面用bytes,这个地方主要模拟的是从kafka中获取数据
26        byte[] juzi = (byte[]) input.getValueByField("bytes");
27        //2、进行切割
28        String[] strings = new String(juzi).split(" ");
29        //3、发送数据
30        for (String word : strings) {
31            //Values对象帮我们生成一个list
32            collector.emit(new Values(word,1));
33        }
34    }
35
36    @Override
37    public void declareOutputFields(OutputFieldsDeclarer declarer) {
38        declarer.declare(new Fields("word","num"));
39    }
40}
41
42

对文本信息进行切割之后,需要对数据进行统计,这里使用另外一个Bolt来完成,代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import org.apache.storm.task.TopologyContext;
6import org.apache.storm.topology.BasicOutputCollector;
7import org.apache.storm.topology.OutputFieldsDeclarer;
8import org.apache.storm.topology.base.BaseBasicBolt;
9import org.apache.storm.tuple.Tuple;
10import redis.clients.jedis.Jedis;
11
12import java.util.HashMap;
13import java.util.Map;
14
15/**
16 * 用于统计分析,并且把统计分析的结果存储到redis中。
17 *
18 * @author tuzq
19 * @create 2017-06-21 9:22
20 */
21public class MyWordCountAndPrintBolt extends BaseBasicBolt {
22    private Jedis jedis;
23    private Map<String,String> wordCountMap = new HashMap<String,String>();
24
25    @Override
26    public void prepare(Map stormConf, TopologyContext context) {
27        //连接redis---代表可以连接任何事物
28        jedis = new Jedis("hadoop11",6379);
29        super.prepare(stormConf,context);
30    }
31
32    @Override
33    public void execute(Tuple input, BasicOutputCollector collector) {
34        String word = (String) input.getValueByField("word");
35        Integer num = (Integer) input.getValueByField("num");
36        //1、查看单词对应的value是否存在
37        Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));
38        if (integer == null || integer.intValue() == 0) {
39            wordCountMap.put(word,num + "");
40        } else {
41            wordCountMap.put(word,(integer.intValue() + num) + "");
42        }
43        //2、保存到redis
44        System.out.println(wordCountMap);
45        //redis key wordcount:-->Map
46        jedis.hmset("wordcount",wordCountMap);
47    }
48
49    @Override
50    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
51        //todo 不需要定义输出的字段
52    }
53}
54
55

接下来通过一个Driver串联起Spout、Bolt实现实时计算,代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import org.apache.storm.Config;
6import org.apache.storm.LocalCluster;
7import org.apache.storm.generated.StormTopology;
8import org.apache.storm.kafka.KafkaSpout;
9import org.apache.storm.kafka.SpoutConfig;
10import org.apache.storm.kafka.ZkHosts;
11import org.apache.storm.topology.TopologyBuilder;
12
13/**
14 * 这个Driver使Kafka、strom、redis进行串联起来。
15 *
16 * 这个代码执行前需要创建kafka的topic,创建代码如下:
17 * [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount
18 *
19 * 接着还要向kafka中传递数据,打开一个shell的producer来模拟生产数据
20 * [root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount
21 * 接着输入数据
22 *
23 * @author tuzq
24 * @create 2017-06-21 9:39
25 */
26public class StormTopologyDriver {
27
28    public static void main(String[] args) throws Exception {
29        //1、准备任务信息
30        TopologyBuilder topologyBuilder = new TopologyBuilder();
31        topologyBuilder.setSpout("KafkaSpout",new KafkaSpout(new SpoutConfig(new ZkHosts("hadoop11:2181"),"wordCount","/wordCount","wordCount")),2);
32        topologyBuilder.setBolt("bolt1",new MySplitBolt(),4).shuffleGrouping("KafkaSpout");
33        topologyBuilder.setBolt("bolt2",new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");
34
35        //2、任务提交
36        //提交给谁?提交内容
37        Config config = new Config();
38        config.setNumWorkers(2);
39        StormTopology stormTopology = topologyBuilder.createTopology();
40
41        //本地模式
42        LocalCluster localCluster = new LocalCluster();
43        localCluster.submitTopology("wordcount",config,stormTopology);
44        //集群模式
45        //StormSubmitter.submitTopology("wordcount1",config,stormTopology);
46    }
47}
48

这里我们使用:

//创建kafka的topic


1
2
3
1[root@hadoop1 ~]# cd $KAFKA_HOME
2[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount
3

接下来创建producer,来发送数据到kafka:


1
2
1[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount
2

在上面输入数据。

4、运行程序,进入StormTopologyDriver,右键run.最后的效果如下:

5、最后如果想看MyWordCountAndPrintBolt中记录到redis的wordcount内容,可以编写如下代码案例:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1package cn.toto.storm.kafkastormredis;/**
2 * Created by toto on 2017/6/21.
3 */
4
5import redis.clients.jedis.Jedis;
6
7import java.util.Map;
8
9/**
10 * 代码说明
11 *
12 * @author tuzq
13 * @create 2017-06-21 10:13
14 */
15public class TestRedis {
16
17    public static void main(String[] args) {
18        Jedis jedis = new Jedis("hadoop11",6379);
19        Map<String,String> wordcount = jedis.hgetAll("wordcount");
20        System.out.println(wordcount);
21    }
22}
23
24

运行后的结果如下:

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!