Lua在Redis中的应用—分布式锁,限制访问次数
Lua是一个高效的轻量级脚本语言。它是开源的,非常小巧,整个源码也才五百来K,可以很方便地嵌入到程序中(无论是桌面端还是移动端)
1.分布式锁
分布式锁可以用多种方式来实现常用为以下方式:
1、基于数据库表做乐观锁,用于分布式锁。
2、memcached
3、redis
4、zookeeper
我们本次只说一下redis(r2m)的实现方式,并由简单分布式锁,以及问题分析,逐渐改进分布式锁的问题。最终达到完成一个尽可能完美的解决方案。(完美是相对的,最终并不能解决集群中锁所在服务器redis进程崩溃,而引起的锁失效问题)。
1.1首先看下简单锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 1 /**
2 * 获取锁(简单)
3 * @param lockName 锁名
4 * @param tryNum 重试次数
5 * @return
6 * @throws InterruptedException
7 */
8 private synchronized String acquire_lock(Jedis redis,String lockName,int tryNum) {
9 String uuid =System.currentTimeMillis()+"";
10 for (int i = 0; i < tryNum; i++) {
11 Long n = redis.setnx("lock:"+lockName, uuid);
12 if(n==1) {
13 return uuid;
14 }
15 try {
16 Thread.sleep(100);
17 } catch (InterruptedException e) {
18 e.printStackTrace();
19 }
20 }
21 return null;
22 }
23
24 /**
25 * 释放锁
26 * @param lockName 锁名
27 * @param lockValue 锁内容
28 */
29 private void release_lock(Jedis redis,String lockName,String lockValue) {
30
31 String lockname = "lock:"+lockName;
32 boolean chek = lockValue.equals(redis.get(lockname));
33 if(chek){
34 redis.del(lockname);
35 }
36 }
37
38 @Test
39 public void testRedisLock() throws InterruptedException {
40 int thread_Num = 15;
41 CountDownLatch countDownLatch = new CountDownLatch(thread_Num);
42 String lockName = Thread.currentThread().getName();
43 for (int i = 0; i < thread_Num; i++) {
44 executor.execute(new Runnable() {
45 Jedis redis = RedisUtil.getJedis();
46 @Override
47 public void run() {
48 try {
49 String lockValue = null;
50 try {
51 lockValue = acquire_lock(redis, lockName, 5);
52 } finally {
53 if (lockValue != null) {
54 release_lock(redis, lockName, lockValue);
55 }
56 }
57
58 } finally {
59 countDownLatch.countDown();
60 RedisUtil.returnResource(redis);
61 }
62 }
63 });
64
65 }
66 countDownLatch.await();
67 executor.shutdown();
68 }
69
这个简单的锁有什么问题呢?当持有锁线程死掉了会发生什么?这时锁就不可能再释放,释放锁的线程已死。
so我以将以上简单的锁改为可自动释放的锁
1.2自动释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 1/**
2 * 当获持有线程崩溃时,自动释放
3 * @param lockName 锁名
4 * @param tryNum 重试次数
5 * @param lock_timeout 锁自动超时时间 (毫秒)
6 * @return
7 */
8 private synchronized String acquire_loca_with_timeout(Jedis redis, String lockName, int tryNum, long lock_timeout) {
9 String uuid = (lock_timeout + System.currentTimeMillis())+"";
10 String key = "lock:" + lockName;
11 for (int i = 0; i < tryNum; i++) {
12 Long n = redis.setnx(key, uuid);
13 if (n == 1) {
14 // redis.expire("lock:"+lockName,lock_timeout);
15 return uuid;
16 } else {
17 String time = redis.get(key);
18 if(time!=null) {
19 long t = Long.valueOf(time);
20 if (System.currentTimeMillis() > t ) {
21 String oldTime = redis.getSet(key, uuid);
22 if (System.currentTimeMillis() >Long.valueOf(oldTime)) {
23 return uuid;
24 }
25 }
26 }
27 }
28 try {
29 Thread.sleep(100);
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 }
33 }
34 return null;
35 }
36 /**
37 * 释放锁
38 * @param lockName 锁名
39 * @param lockValue 锁内容
40 */
41 private void release_lock_with_timeout(Jedis redis,String lockName,String lockValue) {
42 String lockname = "lock:"+lockName;
43 boolean chek = lockValue.equals(redis.get(lockname));
44 if(chek){
45 String time = redis.get(lockname);
46 //只有在超时范围内才是自己的锁,否则可能锁已被其它线程获得
47 if(System.currentTimeMillis()<= Long.valueOf(time)) {
48 redis.del(lockname);
49 }
50 }
51 }
52 @Test
53 public void testRedisLock() throws InterruptedException {
54 int thread_Num = 15;
55 CountDownLatch countDownLatch = new CountDownLatch(thread_Num);
56 String lockName = Thread.currentThread().getName();
57 for (int i = 0; i < thread_Num; i++) {
58 executor.execute(new Runnable() {
59 Jedis redis = RedisUtil.getJedis();
60 @Override
61 public void run() {
62 try {
63 String lockValue = null;
64 try {
65 lockValue = acquire_loca_with_timeout(redis,lockName,5,1000L);
66 } finally {
67 if (lockValue != null) {
68 release_lock_with_timeout(redis,lockName,lockValue);
69 }
70 }
71
72 } finally {
73 countDownLatch.countDown();
74 RedisUtil.returnResource(redis);
75 }
76 }
77 });
78
79 }
80 countDownLatch.await();
81 executor.shutdown();
82 }
83
这一种实现基本没有太大的问题了,它比上种有很大的改善,它获取锁不再单纯依赖setnx
其中需要说明的有以下内容:
1
2
3 1value 修改为(当前时间+过期时间): lock_timeout + System.currentTimeMillis()
2
3
1
2
3
4
5
6
7
8
9
10
11 1 String time = redis.get(key);
2 if(time!=null) {
3 long t = Long.valueOf(time);
4 if (System.currentTimeMillis() > t ) {
5 String oldTime = redis.getSet(key, uuid);
6 if (System.currentTimeMillis() >Long.valueOf(oldTime)) {
7 return uuid;
8 }
9 }
10 }
11
如果setnx为0时 看当前时间 是否大于保存的value如果是说明过期了,此时正常应该是直接就获取到锁了。
但如果此时两个线程都获取过这个过期信号。
so此时 redis.getSet(key, uuid) 执行getset命令将先设置一个时间,并返回老的时间,再对比一次与当前时间的值大小,就可以避免这种情况了。但这时问题来了此时未获取到的线程也会修改这个key的时间(getset) 但这个影响不大。那有没有更好的解决办法呢:
1.3 set nx px实现锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 1 /**
2 * set nx px 进行设置锁
3 * nx当不存在时才设置 xx为存在时才设置,px为毫秒 ex为秒
4 * 这个有什么坏处呢?:
5 * 1、删除锁时如果这个锁已过期了页,而过期期间锁已被其它线程拿到,之后当前线程处理完了,del锁时已经删除的不是自己的锁了。
6 * 如下:A客户端拿到对象锁,但在因为一些原因被阻塞导致无法及时释放锁。
7 因为过期时间已到,Redis中的锁对象被删除。
8 B客户端请求获取锁成功。
9 A客户端此时阻塞操作完成,删除key释放锁。
10 C客户端请求获取锁成功。
11 这时B、C都拿到了锁,因此分布式锁失效。
12 * 2、要避免1中的情况发生,就要保证key的值是唯一的,且每一个拿到该key锁的值不一样,只有拿到锁的客户端才能进行删除。
13 * 基于这个原因,普通的del命令是不能满足要求的,我们需要一个能判断客户端传过来的value和锁对象的value是否一样的命令。Redis并没有这样的原子命令,这时可以通过Lua脚本来完成:
14 * @param redis
15 * @param lockName
16 * @param tryNum
17 * @param lock_timeout
18 * @return
19 */
20 public boolean acquire_loca_nxpx(Jedis redis, String lockName,String value, int tryNum, int lock_timeout) {
21 for (int i = 0; i < tryNum; i++) {
22 String valuel = redis.set(lockName, value, "NX", "PX", lock_timeout);
23 if ("OK".equals(valuel)) {
24 return true;
25 }
26 try {
27 Thread.sleep(1000);
28 } catch (InterruptedException e) {
29 e.printStackTrace();
30 }
31 }
32 return false;
33 }
34 /**
35 * 释放锁
36 * @param redis
37 * @param lockName
38 * @param value
39 * @return
40 */
41 private long del_lock(Jedis redis, String lockName,String value) {
42 String script = "local key =KEYS[1]; local value = ARGV[1] \n" +
43 " if redis.call(\"get\",key) == value then \n" +
44 " return redis.call(\"del\",key)\n" +
45 " else \n" +
46 " return 0 \n" +
47 " end";
48 Object result = redis.eval(script, 1, lockName,value);
49 return (long)result ;
50 }
51 @Test
52 public void testLuaRedisLock() throws InterruptedException {
53 int thread_Num = 5;
54 CountDownLatch countDownLatch = new CountDownLatch(thread_Num);
55 String lockName = Thread.currentThread().getName();
56 for (int i = 0; i < thread_Num; i++) {
57 executor.execute(new Runnable() {
58 Jedis redis = RedisUtil.getJedis();
59 @Override
60 public void run() {
61 try {
62 boolean lockValue = false;
63 String value = UUID.randomUUID().toString();
64 try {
65 lockValue = acquire_loca_lua(redis,lockName,value,5,3000);
66 } finally {
67
68 if (lockValue) {
69 del_lock(redis,lockName,value);
70 }
71 }
72
73 } finally {
74 countDownLatch.countDown();
75 RedisUtil.returnResource(redis);
76 }
77 }
78 });
79
80 }
81 countDownLatch.await();
82 executor.shutdown();
83 }
84
也可以将上同set nx px 改为lua 是一样的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1
2/**
3 * lua的一个获取锁的方法 效果与相同当然弊端也一样;
4 * @see #acquire_loca_nxpx(Jedis, String, String, int, int)
5 * @param redis
6 * @param lockName
7 * @param value
8 * @param tryNum
9 * @param lock_timeout
10 * @return
11 */
12 public boolean acquire_loca_lua(Jedis redis, String lockName, String value, int tryNum, int lock_timeout) {
13 String script = "local key = KEYS[1] \n" +
14 " local value = ARGV[1] \n" +
15 " local outTime = ARGV[2] \n" +
16 " local num = redis.call(\"setnx\",key,value)" +
17 " if num == 1 then \n" +
18 " redis.call(\"expire\",key,outTime) \n"+
19 " end \n" +
20 " return num ";
21 for (int i = 0; i < tryNum; i++) {
22 Long num = (Long) redis.eval(script, 1, lockName,value,lock_timeout+"");
23 System.err.println("acquire_loca_lua:"+num);
24 if(num.intValue()==1) {
25 return true;
26 }
27 try {
28 Thread.sleep(1000);
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32 }
33 return false;
34 }
35
至此锁就完了。。
2.IP防问次数限制
通过上面的例子我们也可以发现,只要有判断的,其实在分布式的系统中就已不再是原子操作,就算是在本地程序中加了锁,也只能保证在本JVM下的线程安全,但往往现在有服务都在多服务器部署。SOjava中的多并发大部分是用在处理数据上,而且往往不能多服务同时执行,除非从逻辑上进行分配数据如通过hash各服务器处理不同的数据 或者通过分布式锁等方法。redis的天生单线程和单进程。如果能将一些简单逻辑操作做为原子操作进行一块执行,就可以很方便的实现多服务器的原子操作。这可以通过redis的事务实现。但r2m(京东自研的分布式redis集群)并不支持事务,而且r2m的文档也说明了,所有需要事务的地方推荐使用lua脚本来实现。只要能事务实现的都可以用lua脚本实现。
如下这个业务可以简单的用lua很方便的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1 /**
2 * @param redis
3 * @param ip 限制的IP
4 * @param limit_time 在某一个时间段(秒) 如:10 秒限制3次 ip_limit(xxx,"127.0.0.1",10,3)
5 * @param limit_count 在这个时间内限制访问多少次
6 * @return
7 */
8 private String ip_limit(Jedis redis,String ip,int limit_time,int limit_count) {
9 String script = "local times = redis.call('incr',KEYS[1]) \n"+
10 " if times == 1 then \n"
11 + "redis.call('expire',KEYS[1],ARGV[1]) \n"
12 + "end \n"
13 + "if times> tonumber(ARGV[2]) then \n"
14 + " return 0 \n"
15 + "end \n"
16 + "return 1";
17 Object result = redis.eval(script, 1, ip,limit_time+"",limit_count+"");
18 return (String) result;
19 }
20