最近的项目中遇到分布式幂等问题,在本文中,我将用一个简单demo,简单阐述下使用分布式锁解决幂等问题以及分布式环境下秒杀扣库存并发问题的解决基本思路。
推荐视频链接
- Java 微服务实践视频教程 – Spring Boot
- Java 微服务实践 视频教程- Spring Cloud
- redis高可用视频
- 分布式电商商城视频教程
- kubernets+docer
- jvm
- 秒杀项目实战
- Linux shell
什么是分布式系统的幂等性
现如今我们的系统大多拆分为分布式SOA,或者微服务,一套系统中包含了多个子系统服务,而一个子系统服务往往会去调用另一个服务,而服务调用服务无非就是使用RPC通信或者restful,既然是通信,那么就有可能再服务器处理完毕后返回结果的时候挂掉,这个时候用户端发现很久没有反应,那么就会多次点击按钮,这样请求有多次,那么处理数据的结果是否要统一呢?那是肯定的!尤其再支付场景。
幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品使用约支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条...
幂等就是一个操作,不论执行多少次,产生的效果和返回的结果都是一样的.
防范POST重复提交
HTTP POST 操作既不是安全的,也不是幂等的(至少在HTTP规范里没有保证)。 当我们因为反复刷新浏览器导致多次提交表单,多次发出同样的POST请求,导致远端服务器重复创建出了资源。
所以,对于电商应用来说,第一对应的后端 WebService 一定要做到幂等性,第二服务器端收到 POST 请求,在操作成功后必须302跳转到另外一个页面,这样即使用户刷新页面,也不会重复提交表单。
接口api的幂等性支持
对外提供接口为了支持幂等调用,接口有两个字段必须传,一个是来源source,一个是来源方序列号seq,这个两个字段在提供方系统里面做联合唯一索引,这样当第三方调用时,先在本方系统里面查询一下,是否已经处理过,返回相应处理结果;没有处理过,进行相应处理,返回结果。注意,为了幂等友好,一定要先查询一下,是否处理过该笔业务,不查询直接插入业务系统,会报错,但实际已经处理了。
幂等的技术方案
唯一索引,防止新增脏数据
唯一索引或唯一组合索引来防止新增数据存在脏数据
(当表存在唯一索引,并发时新增报错时,再查询一次就可以了,数据应该已经存在了,返回结果即可)
token机制,防止页面重复提交
- 数据提交前要向服务的申请token,token放到redis或jvm内存,token有效时间
- 提交后后台校验token,同时删除token,生成新的token返回
redis要用删除操作来判断token,删除成功代表token校验通过,如果用select+delete来校验token,存在并发问题,不建议使用 。
使用唯一id解决重复提交问题(类似redis的删除token判断)
使用类似乐观锁的version机制实现;
分布式锁(redis的setnx);
2.1.2 使用唯一id解决重复交易的幂等性问题(类似redis存token)
基于幂等性的解决方案中一个完整的取钱流程被分解成了两个步骤:
-
调用create_ticket()获取ticket_id;
-
调用 idempotent_withdraw(ticket_id, account_id, amount)。
虽然create_ticket不是幂等的,但在这种设计下,它对系统状态的影响可以忽略,加上idempotent_withdraw 是幂等的,所以任何一步由于网络等原因失败或超时,客户端都可以重试,直到获得结果。
悲观锁
获取数据的时候加锁获取
select * from table_xxx where id=’xxx’ for update;
注意:id字段一定是主键或者唯一索引,不然是锁表,会死人的
悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,根据实际情况选用
乐观锁
乐观锁只是在更新数据那一刻锁表,其他时间不锁表,所以相对于悲观锁,效率更高。
乐观锁的实现方式多种多样可以通过version或者其他状态条件:
- 通过版本号实现
update table_xxx set name=#name#,version=version+1 where version=#version#
- 通过条件限制
update table_xxx set avai_amount=avai_amount-#subAmount# where avai_amount-#subAmount# >= 0
要求:quality-#subQuality# >= ,这个情景适合不用版本号,只更新是做数据安全校验,适合库存模型,扣份额和回滚份额,性能更高。
注意:乐观锁的更新操作,最好用主键或者唯一索引来更新,这样是行锁,否则更新时会锁表,上面两个sql改成下面的两个更好
update table_xxx set name=#name#,version=version+1 where id=#id# and version=#version#
update table_xxx set avai_amount=avai_amount-#subAmount# where id=#id# and avai_amount-#subAmount# >= 0
分布式锁
如果是分布是系统,构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁,这样其实是把多线程并发的锁的思路,引入多多个系统,也就是分布式系统中得解决思路。
何为分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品。我们来根据这个简单的业务场景来解释一下分布式锁。
通常具有秒杀场景的业务系统都比较复杂,承载的业务量非常巨大,并发量也很高。这样的系统往往采用分布式的架构来均衡负载。那么这1000个并发就会是从不同的地方过来,商品库存就是共享的资源,也是这1000个并发争抢的资源,这个时候我们需要将并发互斥管理起来。这就是分布式锁的应用。
而key-value存储系统,如redis,因为其一些特性,是实现分布式锁的重要工具。
基于Redis实现接口幂等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1try {
2 //等待在一个信号量上,挂起
3 beginCount.await();
4 boolean result=RedisReentrantLock.getInstance().tryLock(commidityId1.toString(),0,TimeUnit.MILLISECONDS);
5 if(!result){
6 System.out.println("幂等验证:正在处理中");
7 return;
8 }
9 System.out.println("处理业务逻辑");
10 } catch (InterruptedException e) {
11 e.printStackTrace();
12 }finally {
13 RedisReentrantLock.getInstance().unlock(commidityId1.toString());
14 endCount.countDown();
15 }
16
17
18
tryLock实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1 public boolean tryLock(String lockId, long timeout, TimeUnit unit) throws InterruptedException{
2 Thread currentThread = Thread.currentThread();
3 LockData lockData = threadData.get(currentThread);
4 if ( lockData != null ) {
5 lockData.lockCount.incrementAndGet();
6 return true;
7 }
8 String lockVal = tryRedisLock(lockId,timeout,unit);
9 if ( lockVal != null ) {
10 LockData newLockData = new LockData(currentThread, lockVal);
11 threadData.put(currentThread, newLockData);
12 return true;
13 }
14 return false;
15 }
16
17
18
tryRedisLock实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1 String tryRedisLock(String lockId,long time, TimeUnit unit) {
2 final long startMillis = System.currentTimeMillis();
3 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
4 String lockValue=null;
5 while (lockValue==null){
6 lockValue=createRedisKey(lockId);
7 if(lockValue!=null){
8 break;
9 }
10 if(System.currentTimeMillis()-startMillis-retryAwait>millisToWait){
11 break;
12 }
13
14 try {
15 ////短暂休眠,避免可能的活锁
16 Thread.sleep(3, RANDOM.nextInt(30));
17 } catch (InterruptedException e) {
18 }
19 LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(retryAwait));
20 }
21 return lockValue;
22 }
23
24
createRedisKey 实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1private String createRedisKey(String lockId) {
2 Jedis jedis = null;
3 boolean broken = false;
4 try {
5 String value=lockId+randomId(1);
6 jedis = jedisPool.getResource();
7 String luaScript = ""
8 + "\nlocal r = tonumber(redis.call('SETNX', KEYS[1],ARGV[1]));"
9 + "\nredis.call('PEXPIRE',KEYS[1],ARGV[2]);"
10 + "\nreturn r";
11 List<String> keys = new ArrayList<String>();
12 keys.add(lockId);
13 List<String> args = new ArrayList<String>();
14 args.add(value);
15 args.add(lockTimeout+"");
16 Long ret = (Long) jedis.eval(luaScript, keys, args);
17 if( new Long(1).equals(ret)){
18 return value;
19 }
20 }finally {
21 if(jedis!=null) jedis.close();
22 }
23 return null;
24 }
25
26
unlock实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1public void unlock(String lockId) {
2 Thread currentThread = Thread.currentThread();
3 LockData lockData = threadData.get(currentThread);
4 if ( lockData == null ) {
5 return;
6 }
7 int newLockCount = lockData.lockCount.decrementAndGet();
8 if ( newLockCount > 0 ) {
9 return;
10 }
11 if ( newLockCount < 0 ) {
12 throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + lockId);
13 }
14 try {
15 unlockRedisLock(lockId,lockData.lockVal);
16 } finally {
17 if(lockData!=null){
18 threadData.remove(currentThread);
19 }
20 }
21 }
22
23
unlockRedisLock实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1void unlockRedisLock(String key,String value) {
2 Jedis jedis = null;
3 boolean broken = false;
4 try {
5 jedis = jedisPool.getResource();
6 String luaScript=""
7 +"\nlocal v = redis.call('GET', KEYS[1]);"
8 +"\nlocal r= 0;"
9 +"\nif v == ARGV[1] then"
10 +"\nr =redis.call('DEL',KEYS[1]);"
11 +"\nend"
12 +"\nreturn r";
13 List<String> keys = new ArrayList<String>();
14 keys.add(key);
15 List<String> args = new ArrayList<String>();
16 args.add(value);
17 Object r=jedis.eval(luaScript, keys, args);
18 } finally {
19 if(jedis!=null) jedis.close();
20 }
21 }
22
23
到此为止,分布式幂等接口已经实现了,是不是很简单!!
分布式秒杀减库存实现方案
所谓秒杀,从业务角度看,是短时间内多个用户“争抢”资源,这里的资源在大部分秒杀场景里是商品;将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。
一些可能的实现
刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可以不加思索的想到下面的一些方法:
- 1、秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,在不考虑缓存的情况下应该是要操作数据库的。那么最简单直接的实现就是在这个方法上加上synchronized关键字,通俗的讲就是锁住整个方法;
- 2、锁住整个方法这个策略简单方便,但是似乎有点粗暴。可以稍微优化一下,只锁住秒杀的代码块,比如写数据库的部分;
- 3、既然有并发问题,那我就让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作,自然不会有并发问题。
上面所述的方法都是有效的,但是都不好。为什么?第一和第二种方法本质上是“加锁”,但是锁粒度依然比较高。什么意思?试想一下,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,从业务上讲应该是可以同时进行的,但是如果采用第一二种方法,这两个线程也会去争抢同一个锁,这其实是不必要的。第三种方法也没有解决上面说的问题。
那么如何将锁控制在更细的粒度上呢?可以考虑为每个商品设置一个互斥锁,以和商品ID相关的字符串为唯一标识,这样就可以做到只有争抢同一件商品的线程互斥,不会导致所有的线程互斥。分布式锁恰好可以帮助我们解决这个问题。
需要考虑的问题
1、用什么操作redis?幸亏redis已经提供了jedis客户端用于java应用程序,直接调用jedis API即可。
2、怎么实现加锁?“锁”其实是一个抽象的概念,将这个抽象概念变为具体的东西,就是一个存储在redis里的key-value对,key是于商品ID相关的字符串来唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个商品已经上锁。
3、如何释放锁?既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。
4、阻塞还是非阻塞?笔者采用了阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。
5、如何处理异常情况?比如一个线程把一个商品上了锁,但是由于各种原因,没有完成操作(在上面的业务场景里就是没有将库存-1写入数据库),自然没有释放锁,这个情况笔者加入了锁超时机制,利用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁(可以认为超时释放锁是一个异步操作,由redis完成,应用程序只需要根据系统特点设置超时时间即可)。
具体实现
模拟秒杀场景,1000个线程来争抢两个商品:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 1@Test
2 public void testSecKill(){
3 int threadCount = 1000;
4 int splitPoint = 500;
5 CountDownLatch endCount = new CountDownLatch(threadCount);
6 CountDownLatch beginCount = new CountDownLatch(1);
7 SecKillImpl testClass = new SecKillImpl();
8
9 Thread[] threads = new Thread[threadCount];
10 //起500个线程,秒杀第一个商品
11 for(int i= 0;i < splitPoint;i++){
12 threads[i] = new Thread(new Runnable() {
13 public void run() {
14 try {
15 //等待在一个信号量上,挂起
16 beginCount.await();
17 //用动态代理的方式调用secKill方法
18 SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
19 new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
20 proxy.secKill("test", commidityId1);
21 endCount.countDown();
22 } catch (InterruptedException e) {
23 // TODO Auto-generated catch block
24 e.printStackTrace();
25 }
26 }
27 });
28 threads[i].start();
29
30 }
31
32 for(int i= splitPoint;i < threadCount;i++){
33 threads[i] = new Thread(new Runnable() {
34 public void run() {
35 try {
36 //等待在一个信号量上,挂起
37 beginCount.await();
38 //用动态代理的方式调用secKill方法
39 beginCount.await();
40 SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
41 new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
42 proxy.secKill("test", commidityId2);
43 //testClass.testFunc("test", 10000001L);
44 endCount.countDown();
45 } catch (InterruptedException e) {
46 // TODO Auto-generated catch block
47 e.printStackTrace();
48 }
49 }
50 });
51 threads[i].start();
52
53 }
54
55
56 long startTime = System.currentTimeMillis();
57 //主线程释放开始信号量,并等待结束信号量
58 beginCount.countDown();
59
60 try {
61 //主线程等待结束信号量
62 endCount.await();
63 //观察秒杀结果是否正确
64 System.out.println(SecKillImpl.inventory.get(commidityId1));
65 System.out.println(SecKillImpl.inventory.get(commidityId2));
66 System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
67 System.out.println("total cost " + (System.currentTimeMillis() - startTime));
68 } catch (InterruptedException e) {
69 // TODO Auto-generated catch block
70 e.printStackTrace();
71 }
72 }
73
74
CacheLockInterceptor实现InvocationHandler接口,在invoke方法中获取注解的方法和参数,在执行注解的方法前加锁,执行被注解的方法后释放锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1public Object invoke(Object proxy, Method method, Object[] args) throws InvocationTargetException, IllegalAccessException, CacheLockException, InterruptedException {
2
3 CacheLock cacheLock = method.getAnnotation(CacheLock.class);
4 //没有cacheLock注解,pass
5 if(null == cacheLock){
6 System.out.println("no cacheLock annotation");
7 return method.invoke(proxied, args);
8 }
9
10 //获得方法中参数的注解
11 Annotation[][] annotations = method.getParameterAnnotations();
12 //根据获取到的参数注解和参数列表获得加锁的参数
13 Object lockedObject = getLockedObject(annotations,args);
14 String objectValue = lockedObject.toString();
15 boolean result = RedisReentrantLock.getInstance().tryLock(objectValue,cacheLock.expireTime(), TimeUnit.MILLISECONDS);
16 if(!result){//取锁失败
17 ERROR_COUNT += 1;
18 throw new CacheLockException("get lock fail");
19 }
20 try{
21 //执行方法
22 return method.invoke(proxied, args);
23 }finally{
24 System.out.println("intecepor 释放锁");
25 RedisReentrantLock.getInstance().unlock(objectValue);//释放锁
26 }
27
28 }
29
30
31
tryLock 和unlock 与前面讲的幂等里面是一样的,这里就不重复写了。
分布式速率限制器实现
制的资源,可以是ip,用户id,订单id,手机号,等等.
-
例如限制一个手机号每分钟只能发1条短信.
-
例如限制一个手机号每10秒钟只能发起1次表单提交请求.
-
例如限制一个ip地址每秒钟只能访问10次特定的资源.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1public class AccessSpeedLimitTest {
2 @Test
3 public void test1() throws InterruptedException {
4 JedisPool jp=new JedisPool("127.0.0.1",6379);
5 AccessSpeedLimit accessSpeedLimit=new AccessSpeedLimit(jp);
6 SimpleDateFormat sdf=new SimpleDateFormat(" mm:ss");
7 while(true){
8 //10.0.0.1这个ip每1秒钟最多访问5次if块内代码
9 if(accessSpeedLimit.tryAccess("10.0.0.1", 1,5)){
10 System.out.println("yes"+sdf.format(new Date()));
11 }else{
12 System.out.println("no"+sdf.format(new Date()));
13 }
14 Thread.sleep(100);
15 }
16 }
17
18 @Test
19 public void test2() throws InterruptedException {
20 JedisPool jp=new JedisPool("127.0.0.1",6379);
21 final RedisDistributedLockTemplate template=new RedisDistributedLockTemplate(jp);
22 LimitRule limitRule=new LimitRule();
23 limitRule.setSeconds(1);
24 limitRule.setLimitCount(5);
25 limitRule.setLockCount(7);
26 limitRule.setLockTime(2);
27 AccessSpeedLimit accessSpeedLimit=new AccessSpeedLimit(jp);
28 SimpleDateFormat sdf=new SimpleDateFormat(" mm:ss");
29 while(true){
30 //10.0.0.1这个ip每1秒钟最多访问5次if块内代码.1秒超过10次后,锁定2秒,2秒内无法访问.
31 if(accessSpeedLimit.tryAccess("10.0.0.1",limitRule)){
32 System.out.println("yes"+sdf.format(new Date()));
33 }else{
34 System.out.println("no"+sdf.format(new Date()));
35 }
36 Thread.sleep(100);
37 }
38 }
39}
40
41