线程池分析

访客 阅读:242 2020-02-18 16:37:56 评论:0

一、简介
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

corePoolSize:核心线程池的大小,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到任务数大于等于corePoolSize。

或者是使用prestartAllCoreThreads()或者prestartCoreThread()方法初始化线程,线程池会提前创建并启动所有基本线程。

 maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。

 keepAliveTime:线程池维护线程所允许的空闲时间,默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

unit:参数keepAliveTime的时间单位

TimeUnit.DAYS;               // 
TimeUnit.HOURS;             //小时 
TimeUnit.MINUTES;           //分钟 
TimeUnit.SECONDS;           // 
TimeUnit.MILLISECONDS;      //毫秒 
TimeUnit.MICROSECONDS;      //微妙 
TimeUnit.NANOSECONDS;       //纳秒

workQueue:线程池所使用的阻塞队列

ArrayBlockingQueue://是一个基于数组结构的有界阻塞队列,创建时必须指定大小,此队列按 FIFO(先进先出)原则对元素进行排序 
LinkedBlockingQueue://一个基于链表结构的无界阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue,默认大小为Integer_Max_Value,静态工厂方法Executors.newFixedThreadPool()使用了这个队列 
SynchronousQueue;//一个不存储元素的阻塞队列。直接新建线程来执行任务,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列 
PriorityBlockingQueue://一个具有优先级的无界阻塞队列。

handler: 线程池对拒绝任务的处理策略

ThreadPoolExecutor.AbortPolicy() 
//丢弃任务并抛出java.util.concurrent.RejectedExecutionException异常 
ThreadPoolExecutor.CallerRunsPolicy() 
//重试添加当前的任务,在 execute 方法的调用线程中运行被拒绝的任务 
ThreadPoolExecutor.DiscardOldestPolicy() 
//抛弃队列最前面的任务,然后重新尝试执行任务(重复此过程) 
ThreadPoolExecutor.DiscardPolicy() 
//抛弃任务且不抛异常

二、线程池的工作方式

当一个任务通过execute(Runnable)方法欲添加到线程池时:

public void execute(Runnable command) { 
    if (command ==null) 
            thrownew NullPointerException(); 
            intc = ctl.get(); 
 
      //1 当前运行的线程数量小于核心线程数量,直接将任务加入worker启动运行。 
        if (workerCountOf(c) <corePoolSize) { 
            if (addWorker(command,true)) 
                return; 
            c =ctl.get(); 
        }  
         
        //2 运行线程数量大于核心线程数量时,上面的if分支针对大于corePoolSize,并且缓存队列加入任务操作成功的情况。 
          运行中并且将任务加入缓冲队列成功,正常来说这样已经完成了处理逻辑。 
          但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务; 
        if (isRunning(c) &&workQueue.offer(command)) { 
            intrecheck = ctl.get(); 
            if (!isRunning(recheck) && remove(command)) 
                reject(command); 
            elseif (workerCountOf(recheck) == 0) 
                addWorker(null,false); 
        } 
 
         //3 这里针对运行线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。 
           注意第二个参数是false,可以在下面addWorker方法看到,就是针对线程池最大线程数量maximumPoolSize的判断。 
           elseif (!addWorker(command,false)) 
            reject(command); 
        }

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

从execute源码中可以知道主要的是addWorker方法,addWorker方法主要做的工作就是新建一个Woker线程,加入到woker集合中,然后启动该线程,那么接下来的重点就是Woker类的run方法了。

