堵塞队列

简介

  • def:在多线程中实现高效、安全的数据传输,主要是通过一个共享的队列,使得数据能够从一端输入,从另一端输出
  • 当队列是空的,取数据的线程就会被堵塞,直到其他线程往空的队列中添加数据
  • 当队列是满的,放数据的线程就会被堵塞,直到其他线程移除数据
  • 使用堵塞队列能够实现简化线程之间的协作,使用生产者消费者模型而不必实现线程间的同步和协作

堵塞队列的分类

1 ArrayBlockingQueue:由数组结构组成的有界堵塞队列
2 LinkedBlockingQueue:由链表结构组成的有界堵塞队列
  • 大小默认值为Integer.MAX_VALUE
3 DelayQueue:使用优先级队列实现的延迟无界的堵塞队列
4 PriorityBlockingQueue:支持优先级排序的无界堵塞队列
5 SynchronousQueue:不存储元素的堵塞队列,也即是单个元素的队列
6 LinkedTransferQueue:由链表结构组成的无界堵塞队列
7 LinkedTransferDeque:由链表结构组成的双向堵塞队列

堵塞队列的常用方法

image-20230308162343199

Thread Pool线程池

简介

  • 线程池是一种线程使用模式,线程池维护者多个线程,可以避免在执行短时间任务创建和销毁线程的代价。
  • 线程池能够保证内核的充分利用,防止过分调度
  • 线程池的主要工作是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建之后启动这些任务,当任务的数量超过线程数量时,就会将任务放入队列排队等候,等其他线程执行完毕,再从队列中取出任务执行

线程池的特点

  • 降低资源消耗:重复利用自己创建的线程减少创建、销毁的资源消耗
  • 提高响应速度:任务到达的时候,不需要等待线程创建就能直接执行
  • 提高可管理型

线程池的使用

image-20230308165250684

  • Executors.newFilexedThreadPool(int nThreads),创建指定线程数量的线程池

    ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
    try {
        for (int i = 0; i < 10; i++) {
            threadPool1.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is working...");
            });
        }
    } catch (Exception e) {
    
    } finally {
        threadPool1.shutdown();
    }
    

    pool-1-thread-3 is working...
    pool-1-thread-3 is working...
    pool-1-thread-3 is working...
    pool-1-thread-3 is working...
    pool-1-thread-3 is working...
    pool-1-thread-3 is working...
    pool-1-thread-5 is working...
    pool-1-thread-4 is working...
    pool-1-thread-1 is working...
    pool-1-thread-2 is working...

    Process finished with exit code 0

    只会使用线程池中创建的五个线程去执行任务

  • Executors.newWorkStealingPool(),创建只含有一个线程的线程池

    public static void main(String[] args) {
        ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
        try {
            for (int i = 0; i < 10; i++) {
                threadPool1.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " is working...");
                });
            }
        } catch (Exception e) {
    
        } finally {
            threadPool1.shutdown();
        }
    
    }
    

    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...
    pool-1-thread-1 is working...

    Process finished with exit code 0

  • Executors.newCachedThreadPool(),创建可扩容的线程池

    public static void main(String[] args) {
        ExecutorService threadPool1 = Executors.newCachedThreadPool();
        try {
            for (int i = 0; i < 10; i++) {
                threadPool1.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " is working...");
                });
            }
        } catch (Exception e) {
    
        } finally {
            threadPool1.shutdown();
        }
    }
    

    pool-1-thread-3 is working...
    pool-1-thread-9 is working...
    pool-1-thread-4 is working...
    pool-1-thread-1 is working...
    pool-1-thread-6 is working...
    pool-1-thread-7 is working...
    pool-1-thread-5 is working...
    pool-1-thread-8 is working...
    pool-1-thread-2 is working...
    pool-1-thread-10 is working...

三种线程池本质都是使用的:

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>())

线程池的七个参数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ...
    }
  1. corePoolSize:核心常驻线程的数量
  2. maximumPoolSize:线程池最大线程数量
  3. keepAliveTime:非核心线程的存活时间
  4. unit:存活时间的单位
  5. workQueue:任务数量超过常驻线程的堵塞队列
  6. threadFactory:线程工厂
  7. handler:线程池拒接策略,即任务数量超过线程池最大线程数量的处理策略

线程池的工作流程和拒绝策略

工作流程

image-20230309141547008

  • 主线程首先通过线程池的execute()方法创建线程
  • 线程数量达到常驻线程的数量,任务就会被放入堵塞队列中
  • 如果堵塞队列也满了,主线程又通过execute()方法创建线程的时候,线程池就会为此任务创建线程并分配(而不是堵塞队列?这里很疑惑)
  • 堵塞队列满了,线程池也满了的时候,主线程再次执行execute()方法,线程池就会去执行拒绝策略
拒绝策略
  1. AbortPolicy(默认):直接抛出RejectExecutionException异常
  2. CallerRunsPolicy:将任务回退给调用者,以降低新任务的流量
  3. DisgardOldestPolicy:抛弃队列中等待最久的任务,而将当前任务添加到队列,尝试再次提交
  4. DiscardPolicy:装死

自定义线程池

为什么要自定义线程池

  1. FixedThreadPool和SingleThreadPool允许的请求队列的大小为Integer.MAX_VALUE,容易造成请求堆积,从而OOM
  2. CachedThreadPool则是允许创建的最大线程数为Integer.MAX_VALUE,会创建大量的线程,从而OOM
public static void main(String[] args) {
    var threadPool = new ThreadPoolExecutor(2,
            5,
            5,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    for (int i = 0; i < 10; i++) {
        threadPool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " is working.");
        });
    }
}
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hikaru.juc.threadPool.ThreadPoolDemo$$Lambda$14/0x00000008010031f0@7e6cbb7a rejected from java.util.concurrent.ThreadPoolExecutor@4769b07b[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
	at com.hikaru.juc.threadPool.ThreadPoolDemo.main(ThreadPoolDemo.java:15)
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-1 is working.
pool-1-thread-5 is working.
pool-1-thread-3 is working.
pool-1-thread-2 is working.

由于超过了最大线程数5 + 堵塞队列 3,触发了拒绝策略抛出了异常