java 线程池默认提供了几种拒绝策略:

这几个策略都实现了RejectedExecutionHandler,拿DiscardOldestPolicy来说,查看源码:

核心代码只有2行:

  • e.getQueue().poll() 从列表里弹出1个(最早的)任务,以便让队列空出1个位置
  • e.execute(r) 新任务放入队列执行

从这段代码来看,如果有任务被丢弃(即:从队列里弹出了),不会有任何报错,也没有日志可查,实际使用中不太方便监控这种情况。

 

我们可以参考这段源码,自定义策略:

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class CustomDiscardPolicy implements RejectedExecutionHandler {

    //额外传入1个名称,方便打日志或埋点监控时,定位问题
    private String factoryName = "";

    public CustomDiscardPolicy(String factoryName) {
        this.factoryName = factoryName;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            Runnable poll = e.getQueue().poll();
            //这里可以加一些自己的处理(比如:埋点监控)
            System.err.println("[" + this.factoryName + "]task will be discard:" + poll);
            e.execute(r);
        }
    }
}

当然,这里出于演示目的,只打了一行错误信息,实际应用中大家可以埋点发到kafka之类(以便后续做实时监控预警)。

测试一下:

    @Test
    public void testThreadPool() throws InterruptedException {
        final ThreadFactory DEMO_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("demo-POOL-%d").build();

        final ExecutorService DEMO_POOL = new ThreadPoolExecutor(1, 2, 300L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(5), DEMO_THREAD_FACTORY, new CustomDiscardPolicy("demo-POOL"));

        for (int i = 0; i < 10; i++) {
            DEMO_POOL.submit(() -> {
                try {
                    System.out.println(Thread.currentThread().getId() + " ready!");
                    //假设线程干活,需要一段时间
                    Thread.sleep(500);
                    System.out.println("\t" + Thread.currentThread().getId() + " done!");
                } catch (Exception e) {
                }
            });
        }
        //等一会儿,让线程池都跑完,再结束main
        Thread.sleep(10000);
    }

提交了10个任务,线程池必然饱和(10>2+5),会丢弃一些早期任务,输出如下:

从输出看,丢了3个任务,符合预期。