服务热线

029-87595239

当前位置:首页 > 社区新闻 > 技术文章 >

线程池的实现原理分析

线程池的原理分析,西安Java培训总结了一些相关的知识,供大家学习。

成员变量ctl是个Integer的原子变量用来记载线程池状况和线程池线程个数,其中Integer类型是32位二进制标明,其中高3位用来表明线程池状况,后边 29位用来记载线程池线程个数。

线程池状况意义:

RUNNING:接受新使命而且处理堵塞行列里的使命

SHUTDOWN:回绝新使命可是处理堵塞行列里的使命

STOP:回绝新使命而且抛弃堵塞行列里的使命一起会中止正在处理的使命

TIDYING:一切使命都履行完(包括堵塞行列里边使命)当时线程池活动线程为0,即将调用terminated办法

TERMINATED:停止状况。terminated办法调用完结今后的状况

     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:  Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
     *   TERMINATED: terminated() has completed

线程池状况变换:

RUNNING -> SHUTDOWN
显式调用shutdown()办法,或许隐式调用了finalize(),它里边调用了shutdown()办法。

RUNNING or SHUTDOWN)-> STOP
显式 shutdownNow()办法

SHUTDOWN -> TIDYING
当线程池和使命行列都为空的时分

STOP -> TIDYING
当线程池为空的时分

TIDYING -> TERMINATED
 terminated() hook 办法履行完结时分

使命运转

ThreadPoolExecutor首要变量,对应界说可检查英文:

    private final BlockingQueue<Runnable> workQueue;

        private final ReentrantLock mainLock = new ReentrantLock();

        private final HashSet<Worker> workers = new HashSet<Worker>();

        private final Condition termination = mainLock.newCondition();

        private int largestPoolSize;

        private long completedTaskCount;

        private volatile ThreadFactory threadFactory;

        private volatile RejectedExecutionHandler handler;

        private volatile long keepAliveTime;

        private volatile boolean allowCoreThreadTimeOut;

        private volatile int corePoolSize;

        private volatile int maximumPoolSize;

ThreadPoolExecutor类中,最中心的使命提交办法是execute()办法,我们看下execute()办法源码:

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        int c = ctl.get(); //获取当时线程池的状况+线程个数变量

     //当时线程池线程个数是否小于corePoolSize,小于则敞开新线程运转

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

     //假如线程池处于RUNNING状况,则增加使命到堵塞行列

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

       //假如当时线程池状况不是RUNNING则从行列删去使命,并履行回绝战略

            if (! isRunning(recheck) && remove(command))

                reject(command);

       //否者假如当时线程池线程空,则增加一个线程

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

     //假如行列满了,则新增线程,假如线程个数>maximumPoolSize则履行回绝战略

        else if (!addWorker(command, false))

            reject(command);

    }

由代码可看出,该办法首要调用addWorker办法,其源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

            // Check if queue empty only if necessary.

            // 等价于s >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null || workQueue.isEmpty())

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                   firstTask == null &&

                   ! workQueue.isEmpty()))

                return false;

            //循环增加线程个数

            for (;;) {

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;           

                //cas增加线程个数

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                

            
  //cas失利,则检查线程池状况是否改变,改变则跳到外层循环重试从头获取线程池状况,否者内层循环。

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop            }

        }

        //ctl更新成功,新增worker

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            final ReentrantLock mainLock = this.mainLock;

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

                mainLock.lock();

                try {

                    // Recheck while holding lock.

                    // Back out on ThreadFactory failure or if

                    // shut down before lock acquired.

                    int c = ctl.get();

                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||

                        (rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }                

                //添加成功,启动线程

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

创建worker成功,Worker类实现啦Runnable接口,我们接下来看下其run方法:

    /**

     * Creates with given first task and thread from ThreadFactory.

     * @param firstTask the first task (null if none)

     */

     Worker(Runnable firstTask) {

        setState(-1); // inhibit interrupts until runWorker

        his.firstTask = firstTask;

        this.thread = getThreadFactory().newThread(this);

     }

   /** Delegates main run loop to outer runWorker  */

     public void run() {

       runWorker(this);

     }   

    final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            while (task != null || (task = getTask()) != null) {

                w.lock();

                // If pool is stopping, ensure thread is interrupted;

                // if not, ensure thread is not interrupted.  This

                // requires a recheck in second case to deal with

                // shutdownNow race while clearing interrupt

                if ((runStateAtLeast(ctl.get(), STOP) ||

                     (Thread.interrupted() &&

                      runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                    wt.interrupt();

                try {

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

            processWorkerExit(w, completedAbruptly);

        }

    }       

创立worker成功,循环从堵塞行列获取task,若获取task==null,循环完毕,移除该作业线程,下面我们看下getTask办法:

    private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {

                int wc = workerCountOf(c);

                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))

                    break;

                //工作线程数量减1;runTask循环结束,执行processWorkerExit(w, completedAbruptly),移除工作线程

                if (compareAndDecrementWorkerCount(c))

                    return null;

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop            }

            try {

                //timed=true,阻塞等待一段时间,若取到task==null,则移除该worker;timed=false:一直阻塞等待

                Runnable r = timed ?

                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                    workQueue.take();

                if (r != null)

                    return r;

                timedOut = true;

            } catch (InterruptedException retry) {

                timedOut = false;

            }

        }

    }