Java高并发(三.三)–JDK并发容器

释放双眼,带上耳机,听听看~!

3. JDK并发容器

3.1 ConcurrentHashMap:线程安全的HashMap

Collections.synchronizedMap()


1
2
3
4
5
1public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
2   return new SynchronizedMap<>(m);
3}
4
5

SynchronizedMap 内包装了一个 Map,对Map的所有操作都需要获取mutex监视器.


1
2
3
4
5
6
7
1private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable {
2    private final Map<K,V> m;     // Backing Map
3    final Object      mutex;        // Object on which to synchronize
4   ...
5}
6
7

ConcurrentHashMap
参照锁的优化及注意事项

3.2 有关 List 的线程安全

底层由数组实现.

  • Vector:线程安全
  • ArrayList:线程不安全

底层由链表实现

  • LinkedList:线程不安全


1
2
3
4
5
6
7
1public static <T> List<T> synchronizedList(List<T> list) {
2   return (list instanceof RandomAccess ?
3           new SynchronizedRandomAccessList<>(list) :
4           new SynchronizedList<>(list));
5}
6
7

3.3 ConcurrentLinkedQueue:高效读写队列

ConcurrentLinkedQueue是并发环境下性能最好的队列.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1public class ConcurrentLinkedDeque<E> extends AbstractCollection<E> implements Deque<E>, java.io.Serializable {
2   private transient volatile Node<E> head;
3   private transient volatile Node<E> tail;
4   public E poll()           { return pollFirst(); }
5    public E peek()           { return peekFirst(); }
6   static final class Node<E> {
7        volatile Node<E> prev;
8        volatile E item;
9        volatile Node<E> next;
10        ...
11    }
12}
13
14

保证高并发下的安全性,对Node进行操作时,使用了CAS


1
2
3
4
5
6
7
8
9
10
11
1boolean casItem(E cmp, E val) {
2   return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
3}
4void lazySetNext(Node<E> val) {
5   UNSAFE,putOrderedObject(this, nextOffset, val);
6}
7boolean casNext(Node<E> cmp, Node<E> val) (
8   return UNSAFE,compareAndSwapObject(this, nextOffset, cmp, val);
9}
10
11

head
head表示链表的头部,永远不会为null,可通过head和succ()遍历链表.


1
2
3
4
5
6
1final Node<E> succ(Node<E> p) {
2    Node<E> q = p.next;
3    return (p == q) ? first() : q;
4}
5
6

succ()会返回后续节点,或者第一个节点(
哨兵)
tail
tail一般指向尾节点,但tail并不是实时更新的.
Java高并发(三.三)--JDK并发容器


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1public boolean offer(E e) {
2   checkNotNull(e);
3   final Node<E> newNode = new Node<E>(e);
4
5   for (Node<E> t = tail, p = t;;) {
6       Node<E> q = p.next;
7       if (q == null) {
8       // P 是最后一个节点
9           if (p.casNext(nullf newNode)) {
10          //每两次更新一下 tail
11              if (p != t)
12                  casTail(t, newNode);
13              return true;
14          }
15          //CAS 竞争失败, 再次尝试
16      }
17      else if (p == q)
18          //遇到哨兵节点, 从都 head 开始遍历
19          //但如果 tail 被修改, 则使用 tail (因为可能被修改正确了)
20          p = (t != (t = tail)) ? t : head;
21       else
22          //取下一个节点或者最后一个节点
23          p = (p != t & & t != (t = tail)) ? t : q;
24  }      
25}
26
27

代码分析
为保证并发环境的安全性,添加元素到队尾时,通过在"死循环"中使用CAS添加元素

  1. 队列为空时,head=tail,则q=null,p.casNext(null,newNode)将新节点链接到队尾

判断p!=t
p不是尾节点,casTail(t,newNode),更新尾节点,返回true.
p是尾节点,不需要更新尾节点
(尾节点隔次更新)

  1. 当前节点为哨兵

(next指向自身),p = (t != (t = tail)) ? t : head;
若t不为尾节点
(并发更新了尾节点),将p设置为尾节点
若尾节点未改变,将p设置为头结点,从头遍历

  1. 其他情况,p = (p != t & & t != (t = tail)) ? t : q;

若p为尾节点或t不为尾节点,p=p.next查找下一个节点
(哨兵导致从头查找)
否则,将p设置为为尾节点

  1. 继续循环,保证节点被加入队列

哨兵的产生
哨兵:节点的next指向节点本身.一般表示被删除的节点.


1
2
3
4
5
1ConcurrentLinkedQueue<String> q=new ConcurrentLinkedQueue<String>();
2q.add("l");
3q.poll();
4
5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1public E poll() {
2   restartFromHead:
3   for (;;) {
4       for (Node<E> h = head, p = h, q;;) {
5           E item = p.item;
6           if (item != null && p.casItem(item, null)) {
7               if (p != h)
8                   updateHead(h,((q = p.next) != null) ? q : p);
9               return item;
10          }
11          else if ((q = p * next) == null) {
12              updateHead(hr p);
13              return null;
14          }
15          else if (p == q)
16              continue restartFromHead;
17          else
18              p = q
19      }
20  }
21}
22
23

代码分析

  1. 队列中只有一个元素时,tail=head.head的item为空,且head.next不为空,执行p=q(p=p.next)
  2. 第一个元素item不为null,进入第7行

p不是头结点,执行第8行,updateHead(h, p);将p设置为head,h设置为哨兵.

CAS利弊分析

  • :保证线程安全的情况下,相比阻塞,性能飞速提升
  • :可能存在ABA问题,且程序设计和实现难度增大

