Java BlockingQueue 简介


1 简介

在 Java 中容器主要有 2 个大类 Collection 和 Map, 其中 Collection 主用用于数据的直接存储 (Map 则是一种键值对的存储方式, 除了要存储的数据外, 还需要有一个 key 和数据建立一个映射关系)。
Collection 使用的最多的大概就是 List, Set 和 Queue。
而在日常的开发中, 会根据不同数据的特性, 比如是否可重复, 是否需要有序等条件选择不同的集合, 同时还会考虑其他的因素, 比如线程是否安全。

而本文的主要是对基于 Queue 派生的另一个线程安全的接口 – BlockingQueue 做个简单的介绍。

在经典的“生产者-消费者”问题中, 队列通常被视为线程间操作的数据容器。这样一来, 我们能够将各个模块的业务功能解耦。生产者负责将其“生产”出来的数据放置在数据容器中, 而消费者则仅需在“数据容器”中获取数据。这种设计使得生产者线程和消费者线程能够实现解耦, 各自专注于自身的业务功能

而本篇主要对基于 Queue 派生的另一个线程安全的接口 BlockingQueue 做一个简单的介绍。

阻塞队列 (BlockingQueue) 被广泛使用在 “生产者-消费者” 问题中, 作为线程间操作的数据容器, 使得各个模块的业务功能进行解耦。生产者专注生产数据, 然后投递到队列中即可, 而消费者则从队列中获取数据, 然后专注消费数据。

BlockingQueue 能够高频的应用于 “生产者-消费者” 其原因是它提供了可阻塞的插入和移除的方法。
当队列容器已满, 生产者线程会被阻塞, 直到队列未满;当队列容器为空时, 消费者线程会被阻塞, 直至队列非空时为止

2 BlockingQueue 的基本操作

几种不同的场景的新增删除和检查方法列表

Throws Excepiton Special Value Blocks Times Out
insert add(e) offer(e) put(e) offer(e, time, unit)
remove remove() poll() take() poll(time, unit)
examine element() peek() not applicable not applicable

新增数据

  1. add(E e): 往队列插入数据, 当队列满时, 插入元素时会抛出 IllegalStateException 异常
  2. offer(E e): 当往队列插入数据时, 插入成功返回 true, 否则返回 false, 当队列满时不会抛出异常
  3. put(E e): 当阻塞队列容量已经满时, 往阻塞队列插入数据的线程会被阻塞, 直至阻塞队列已经有空余的容量可供使用
  4. offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时, 同样会阻塞插入数据的线程, 直至阻塞队列已经有空余的地方, 与 put 方法不同的是, 该方法会有一个超时时间, 若超过当前给定的超时时间, 插入数据的线程会退出

删除元素

  1. remove(Object o):从队列中删除数据, 成功则返回 true, 否则为 false
  2. poll(): 删除队列中第一个的数据, 当队列为空时, 返回 null
  3. take():当阻塞队列为空时, 获取队头数据的线程会被阻塞
  4. poll(long timeout, TimeUnit unit): 当阻塞队列为空时, 获取数据的线程会被阻塞, 另外, 如果被阻塞的线程超过了给定的时长, 该线程会退出

查看元素

  1. element(): 获取队头元素, 如果队列为空时则抛出 NoSuchElementException 异常
  2. peek():获取队头元素, 如果队列为空, 返回 null

3 常用的 BlockingQueue

实现 BlockingQueue 接口的有

  1. ArrayBlockingQueue
  2. LinkedBlockingQueue
  3. PriorityBlockingQueue
  4. SynchronousQueue
  5. LinkedTransferQueue
  6. DelayQueue
  7. LinkedBlockingDeque

而这几种常见的阻塞队列也是在实际编程中会常用的, 下面对这几种常见的阻塞队列进行说明。

3.1 ArrayBlockingQueue

特点

  1. ArrayBlockingQueue 一旦创建, 容量不能改变
  2. ArrayBlockingQueue 是由数组实现的有界阻塞队列, 该队列里面的元素满足 FIFO (先进先出), 因此, 头元素在队列中存在时间最长, 而队尾数据则是当前队列最新的数据元素。
  3. ArrayBlockingQueue 可作为 “有界数据缓冲区”, 生产者插入数据到队列容器中, 并由消费者提取。
  4. 当队列容量满时, 尝试将元素放入队列将导致操作阻塞, 同理尝试从一个空队列中取一个元素也会同样阻塞。

ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性。
所谓公平性是指严格按照线程等待的绝对时间顺序, 即最先等待的线程能够最先访问到 ArrayBlockingQueue 中的数据。
而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序。有可能存在, ArrayBlockingQueue 从无数据变为有数据, 即可被访问时, 长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。
如果需要保证公平性, 可以在声明 ArrayBlockingQueue 的时候指明公平锁, 但是这通常会降低吞吐量。

ArrayBlockingQueue 的声明:

