阻塞队列一般用来解决生产消费者模式的上的不平衡导致的问题。

例如:消息队列中就会存在这样的情况,根据指定不同阻塞队列实现不同的消息处理模式。

首先我们简单了解一下几种常见的阻塞队列。

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

这里先介绍一下阻塞队列实现。

阻塞队列的底层结构一共分为两种,

一种是数组实现例如:ArrayBlockingQueue

一种是链表实现例如:LinkedBlockingDeque

这两个的区别也主要是有数据结构决定他们的使用场景。

但是两种阻塞队列的阻塞方法是一致的。也是就是通过 lock 实现,底层均是 ReentrantLock。

代码部分如下:

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

由于数组的初始化是需要指定一个长度的,所以这也就是为什么ArrayBlockingQueue的初始化需要一个指定数值。

初始化代码如下:

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

而相对的LinkedBlockingDeque的初始化就简单粗暴的多,new一个就可以,原因也是初始化的时候他是如下过程。

初始化代码如下:


/**
* The capacity bound, or Integer.MAX_VALUE if none
*/

  private final int capacity;
***********************************************************************************

  /** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }

所以可以看见LinkedBlockingDeque使用的时候最好还是给定一个预计的估计值为好,否则你的队列就需要到达Integer.MAX_VALUE才会被阻塞在外了,

当然这可以更具场景使用,有些场景确实不需要。

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。

试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。

同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。

同时LinkedBlockingDeque是有四把锁的代码如下:


/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

只有队列的大小等于上限或者下限(0)时,才调用fullyLock方法。如果队列的大小既不等于上限,也不等于下限,

任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

所以要注意根据现实场景设置好自己最大阻塞长度。

ps:以上代码取自jdk1.8