link-JAVA多线程与高并发系列[前言,大纲,目录]
volatile
首先,大佬(马老师)说,这个volatile在工程中能不用就不用,因为这玩意不好掌控,没有什么资料.
1. 保证线程可见性(synchronize也有这效果
设一个变量a,如果没有加volatile,多线程情况下,在线程t1修改了a的值后,另一个线程t2读到的仍然是旧值;如果加了volatile修饰,t2就可以马上读到t1修改后的值.
因为如下:(本质依靠的是MESI,CPU的缓存一致性协议)
首先变量a保存在heap堆内存中,堆内存是各线程共享内存;而且每个线程都有自己的专属工作内存.
当两个线程,t1和t2去访问共享内存的变量a时,他们会各自把a复制一份到自己的专属内存.
如果变量a没有加volatile,这时候如果线程t1修改了变量的值,(t1应该会把变动马上同步回共享内存),但是线程t2什么时候去共享内存再次读取同步变量a,不好控制,如果线程2没有去共享内存再次读取同步变量a,那么就看不见线程1的修改后的结果.
看一个保证线程可见性的例子:
在一秒后,main线程修改了变量running的值,
没有加volatile时,程序会过很久才打印"m end!";
加了volatile后,会在一秒后马上打印"m end!"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1import java.util.concurrent.TimeUnit;
2
3public class T01_HelloVolatile {
4 // 对比一下有无volatile的运行结果
5 /*volatile*/ boolean running = true;
6 void m() {
7 System.out.println("m start");
8 while(running) {
9 }
10 System.out.println("m end!");
11 }
12
13 public static void main(String[] args) {
14 T01_HelloVolatile t = new T01_HelloVolatile();
15
16 new Thread(t::m, "t1").start();
17
18 try {
19 TimeUnit.SECONDS.sleep(1);
20 } catch (InterruptedException e) {
21 e.printStackTrace();
22 }
23
24 t.running = false;
25 }
26
27}
28
29
2. 禁止指令重排序(也和CPU有关)(synchronize无此效果)
(其底层原理是加了读屏障loadfence和写屏障storefence原语指令)
以前的CPU是"串联"执行指令;现代CPU为了提高效率,当第一个指令执行到中间时,就开始执行第二个指令.(原来像是平铺的水泥板,现在像是楼梯).
为了利用CPU的这种高效架构,编译器(compiler)把源码编译时,可能会将指令重新排序,据说这样会把速度提高很多.
指令重排序可能带来问题的场景举例:DCL单例模式
DCL双重检查锁实现的单例模式(静态实例变量加volatile)
new一个对象的时候,分为三步:
- 给这个对象申请内存,给成员变量赋默认值(比如int的默认值为0);
- 给这个对象的成员变量初始化(比如我们写的int a=8)
- 把申请的内存赋值给对象的"引用"
正常情况下是1,2,3顺序执行;如果发生指令重排序的话,会1,3,2这样顺序执行.
重排序后,在特别大的并发量下,可能会在第1,3步后,还没有执行第2步前,这时该引用已经!=null了,但是成员变量还没有初始化,这个时候对象被其他线程使用就会出问题.
DCL单例模式举例:(这个属于懒汉模式,一般用不到;直接用恶汉单例就行啦,没必要用懒汉)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1public class Manager {
2 private static volatile Manager INSTANCE = null;
3
4 // 私有构造方法
5 private Manager() {
6
7 }
8
9 public static Manager getInstance() {
10 if (INSTANCE == null) {
11 synchronized (Manager.class) {
12 if (INSTANCE == null) {
13 // 初始化INSTANCE,加载所需资源等
14 INSTANCE = new Manager();
15 }
16 }
17 }
18 return INSTANCE;
19 }
20}
21
22
3. 不能保证原子性
做个测试证明一下,如果volatile可以保证原子性,那么下面这段代码应该输出100000;反之则证明volatile不能保证原子性.
如果想保证原子性,可以在方法m()上加个synchronize,或者把count++这段代码用synchronize包起来.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1public class T04_VolatileNotSync {
2 volatile int count = 0;
3 void m() {
4 for(int i=0; i<10000; i++) {
5 count++;
6 /*
7 synchronize(this){
8 count++;
9 }
10 */
11 }
12 }
13
14 public static void main(String[] args) {
15 T04_VolatileNotSync t = new T04_VolatileNotSync();
16 List<Thread> threads = new ArrayList<Thread>();
17 for(int i=0; i<10; i++) {
18 threads.add(new Thread(t::m, "thread-"+i));
19 }
20 threads.forEach((o)->o.start());
21 threads.forEach((o)->{
22 try {
23 o.join();
24 } catch (InterruptedException e) {
25 e.printStackTrace();
26 }
27 });
28 System.out.println(t.count);
29 }
30}
31
32
4.volatile修饰引用类型变量的可见性?
有些文章会写到,volatile如果修饰引用类型变量,那么"引用"的地址的改变对其他线程是可见的,但是引用的对象的属性变化对其他线程不可见.
如果你写一个例子,经过一些尝试,发现普通对象的属性的改变,volatile能保证其变化是可见的.但是!!!
但是!!!大量的测试后,我发现,不是每次都可见,特别是对象的属性变化很多次的时候!
所以,结论是,volatile的确不能保证变量指向的对象的属性的可见性.
又但是!如果修改对象属性的线程,sleep了一下,哪怕一纳秒,那么就能保证可见了,真是奇怪,回头有时间研究下JVM没准能搞明白.(也有可能是sleep的情况下,测试次数不够多)
下面是测试的例子,可以看出,volatile修饰的引用类型变量,如果修改的线程(生产者)没有sleep,其他线程不是每次都可见.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 1/**
2 * @author liweizhi
3 * @date 2020/3/4 18:18
4 */
5public class VolatileObject {
6 volatile static Pet pet = new Pet("dahuang", 1);
7
8 public static void main(String[] args) {
9 // 多运行几次无论是ageChange还是nameChange,会发现有时候t1不会结束(如果t2不sleep)
10 nameChange();
11// ageChange();
12 }
13
14 private static void ageChange() {
15 new Thread(() -> {
16 System.out.println("t1 start "/* + Instant.now()*/);
17 while (true) {
18 if (pet.getAge() == 5) {
19 break;
20 }
21 }
22 System.out.println("t1 end "/* + Instant.now()*/);
23 }, "t1").start();
24 try {
25 TimeUnit.SECONDS.sleep(1);
26 } catch (InterruptedException e) {
27 e.printStackTrace();
28 }
29 new Thread(() -> {
30 Pet myPet = pet;
31 for (int i = 1; i <= 100; i++) {
32 int age = myPet.getAge();
33 myPet.setAge(++age);
34 /*try {
35 Thread.sleep(1);
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 }*/
39 }
40 System.out.println("t2 end "/* + Instant.now()*/);
41 }, "t2").start();
42 }
43
44 private static void nameChange() {
45 new Thread(() -> {
46 System.out.println("t1 start "/* + Instant.now()*/);
47 while (true) {
48 if ("xiaobai8".equals(pet.getName())) {
49 break;
50 }
51 }
52 System.out.println("t1 end "/* + Instant.now()*/);
53 }, "t1").start();
54 try {
55 TimeUnit.SECONDS.sleep(1);
56 } catch (InterruptedException e) {
57 e.printStackTrace();
58 }
59 new Thread(() -> {
60 Pet myPet = pet;
61 for (int i = 1; i <= 10; i++) {
62 myPet.setName("xiaobai" + i);
63 /*try {
64 TimeUnit.NANOSECONDS.sleep(1);
65 } catch (InterruptedException e) {
66 e.printStackTrace();
67 }*/
68 }
69 System.out.println("t2 end "/* + Instant.now()*/);
70 }, "t2").start();
71 }
72
73 static class Pet {
74 String name;
75
76 int age;
77
78 public Pet(String name, int age) {
79 this.name = name;
80 this.age = age;
81 }
82
83 public String getName() {
84 return name;
85 }
86
87 public void setName(String name) {
88 this.name = name;
89 }
90
91 public int getAge() {
92 return age;
93 }
94
95 public void setAge(int age) {
96 this.age = age;
97 }
98 }
99}
100
101
CAS(也有人称作无锁优化,自旋锁,乐观锁)
首先要明确一点,自旋而"无锁"并不一定就比有锁快,因为自旋是在占用CPU,如果很多个线程一起自旋很久,想想都觉得效率很低…具体情况还是要具体分析.
概念 Compare And Swap/Set
下面是一段说明CAS原理的伪代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1/**
2 * nowValue:当前值,这个值是随时可能被修改的
3 * expectedValue:期望值,在调用casMethod前获取到的nowValue
4 * newValue:想要修改的新的值
5 */
6casMethod(nowValue, expectedValue, newValue){
7 // 如果当前值和期望值相等,说明本次修改期间没有其他线程修改,则赋值
8 if(nowValue == expectedValue) {
9 // 问题:此时,已经判定为其他线程没有修改,那么在赋值前会不会被其他线程修改了?
10 // 答:不会,cas操作是CPU原语支持,是CPU指令指令级别上的支持,中间不能被打断
11 nowValue = newValue;
12 } else {
13 // 如果当前值和期望值不等,说明本次修改期间有其他线程已经修改了值,
14 // 那么就再试一次或者直接返回修改失败的结果
15 // todo try again or return fail
16}
17
18
应用:AtomicInteger等atomic类
简单使用
下面这段代码,用了AtomicInteger后,自增部分(count++)不需要加同步锁,输出结果为100000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1public class T01_AtomicInteger {
2 /*volatile*/ //int count1 = 0;
3
4 AtomicInteger count = new AtomicInteger(0);
5
6 /*synchronized*/ void m() {
7 for (int i = 0; i < 10000; i++){
8 //if count1.get() < 1000
9 count.incrementAndGet(); //count1++
10 }
11 }
12
13 public static void main(String[] args) {
14 T01_AtomicInteger t = new T01_AtomicInteger();
15 List<Thread> threads = new ArrayList<Thread>();
16 for (int i = 0; i < 10; i++) {
17 threads.add(new Thread(t::m, "thread-" + i));
18 }
19 threads.forEach((o) -> o.start());
20 threads.forEach((o) -> {
21 try {
22 o.join();
23 } catch (InterruptedException e) {
24 e.printStackTrace();
25 }
26 });
27 System.out.println(t.count);
28 }
29}
30
31
简单的分析一波incrementAndGet()
跟踪下去,count.incrementAndGet():
1
2
3
4
5
6
7
8
9
10 1/**
2 * Atomically increments by one the current value.
3 *
4 * @return the updated value
5 */
6 public final int incrementAndGet() {
7 return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
8 }
9
10
继续跟踪,到了Unsafe.class,这里compareAndSwapInt就是用到了CAS:
1
2
3
4
5
6
7
8
9
10 1 public final int getAndAddInt(Object var1, long var2, int var4) {
2 int var5;
3 do {
4 var5 = this.getIntVolatile(var1, var2);
5 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
6
7 return var5;
8 }
9
10
Unsafe.class比较复杂,直接操作JVM中的内存,类似C和C++的操作,比如分配一个对象不用new,而是直接写在内存中;操作对象的属性也可以根据"地址"(or指针)和偏移量定位.
ABA问题
什么是ABA?
在开始使用CAS修改一个值后,在CAS中判断nowValue == expectedValue前,假设有一个线程先把nowValue改成了其他值x,然后又把x改回了nowValue,那么这时候虽然nowValue等于expectedValue,但这个值其实已经被修改过了.
ABA会带来什么问题?
如果是AtomicInteger等数值类型,其实ABA是没有影响的,无所谓.
如果是一个对象引用,多数情况是不允许ABA情况的(比如小黄和其女朋友小白要结婚了,但是突然来了小黑抢走了小白,他们成为恋人,过了段时间又把小白还了回来,这婚多半就结不成了).
怎么避免ABA?
加上版本号version,做任何操作时都把version+1,同时比较nowValue和version
假设nowValue值为A,version为1,如果有ABA情况发生,即nowValue值变为B后又变回A,那么此时version是3,就可以根据version知道值已经被修改过了.
例如java.util.concurrent.atomic.AtomicStampedReference
Atomic的常见问题
高并发实现递增的三种方式?
- 同步static long count2 = 0L;
synchronized (lock) {
count2++;
}
- CAS原子操作AtomicLong count1 = new AtomicLong(0L);
count1.incrementAndGet();
- 分段锁LongAdder count3 = new LongAdder();
count3.increment();
下面代码是一个简单的测试(1000个线程,累加10W次),从中可以LongAdder最快,AtomicLong次之,synchronize最慢.
synchronize慢是因为会升级成重量级锁,向OS申请资源加锁.
注意:如果线程数较少,或者累加次数较少,LongAdder比AtomicLong慢.所以实际项目中,还是要看项目中的并发度如何.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 1public class T02_AtomicVsSyncVsLongAdder {
2 static long count2 = 0L;
3 static AtomicLong count1 = new AtomicLong(0L);
4 static LongAdder count3 = new LongAdder();
5
6 public static void main(String[] args) throws Exception {
7 Thread[] threads = new Thread[1000];
8 for (int i = 0; i < threads.length; i++) {
9 threads[i] = new Thread(() -> {
10 for (int k = 0; k < 100000; k++) {
11 count1.incrementAndGet();
12 }
13 });
14 }
15 long start = System.currentTimeMillis();
16 for (Thread t : threads) {
17 t.start();
18 }
19 for (Thread t : threads) {
20 t.join();
21 }
22 long end = System.currentTimeMillis();
23 //TimeUnit.SECONDS.sleep(10);
24 System.out.println("Atomic: " + count1.get() + " time " + (end - start));
25 //-----------------------------------------------------------
26 Object lock = new Object();
27 for (int i = 0; i < threads.length; i++) {
28 threads[i] =
29 new Thread(new Runnable() {
30 @Override
31 public void run() {
32 for (int k = 0; k < 100000; k++) {
33 synchronized (lock) {
34 count2++;
35 }
36 }
37 }
38 });
39 }
40
41 start = System.currentTimeMillis();
42 for (Thread t : threads) {
43 t.start();
44 }
45 for (Thread t : threads) {
46 t.join();
47 }
48 end = System.currentTimeMillis();
49
50 System.out.println("Sync: " + count2 + " time " + (end - start));
51
52 //----------------------------------
53 for (int i = 0; i < threads.length; i++) {
54 threads[i] =
55 new Thread(() -> {
56 for (int k = 0; k < 100000; k++) {
57 count3.increment();
58 }
59 });
60 }
61 start = System.currentTimeMillis();
62 for (Thread t : threads) {
63 t.start();
64 }
65 for (Thread t : threads) {
66 t.join();
67 }
68 end = System.currentTimeMillis();
69 //TimeUnit.SECONDS.sleep(10);
70 System.out.println("LongAdder: " + count1.longValue() + " time " + (end - start));
71 }
72
73}
74
75
我本地环境输出:
1
2
3
4
5 1Atomic: 100000000 time 1665
2Sync: 100000000 time 3930
3LongAdder: 100000000 time 406
4
5
为什么高并发下,LongAdder比AtomicLong快?
LongAdder内部实现类似"分段锁"(分段锁也是CAS操作),把值放在数组里,每个元素作为一个相对独立的部分,分散开线程的压力,最后再汇总起来.(有点类似于MapReduce思想)