3.4 高效读取:不变模式下的CopyOnWriteArrayList类

CopyOnWriteArrayList类

  • 读读不冲突
  • 读写不冲突
  • 写写需要同步等待

实现原理

  • 对数据修改时,并不修改原内容.而是对原数据进行一次复制

(保证数据一致性,数据要么全改,要么全不改),

  • 将修改的内容写入副本,再用副本替换原数据.

实现代码

  • 读取实现

1
2
3
4
5
6
7
8
9
10
1private volatile transient Object[] array;
2public E get(int index) {
3   return get(getArray(), index);
4final Object[] getArray() {
5   return array;
6private E get(Object[] a, int index) {
7   return (E) a[index];
8}
9
10

1
2
1 *代码分析*
2
  • array有volatile修饰,修改后立即可见
    • 读取数据没有任何的同步控制和锁操作
  1. 写实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1public boolean add(E e) {
2   final ReentrantLock lock = this.lock;
3   lock.lock();
4   try {
5       Object[] elements = getArray() ;
6       int len = elements.length;
7       Object[] newElements = Arrays.copyOf(elementsf len + 1);//创建副本
8       newElements[len] = e;//修改副本
9       setArray(newElements);//副本替代原数据
10      return true;
11  } finally {
12      lock.unlock();
13  }
14}
15
16

代码分析

  • 写入操作使用了锁,但此锁只用于写入控制,不会影响读操作
  • 若一个事务中包含2个读操作,可能会出现

不可重复读的问题

3.5 数据共享通道:BlockingQueue

若线程A与线程B需要通信,且A不需要知道B的存在.可使用BlockQueue作为"信箱"

  • A将要传递的信息存在信箱中
  • B从信箱中获取信息

信箱对信息的处理

  1. 循环监控"信箱",有信息则报告(循环周期难以确定,且容易造成资源浪费)
  2. 服务线程在"信箱"为空时,等待信息到来

BlockingQueue接口有2个常用的实现类

  1. ArrayBlockingQueue(之后重要介绍)
  2. LinkedBlockingQueue

ArrayBlockingQueue底层实现


1
2
3
4
5
6
7
8
1final Object[] items;//元素存储数组\
2int takeIndex;//队头
3int putIndex;//队尾
4final ReentrantLock lock;//锁
5private final Condijtion notEmpty;//非空条件
6private final Condition notFull;//未满条件
7
8

ArrayBlockingQueue采用items的循环队列,takeIndex为队头,putIndex为队尾

压入元素


1
2
3
4
5
6
7
8
1boolean offer(E e)
2   若队列已满,返回false
3   若队列未满,元素入队,返回true
4void put(E e) throws InterruptedException
5   若队列已满,则阻塞
6   若队列为满,元素入队
7
8

put()具体实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1public void put(E e) throws InterruptedException {
2   checkNotNuli(e);
3   final ReentrantLock lock = this.lock;
4   lock.locklnterruptibly();
5   try {
6       while (count == items ,length)
7           notFull.await{);//若队列已满,未满等待
8       insert(e);
9   } finally{
10      lock.unlock();
11}
12private void insert(E x) {
13  items[putIndex] = x;
14  putIndex = inc(putlndex);
15  ++count;
16  notEmpty,signal();//唤醒非空等待的线程
17}
18
19

弹出元素


1
2
3
4
5
6
1E poll(long timeout, TimeUnit unit) throws InterruptedException
2   有限时间等待
3E take() throws InterruptedException
4   永久等待,除非被中断
5
6

take()具体实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1public E take{) throws InterruptedException (
2   final ReentrantLock lock = this.lock;
3   lock.locklnterruptibly();
4   try {
5       while (count==0)
6           notEmpty.await(};//若队列为空,则非空等待
7       return extract();
8   } finally {
9       lock.unlock();
10  }
11}
12private E extract() {
13  final Object[] items = this.items;
14  E x = this.<E>cast(items[takelndex]);
15  items[takeIndex]=null;
16  takelndex = inc(takelndex);
17  --count;
18  notFull.signal();//唤醒未满等待的线程
19  return x;
20}
21
22

随机数据结构:跳表

跳表数据结构

  • 跳表中元素是有序的
  • 跳表中高层是底层的子集

跳表与平衡树

  • 平衡树和跳表都能实现快速查找,O(logN)
  • 平衡树中元素的插入和删除需要全局调整

跳表中元素的插入和删除只需局部调整

跳表与hash表

  • 都用于实现Map
  • hash表不会保存元素的顺序

跳表中元素是有序的

跳表元素查找过程

  1. 在本层找到不大于target的key
  2. 从下一层的key开始查找
  3. 重复1,2直至找到key,或失败

跳表的实现:ConcurrentSkipListMap


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
2    implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
3   private transient volatile HeadIndex<K,V> head;
4  
5   static final class HeadIndex<K,V> extends Index<K,V> {
6        final int level;//记录层数
7        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
8            super(node, down, right);
9            this.level = level;
10        }
11    }
12    
13    static class Index<K,V> {
14        final Node<K,V> node;
15        final Index<K,V> down;
16        volatile Index<K,V> right;
17    }
18    
19  static final class Node<K,V> {
20      final K key;
21        volatile Object value;
22        volatile Node<K,V> next;
23        boolean casValue(Object cmp, Object val){...}
24        boolean casNext(Node<K,V> cmp, Node<K,V> val){...}
25       }
26}
27
28

问题记录

跳表中index数据结构中down不需要volatile修饰

给TA打赏
共{{data.count}}人
人已打赏
安全经验

海外中文博客与Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索