JAVA多线程与高并发(二)[volatile,CAS(无锁优化,自旋锁,乐观锁-AtomicLong,LongAdder-分段锁)]

释放双眼,带上耳机,听听看~!

link-JAVA多线程与高并发系列[前言,大纲,目录]

volatile

首先,大佬(马老师)说,这个volatile在工程中能不用就不用,因为这玩意不好掌控,没有什么资料.

1. 保证线程可见性(synchronize也有这效果

设一个变量a,如果没有加volatile,多线程情况下,在线程t1修改了a的值后,另一个线程t2读到的仍然是旧值;如果加了volatile修饰,t2就可以马上读到t1修改后的值.
因为如下:(本质依靠的是MESI,CPU的缓存一致性协议)
首先变量a保存在heap堆内存中,堆内存是各线程共享内存;而且每个线程都有自己的专属工作内存.
当两个线程,t1和t2去访问共享内存的变量a时,他们会各自把a复制一份到自己的专属内存.
如果变量a没有加volatile,这时候如果线程t1修改了变量的值,(t1应该会把变动马上同步回共享内存),但是线程t2什么时候去共享内存再次读取同步变量a,不好控制,如果线程2没有去共享内存再次读取同步变量a,那么就看不见线程1的修改后的结果.
看一个保证线程可见性的例子:
在一秒后,main线程修改了变量running的值,
没有加volatile时,程序会过很久才打印"m end!";
加了volatile后,会在一秒后马上打印"m end!"


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1import java.util.concurrent.TimeUnit;
2
3public class T01_HelloVolatile {
4    // 对比一下有无volatile的运行结果
5   /*volatile*/ boolean running = true;
6   void m() {
7       System.out.println("m start");
8       while(running) {
9       }
10      System.out.println("m end!");
11  }
12 
13  public static void main(String[] args) {
14      T01_HelloVolatile t = new T01_HelloVolatile();
15     
16      new Thread(t::m, "t1").start();
17
18      try {
19          TimeUnit.SECONDS.sleep(1);
20      } catch (InterruptedException e) {
21          e.printStackTrace();
22      }
23     
24      t.running = false;
25  }
26 
27}
28
29

2. 禁止指令重排序(也和CPU有关)(synchronize无此效果)

(其底层原理是加了读屏障loadfence和写屏障storefence原语指令)
以前的CPU是"串联"执行指令;现代CPU为了提高效率,当第一个指令执行到中间时,就开始执行第二个指令.(原来像是平铺的水泥板,现在像是楼梯).
为了利用CPU的这种高效架构,编译器(compiler)把源码编译时,可能会将指令重新排序,据说这样会把速度提高很多.
指令重排序可能带来问题的场景举例:DCL单例模式

DCL双重检查锁实现的单例模式(静态实例变量加volatile)

new一个对象的时候,分为三步:

  1. 给这个对象申请内存,给成员变量赋默认值(比如int的默认值为0);
  2. 给这个对象的成员变量初始化(比如我们写的int a=8)
  3. 把申请的内存赋值给对象的"引用"

正常情况下是1,2,3顺序执行;如果发生指令重排序的话,会1,3,2这样顺序执行.
重排序后,在特别大的并发量下,可能会在第1,3步后,还没有执行第2步前,这时该引用已经!=null了,但是成员变量还没有初始化,这个时候对象被其他线程使用就会出问题.

DCL单例模式举例:(这个属于懒汉模式,一般用不到;直接用恶汉单例就行啦,没必要用懒汉)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1public class Manager {
2    private static volatile Manager INSTANCE = null;
3
4   // 私有构造方法
5    private Manager() {
6
7    }
8
9    public static Manager getInstance() {
10        if (INSTANCE == null) {
11            synchronized (Manager.class) {
12                if (INSTANCE == null) {
13                    // 初始化INSTANCE,加载所需资源等
14                    INSTANCE = new Manager();
15                }
16            }
17        }
18        return INSTANCE;
19    }
20}
21
22

3. 不能保证原子性

做个测试证明一下,如果volatile可以保证原子性,那么下面这段代码应该输出100000;反之则证明volatile不能保证原子性.
如果想保证原子性,可以在方法m()上加个synchronize,或者把count++这段代码用synchronize包起来.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1public class T04_VolatileNotSync {
2   volatile int count = 0;
3   void m() {
4       for(int i=0; i<10000; i++) {
5         count++;
6         /*
7         synchronize(this){
8           count++;
9         }
10        */
11      }
12  }
13 
14  public static void main(String[] args) {
15      T04_VolatileNotSync t = new T04_VolatileNotSync();
16      List<Thread> threads = new ArrayList<Thread>();
17      for(int i=0; i<10; i++) {
18          threads.add(new Thread(t::m, "thread-"+i));
19      }
20      threads.forEach((o)->o.start());
21      threads.forEach((o)->{
22          try {
23              o.join();
24          } catch (InterruptedException e) {
25              e.printStackTrace();
26          }
27      });
28      System.out.println(t.count);
29  }
30}
31
32

4.volatile修饰引用类型变量的可见性?

有些文章会写到,volatile如果修饰引用类型变量,那么"引用"的地址的改变对其他线程是可见的,但是引用的对象的属性变化对其他线程不可见.

如果你写一个例子,经过一些尝试,发现普通对象的属性的改变,volatile能保证其变化是可见的.但是!!!
但是!!!大量的测试后,我发现,不是每次都可见,特别是对象的属性变化很多次的时候!
所以,结论是,volatile的确不能保证变量指向的对象的属性的可见性.
又但是!如果修改对象属性的线程,sleep了一下,哪怕一纳秒,那么就能保证可见了,真是奇怪,回头有时间研究下JVM没准能搞明白.(也有可能是sleep的情况下,测试次数不够多)

下面是测试的例子,可以看出,volatile修饰的引用类型变量,如果修改的线程(生产者)没有sleep,其他线程不是每次都可见.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
1/**
2 * @author liweizhi
3 * @date 2020/3/4 18:18
4 */
5public class VolatileObject {
6    volatile static Pet pet = new Pet("dahuang", 1);
7
8    public static void main(String[] args) {
9        // 多运行几次无论是ageChange还是nameChange,会发现有时候t1不会结束(如果t2不sleep)
10        nameChange();
11//        ageChange();
12    }
13
14    private static void ageChange() {
15        new Thread(() -> {
16            System.out.println("t1 start "/* + Instant.now()*/);
17            while (true) {
18                if (pet.getAge() == 5) {
19                    break;
20                }
21            }
22            System.out.println("t1 end "/* + Instant.now()*/);
23        }, "t1").start();
24        try {
25            TimeUnit.SECONDS.sleep(1);
26        } catch (InterruptedException e) {
27            e.printStackTrace();
28        }
29        new Thread(() -> {
30            Pet myPet = pet;
31            for (int i = 1; i <= 100; i++) {
32                int age = myPet.getAge();
33                myPet.setAge(++age);
34                /*try {
35                    Thread.sleep(1);
36                } catch (InterruptedException e) {
37                    e.printStackTrace();
38                }*/
39            }
40            System.out.println("t2 end "/* + Instant.now()*/);
41        }, "t2").start();
42    }
43
44    private static void nameChange() {
45        new Thread(() -> {
46            System.out.println("t1 start "/* + Instant.now()*/);
47            while (true) {
48                if ("xiaobai8".equals(pet.getName())) {
49                    break;
50                }
51            }
52            System.out.println("t1 end "/* + Instant.now()*/);
53        }, "t1").start();
54        try {
55            TimeUnit.SECONDS.sleep(1);
56        } catch (InterruptedException e) {
57            e.printStackTrace();
58        }
59        new Thread(() -> {
60            Pet myPet = pet;
61            for (int i = 1; i <= 10; i++) {
62                myPet.setName("xiaobai" + i);
63                /*try {
64                    TimeUnit.NANOSECONDS.sleep(1);
65                } catch (InterruptedException e) {
66                    e.printStackTrace();
67                }*/
68            }
69            System.out.println("t2 end "/* + Instant.now()*/);
70        }, "t2").start();
71    }
72
73    static class Pet {
74        String name;
75
76        int age;
77
78        public Pet(String name, int age) {
79            this.name = name;
80            this.age = age;
81        }
82
83        public String getName() {
84            return name;
85        }
86
87        public void setName(String name) {
88            this.name = name;
89        }
90
91        public int getAge() {
92            return age;
93        }
94
95        public void setAge(int age) {
96            this.age = age;
97        }
98    }
99}
100
101

CAS(也有人称作无锁优化,自旋锁,乐观锁)

首先要明确一点,自旋而"无锁"并不一定就比有锁快,因为自旋是在占用CPU,如果很多个线程一起自旋很久,想想都觉得效率很低…具体情况还是要具体分析.

概念 Compare And Swap/Set

下面是一段说明CAS原理的伪代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1/**
2 * nowValue:当前值,这个值是随时可能被修改的
3 * expectedValue:期望值,在调用casMethod前获取到的nowValue
4 * newValue:想要修改的新的值
5 */
6casMethod(nowValue, expectedValue, newValue){
7  // 如果当前值和期望值相等,说明本次修改期间没有其他线程修改,则赋值
8  if(nowValue == expectedValue) {
9    // 问题:此时,已经判定为其他线程没有修改,那么在赋值前会不会被其他线程修改了?
10    // 答:不会,cas操作是CPU原语支持,是CPU指令指令级别上的支持,中间不能被打断
11    nowValue = newValue;
12  } else {
13    // 如果当前值和期望值不等,说明本次修改期间有其他线程已经修改了值,
14    // 那么就再试一次或者直接返回修改失败的结果
15    // todo try again or return fail
16}
17
18

应用:AtomicInteger等atomic类

简单使用

下面这段代码,用了AtomicInteger后,自增部分(count++)不需要加同步锁,输出结果为100000


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1public class T01_AtomicInteger {
2   /*volatile*/ //int count1 = 0;
3  
4   AtomicInteger count = new AtomicInteger(0);
5
6   /*synchronized*/ void m() {
7       for (int i = 0; i < 10000; i++){
8           //if count1.get() < 1000
9           count.incrementAndGet(); //count1++
10      }
11  }
12
13  public static void main(String[] args) {
14      T01_AtomicInteger t = new T01_AtomicInteger();
15      List<Thread> threads = new ArrayList<Thread>();
16      for (int i = 0; i < 10; i++) {
17          threads.add(new Thread(t::m, "thread-" + i));
18      }
19      threads.forEach((o) -> o.start());
20      threads.forEach((o) -> {
21          try {
22              o.join();
23          } catch (InterruptedException e) {
24              e.printStackTrace();
25          }
26      });
27      System.out.println(t.count);
28  }
29}
30
31

简单的分析一波incrementAndGet()

跟踪下去,count.incrementAndGet():


1
2
3
4
5
6
7
8
9
10
1/**
2     * Atomically increments by one the current value.
3     *
4     * @return the updated value
5     */
6    public final int incrementAndGet() {
7        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
8    }
9
10

继续跟踪,到了Unsafe.class,这里compareAndSwapInt就是用到了CAS:


1
2
3
4
5
6
7
8
9
10
1   public final int getAndAddInt(Object var1, long var2, int var4) {
2        int var5;
3        do {
4            var5 = this.getIntVolatile(var1, var2);
5        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
6
7        return var5;
8    }
9
10

Unsafe.class比较复杂,直接操作JVM中的内存,类似C和C++的操作,比如分配一个对象不用new,而是直接写在内存中;操作对象的属性也可以根据"地址"(or指针)和偏移量定位.

ABA问题

什么是ABA?

在开始使用CAS修改一个值后,在CAS中判断nowValue == expectedValue前,假设有一个线程先把nowValue改成了其他值x,然后又把x改回了nowValue,那么这时候虽然nowValue等于expectedValue,但这个值其实已经被修改过了.

ABA会带来什么问题?

如果是AtomicInteger等数值类型,其实ABA是没有影响的,无所谓.
如果是一个对象引用,多数情况是不允许ABA情况的(比如小黄和其女朋友小白要结婚了,但是突然来了小黑抢走了小白,他们成为恋人,过了段时间又把小白还了回来,这婚多半就结不成了).

怎么避免ABA?

加上版本号version,做任何操作时都把version+1,同时比较nowValue和version
假设nowValue值为A,version为1,如果有ABA情况发生,即nowValue值变为B后又变回A,那么此时version是3,就可以根据version知道值已经被修改过了.
例如java.util.concurrent.atomic.AtomicStampedReference

Atomic的常见问题

高并发实现递增的三种方式?

  1. 同步static long count2 = 0L;

synchronized (lock) {
count2++;
}

  1. CAS原子操作AtomicLong count1 = new AtomicLong(0L);

count1.incrementAndGet();

  1. 分段锁LongAdder count3 = new LongAdder();

count3.increment();

下面代码是一个简单的测试(1000个线程,累加10W次),从中可以LongAdder最快,AtomicLong次之,synchronize最慢.
synchronize慢是因为会升级成重量级锁,向OS申请资源加锁.
注意:如果线程数较少,或者累加次数较少,LongAdder比AtomicLong慢.所以实际项目中,还是要看项目中的并发度如何.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
1public class T02_AtomicVsSyncVsLongAdder {
2    static long count2 = 0L;
3    static AtomicLong count1 = new AtomicLong(0L);
4    static LongAdder count3 = new LongAdder();
5
6    public static void main(String[] args) throws Exception {
7        Thread[] threads = new Thread[1000];
8        for (int i = 0; i < threads.length; i++) {
9            threads[i] = new Thread(() -> {
10                for (int k = 0; k < 100000; k++) {
11                    count1.incrementAndGet();
12                }
13            });
14        }
15        long start = System.currentTimeMillis();
16        for (Thread t : threads) {
17            t.start();
18        }
19        for (Thread t : threads) {
20            t.join();
21        }
22        long end = System.currentTimeMillis();
23        //TimeUnit.SECONDS.sleep(10);
24        System.out.println("Atomic: " + count1.get() + " time " + (end - start));
25        //-----------------------------------------------------------
26        Object lock = new Object();
27        for (int i = 0; i < threads.length; i++) {
28            threads[i] =
29                    new Thread(new Runnable() {
30                        @Override
31                        public void run() {
32                            for (int k = 0; k < 100000; k++) {
33                                synchronized (lock) {
34                                    count2++;
35                                }
36                            }
37                        }
38                    });
39        }
40
41        start = System.currentTimeMillis();
42        for (Thread t : threads) {
43            t.start();
44        }
45        for (Thread t : threads) {
46            t.join();
47        }
48        end = System.currentTimeMillis();
49
50        System.out.println("Sync: " + count2 + " time " + (end - start));
51
52        //----------------------------------
53        for (int i = 0; i < threads.length; i++) {
54            threads[i] =
55                    new Thread(() -> {
56                        for (int k = 0; k < 100000; k++) {
57                            count3.increment();
58                        }
59                    });
60        }
61        start = System.currentTimeMillis();
62        for (Thread t : threads) {
63            t.start();
64        }
65        for (Thread t : threads) {
66            t.join();
67        }
68        end = System.currentTimeMillis();
69        //TimeUnit.SECONDS.sleep(10);
70        System.out.println("LongAdder: " + count1.longValue() + " time " + (end - start));
71    }
72    
73}
74
75

我本地环境输出:


1
2
3
4
5
1Atomic: 100000000 time 1665
2Sync: 100000000 time 3930
3LongAdder: 100000000 time 406
4
5

为什么高并发下,LongAdder比AtomicLong快?

LongAdder内部实现类似"分段锁"(分段锁也是CAS操作),把值放在数组里,每个元素作为一个相对独立的部分,分散开线程的压力,最后再汇总起来.(有点类似于MapReduce思想)

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google AdSense 全面解析(申请+操作+作弊+忠告)

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索