Redis Demo系列之(六)消息队列 pub/sub

释放双眼,带上耳机,听听看~!

前言

redis设计的初衷并不是为了消息队列而设计的,但是有太多的人将Redis作为消息队列而使用。Redis消息队列时,当Redis宕机后,消息会丢失。如果收消息方未有重发和验证机制,Redis内的数据会出现丢失。所以,使用Redis的作为消息队列,通常是对于消息的准确性并非特别高的场景。当需要对数据非常敏感以及准确性较高的情况可以使用Kafka、RabbitMQ等专门等消息队列。但是,通常,这会增加系统的开发成本和维护成本。

本文相关代码,可在我的Github项目https://github.com/SeanYanxml/bigdata/tree/master/redis 目录下可以找到。
PS: (如果觉得项目不错,可以给我一个Star。)


Demo

  • RedisMQConst


1
2
3
4
5
6
7
1public class RedisMQConst {
2
3    public static final String MQ_KEY = "MQ_KEY";
4
5}
6
7
  • RedisMQDispatchMessageHandler 消息处理器


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1public class RedisMQDispatchMessageHandler {
2
3    public void syncFile(){
4        // do something about syncFile thing.
5//      while(true){
6            try{
7                String message = RedisMQService.lpop(RedisMQConst.MQ_KEY);
8                System.out.println("SYNC FILE: "+ System.currentTimeMillis() + " - " + message);
9                if(null != message){
10                    // do something about syncFile.
11                    // messageHandler.synfile(message);
12                    System.out.println("SYNC FILE: "+ System.currentTimeMillis() + " - " + message);
13                }
14                Thread.currentThread().setName(UUID.randomUUID().toString());
15            }catch(Exception ex){
16                // do something exception.
17            }
18//      }
19    }
20
21
22
23}
24

1
2
3
1* RedisMQService Redis基本服务工具类(订阅等)
2
3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
1public class RedisMQService {
2
3    public static JedisPoolManager manager = new JedisPoolManager();
4
5    // 订阅 subscribe
6    public static boolean subscribe(JedisPubSub jedisPubSub, String channels){
7        Jedis jedis = manager.getJedis();
8        jedis.subscribe(jedisPubSub, channels);
9//      jedis.close();
10        return true;
11    }
12
13    public static boolean publish(String channels,String message){
14        Jedis jedis = manager.getJedis();
15        jedis.publish(channels, message);
16        jedis.close();
17        return true;
18    }
19
20    public static String lpop(String key){
21        Jedis jedis = manager.getJedis();
22        String result = jedis.lpop(key);
23        jedis.close();
24        return result;
25    }
26
27    public static void lpush(String key,String value){
28        Jedis jedis = manager.getJedis();
29        jedis.lpush(key, value);
30        jedis.close();
31    }
32}
33
34/**
35 * 多线程redis抛出异常B cannot be cast to java.lang.Long
36 * https://blog.csdn.net/JavaMoo/article/details/77233976
37 * 原因:多个线程同时调用了同一个jedis对象,导致内存数据被多个线程竞争,产生数据混乱
38 * 解决方案:每个线程都new出一个自己的jedis对象
39 * Exception in thread "main" java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.Long
40    at redis.clients.jedis.Connection.getIntegerReply(Connection.java:265)
41    at redis.clients.jedis.Jedis.lpush(Jedis.java:882)
42    at com.yanxml.redis.demos.mq.RedisMQService.lpush(RedisMQService.java:23)
43    at com.yanxml.redis.demos.mq.RedisMQDemoMain.main(RedisMQDemoMain.java:15)
44Exception in thread "Thread-0" redis.clients.jedis.exceptions.JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
45    at redis.clients.jedis.Protocol.processError(Protocol.java:127)
46    at redis.clients.jedis.Protocol.process(Protocol.java:161)
47    at redis.clients.jedis.Protocol.read(Protocol.java:215)
48    at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
49    at redis.clients.jedis.Connection.getRawObjectMultiBulkReply(Connection.java:285)
50    at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:121)
51    at redis.clients.jedis.JedisPubSub.proceed(JedisPubSub.java:115)
52    at redis.clients.jedis.Jedis.subscribe(Jedis.java:2680)
53    at com.yanxml.redis.demos.mq.RedisMQService.subscribe(RedisMQService.java:14)
54    at com.yanxml.redis.demos.mq.RedisMQSubscribeService$1.run(RedisMQSubscribeService.java:17)
55    at java.lang.Thread.run(Thread.java:745)
56 * */
57
  • RedisMQSubscribeService


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1public class RedisMQSubscribeService{
2
3    private RedisMQSyncListener redisMQSyncListener;//订阅者
4
5    public RedisMQSubscribeService(RedisMQSyncListener redisMQSyncListener,String channel){
6        this.redisMQSyncListener = redisMQSyncListener;
7        subscribe(this.redisMQSyncListener,channel);
8    }
9
10    public void subscribe(RedisMQSyncListener redisMQSyncListener, String channel) {
11        new Thread(new Runnable() {
12
13            @Override
14            public void run() {
15                System.out.println("Subscribe start.");
16
17                RedisMQService.subscribe(redisMQSyncListener, channel);
18
19                System.out.println("Subscribe OK.");
20
21            }
22        }).start();
23    }
24}
25
  • RedisMQSyncListener


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1public class RedisMQSyncListener extends JedisPubSub{
2
3    private RedisMQDispatchMessageHandler dispatchMessageHandler;
4
5    public RedisMQSyncListener(RedisMQDispatchMessageHandler dispatchMessageHandler){
6        this.dispatchMessageHandler = dispatchMessageHandler;
7    }
8
9    @Override
10    public void onMessage(String channel, String message){
11        System.out.println("On Message.");
12        dispatchMessageHandler.syncFile();
13    }
14
15
16}
17
18
  • RedisMQPublisherMain


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1/**
2 * Redis MQ Publisher 启动类
3 * */
4public class RedisMQPublisherMain {
5    public static void main(String[] args) {
6        int i=0;
7        while(i<2){
8            i++;
9            RedisMQService.publish(RedisMQConst.MQ_KEY,String.valueOf(i));
10        }
11    }
12
13}
14
  • RedisMQSubscriberMain


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1/**
2 * Redis MQ Subscriber 启动类
3 * */
4public class RedisMQSubscriberMain {
5
6    public static void main(String[] args) {
7        RedisMQDispatchMessageHandler redisMQDispatchMessageHandler = new RedisMQDispatchMessageHandler();
8
9        RedisMQSyncListener redisMQSyncListener = new RedisMQSyncListener(redisMQDispatchMessageHandler);
10
11        RedisMQSubscribeService redisMQSubscribeService = new RedisMQSubscribeService(redisMQSyncListener,RedisMQConst.MQ_KEY);
12
13
14    }
15
16}
17

Reference

[1]. Jedis实现Publish/Subscribe功能
[2]. 用redis实现消息队列(实时消费+ack机制)
[3]. 【Redis系列】Redis频道发布与消息订阅

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索