前言

本章简单概括线程池相关的一些概念和类,以及如何使用线程池,线程池的原理是什么。

概述

  • 什么是线程池:线程池是指管理一组同构工作线程的资源池。
  • 为什么要使用线程池:随意创建线程会带来三大问题:开销高、系统资源消耗大、破坏稳定性,所以需要通过一个构造程序来进行统一管理。
    • 线程是一个重量级对象,频繁创建和销毁线程需要产生巨大的系统资源开销;
    • 当线程创建数超过可用处理器的线程数时,线程会闲置,闲置的线程会占用大量的内存空间,给垃圾回收器造成压力,且大量线程在竞争共享资源时也会产生其他的性能开销,严重的会导致整个应用程序崩溃;
  • 使用的好处:
    • 重用现有的线程而不是创建新线程,减少开销(线程是一个重量级对象,线程频繁创建和销毁需要产生巨大开销);
    • 大部分情况下工作线程已经存在,当请求到达时可以直接执行任务而不用等待再创建线程,提高了响应性;
    • 适量调整线程池的大小,能够保证性能最大化,同时防止创建过多线程而导致内存不足。

基础类说明

Runnable

Runnable表示无返回值的任务

@FunctionalInterface
public interface Runnable {
    //当使用对象实现接口 Runnable 创建线程时,启动线程会导致在该单独执行的线程中调用对象的方法 run 。
    public abstract void run();
}

Callable

Callable表示有返回值的任务

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Future

Future表示一个任务的生命周期(创建、提交、开始执行、完成),并提供了相应的方法来判断任务是否已经完成或取消、获取任务的结果、取消任务等

public interface Future<V> {
    //取消任务(已提交但尚未开始执行的任务可以取消;已开始执行的任务,只有当它们能响应中断时才能取消;已完成的任务取消不会有任何影响)
    boolean cancel(boolean mayInterruptIfRunning);
    //任务是否已经取消
    boolean isCancelled();
    //任务是否已经完成
    boolean isDone();
    //获取任务的结果(任务已成功执行完成:立即返回结果;任务执行中:阻塞等待执行结果;任务取消或异常:抛出异常)
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Executor

Executor表示执行已提交Runnable任务的对象。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService

ExecutorService是Executor的扩展类,管理Executor的生命周期(运行、关闭、已终止),并扩展了带返回结果的提交任务方法。

public interface ExecutorService extends Executor {
    //平缓的关闭线程池,同时等待已提交的任务执行完成,对于新提交的任务提交到RejectExecutionHandler
    void shutdown();
    //暴力的关闭线程池,同时取消所有正在执行的任务,并且不再处理工作队列中的任务
    List<Runnable> shutdownNow();
    //检测是否关闭/终止
    boolean isShutdown();
    boolean isTerminated();
    //等待ExecutorService到达终止状态,
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    //提交任务并返回执行结果Future对象
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    //执行一批任务,在全部完成后返回结果
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
    //执行一批任务,返回已成功完成的任务
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

ThreadFactory

当线程池需要线程时,都是通过ThreadFactory来完成。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

线程池详细说明

线程池具体实现类为ThreadPoolExecutor,继承AbstractExecutorService类,AbstractExecutorService类实现了ExecutorService类

设置线程池的大小

  • 线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.getRuntime().availableProcessors()来动态计算;
  • 线程池过大,大量的线程在相对较少的CPU和内存资源上相互竞争,这不仅会导致更高的内存使用量,还可能耗尽资源;线程池过小,将导致存在许多空闲的处理器线程,降低了吞吐率;
  • 要想正确地设置线程池的大小,必须分析计算环境、资源预算、任务的特性(计算密集型、IO密集型、两者皆是)。
  • 计算公式:N=cpuN * cpuU * (1+W/C) (其中N=线程池大小,cpuN=cpu线程数、cpuU=CPU利用率、W=等待时间、C=计算时间)
  • 经验:计算密集型N=cpuN+1;I/O密集型N=cpuN*2+1

创建线程池

使用ThreadPoolExecutor构造方法创建

构造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler){
    if (corePoolSize < 0 || maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize || keepAliveTime < 0) 
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数说明

  • corePoolSize:核心线程数,线程被创建后不会被回收
  • maximumPoolSize:最大线程数,线程池中可同时活动的最大线程数,线程空闲后过一段时间会被回收
  • keepAliveTime:非核心线程数空闲后的存活时间,超过这个时间会被标记为可回收
  • unit:时间单位TimeUnit
  • workQueue:任务阻塞队列,若核心线程数满,则任务优先进入队列中排队等待执行(强烈建议使用有界队列)
    • 根据特定场景选择合适的阻塞队列,如:需要按优先级排序的使用PriorityBlockingQueue,需要同步执行的使用SynchronousQueue,无界使用LinkedBlockingQueue,有界使用ArrayBlockingQueue或设定初始容量的LinkedBlockingQueue
  • threadFactory:创建线程的工厂类,可选择使用默认Executors.defaultThreadFactory()或者自定义参考DefaultThreadFactory类实现
  • handler:饱和策略RejectedExecutionHandler,当工作队列和最大线程数均已满时提交的任务会被执行饱和策略。
    • AbortPolicy:终止策略;默认策略,直接抛RejectedExecutionExcetion异常
    • DiscardPolicy:丢弃策略;丢弃无法处理的工作任务
    • DiscardOldestPolicy:丢弃最旧策略;丢弃工作队列头的任务,然后在工作队列尾添加新的任务。(不建议在FIFO队列中使用此策略,因为会抛弃优先级最高的任务)**
    • CallerRunsPolicy:调用者运行策略,使用主线程去执行工作任务。(会导致主线程一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务)
    • 自定义RejectedExecutionHandler:扩展RejectedExecutionHandler接口(推荐)

使用Executors静态工厂方法创建

线程池的静态工厂类,用于快速创建线程。由于Executors提供的很多方法默认都是使用的无界队列,高负载情境下,无界队列很容易导致OOM(不建议使用,推荐自定义线程池类)

  • newCacheThreadPool:创建不限定线程数的线程池,使用SynchronousQueue
  • newFixedThreadPool:创建固定线程数的线程池,使用LinkedBlockingQueue
  • newScheduleThreadPool:创建延时执行的线程池,使用DelayedWorkQueue
  • newSingleThreadExecutor:创建单个线程的线程池,使用LinkedBlockingQueue
  • newSingleThreadScheduledExecutor:创建核心线程数为1,最大线程数为Integer.MAX_VALUE的延时线程的线程池,使用了DelayedWorkQueue
  • newWorkStealingPool:创建任务抢占式(work-stealing工作窃取)的ForkJoinPool,jdk1.8版本新增

使用线程池

创建全局线程池类

package com.alan.module.demo.concurrent;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 定制化全局线程池
 * 最大容量为2000
 *
 * @author jiangfengan
 */
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private final static int systemSize = Runtime.getRuntime().availableProcessors();
    private final static int corePoolSize = systemSize + 1;
    private final static int maximumPoolSize = systemSize * 2;
    private final static long keepAliveTime = 1L;
    private final static TimeUnit timeUnit = TimeUnit.MILLISECONDS;
    private final static int MAX_CAPACITY = 2000;
    private final static BlockingQueue workQueue = new LinkedBlockingQueue(MAX_CAPACITY);

    private final static MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(corePoolSize
            , maximumPoolSize, keepAliveTime, timeUnit, workQueue, new MyThreadFactory(), new MyRejectedExecutionHandler());

    private MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit
            , BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
    * 静态工厂方法构建单例对象
    */
    public static MyThreadPoolExecutor getInstance() {
        return myThreadPoolExecutor;
    }

    /**
     * 定制化线程工厂
     * 仅设置名字可以直接使用 new ThreadFactoryBuilder().setNameFormat("my-pool-").get()
     */
    static class MyThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        MyThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "my-pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    /**
     * 定制化拒绝策略处理类
     */
    static class MyRejectedExecutionHandler implements RejectedExecutionHandler {

        public MyRejectedExecutionHandler() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            throw new RejectedExecutionException(String.format("任务%s被拒绝%s", r.toString(), executor.toString()));
        }
    }
}