// true 公平竞争  false 非公平竞争
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(容量大小,true); 

3.2 LinkedBlockingQueue

LinkedBlockingQueue 是用链表实现的有界阻塞队列, 同样满足 FIFO 的特性, 与 ArrayBlockingQueue 相比起来具有更高的吞吐量, 为了防止 LinkedBlockingQueue 容量迅速增, 损耗大量内存。
通常在创建 LinkedBlockingQueue 对象时, 会指定其大小, 如果未指定, 容量等于 Integer.MAX_VALUE。

3.3 PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。
默认情况下元素采用自然顺序进行排序, 也可以通过自定义类实现 compareTo() 方法来指定元素排序规则, 或者初始化时通过构造器参数 Comparator 来指定排序规则。

3.4 SynchronousQueue

SynchronousQueue 每个插入操作必须等待另一个线程进行相应的删除操作, 因此, SynchronousQueue 实际上没有存储任何数据元素, 因为只有线程在删除数据时, 其他线程才能插入数据。
同样的, 如果当前有线程在插入数据时, 线程才能删除数据。SynchronousQueue 也可以通过构造器参数来为其指定公平性。

3.5 LinkedTransferQueue

LinkedTransferQueue 是一个由链表数构成的无界阻塞队列, 由于该队列实现了 TransferQueue 接口, 与其他阻塞队列相比主要有以下不同的方法

transfer(E e): 如果当前有线程 (消费者) 正在调用 take() 方法或者可延时的 poll() 方法进行消费数据时, 生产者线程可以调用这个方法将数据传递给消费者线程。
如果当前没有消费者线程的话, 生产者线程就会将数据插入到队尾, 直到有消费者能够进行消费才能退出

tryTransfer(E e): 如果当前有线程 (消费者) 正在调用 take() 方法或者可延时的 poll() 方法进行消费数据时, 生产者线程可以调用这个方法将数据立即传送给消费者线程, 如果当前没有消费者线程消费数据的话, 就立即返回 false。

因此, 与 transfer 方法相比, transfer 方法是必须等到有消费者线程消费数据时, 生产者线程才能够返回。而 tryTransfer 方法能够立即返回结果退出。

tryTransfer(E e,long timeout,imeUnit unit): 与 transfer 基本功能一样, 只是增加了超时特性, 如果数据到了规定的超时时间内没有消费者进行消费的话, 就返回 false。

3.6 DelayQueue

DelayQueue 是一个存放实现 Delayed 接口的数据的无界阻塞队列, 只有当数据对象的延时时间达到时才能插入到队列进行存储。如果当前所有的数据都还没有达到创建时所指定的延时期,
则队列没有队头, 并且线程通过 poll() 等方法获取数据元素则返回 null。

所谓数据延时期满时, 则是通过 Delayed 接口的 long getDelay(TimeUnit unit) 进行判定,
如果该方法返回的是小于等于 0 则说明该数据元素的延时期已满。

3.7 LinkedBlockingDeque

LinkedBlockingDeque 是基于链表实现的有界阻塞双端队列, 如果在创建对象时为指定大小时, 其默认大小为 Integer.MAX_VALUE。
与 LinkedBlockingQueue 相比, 主要的不同点在于, LinkedBlockingDeque 具有双端队列的特性。 所以其提供的增删操作的方法有些不同。

头部操作

Throws Excepiton Special Value Blocks Times Out
insert addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
remove removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
examine getFirst() peekFirst() not applicable not applicable

尾部操作

Throws Excepiton Special Value Blocks Times Out
insert addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
remove removeLast() pollLast() takeLast() pollLast(time, unit)
examine getLast() peekLast() not applicable not applicable

LinkedBlockingDeque 的基本操作可以分为四种类型:

  1. 特殊情况, 抛出异常
  2. 特殊情况, 返回特殊值如 null 或者 false
  3. 当线程不满足操作条件时, 线程会被阻塞直至条件满足
  4. 操作具有超时特性

另外, LinkedBlockingDeque 实现了 BlockingDueue 接口, 而 LinkedBlockingQueue 实现的是 BlockingQueue, 这两个接口的主要区别如下

Insert

Throws Excepiton Special Value Blocks Times Out
BlockingDeque addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
BlockingQueue add(e) offer(e) put(e) offer(e, time, unit)

Remove

Throws Excepiton Special Value Blocks Times Out
BlockingDeque removeFist() pollFirst() takeFirst() pollFirst(time, unit)
BlockingQueue remove() poll() take(e) poll(e, time, unit)

Examine

Throws Excepiton Special Value Blocks Times Out
BlockingDeque getFirst() peeKFirst() not applicable not applicable
BlockingQueue element() peek() not applicable not applicable

从上可以看出, 两个接口的很多功能是可以等价使用的, 比如 BlockingQueue 的 add 方法和 BlockingDeque 的 addLast 方法的功能是一样的。


  目录