前言
redis设计的初衷并不是为了消息队列而设计的,但是有太多的人将Redis作为消息队列而使用。Redis消息队列时,当Redis宕机后,消息会丢失。如果收消息方未有重发和验证机制,Redis内的数据会出现丢失。所以,使用Redis的作为消息队列,通常是对于消息的准确性并非特别高的场景。当需要对数据非常敏感以及准确性较高的情况可以使用Kafka、RabbitMQ等专门等消息队列。但是,通常,这会增加系统的开发成本和维护成本。
本文相关代码,可在我的Github项目https://github.com/SeanYanxml/bigdata/tree/master/redis 目录下可以找到。
PS: (如果觉得项目不错,可以给我一个Star。)
Demo
-
RedisMQConst
1
2
3
4
5
6
7 1public class RedisMQConst {
2
3 public static final String MQ_KEY = "MQ_KEY";
4
5}
6
7
-
RedisMQDispatchMessageHandler 消息处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1public class RedisMQDispatchMessageHandler {
2
3 public void syncFile(){
4 // do something about syncFile thing.
5// while(true){
6 try{
7 String message = RedisMQService.lpop(RedisMQConst.MQ_KEY);
8 System.out.println("SYNC FILE: "+ System.currentTimeMillis() + " - " + message);
9 if(null != message){
10 // do something about syncFile.
11 // messageHandler.synfile(message);
12 System.out.println("SYNC FILE: "+ System.currentTimeMillis() + " - " + message);
13 }
14 Thread.currentThread().setName(UUID.randomUUID().toString());
15 }catch(Exception ex){
16 // do something exception.
17 }
18// }
19 }
20
21
22
23}
24
1
2
3 1* RedisMQService Redis基本服务工具类(订阅等)
2
3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 1public class RedisMQService {
2
3 public static JedisPoolManager manager = new JedisPoolManager();
4
5 // 订阅 subscribe
6 public static boolean subscribe(JedisPubSub jedisPubSub, String channels){
7 Jedis jedis = manager.getJedis();
8 jedis.subscribe(jedisPubSub, channels);
9// jedis.close();
10 return true;
11 }
12
13 public static boolean publish(String channels,String message){
14 Jedis jedis = manager.getJedis();
15 jedis.publish(channels, message);
16 jedis.close();
17 return true;
18 }
19
20 public static String lpop(String key){
21 Jedis jedis = manager.getJedis();
22 String result = jedis.lpop(key);
23 jedis.close();
24 return result;
25 }
26
27 public static void lpush(String key,String value){
28 Jedis jedis = manager.getJedis();
29 jedis.lpush(key, value);
30 jedis.close();
31 }
32}
33
34/**
35 * 多线程redis抛出异常B cannot be cast to java.lang.Long
36 * https://blog.csdn.net/JavaMoo/article/details/77233976
37 * 原因:多个线程同时调用了同一个jedis对象,导致内存数据被多个线程竞争,产生数据混乱
38 * 解决方案:每个线程都new出一个自己的jedis对象
39 * Exception in thread "main" java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.Long
40 at redis.clients.jedis.Connection.getIntegerReply(Connection.java:265)
41 at redis.clients.jedis.Jedis.lpush(Jedis.java:882)
42 at com.yanxml.redis.demos.mq.RedisMQService.lpush(RedisMQService.java:23)
43 at com.yanxml.redis.demos.mq.RedisMQDemoMain.main(RedisMQDemoMain.java:15)
44Exception in thread "Thread-0" redis.clients.jedis.exceptions.JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
45 at redis.clients.jedis.Protocol.processError(Protocol.java:127)
46 at redis.clients.jedis.Protocol.process(Protocol.java:161)
47 at redis.clients.jedis.Protocol.read(Protocol.java:215)
48 at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
49 at redis.clients.jedis.Connection.getRawObjectMultiBulkReply(Connection.java:285)
50 at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:121)
51 at redis.clients.jedis.JedisPubSub.proceed(JedisPubSub.java:115)
52 at redis.clients.jedis.Jedis.subscribe(Jedis.java:2680)
53 at com.yanxml.redis.demos.mq.RedisMQService.subscribe(RedisMQService.java:14)
54 at com.yanxml.redis.demos.mq.RedisMQSubscribeService$1.run(RedisMQSubscribeService.java:17)
55 at java.lang.Thread.run(Thread.java:745)
56 * */
57
-
RedisMQSubscribeService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1public class RedisMQSubscribeService{
2
3 private RedisMQSyncListener redisMQSyncListener;//订阅者
4
5 public RedisMQSubscribeService(RedisMQSyncListener redisMQSyncListener,String channel){
6 this.redisMQSyncListener = redisMQSyncListener;
7 subscribe(this.redisMQSyncListener,channel);
8 }
9
10 public void subscribe(RedisMQSyncListener redisMQSyncListener, String channel) {
11 new Thread(new Runnable() {
12
13 @Override
14 public void run() {
15 System.out.println("Subscribe start.");
16
17 RedisMQService.subscribe(redisMQSyncListener, channel);
18
19 System.out.println("Subscribe OK.");
20
21 }
22 }).start();
23 }
24}
25
-
RedisMQSyncListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public class RedisMQSyncListener extends JedisPubSub{
2
3 private RedisMQDispatchMessageHandler dispatchMessageHandler;
4
5 public RedisMQSyncListener(RedisMQDispatchMessageHandler dispatchMessageHandler){
6 this.dispatchMessageHandler = dispatchMessageHandler;
7 }
8
9 @Override
10 public void onMessage(String channel, String message){
11 System.out.println("On Message.");
12 dispatchMessageHandler.syncFile();
13 }
14
15
16}
17
18
-
RedisMQPublisherMain
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1/**
2 * Redis MQ Publisher 启动类
3 * */
4public class RedisMQPublisherMain {
5 public static void main(String[] args) {
6 int i=0;
7 while(i<2){
8 i++;
9 RedisMQService.publish(RedisMQConst.MQ_KEY,String.valueOf(i));
10 }
11 }
12
13}
14
-
RedisMQSubscriberMain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1/**
2 * Redis MQ Subscriber 启动类
3 * */
4public class RedisMQSubscriberMain {
5
6 public static void main(String[] args) {
7 RedisMQDispatchMessageHandler redisMQDispatchMessageHandler = new RedisMQDispatchMessageHandler();
8
9 RedisMQSyncListener redisMQSyncListener = new RedisMQSyncListener(redisMQDispatchMessageHandler);
10
11 RedisMQSubscribeService redisMQSubscribeService = new RedisMQSubscribeService(redisMQSyncListener,RedisMQConst.MQ_KEY);
12
13
14 }
15
16}
17
Reference
[1]. Jedis实现Publish/Subscribe功能
[2]. 用redis实现消息队列(实时消费+ack机制)
[3]. 【Redis系列】Redis频道发布与消息订阅