ThreadPoolExecutor 的 execute 源码分析

本文由 简悦 SimpRead 转码, 原文地址 www.cnblogs.com

上一篇文章指出,ThreadPoolExecutor 执行的步骤如下:

  1. 向线程池中添加任务,当任务数量少于 corePoolSize 时,会自动创建 thead 来处理这些任务;
  2. 当添加任务数大于 corePoolSize 且少于 maximmPoolSize 时,不再创建线程,而是将这些任务放到阻塞队列中,等待被执行;
  3. 接上面 2 的条件,且当阻塞队列满了之后,继续创建 thread, 从而加速处理阻塞队列;
  4. 当添加任务大于 maximmPoolSize 时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是 AbortPolicy(直接丢弃)。

我们直接可以通过 ThreadPoolExecutor 的 execute 方法源码来跟踪这个流程。首先,由于在 execute 方法中常常会根据线程池的状态选择判断一些逻辑,因此在介绍该方法之前首先说一下线程池的几种方法。

1. 线程池的状态:

  1. RUNNING:该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务;
  2. SHUTDOWN:该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务;
  3. STOP:该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务;
  4. TIDYING:所有任务都被终止了,workerCount 为 0,为此状态时还将调用 terminated() 方法;
  5. TERMINATED:terminated() 方法调用完成后变成此状态。

几个状态相关的方法:

runStateOf(int c)  方法:c & 高 3 位为 1,低 29 位为 0 的~ CAPACITY,用于获取高 3 位保存的线程池状态

workerCountOf(int c)  方法:c & 高 3 位为 0,低 29 位为 1 的 CAPACITY,用于获取低 29 位的线程数量

ctlOf(int rs, int wc)  方法:参数 rs 表示 runState,参数 wc 表示 workerCount,即根据 runState 和 workerCount 打包合并成 ctl

也就是说 32 位含义:(高三位表示状态)+ (低 29 位表示线程数量)。

接下来分析源码:

2. execute 代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. 如果运行的线程少于corePoolSize,
         * 尝试开启一个新线程去运行command,command作为这个线程的第一个任务,并运行
         *
         * 2. 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程
         *(因为可能存在有些线程在我们上次检查后死了),或者进入这个方法后,pool被关闭了
         * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,
         * 如果池中没有线程了,新开启 一个线程
         *
         * 3. 如果无法将任务入队列(可能队列满了),需要新开区一个线程
         * 如果失败了,说明线程池shutdown或者饱和了,所以我们拒绝任务
         */
         
        // 1.当运行的线程少于corePoolSize,
        // 则直接执行command任务,addworker(command,true)会产生一个新线程来执行这个任务
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        // 2.  线程池处于RUNNING状态,并将任务放入workQueue队列,但不执行addWorker(表明不创建新的线程)
        // 双重校验可防止添加任务到workQueue队列后,线程池状态由于意外等原因处于非RUNNING状态,
        // 此时就需要从workQueue队列remove掉这个任务
        // 注:offer方法不会阻塞,如果不能插入队列直接返回false。(有可能造成数据丢失?这里不会,也就是说阻塞队列满了)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        
        // 3. 如果线程池不是running状态或者无法入队列,执行线程池的饱和策略
        else if (!addWorker(command, false))
            reject(command);
    }

从上面代码可知,java 线程池在任务比较少时(当运行的线程少于 corePoolSize),直接通过 addWorker 来执行任务,当任务比较多时,使用了阻塞队列,阻塞队列里存放的是 Worker 对象,Worker 类是 ThreadPoolExecutor 的一个内部类,它实现了 Runable 接口,具有线程的功能。同时还继承了 AbstractQueuedSynchronizer(AQS), 因此也具有锁的功能。那么 ThreadPoolExecutor 中如何去执行阻塞队列里面的 Worker 任务的呢?首先我们来分析一下 doWorker,看它是如何执行任务,以及如何触发执行阻塞队列里面的任务的。

3. doWorker 代码

doWorker 的的作用首先是创建线程,然后执行任务,源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取线程池运行状态,
            // 线程池的运行状态:runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            // CAS算法
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 如果添加任务成功,则跳出retry,也就是跳出整个循环体
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            // 通过线程池的ThreadFactory创建一个线程,用于执行这个firstTask任务
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 说明:(rs == SHUTDOWN && firstTask == null)可能是workQueue中仍有未执行完成的任务,
                    // 创建没有初始任务的worker线程执行
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 提前检查t线程是否启动,如果是就抛非法线程状态异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // workQueue队列中添加Worker对象
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 往HashSet中添加worker成功,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

代码看起来有点长,但只做了两件事:

1)用循环 CAS 操作来将线程数加 1;

2)新建一个线程并启执行这个任务。

代码中使用的 retry,它类似与 goto, 用于控制跳出循环体,retry 可以随意命名,只要遵循 java 的命名规则即可。

CAS 会使用循环机制,当存在多线程的情况下,通过比较与交换,其它线程通过循环可以的更新最新值。关于 CAS 可以参考《深入浅出 CAS》

在上面源码中可以看到,addWorker 会用当前 firstTask 创建一个 Worker 对象,相当于对 firstTask 的包装,然后用 Worker 对象作为 firstTask 创建一个 Thread,该 Thread 保存在 Worker 的 thread 成员变量中。在 addWorker 中通过 t.start() 启动了这个线程,线程中执行 runWorker 方法。

4. 内部类 Worker

那么 ThreadPoolExecutor 中如何去执行阻塞队列里面的 Worker 任务的呢?看到这里好像还是没有答案。那接着分析 Worker 这个内部类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         */
        Worker(Runnable firstTask) {
            // 设置AQS的同步状态,大于0代表锁已经被获取
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            // 调用ThreadPoolExecutor的runworker方法
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

在 addWorker 中通过 t.start() 启动了这个线程,线程中执行 runWorker 方法。

5. runWorker 代码

到目前为止还是没有涉及到阻塞队列!可是到 runWorker 中就可以看到啦!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

上面代码关键点是while循环getTask()方法,通过循环不断的调用 getTask() 从阻塞队列中获取任务,通过这个方法,它与阻塞队列建立桥梁。目前我们已经知道当添加任务数量大于 coolPoolSize(且小于 maximumPoolSize)的时候,并不会创建线程,但是由于在任务数量小于 coolPoolSize 之前调用了 addWorker 并触发 t.star() 执行,从而调用了 runWorker, 通过循环不断的调用 getTask() 从阻塞队列中获取任务,如果 getTask() 返回不为 null,则上锁,执行任务,任务执行完成之后解锁。如果 getTask() 返回 null,改变 completedAbrutly 状态,然后调用 processWorkerExit() 退出 worker 线程。

6. getTask 代码

由第 5 点引出了 getTask 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask 中主要看获取任务的代码如下:

  1. workQueue.poll():如果在 keepAliveTime 时间内,阻塞队列中没有任务,返回 null;
  2. workQueue.take():如果阻塞队列为空,当前线程会被阻塞;当队列中有任务加入时,线程被唤醒,并返回任务。

6. 小结

本文只是对线程池正常的工作流程进行了分析,并没有对线程池 shutdown 或者 stop 的情况进行分析,这些部分涉及到 AQS 等并发技术,这部分比较复杂,感兴趣可以更加深入研究一下。

参考:

  1. https://www.cnblogs.com/trust-freedom/p/6681948.html#top
  2. https://www.jianshu.com/p/fb6e91b013cc