private boolean addWorker(Runnable firstTask, boolean core) { 
        retry: 
        for (;;) { 
            int c = ctl.get(); 
            int rs = runStateOf(c); 
 
            // Check if queue empty only if necessary. 
            if (rs >= SHUTDOWN && 
                ! (rs == SHUTDOWN && 
                   firstTask == null && 
                   ! workQueue.isEmpty())) 
                return false; 
 
            for (;;) { 
                int wc = workerCountOf(c); 
                if (wc >= CAPACITY || 
                    wc >= (core ? corePoolSize : maximumPoolSize)) 
                    return false; 
                if (compareAndIncrementWorkerCount(c)) 
                    break retry; 
                c = ctl.get();  // Re-read ctl 
                if (runStateOf(c) != rs) 
                    continue retry; 
                // else CAS failed due to workerCount change; retry inner loop 
            } 
        } 
//以上是线程数量的校验与更新逻辑 
        boolean workerStarted = false; 
        boolean workerAdded = false; 
        Worker w = null; 
        try { 
            w = new Worker(firstTask); 
            final Thread t = w.thread; 
            if (t != null) { 
                final ReentrantLock mainLock = this.mainLock; 
                mainLock.lock(); 
                try { 
                    // Recheck while holding lock. 
                    // Back out on ThreadFactory failure or if 
                    // shut down before lock acquired. 
                    int rs = runStateOf(ctl.get()); 
 
                    if (rs < SHUTDOWN || 
                        (rs == SHUTDOWN && firstTask == null)) { 
                        if (t.isAlive()) // precheck that t is startable 
                            throw new IllegalThreadStateException(); 
                        workers.add(w); 
                        int s = workers.size(); 
                        if (s > largestPoolSize) 
                            largestPoolSize = s; 
                        workerAdded = true; 
                    } 
                } finally { 
                    mainLock.unlock(); 
                } 
                if (workerAdded) { 
                    t.start(); 
                    workerStarted = true; 
                } 
            } 
        } finally { 
            if (! workerStarted) 
                addWorkerFailed(w); 
        } 
        return workerStarted; 
    }

woker线程的执行流程就是首先执行初始化时分配给的任务,执行完成以后会尝试从阻塞队列中获取可执行的任务,如果指定时间内仍然没有任务可以执行,则进入销毁逻辑。(只销毁非核心线程)

public void run() { 
            runWorker(this); 
        } 
final void runWorker(Worker w) { 
        Thread wt = Thread.currentThread(); 
        //task就是Woker构造函数入参指定的任务,即用户提交的任务 
        Runnable task = w.firstTask; 
        w.firstTask = null; 
        w.unlock();  
        boolean completedAbruptly = true; 
        try { 
            while (task != null || (task = getTask()) != null) { 
                w.lock(); 
                if ((runStateAtLeast(ctl.get(), STOP) || 
                     (Thread.interrupted() && 
                      runStateAtLeast(ctl.get(), STOP))) && 
                    !wt.isInterrupted()) 
                    wt.interrupt(); 
                try { 
                    beforeExecute(wt, task); 
                    Throwable thrown = null; 
                    try { 
                        task.run(); 
                    } catch (RuntimeException x) { 
                        thrown = x; throw x; 
                    } catch (Error x) { 
                        thrown = x; throw x; 
                    } catch (Throwable x) { 
                        thrown = x; throw new Error(x); 
                    } finally { 
                        afterExecute(task, thrown); 
                    } 
                } finally { 
                    task = null; 
                    w.completedTasks++; 
                    w.unlock(); 
                } 
            } 
            completedAbruptly = false; 
        } finally { 
              processWorkerExit(w, completedAbruptly); 
        } 
    }

 

三、Executors包含的常用线程池

1.newFixedThreadPool:固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) {   
        return new ThreadPoolExecutor(nThreads, nThreads,   
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());   
    }  

FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。

  1. 可控制线程最大并发数(同时执行的线程数)
  2. 超出的线程会在队列中等待

但是,在核心线程池空闲时,即核心线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

(2) newCachedThreadPool:无界线程池

public static ExecutorService newCachedThreadPool() {   
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                      60L, TimeUnit.SECONDS,   
                                      new SynchronousQueue<Runnable>());   
    }  
  • 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

(3)singleThreadPoll   单线程线程池

大小为1的固定线程池,这个其实就是newFixedThreadPool(1)

它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。

单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

(4)ScheduledThreadPoll  

它的核心线程池固定,非核心线程的数量没有限制,但是闲置时会立即会被回收。

支持定时及周期性任务执行

四、线程池的关闭:

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

 

 

 

 

标签:java多线程
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

发表评论
搜索
排行榜
关注我们

扫一扫关注我们,了解最新精彩内容