还剩18页未读,继续阅读
本资源只提供10页预览,全部文档请下载后查看!喜欢就下载吧,查找使用更方便
文本内容:
在J__a多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列J__a提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列注什么叫线程安全?这个首先要明确线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的如果由于多线程的访问(比如修改、遍历、查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类就不是线程安全的今天就聊聊这两种Queue,本文分为以下两个部分,用分割线分开 BlockingQueue 阻塞算法ConcurrentLinkedQueue,非阻塞算法首先来看看BlockingQueue Queue是什么就不需要多说了吧,一句话队列是先进先出相对的,栈是后进先出如果不熟悉的话先找本基础的数据结构的书看看吧 BlockingQueue,顾名思义,“阻塞队列”可以提供阻塞功能的队列 首先,看看BlockingQueue提供的常用方法 可能报异常返回布尔值可能阻塞设定等待时间入队addeoffereputeofferetimeoutunit出队removepolltakepolltimeoutunit查看elementpeek无无从上表可以很明显看出每个方法的作用,这个不用多说我想说的是 adderemoveelement方法不会阻塞线程当不满足约束条件时,会抛出IllegalStateEx__ption异常例如当队列被元素填满后,再调用adde,则会抛出异常offerepollpeek方法即不会阻塞线程,也不会抛出异常例如当队列被元素填满后,再调用offere,则不会插入元素,函数返回false要想要实现阻塞功能,需要调用putetake方法当不满足约束条件时,会阻塞线程好,上点源码你就更明白了以ArrayBlockingQueue类为例 对于第一类方法,很明显如果操作不成功就抛异常而且可以看到其实调用的是第二类的方法,___?因为第二类方法返回boolean啊 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public boolean addEe{
1.if offere
1.return true;
1.else
1.throw new IllegalStateEx__ptionQueuefull;//队列已满,抛异常
1.}
1.
1.public Eremove{
1.Ex=poll;
1.if x!= null
1.return x;
1.else
1.throw new NoSuchElementEx__ption;//队列为空,抛异常
1.}对于第二类方法,很标准的ReentrantLock使用方式(不熟悉的朋友看一下我上一篇帖子吧),另外对于insert和extract的实现没啥好说的注先不看阻塞与否,这ReentrantLock的使用方式就能说明这个类是线程安全类 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public boolean offerEe{
1.if e== nullthrow new NullPointerEx__ption;
1.final ReentrantLocklock= this.lock;
1.lock.lock;
1.try {
1.if count==items.length//队列已满,返回false
1.return false;
1.else {
1.inserte;//insert方法中发出了notEmpty.signal;
1.return true;
1.}
1.} finally {
1.lock.unlock;
1.}
1.}
1.
1.public Epoll{
1.final ReentrantLocklock= this.lock;
1.lock.lock;
1.try {
1.if count== 0//队列为空,返回false
1.return null;
1.Ex=extract;//extract方法中发出了notFull.signal;
1.return x;
1.} finally {
1.lock.unlock;
1.}
1.}对于第三类方法,这里面涉及到Condition类,简要提一下, await方法指造成当前线程在接到__或被中断之前一直处于等待状态 signal方法指唤醒一个等待线程 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public void putEethrows InterruptedEx__ption{
1.if e== nullthrow new NullPointerEx__ption;
1.final E[]items= this.items;
1.final ReentrantLocklock= this.lock;
1.lock.lockInterruptibly;
1.try {
1.try {
1.while count==items.length//如果队列已满,等待notFull这个条件,这时当前线程被阻塞
1.notFull.await;
1.} catch InterruptedEx__ptionie{
1.notFull.signal; //唤醒受notFull阻塞的当前线程
1.throw ie;
1.}
1.inserte;
1.} finally {
1.lock.unlock;
1.}
1.}
1.
1.public Etake throws InterruptedEx__ption{
1.final ReentrantLocklock= this.lock;
1.lock.lockInterruptibly;
1.try {
1.try {
1.while count== 0//如果队列为空,等待notEmpty这个条件,这时当前线程被阻塞
1.notEmpty.await;
1.} catch InterruptedEx__ptionie{
1.notEmpty.signal;//唤醒受notEmpty阻塞的当前线程
1.throw ie;
1.}
1.Ex=extract;
1.return x;
1.} finally {
1.lock.unlock;
1.}
1.}
1. 第四类方法就是指在有必要时等待指定时间,就不详细说了 再来看看BlockingQueue接口的具体实现类吧 ArrayBlockingQueue,其构造函数必须带一个int参数来指明其大小LinkedBlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.__X_VALUE来决定PriorityBlockingQueue,其所含对象的排序不是FIFO而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序上面是用ArrayBlockingQueue举得例子,下面看看LinkedBlockingQueue 首先,既然是链表,就应该有Node节点,它是一个内部静态类 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.static class NodeE{
1./**Theitemvolatiletoensurebarrierseparatingwriteandread*/
1.volatile Eitem;
1.NodeEnext;
1.NodeEx{item=x;}
1.}然后,对于链表来说,肯定需要两个变量来标示头和尾 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1./**头指针*/
1.private transient NodeEhead;//head.next是队列的头元素
1./**尾指针*/
1.private transient NodeElast;//last.next是null那么,对于入队和出队就很自然能理解了 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.private void enqueueEx{
1.last=last.next= new NodeEx;//入队是为last再找个下家
1.}
1.
1.private Edequeue{
1.NodeEfirst=head.next; //出队是把head.next取出来,然后将head向后移一位
1.head=first;
1.Ex=first.item;
1.first.item= null;
1.return x;
1.}另外,LinkedBlockingQueue相对于ArrayBlockingQueue还有不同是,有两个ReentrantLock,且队列现有元素的大小由一个AtomicInteger对象标示注AtomicInteger类是以原子的方式操作整型变量 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.private final AtomicIntegercount=new AtomicInteger0;
1./**用于读取的独占锁*/
1.private final ReentrantLocktakeLock=new ReentrantLock;
1./**队列是否为空的条件*/
1.private final ConditionnotEmpty=takeLock.newCondition;
1./**用于写入的独占锁*/
1.private final ReentrantLockputLock=new ReentrantLock;
1./**队列是否已满的条件*/
1.private final ConditionnotFull=putLock.newCondition;有两个Condition很好理解,在ArrayBlockingQueue也是这样做的但是___需要两个ReentrantLock呢?下面会慢慢道来 让我们来看看offer和poll方法的代码 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public boolean offerEe{
1.if e== nullthrow new NullPointerEx__ption;
1.final AtomicIntegercount= this.count;
1.if count.get==capacity
1.return false;
1.int c=-1;
1.final ReentrantLockputLock=this.putLock;//入队当然用putLock
1.putLock.lock;
1.try {
1.if count.getcapacity{
1.enqueuee; //入队
1.c=count.getAndIncrement; //队长度+
11.if c+ 1 capacity
1.notFull.signal; //队列没满,当然可以解锁了
1.}
1.} finally {
1.putLock.unlock;
1.}
1.if c==
01.signalNotEmpty;//这个方法里发出了notEmpty.signal;
1.return c= 0;
1.}
1.
1.public Epoll{
1.final AtomicIntegercount= this.count;
1.if count.get==
01.return null;
1.Ex= null;
1.int c=-1;
1.final ReentrantLocktakeLock=this.takeLock;出队当然用takeLock
1.takeLock.lock;
1.try {
1.if count.get 0{
1.x=dequeue;//出队
1.c=count.getAndDecrement;//队长度-
11.if c
11.notEmpty.signal;//队列没空,解锁
1.}
1.} finally {
1.takeLock.unlock;
1.}
1.if c==capacity
1.signalNotFull;//这个方法里发出了notFull.signal;
1.return x;
1.}看看源代码发现和上面ArrayBlockingQueue的很类似,关键的问题在于___要用两个ReentrantLockputLock和takeLock?我们仔细想一下,入队操作其实操作的只有队尾引用last,并且没有牵涉到head而出队操作其实只针对head,和last没有关系-------------------------------我是分割线--------------------------------------下面再来说说ConcurrentLinkedQueue,它是一个无锁的并发线程安全的队列 以下部分的内容参照了这个帖子对比锁机制的实现,使用无锁机制的难点在于要充分考虑线程间的协调简单的说就是多个线程对内部数据结构进行访问时,如果其中一个线程执行的中途因为一些原因出现故障,其他的线程能够检测并帮助完成剩下的操作这就需要把对数据结构的操作过程精细的划分成多个状态或阶段,考虑每个阶段或状态多线程访问会出现的情况ConcurrentLinkedQueue有两个volatile的线程共享变量head,tail要保证这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的原子性要被保证下面通过offer方法的实现来看看在无锁情况下如何保证原子性 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public boolean offerEe{
1.if e== nullthrow new NullPointerEx__ption;
1.NodeEn= new NodeEe null;
1.for ;;{
1.NodeEt=tail;
1.NodeEs=t.getNext;
1.if t==tail{ //------------------------------a
1.if s== null{//---------------------------b
1.if t.casNextsn{ //-------------------c
1.casTailtn; //------------------------d
1.return true;
1.}
1.} else {
1.casTailts; //----------------------------e
1.}
1.}
1.}
1.}此方法的循环内首先获得尾指针和其next指向的对象,由于tail和Node的next均是volatile的,所以保证了获得的分别都是最新的值 代码a t==tail是最上层的协调,如果其他线程改变了tail的引用,则说明现在获得不是最新的尾指针需要重新循环获得最新的值 代码b s==null的判断静止状态下tail的next一定是指向null的,但是多线程下的另一个状态就是中间态tail的指向没有改变,但是其next已经指向新的结点,即完成tail引用改变前的状态,这时候s!=null这里就是协调的典型应用,直接进入代码e去协调参与中间态的线程去完成最后的更新,然后重新循环获得新的tail开始自己的新一次的入队尝试另外值得注意的是ab之间,其他的线程可能会改变tail的指向,使得协调的操作失败从这个步骤可以看到无锁实现的复杂性代码c t.casNextsn是入队的第一步,因为入队需要两步更新Node的next,改变tail的指向代码c之前可能发生tail引用指向的改变或者进入更新的中间态,这两种情况均会使得t指向的元素的next属性被原子的改变,不再指向null这时代码c操作失败,重新进入循环代码d这是完成更新的最后一步了,就是更新tail的指向,最有意思的协调在这儿又有了体现从代码看casTailtn不管是否成功都会接着返回true标志着更新的成功首先如果成功则表明本线程完成了两步的更新,返回true是理所当然的;如果casTailtn不成功呢?要清楚的是完成代码c则代表着更新进入了中间态,代码d不成功则是tail的指向被其他线程改变意味着对于其他的线程而言它们得到的是中间态的更新,s!=null,进入代码e帮助本线程执行最后一步并且先于本线程成功这样本线程虽然代码d失败了,但是是由于别的线程的协助先完成了,所以返回true也就理所当然了通过分析这个入队的操作,可以清晰的看到无锁实现的每个步骤和状态下多线程之间的协调和工作 注上面这大段文字看起来很累,先能看懂多少看懂多少,现在看不懂先不急,下面还会提到这个算法,并且用示意图说明,就易懂很多了 在使用ConcurrentLinkedQueue时要注意,如果直接使用它提供的函数,比如add或者poll方法,这样我们自己不需要做任何同步 但如果是非原子操作,比如 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.if!queue.isEmpty{
1.queue.pollo__;
1.}我们很难保证,在调用了isEmpty之后,poll之前,这个queue没有被其他线程修改所以对于这种情况,我们还是需要自己同步 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.synchronizedqueue{
1.if!queue.isEmpty{
1.queue.pollo__;
1.}
1.}注这种需要进行自己同步的情况要视情况而定,不是任何情况下都需要这样做 另外还说一下,ConcurrentLinkedQueue的size是要遍历一遍__的,所以尽量要避免用size而改用isEmpty,以免性能过慢 好,最后想说点什么呢,阻塞算法其实很好理解,简单点理解就是加锁,比如在BlockingQueue中看到的那样,再往前推点,那就是synchronized相比而言,非阻塞算法的设计和实现都很困难,要通过低级的原子性来支持并发下面就简要的介绍一下非阻塞算法,以下部分的内容参照了一篇很经典的文章注我觉得可以这样理解,阻塞对应同步,非阻塞对应并发也可以说同步是阻塞模式,异步是非阻塞模式 举个例子来说明什么是非阻塞算法非阻塞的计数器 首先,使用同步的线程安全的计数器代码如下 J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public finalclass Counter{
1.private long value=0;
1.public synchronizedlong getValue{
1.return value;
1.}
1.public synchronizedlong increment{
1.return ++value;
1.}
1.}下面的代码显示了一种最简单的非阻塞算法使用AtomicInteger的compareAndSet(CAS方法)的计数器compareAndSet方法规定“将这个变量更新为新值,但是如果从我上次看到这个变量之后其他线程修改了它的值,那么更新就失败”J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public class Nonblockin__ounter{
1.private AtomicIntegervalue;//前面提到过,AtomicInteger类是以原子的方式操作整型变量
1.public int getValue{
1.return value.get;
1.}
1.public int increment{
1.int v;
1.do {
1.v=value.get;
1.while !value.compareAndSetvv+1;
1.return v+ 1;
1.}
1.}非阻塞版本相对于基于锁的版本有几个性能优势首先,它用硬件的原生形态代替JVM的锁定代码路径,从而在更细的粒度层次上(__的内存位置)进行同步,失败的线程也可以立即重试,而不会被挂起后重新调度更细的粒度降低了争用的机会,不用重新调度就能重试的能力也降低了争用的成本即使有少量失败的CAS操作,这种方法仍然会比由于锁争用造成的重新调度快得多Nonblockin__ounter这个示例可能简单了些,但是它演示了所有非阻塞算法的一个基本特征——有些算法步骤的执行是要冒险的,因为知道如果CAS不成功可能不得不重做非阻塞算法通常叫作乐观算法,因为它们继续操作的假设是不会有干扰如果发现干扰,就会回退并重试在计数器的示例中,冒险的步骤是递增——它检索旧值并在旧值上加一,希望在计算更新期间值不会变化如果它的希望落空,就会再次检索值,并重做递增计算再来一个例子,Michael-Scott 非阻塞队列算法的插入操作,ConcurrentLinkedQueue就是用这个算法实现的,现在来结合示意图分析一下,很明朗J__a代码 HYPERLINKhttp://hellosure.iteye.com/blog/1126541\o复制代码INCLUDEPICTUREhttp://hellosure.iteye.com/images/icon_copy.gif\*MERGEFORMAT
1.public class LinkedQueueE{
1.private staticclass NodeE{
1.final Eitem;
1.final AtomicReferen__NodeEnext;
1.NodeEitemNodeEnext{
1.this.item=item;
1.this.next= new AtomicReferen__NodeEnext;
1.}
1.}
1.private AtomicReferen__NodeEhead
1.= new AtomicReferen__NodeEnew NodeEnullnull;
1.private AtomicReferen__NodeEtail=head;
1.public boolean putEitem{
1.NodeEnewNode= new NodeEitemnull;
1.while true{
1.NodeEcurTail=tail.get;
1.
1.if curTail==tail.get{
1.if residue== null/*A*/ {
1.if {
1.tail.compareAndSetcurTailnewNode /*D*/ ;
1.return true;
1.}
1.} else {
1.tail.compareAndSetcurTailresidue /*B*/;
1.}
1.}
1.}
1.}
1.}看看这代码完全就是ConcurrentLinkedQueue源码啊 插入一个元素涉及头指针和尾指针两个指针更新,这两个更新都是通过CAS进行的从队列当前的最后节点(C)链接到新节点,并把尾指针__到新的最后一个节点(D)如果第一步失败,那么队列的状态不变,插入线程会继续重试,直到成功一旦操作成功,插入被当成生效,其他线程就可以看到修改还需要把尾指针__到新节点的位置上,但是这项工作可以看成是“清理工作”,因为任何处在这种情况下的线程都可以判断出是否需要这种清理,也知道如何进行清理队列总是处于两种状态之一正常状态(或称静止状态,图1和图3)或中间状态(图2)在插入操作之前和第二个CAS(D)成功之后,队列处在静止状态;在第一个CAS(C)成功之后,队列处在中间状态在静止状态时,尾指针指向的链接节点的next字段总为null,而在中间状态时,这个字段为非null任何线程通过比较tail.next是否为null,就可以判断出队列的状态,这是让线程可以帮助其他线程“完成”操作的关键上图显示的是有两个元素,处在静止状态的队列 插入操作在插入新元素(A)之前,先检查队列是否处在中间状态如果是在中间状态,那么肯定有其他线程已经处在元素插入的中途,在步骤(C)和(D)之间不必等候其他线程完成,当前线程就可以“帮助”它完成操作,把尾指针向前__(B)如果有必要,它还会继续检查尾指针并向前__指针,直到队列处于静止状态,这时它就可以开始自己的插入了第一个CAS(C)可能因为两个线程竞争访问队列当前的最后一个元素而失败;在这种情况下,没有发生修改,失去CAS的线程会重新装入尾指针并再次尝试如果第二个CAS(D)失败,插入线程不需要重试——因为其他线程已经在步骤(B)中替它完成了这个操作! 上图显示的是处在插入中间状态的队列,在新元素插入之后,尾指针更新之前 上图显示的是在尾指针更新后,队列重新处在静止状态。