Java多线程 -- JUC包源码分析13 -- SynchronousQueue与CachedThreadPool

架构师 来源:chunlongyu 143℃ 0评论

在前面分析工具类Executors的时候,提到了CachedThreadPool:其线程数会无限增大,每来一个新请求,就会new一个Thread,其maxPoolSize = Integer.MAX。
其构造函数如下:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

//ThreadPoolExecutor的execute函数:
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }    

至所以会达到这个效果:就是因为SynchronousQueue本身没有容量,上面的wokerQueue.offer(command)函数,永远返回空,所以就会一直走到下面的addIfUnderMaximuxPoolSize里面。

下面就来详细分析SynchronousQueue

SynchronousQueue使用方式

SynchronousQueue最大的特点就是put/take是成对调用的:
先调put,线程会阻塞在那;直到另外一个线程调用了take,2个线程才同时解锁。反之亦然。
对于多个线程,比如3个,调用3次put,3个都会阻塞在那;直到等另外的线程,调用3次take,大家才同时解锁。反之亦然。

这里就会有1个问题:先调用了3次put,那调用take的时候,是首先唤醒哪一个put线程呢?第1个,还是最后一个呢?

这就涉及到2种不同的模式:公平模式(队列模式) 和 非公平模式(栈模式)。

队列模式:最先调用put的线程,最先被take唤醒

栈模式:最后调用put的线程,最先被take唤醒。

SynchronousQueue实现

public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

   private transient volatile Transferer transferer;

    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, false, 0) == null) {  //第一个参数为put进去的obejct
        Thread.interrupted();
            throw new InterruptedException();
    }
    }

    public E take() throws InterruptedException {
        Object e = transferer.transfer(null, false, 0);  //第1个参数为空,返回值是上面put进去的object
        if (e != null)
            return (E)e;
    Thread.interrupted();
        throw new InterruptedException();
    }

    public E poll() {
        return (E)transferer.transfer(null, true, 0);
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }
。。。
}

从代码可以看出,无论是put/take/offer/poll,都是调用的transfer.transfer函数,只是传进去的参数不一样而已。

那这个Transfer是什么呢?看代码知道,Transfer是个接口,有TransferQueue/TransferStack 2种实现,这2个实现,都是SynchronousQueue的内部类,其结果如下:

static final class TransferStack extends Transferer {
 static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
            ...
        }

 volatile SNode head;

 ...
}

    static final class TransferQueue extends Transferer {
            static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;
            ...
            }

        transient volatile QNode head;
        transient volatile QNode tail;
   }

从上面代码可以看出,2者结构都是一个单向链表。对于栈,只需要维护head结点;对于队列,维护head + tail结点。

实现思路

不管是栈,还是队列,其基本实现思路类似:
put的时候,new一个item != null的结点;take的时候,new一个item = null的结点。从head开始遍历,遇到的结点和自己同类型的,说明没有匹配者,自己也加入链表;遇到和自己不同类型的,尝试匹配。
加上链表的时候,如果是栈模式,加在头部;如果是队列模式,加在尾部。如下图所示:

这里写图片描述

transfer代码

//TransferQueue.transfer
        Object transfer(Object e, boolean timed, long nanos) {
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { //队列为空,或者head结点和自己同类型
                    QNode tn = t.next;
                    if (t != tail)            // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null)? x : e;

                } else {                            //队列不为空,head和自己不同类型
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null)? x : e;
                }
            }
        }

//TransferStack.transfer
        Object transfer(Object e, boolean timed, long nanos) {
            SNode s = null; 
            int mode = (e == null)? REQUEST : DATA;

            for (;;) {  //for循环里面3大分支
                SNode h = head;
                if (h == null || h.mode == mode) {  //case1:栈为空,或者栈顶元素和自己同类型
                    if (timed && nanos <= 0) {      // 关键点:offer函数就走的这个逻辑,offer(e, ture, 0) 一直会返回null
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return mode == REQUEST? m.item : s.item;
                    }
                } else if (!isFulfilling(h.mode)) { //case 2: 和自己不同类型,并且没有其他线程fulfilling这个结点,进入
                    if (h.isCancelled())            
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (mode == REQUEST)? m.item : s.item;
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {    //case 3: 和自己不同类型,但现在有其他线程正在fulfilling此结点 
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

这里面有一个关键点:

无论是栈的实现,还是队列的实现,链表本身是没有加锁的。因此在多线程访问下,就会有弱一致性问题,inconsistent read问题。

但这个不会出问题,因为匹配上,那是最好;匹配不上,出现inconsistent read,for循环回来再重新读,直到匹配上了,线程才会解锁。

也正因为如此,上面的代码中,有诸多的double check逻辑。

关闭

IT问道推荐

银行贷款频频被拒?
“Dr信用牛牛”让你远离信用污点 国内首家信用健康管理平台免费为你提供信用修复方案