3. JDK并发容器
3.1 ConcurrentHashMap:线程安全的HashMap
Collections.synchronizedMap()
1
2
3
4
5 1public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
2 return new SynchronizedMap<>(m);
3}
4
5
SynchronizedMap 内包装了一个 Map,对Map的所有操作都需要获取mutex监视器.
1
2
3
4
5
6
7 1private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable {
2 private final Map<K,V> m; // Backing Map
3 final Object mutex; // Object on which to synchronize
4 ...
5}
6
7
ConcurrentHashMap
参照锁的优化及注意事项
3.2 有关 List 的线程安全
底层由数组实现.
- Vector:线程安全
- ArrayList:线程不安全
底层由链表实现
-
LinkedList:线程不安全
1
2
3
4
5
6
7 1public static <T> List<T> synchronizedList(List<T> list) {
2 return (list instanceof RandomAccess ?
3 new SynchronizedRandomAccessList<>(list) :
4 new SynchronizedList<>(list));
5}
6
7
3.3 ConcurrentLinkedQueue:高效读写队列
ConcurrentLinkedQueue是并发环境下性能最好的队列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1public class ConcurrentLinkedDeque<E> extends AbstractCollection<E> implements Deque<E>, java.io.Serializable {
2 private transient volatile Node<E> head;
3 private transient volatile Node<E> tail;
4 public E poll() { return pollFirst(); }
5 public E peek() { return peekFirst(); }
6 static final class Node<E> {
7 volatile Node<E> prev;
8 volatile E item;
9 volatile Node<E> next;
10 ...
11 }
12}
13
14
保证高并发下的安全性,对Node进行操作时,使用了CAS
1
2
3
4
5
6
7
8
9
10
11 1boolean casItem(E cmp, E val) {
2 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
3}
4void lazySetNext(Node<E> val) {
5 UNSAFE,putOrderedObject(this, nextOffset, val);
6}
7boolean casNext(Node<E> cmp, Node<E> val) (
8 return UNSAFE,compareAndSwapObject(this, nextOffset, cmp, val);
9}
10
11
head
head表示链表的头部,永远不会为null,可通过head和succ()遍历链表.
1
2
3
4
5
6 1final Node<E> succ(Node<E> p) {
2 Node<E> q = p.next;
3 return (p == q) ? first() : q;
4}
5
6
succ()会返回后续节点,或者第一个节点(
哨兵)
tail
tail一般指向尾节点,但tail并不是实时更新的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 1public boolean offer(E e) {
2 checkNotNull(e);
3 final Node<E> newNode = new Node<E>(e);
4
5 for (Node<E> t = tail, p = t;;) {
6 Node<E> q = p.next;
7 if (q == null) {
8 // P 是最后一个节点
9 if (p.casNext(nullf newNode)) {
10 //每两次更新一下 tail
11 if (p != t)
12 casTail(t, newNode);
13 return true;
14 }
15 //CAS 竞争失败, 再次尝试
16 }
17 else if (p == q)
18 //遇到哨兵节点, 从都 head 开始遍历
19 //但如果 tail 被修改, 则使用 tail (因为可能被修改正确了)
20 p = (t != (t = tail)) ? t : head;
21 else
22 //取下一个节点或者最后一个节点
23 p = (p != t & & t != (t = tail)) ? t : q;
24 }
25}
26
27
代码分析
为保证并发环境的安全性,添加元素到队尾时,通过在"死循环"中使用CAS添加元素
- 队列为空时,head=tail,则q=null,p.casNext(null,newNode)将新节点链接到队尾
判断p!=t
p不是尾节点,casTail(t,newNode),更新尾节点,返回true.
p是尾节点,不需要更新尾节点
(尾节点隔次更新)
- 当前节点为哨兵
(next指向自身),p = (t != (t = tail)) ? t : head;
若t不为尾节点
(并发更新了尾节点),将p设置为尾节点
若尾节点未改变,将p设置为头结点,从头遍历
- 其他情况,p = (p != t & & t != (t = tail)) ? t : q;
若p为尾节点或t不为尾节点,p=p.next查找下一个节点
(哨兵导致从头查找)
否则,将p设置为为尾节点
- 继续循环,保证节点被加入队列
哨兵的产生
哨兵:节点的next指向节点本身.一般表示被删除的节点.
1
2
3
4
5 1ConcurrentLinkedQueue<String> q=new ConcurrentLinkedQueue<String>();
2q.add("l");
3q.poll();
4
5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1public E poll() {
2 restartFromHead:
3 for (;;) {
4 for (Node<E> h = head, p = h, q;;) {
5 E item = p.item;
6 if (item != null && p.casItem(item, null)) {
7 if (p != h)
8 updateHead(h,((q = p.next) != null) ? q : p);
9 return item;
10 }
11 else if ((q = p * next) == null) {
12 updateHead(hr p);
13 return null;
14 }
15 else if (p == q)
16 continue restartFromHead;
17 else
18 p = q
19 }
20 }
21}
22
23
代码分析
- 队列中只有一个元素时,tail=head.head的item为空,且head.next不为空,执行p=q(p=p.next)
- 第一个元素item不为null,进入第7行
p不是头结点,执行第8行,updateHead(h, p);将p设置为head,h设置为哨兵.
CAS利弊分析
- 利:保证线程安全的情况下,相比阻塞,性能飞速提升
- 弊:可能存在ABA问题,且程序设计和实现难度增大
3.4 高效读取:不变模式下的CopyOnWriteArrayList类
CopyOnWriteArrayList类
- 读读不冲突
- 读写不冲突
- 写写需要同步等待
实现原理
- 对数据修改时,并不修改原内容.而是对原数据进行一次复制
(保证数据一致性,数据要么全改,要么全不改),
- 将修改的内容写入副本,再用副本替换原数据.
实现代码
- 读取实现
1
2
3
4
5
6
7
8
9
10 1private volatile transient Object[] array;
2public E get(int index) {
3 return get(getArray(), index);
4final Object[] getArray() {
5 return array;
6private E get(Object[] a, int index) {
7 return (E) a[index];
8}
9
10
1
2 1 *代码分析*
2
- array有volatile修饰,修改后立即可见
- 读取数据没有任何的同步控制和锁操作
- 写实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public boolean add(E e) {
2 final ReentrantLock lock = this.lock;
3 lock.lock();
4 try {
5 Object[] elements = getArray() ;
6 int len = elements.length;
7 Object[] newElements = Arrays.copyOf(elementsf len + 1);//创建副本
8 newElements[len] = e;//修改副本
9 setArray(newElements);//副本替代原数据
10 return true;
11 } finally {
12 lock.unlock();
13 }
14}
15
16
代码分析
- 写入操作使用了锁,但此锁只用于写入控制,不会影响读操作
- 若一个事务中包含2个读操作,可能会出现
不可重复读的问题
3.5 数据共享通道:BlockingQueue
若线程A与线程B需要通信,且A不需要知道B的存在.可使用BlockQueue作为"信箱"
- A将要传递的信息存在信箱中
- B从信箱中获取信息
信箱对信息的处理
- 循环监控"信箱",有信息则报告(循环周期难以确定,且容易造成资源浪费)
- 服务线程在"信箱"为空时,等待信息到来
BlockingQueue接口有2个常用的实现类
- ArrayBlockingQueue(之后重要介绍)
- LinkedBlockingQueue
ArrayBlockingQueue底层实现
1
2
3
4
5
6
7
8 1final Object[] items;//元素存储数组\
2int takeIndex;//队头
3int putIndex;//队尾
4final ReentrantLock lock;//锁
5private final Condijtion notEmpty;//非空条件
6private final Condition notFull;//未满条件
7
8
ArrayBlockingQueue采用items的循环队列,takeIndex为队头,putIndex为队尾
压入元素
1
2
3
4
5
6
7
8 1boolean offer(E e)
2 若队列已满,返回false
3 若队列未满,元素入队,返回true
4void put(E e) throws InterruptedException
5 若队列已满,则阻塞
6 若队列为满,元素入队
7
8
put()具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1public void put(E e) throws InterruptedException {
2 checkNotNuli(e);
3 final ReentrantLock lock = this.lock;
4 lock.locklnterruptibly();
5 try {
6 while (count == items ,length)
7 notFull.await{);//若队列已满,未满等待
8 insert(e);
9 } finally{
10 lock.unlock();
11}
12private void insert(E x) {
13 items[putIndex] = x;
14 putIndex = inc(putlndex);
15 ++count;
16 notEmpty,signal();//唤醒非空等待的线程
17}
18
19
弹出元素
1
2
3
4
5
6 1E poll(long timeout, TimeUnit unit) throws InterruptedException
2 有限时间等待
3E take() throws InterruptedException
4 永久等待,除非被中断
5
6
take()具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1public E take{) throws InterruptedException (
2 final ReentrantLock lock = this.lock;
3 lock.locklnterruptibly();
4 try {
5 while (count==0)
6 notEmpty.await(};//若队列为空,则非空等待
7 return extract();
8 } finally {
9 lock.unlock();
10 }
11}
12private E extract() {
13 final Object[] items = this.items;
14 E x = this.<E>cast(items[takelndex]);
15 items[takeIndex]=null;
16 takelndex = inc(takelndex);
17 --count;
18 notFull.signal();//唤醒未满等待的线程
19 return x;
20}
21
22
随机数据结构:跳表
跳表数据结构
- 跳表中元素是有序的
- 跳表中高层是底层的子集
跳表与平衡树
- 平衡树和跳表都能实现快速查找,O(logN)
- 平衡树中元素的插入和删除需要全局调整
跳表中元素的插入和删除只需局部调整
跳表与hash表
- 都用于实现Map
- hash表不会保存元素的顺序
跳表中元素是有序的
跳表元素查找过程
- 在本层找到不大于target的key
- 从下一层的key开始查找
- 重复1,2直至找到key,或失败
跳表的实现:ConcurrentSkipListMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
2 implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
3 private transient volatile HeadIndex<K,V> head;
4
5 static final class HeadIndex<K,V> extends Index<K,V> {
6 final int level;//记录层数
7 HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
8 super(node, down, right);
9 this.level = level;
10 }
11 }
12
13 static class Index<K,V> {
14 final Node<K,V> node;
15 final Index<K,V> down;
16 volatile Index<K,V> right;
17 }
18
19 static final class Node<K,V> {
20 final K key;
21 volatile Object value;
22 volatile Node<K,V> next;
23 boolean casValue(Object cmp, Object val){...}
24 boolean casNext(Node<K,V> cmp, Node<K,V> val){...}
25 }
26}
27
28
问题记录
跳表中index数据结构中down不需要volatile修饰