setNx是一个耗时操作,因为它需要查询这个键是否存在,就算redis的百万的qps,在高并发的场景下,这种操作也是有问题的。关于redis实现分布式锁,redis官方推荐使用redlock。
一、redlock简介
在不同进程需要互斥地访问共享资源时,分布式锁是一种非常有用的技术手段。实现高效的分布式锁有三个属性需要考虑:
- 安全属性:互斥,不管什么时候,只有一个客户端持有锁
- 效率属性A:不会死锁
- 效率属性B:容错,只要大多数redis节点能够正常工作,客户端端都能获取和释放锁。
Redlock是redis官方提出的实现分布式锁管理器的算法。这个算法会比一般的普通方法更加安全可靠。关于这个算法的讨论可以看下官方文档。
二、怎么用java使用 redlock
在pom文件引入redis和redisson依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1<!-- redis-->
2 <dependency>
3 <groupId>org.springframework.boot</groupId>
4 <artifactId>spring-boot-starter-data-redis</artifactId>
5 </dependency>
6 <!-- redisson-->
7 <dependency>
8 <groupId>org.redisson</groupId>
9 <artifactId>redisson</artifactId>
10 <version>3.3.2</version>
11 </dependency>
12
13
14
AquiredLockWorker接口类,,主要是用于获取锁后需要处理的逻辑:
1
2
3
4
5
6
7
8
9
10 1/**
2 * Created by fangzhipeng on 2017/4/5.
3 * 获取锁后需要处理的逻辑
4 */
5public interface AquiredLockWorker<T> {
6 T invokeAfterLockAquire() throws Exception;
7}
8
9
10
DistributedLocker 获取锁管理类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1
2/**
3 * Created by fangzhipeng on 2017/4/5.
4 * 获取锁管理类
5 */
6public interface DistributedLocker {
7
8 /**
9 * 获取锁
10 * @param resourceName 锁的名称
11 * @param worker 获取锁后的处理类
12 * @param <T>
13 * @return 处理完具体的业务逻辑要返回的数据
14 * @throws UnableToAquireLockException
15 * @throws Exception
16 */
17 <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception;
18
19 <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception;
20
21}
22
23
UnableToAquireLockException ,不能获取锁的异常类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1/**
2 * Created by fangzhipeng on 2017/4/5.
3 * 异常类
4 */
5public class UnableToAquireLockException extends RuntimeException {
6
7 public UnableToAquireLockException() {
8 }
9
10 public UnableToAquireLockException(String message) {
11 super(message);
12 }
13
14 public UnableToAquireLockException(String message, Throwable cause) {
15 super(message, cause);
16 }
17}
18
19
20
RedissonConnector 连接类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1/**
2 * Created by fangzhipeng on 2017/4/5.
3 * 获取RedissonClient连接类
4 */
5@Component
6public class RedissonConnector {
7 RedissonClient redisson;
8 @PostConstruct
9 public void init(){
10 redisson = Redisson.create();
11 }
12
13 public RedissonClient getClient(){
14 return redisson;
15 }
16
17}
18
19
20
RedisLocker 类,实现了DistributedLocker:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1import org.redisson.api.RLock;
2import org.redisson.api.RedissonClient;
3import org.springframework.beans.factory.annotation.Autowired;
4import org.springframework.stereotype.Component;
5import java.util.concurrent.TimeUnit;
6
7/**
8 * Created by fangzhipeng on 2017/4/5.
9 */
10@Component
11public class RedisLocker implements DistributedLocker{
12
13 private final static String LOCKER_PREFIX = "lock:";
14
15 @Autowired
16 RedissonConnector redissonConnector;
17 @Override
18 public <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception {
19
20 return lock(resourceName, worker, 100);
21 }
22
23 @Override
24 public <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception {
25 RedissonClient redisson= redissonConnector.getClient();
26 RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName);
27 // Wait for 100 seconds seconds and automatically unlock it after lockTime seconds
28 boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS);
29 if (success) {
30 try {
31 return worker.invokeAfterLockAquire();
32 } finally {
33 lock.unlock();
34 }
35 }
36 throw new UnableToAquireLockException();
37 }
38}
39
40
41
测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 1 @Autowired
2 RedisLocker distributedLocker;
3 @RequestMapping(value = "/redlock")
4 public String testRedlock() throws Exception{
5
6 CountDownLatch startSignal = new CountDownLatch(1);
7 CountDownLatch doneSignal = new CountDownLatch(5);
8 for (int i = 0; i < 5; ++i) { // create and start threads
9 new Thread(new Worker(startSignal, doneSignal)).start();
10 }
11 startSignal.countDown(); // let all threads proceed
12 doneSignal.await();
13 System.out.println("All processors done. Shutdown connection");
14 return "redlock";
15 }
16
17 class Worker implements Runnable {
18 private final CountDownLatch startSignal;
19 private final CountDownLatch doneSignal;
20
21 Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
22 this.startSignal = startSignal;
23 this.doneSignal = doneSignal;
24 }
25
26 public void run() {
27 try {
28 startSignal.await();
29 distributedLocker.lock("test",new AquiredLockWorker<Object>() {
30
31 @Override
32 public Object invokeAfterLockAquire() {
33 doTask();
34 return null;
35 }
36
37 });
38 }catch (Exception e){
39
40 }
41 }
42
43 void doTask() {
44 System.out.println(Thread.currentThread().getName() + " start");
45 Random random = new Random();
46 int _int = random.nextInt(200);
47 System.out.println(Thread.currentThread().getName() + " sleep " + _int + "millis");
48 try {
49 Thread.sleep(_int);
50 } catch (InterruptedException e) {
51 e.printStackTrace();
52 }
53 System.out.println(Thread.currentThread().getName() + " end");
54 doneSignal.countDown();
55 }
56 }
57
58
59
运行测试类:
Thread-48 start
Thread-48 sleep 99millis
Thread-48 end
Thread-49 start
Thread-49 sleep 118millis
Thread-49 end
Thread-52 start
Thread-52 sleep 141millis
Thread-52 end
Thread-50 start
Thread-50 sleep 28millis
Thread-50 end
Thread-51 start
Thread-51 sleep 145millis
Thread-51 end
从运行结果上看,在异步任务的情况下,确实是获取锁之后才能运行线程。不管怎么样,这是redis官方推荐的一种方案,可靠性比较高。有什么问题欢迎留言。
三、参考资料
https://github.com/redisson/redisson
《Redis官方文档》用Redis构建分布式锁
A Look at the Java Distributed In-Memory Data Model (Powered by Redis)