提交任务

//无返回值提交
executor.execute(()-> System.out.println("Hello"));

//有返回值提交
executor.submit(()-> System.out.println("Hello"));

//有返回值提交
executor.submit(()-> "Hello");

关闭线程池

//平缓的关闭线程池,同时等待已提交的任务执行完成,对于新提交的任务提交到RejectExecutionHandler
executor1.shutdown();
//暴力的关闭线程池,同时取消所有正在执行的任务,并且不再处理工作队列中的任务    
executor1.shutdownNow();

线程池源码分析

整体流程

1. 提交任务到线程池

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    //1. Runnable转换为FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //2. 执行FutureTask
    execute(ftask);
    //3. 返回结果
    return ftask;
}
public void execute(Runnable command) {
    //检验任务是否为空
    if (command == null)
        throw new NullPointerException();
    //获取线程池
    int c = ctl.get();
    //1. 当前工作线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //创建一个核心线程,执行任务
        if (addWorker(command, true))
            return;
        //若创建核心线程失败,则重新获取线程池,到后续逻辑中处理
        c = ctl.get();
    }
    //2. 当前核心线程数已满
    //如果是运行状态,将工作任务添加到工作队列中
    if (isRunning(c) && workQueue.offer(command)) {
        //再次获取线程池
        int recheck = ctl.get();
        //若此时线程池已不是运行状态,则需要进行回滚入队操作
        if (! isRunning(recheck) && remove(command))
            //回滚成功后执行拒绝策略
            reject(command);
        //若线程池正常运行,查看工作线程数是否为0
        else if (workerCountOf(recheck) == 0)
            //当工作线程数全部为0时,创建一个非核心线程
            addWorker(null, false);
    }
    //3 工作队列已满,尝试创建非核心线程处理任务
    else if (!addWorker(command, false))
        //创建非核心线程失败,说明线程池已满,执行饱和策略
        reject(command);
}

2. 创建线程执行任务

private boolean addWorker(Runnable firstTask, boolean core) {
    //1. 工作线程数加1
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //2. 开始创建线程执行任务
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                //添加任务到实例
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateExceptio();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //添加工作成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //3. 启动线程执行任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3. 线程执行任务

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //获取任务成功执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //中断校验
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            //执行任务
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

4. 获取任务

private Runnable getTask() {

    boolean timedOut = false; // Did the last poll() time out?

    //线程回收条件:1.若线程池已停止;2.线程池已关闭,工作队列为空;3.工作线程已超出最大线程数;4.工作线程在等待任务时超时,不是最后一个线程
    for (;;) {
        //获取线程池和线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);

        //若线程池已停止 或 线程池已关闭,工作队列为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //回收线程,获取任务为null
            decrementWorkerCount();
            return null;
        }

        //获取工作线程数
        int wc = workerCountOf(c);
        //获取线程是否需要回收标识(设置为需要回收的、超过核心线程数的;true表示需要回收)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        //( 工作线程已超出最大线程数 或者 空闲线程)并且(不是最后一个线程 或者 是最后一个线程且工作队列已空)
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            //尝试回收线程,获取任务为null
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        //获取任务:需要回收的线程执行poll方法获取任务,不需要回收的线程执行take方法获取任务
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;//获取任务超时,说明当前没有任务需要处理,线程空闲
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}