主要参数

corePoolSize:核心线程数

核心线程会一直存活,及时没有任务需要执行。当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理。

maxPoolSize:最大线程数

当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务。
当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常。

keepAliveTime:非核心线程闲置时的超时时长

超过这个时长,非核心线程就会被回收。当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true 时,keepAliveTime 同样会作用于核心线程。

unit:用于指定 keepAliveTime 参数的时间单位

常用的有 TimeUnit .MILLISECONDS 和 TimeUnit .SECONDS。

workQueue:线程池中的任务队列

通过线程池的 execute 方法提交的 Runnable 对象会存储在这个参数中。

threadFactory:线程工厂

为线程池提供创建新的线程的功能。threadFactory 是一个接口,它只有一个方法: public abstract Thread newThread (Runnable r);

RejectedExecutionHandler:通常叫做拒绝策略

在线程池已经关闭的情况下或者当线程数已经达到maxPoolSize且队列已满。只要满足其中一种时,在使用execute() 来提交新的任务时将会拒绝,而默认的拒绝策略是抛一个 RejectedExecutionException 异常。

allowCoreThreadTimeout: 是否允许主线程超时回收

看看代码

//执行线程
executor.execute(workers[i]);
//执行代码如下

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
    //1. 当运行的线程数小于corePoolSize的时候 ,创建新的线程即Worker执行提交的任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    /**
    2. 如果线程数大于等于corePoolSize的时候,将任务提交到workQueue队列中 ,如果成功添加 ,即在上面的runWorker就会执行调用了,当然这里会重新的核查此时的线程数,看下是否有线程减少,如果减少,则创建新的线程来使线程数维持在corePoolSize的数目
    */

        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(nullfalse);
        }
        /*
         * 3. 如果队列满了后,则创建新的线程来执行,当然这里有一种极端情况,当线程数等于maximumPoolSize时,并且workQueue也满了后,则会使用
         */

        else if (!addWorker(command, false))
            reject(command);
    }

那么addWorker 方法是如何唤醒runWorker的呢?

我们看下addWorker

private boolean addWorker(Runnable firstTask, boolean core) {

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //这里将t线程启动起来了,那么t是什么,是w.thread w是woker,thread是worker的一个属性,看下面这段代码
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

//这里表明thread里面的target(runnable)就是Worker,所以唤醒的时候是唤醒的Worker的run方法,再看run方法
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
    runWorker(this);
}

这段核心代码

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

当阻塞队列中没有任务时,等待时间达到keepAliveTime毫秒值时就会被自动唤醒,而不会永远地沉睡下去。也就是说设置为60s还是没有取到task,说明没有任务了,就把这个worker给移除了。哇,好巧妙

线程池的实际应用

  • 1、一般需要根据任务的类型来配置线程池大小:如果是CPU密集型任务,就要减少CPU的线程切换频率,参考值可以设为 N*CPU+1 , 如果是IO密集型任务,就需要尽量压榨CPU,让CPU可以处理更多任务,参考值可以设置为2*NCPU

  • 2、一般情况下,队列的大小遵循下面的公事:

queSize <= ClientTimeOut(秒) * TPS , 队列大小 小于等于 客户端超时 * 每秒处理的交易数 , 比如客户端超时60s,TPS为1 , 那么队列长度就可以设置为60,因为第60个任务加到队列里面的时候,第一个任务还没处理完。那么第61个来的时候,等TPS为1 ,肯定处理时间会大于60s,所以第61条及以后的直接超时了。所以队列长度过长没有意义。

  • 3、比如你是全程异步的系统你的队列设置可以设置为0,触发拒绝策略,启用补偿机制,慢慢补偿异步任务,如果无界队列,会不断的在queue里面添加任务,corePoolSize设置为cpu核数。

  • 4、当请求速度远大于处理速度,队列就会无限加入也会造成 资源耗尽,服务宕掉

/**
     * 默认的核心线程数是CPU*2 最大线程数是500
     */

    private final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    private final int DEFAULT_MAX_POOL_SIZE = 500;

    private Integer corePoolSize;
    private Integer maxPoolSize;

    @PostConstruct
    public void init() {
        corePoolSize = //from db config
        if (corePoolSize == null) {
            corePoolSize = DEFAULT_CORE_POOL_SIZE;
        }
        maxPoolSize = //from db config
        if (maxPoolSize == null) {
            maxPoolSize = DEFAULT_MAX_POOL_SIZE;
        }
    }


/**
     * 扣款异步线程池
     *
     * @return
     */

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        //这里是用的Spring的线程池,一样
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        //空闲线程存活时间60s
        executor.setKeepAliveSeconds(60);
        //队列大于0就是LinkBlockQueue无界队列,小于等于0则是队列采用SynchronizedQueue无缓存队列
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix(“AS”+Thread.currentThread().getName());
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                log.info("[异步线程池] 发生拒绝异常,触发补偿扣款自动任务 coreSize:{} maxSize:{}", e.getCorePoolSize(), e.getMaximumPoolSize());
                throw new RejectedExecutionException("Task " + r.toString() +
                        " rejected from " +
                        e.toString());
            }
        });
        return executor;
    }
public void startAsynTask() {
        try {
            asynTaskService.asynProcess();
        }
        //正常请求异步线程池满,抛出异常,启动补偿机制
        catch (RejectedExecutionException e) {
            log.warn("[异步线程池满] 进入消息队列重新投递");
            //重新投递  可以重新弄个task扫描,减小压力
        }
    }

@Async("threadPoolTaskExecutor")
public void asynProcess(){
 // 你的业务代码
}