Java LinkedBlockingDeque


在开始介绍 LinkedBlockingDeque 之前, 我们先看一下 LinkedBlockingDeque 的类图:

Alt 'LinkedBlockingDeque 类图'

从其中可以看出他直接实现了 BlockingDeque 接口, 而 BlockingDeque 又实现了 BlockingQueue 的接口, 所以它本身具备了队列的特性。
而实现 BlockingDeque 使其在 BlockingQueue 的基础上多了 Deque 的功能。

什么是 Deque 呢?
Deque(双端队列)是一种具有两端插入和删除操作的数据结构, 允许在队列的前端和后端执行高效的操作。
Deque 是“双端队列”(Double-Ended Queue)的缩写, 其特点在于可以在两端同时进行元素的插入和删除操作, 这使得它在许多应用中具有卓越的灵活性和性能。

我们知道队列只能是头进尾出, 而 Deque 2 边都支持进出, 这个特性更加的灵活。

实现了 BlockingDeque 的 LinkedBlockingDeque 其本身具备了以下特点

  1. 不支持 null 元素: 存入 null 元素会抛出异常
  2. 线程安全性: 在内部中通过 ReentrantLock 保证多线程环境下的安全访问
  3. 无界容量: 在不指定容量的情况下, 默认为 Integer.MAX_VALUE, 理论上的无限容量
  4. 阻塞操作: 当 LinkedBlockingDeque 的操作尝试在空队列中执行移除操作或在满队列中执行插入操作时, 它会导致操作的阻塞

1 实现的数据结构

内部的实现结构就是一个链表, 但是为了支持 2 端都可以出入队操作, 所以是一个双向链表。

2 源码分析

2.1 LinkedBlockingDeque 链表节点的定义

我们知道 LinkedBlockingDeque 的底层实现结构就是一个链表, 而链表绕不开的一个概念就是节点, 所以我们先来看一下 LinkedBlockingDeque 的节点定义。

public class LinkedBlockingDeque<E> {

    /**
     * 节点类, 用于存储数据
     */
    static class Node<E> {

        // 存储的内容
        E item;

        // 上一个节点的指针
        Node<E> prev;

        // 下一个节点的指针
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
}

上面的 Node 就是队列中的节点了, 声明的属性

item: 存储的内容
prev: 上一个节点的指针
next: 下一个节点指针

可以看出 LinkedBlockingDeque 是一个具备双向操作的链表

2.2 LinkedBlockingQueue 持有的属性


public class LinkedBlockingDeque<E> {

    /** 队列的第一个元素的节点, 也就是头节点*/
    transient Node<E> first;

    /** 队列的最后一个元素节点, 也就是尾节点 */
    transient Node<E> last;

    /** 当前队列中的元素个数 */
    private transient int count;

    /** 队列的大小, 默认为 Integer.MAX_VALUE */
    private final int capacity;

    /** 所有操作队列元素节点时使用的锁 */
    final ReentrantLock lock = new ReentrantLock();

    // 非空, 也就是数组中重新有数据了, 可以继续取数据了
    // 当某个线程尝试从当前的队列中获取元素时, 如果数组中没有数据, 会把这个线程放到这个等待条件中
    // 在另外一个线程中添加元素到数组中, 会唤醒阻塞在这个等待条件中的线程
    private final Condition notEmpty = lock.newCondition();

    // 非满, 也就是数组中的数据又没有达到上限了, 可以继续添加数据
    // 当某个线程尝试向当前的队列添加元素, 但是当前数组已经满了, 会把这个线程放到这个等待条件中
    // 在另一个线程中从当前队列中获取一个元素时, 会唤醒等待在这个等待队列中的线程
    private final Condition notFull = lock.newCondition();

}

通过上面属性的声明, 可以发现 LinkedBlockingDeque 和 ArrayBlockingQueue 类似, 都是通过一个可重入锁控制并发, 同时通过 2 个 Condition 实现在队列空或满时挂起或唤醒线程。

再来看表示队列容量的 count, 就是简单的用一个 int 表示, 没有用 volatile 修饰, 可以保证线程的可见性吗?
在 LinkedBlockingQueue 容量的大小却是的用 AtomicInteger 来表示, 为什么不是同样简单用一个 int 呢?

这 2 个问题都可以用 Happens-Before 的监视器原则来解释。
在 LinkedBlockingDeque 中只用了一把锁, 同时 count 的修改也只有在线程持有锁的过程中变更, 所以能够保证可见性, 而监视器原则,
确保了同一把锁的解锁 Happens-Before 加锁, 也就是说在 A 线程解锁之前的操作对于 B 线程获取同一把锁后都是可见的。
而 LinkedBlockingQueue 中使用了 2 把锁, 一个用于添加元素, 一个用于移除元素, 所以 2 把锁之间的加锁和解锁不能保证可见性, 所以使用了 AtomicInteger, 利用其内部的 CAS 操作来保证可见性。

2.3 LinkedBlockingQueue 构造函数

public class LinkedBlockingDeque<E> {

    // 无参构造函数
    public LinkedBlockingDeque() {
        // 调用自身指定容量的构造函数, 默认为 int 的最大值
        this(Integer.MAX_VALUE);
    }

    // 指定容量的构造函数
    public LinkedBlockingDeque(int capacity) {
        // 指定的容量小于等于 0, 直接抛出异常
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    // 指定一个集合的构造函数
    public LinkedBlockingDeque(Collection<? extends E> c) {

        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            for (E e : c) {
                // 元素不能为空
                if (e == null)
                    throw new NullPointerException();
                // 把当前的数据封装为 Node 节点, 放到了队列的尾部, 添加失败就抛出异常
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }

    private boolean linkLast(Node<E> node) {
        // 队列中的元素个数大于等于容量上限了, 直接返回 false
        if (count >= capacity)
            return false;
        // 获取尾节点
        Node<E> l = last;
        // 新增节点的上一个节点为原本的尾节点
        node.prev = l;
        // 当前的尾节点等于新增的节点
        last = node;
        // 头节点为空, 原本链表没有数据, 新增节点就是头节点
        if (first == null)
            first = node;
        else
            // 头节点不为空, 原本链表有数据, 那么把原本的尾节点的下一个节点指向新增的节点
            l.next = node;
        // 队列中的元素个数 + 1
        ++count;
        // 队列有数据了, 唤醒阻塞在非空等待条件上的线程
        notEmpty.signal();
        return true;
    }
}

第一个构造函数无参, 那么创建的队列的容量就是 Integer.MAX_VALUE, 也就是理论上的无限容量。

第二个构造函数可以指定队列大小。

第三个构造函数传入一个集合, 这样也会创建出理论无限容量的队列。

同理里面使用了 ReentrantLock 来加锁, 然后把传入的集合元素按顺序一个个放入 items 中。
这里加锁目的不是使用它的互斥性, 而是让修改后的头节点 first 和尾节点 last 的变动, 对其他线程可见, 也就是使用了 ReentrantLock 的监视器原则

2.4 LinkedBlockingDeque 支持的方法

2.4.1 数据入队方法

LinkedBlockingDeque 提供了多种入队操作的实现来满足不同情况下的需求, 入队操作有如下几种:

  1. add(E e) / addFirst(E e) / addLast(E e)
  2. offer(E e) / offerFirst(E e) / offerLast(E e)
  3. offer(E e, long timeout, TimeUnit unit) / offerFirst(E e, long timeout, TimeUnit unit) / offerLast(E e, long timeout, TimeUnit unit)
  4. put(E e) / putFirst(E e) / putLast(E e)

add() 相关的方法


public class LinkedBlockingDeque<E> {
    
    
    public boolean add(E e) {
        // 调用到添加到尾部的方法
        addLast(e);
        return true;
    }

    public void addFirst(E e) {
        // 调用 offerFirst 的方法
        if (!offerFirst(e))
            throw new IllegalStateException("Deque full");
    }

    public void addLast(E e) {
        // 调用 offerLast 的方法
        if (!offerLast(e))
            throw new IllegalStateException("Deque full");
    }

}

add() 相关的方法都是直接调用 offer() 对应的方法, 比如 add 对应 offer, addFirst 对应 offerFirst, 所以我们只需要看 offer() 方法的实现就可以了。

offer() 相关的方法

public class LinkedBlockingDeque<E> {
    
    public boolean offer(E e) {
        // 调用自身的 offerLast 方法, 添加到队列的尾部
        return offerLast(e);
    }
    
    public boolean offerFirst(E e) {

        // 添加的数据不能为空
        if (e == null) 
            throw new NullPointerException();

        // 封装为 Node 节点    
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        try {
            // 添加到队列的头部
            return linkFirst(node);
        } finally {
            lock.unlock();
        }
    }

    public boolean offerLast(E e) {
        // 添加的数据不能为空
        if (e == null) 
            throw new NullPointerException();
            
        // 封装为 Node 节点  
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        try {
            // 添加到队列的尾部
            return linkLast(node);
        } finally {
            lock.unlock();
        }
    }

    private boolean linkFirst(Node<E> node) {
        // 节点个数大于等于容量上限了, 直接返回
        if (count >= capacity)
            return false;
        // 获取头节点    
        Node<E> f = first;
        // 新增的节点的下一个节点为当前的头节点
        node.next = f;
        // 当前的头节的设置为新增的节点
        first = node;

        // 当前的尾节点为空, 表示原本链表中没有数据, 把新增的节点同时设置为尾节点
        if (last == null)
            last = node;
        else
            // 为节点不为空, 表示原本链表中有数据了
            // 那么只需要把 旧的头节点的上一个节点指向新增的节点
            f.prev = node;
        // 队列中的元素个数 + 1    
        ++count;
        // 唤醒阻塞在非空等待条件中的线程
        notEmpty.signal();
        return true;
    }

    private boolean linkLast(Node<E> node) {
        if (count >= capacity)
            return false;
        // 获取尾结点    
        Node<E> l = last;
        // 新增节点的上一个节点为原本的尾节点
        node.prev = l;
        // 当前的尾节点等于新增的节点
        last = node;

        // 头节点为空, 原本链表没有数据
        if (first == null)
            // 将头节点设置为新增的节点
            first = node;
        else
            // 尾节点不为空, 原本队列有数据了, 把原本尾节点的下一个节点设置为新增的节点
            l.next = node;
        // 队列中的元素个数 + 1      
        ++count;
        // 唤醒阻塞在非空等待条件中的线程
        notEmpty.signal();
        return true;
    }
}

offer() 相关的方法的实现很简单, 最终就是调用到内部的 linkFirst() 和 linkLast() 方法, 而 2 个就是链表的节点新增操作。

而 offer() 相关的带超时时间的方法就不展开了, 原理和 LinkedBlockingQueue 一样, 利用了 Condition 的 awaitNanos 进行超时等待, 并在外面用 while 循环控制等待时的中断问题。

put() 相关的方法


public class LinkedBlockingDeque<E> {

    public void put(E e) throws InterruptedException {
        // 调用到 putLast 方法
        putLast(e);
    }

    public void putLast(E e) throws InterruptedException {
        // 添加的数据不能为空
        if (e == null) 
            throw new NullPointerException();

        // 将当前的数据封装为 Node 节点            
        Node<E> node = new Node<E>(e);

        final ReentrantLock lock = this.lock;
        lock.lock();
        
        try {
            // 阻塞等待 linkLast 成功
            while (!linkLast(node))
                // 添加到队列尾部失败的话, 将当前线程放到非满的等待条件, 等待唤醒后, 尝试添加元素到队列
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

    public void putFirst(E e) throws InterruptedException {
        
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 阻塞等待linkFirst成功
            while (!linkFirst(node))
                // 添加到队列尾部失败的话, 当前线程放到非满的等待条件, 等待唤醒后, 尝试添加元素到队列
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
}

lock 加锁后一直阻塞等待, 直到元素插入到队列中。
整体的方法和 offer 方法类似, 前者在当队列已满时, 进入阻塞, 后者在队列已满时, 则是返回。

2.3.2 数据出队方法

同入队的方法一样, 出队也有多种实现, LinkedBlockingDeque 提供了好几种出队的方法, 大体如下:

  1. remove() / removeFirst() /removeLast()
  2. poll() / pollFirst() / pollLast()
  3. poll(long timeout, TimeUnit unit) / pollFirst(long timeout, TimeUnit unit) / pollLast(long timeout, TimeUnit unit)
  4. take() / takeFirst() / takeLast()

remove() 相关的方法


public class LinkedBlockingDeque<E> {

    public E remove() {
        // 调用自身的 removeFirst 方法
        return removeFirst();
    }

    public E removeFirst() {
        // 调用自身的 pollFirst 方法
        E x = pollFirst();
        if (x == null) 
            throw new NoSuchElementException();
        return x;
    }

    public E removeLast() {
        // 调用自身的 pollLast 方法
        E x = pollLast();
        if (x == null) 
            throw new NoSuchElementException();
        return x;
    }
}

remove() 相关的方法都是调用到对应的 poll() 方法, 在拿到方法的返回值后, 做一层非空的判断。

poll() 相关的方法

public class LinkedBlockingDeque<E> {

    public E poll() {
        // 调用到自身的 pollFirst 方法
        return pollFirst();
    }

    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            // 带锁的执行 unlinkFirst 方法
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }

    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 带锁的执行 unlinkLast 方法
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }

    private E unlinkFirst() {
        Node<E> f = first;
        // 判断当前的头节点, 为空的话, 直接返回 null
        if (f == null)
            return null;
        // 获取头节点的下一个节点    
        Node<E> n = f.next;
        // 获取头节点的数据
        E item = f.item;
        // 置空头节点的数据
        f.item = null;
        // 把头节点的下一个节点指向自身
        f.next = f;

        // 把头节点的指针 first 指向当前头节点的下一个节点
        first = n;
        // 头节点的下一个节点为空
        if (n == null)
            // 置空尾结点
            last = null;
        else
            // 否则设置头节点的下一个节点的上一个节点为空
            n.prev = null;
        // 队列元素个数 - 1    
        --count;
        // 唤醒在 notFull 等待队列上的线程
        notFull.signal();
        return item;
    }

    private E unlinkLast() {
        Node<E> l = last;
        // 尾节点为空, 直接返回 null
        if (l == null)
            return null;

        // 获取尾节点的上一个节点
        Node<E> p = l.prev;
        // 获取尾节点的数据
        E item = l.item;
        // 置空尾节点的数据
        l.item = null;
        // 设置尾结点的上一个节点为自身
        l.prev = l;

        // 设置尾节点的指针 last 指向当前尾结点的上一个节点
        last = p;
        // 尾结点的上一个节点为空
        if (p == null)
            // 置空头节点
            first = null;
        else
            // 设置尾结点的上一个节点的下一个节点为空
            p.next = null;
        //   队列元素个数 - 1       
        --count;
        // 唤醒在 notFull 等待队列上的线程
        notFull.signal();
        return item;
    }

}

poll() 相关的方法的实现很简单, 最终就是调用到内部的 unlinkFirst() 和 unlinkLast() 方法, 而 2 个就是链表的节点删除操作。

而 poll() 相关的带超时时间的方法就不展开了, 原理和 LinkedBlockingQueue 一样, 利用了 Condition 的 awaitNanos 进行超时等待, 并在外面用 while 循环控制等待时的中断问题。

take() 相关的方法

public class LinkedBlockingDeque<E> {

    public E take() throws InterruptedException {
        // 调用到自身的 takeFirst 方法
        return takeFirst();
    }
    
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 通过 unlinkFirst 方法删除头节点, 
            // 删除失败, 即返回结果为 null, 将当前线程阻塞在非空等待条件上
            // 等待唤醒后重新执行删除
            while ((x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }

    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 通过 unlinkFirst 方法删除头节点, 
            // 删除失败, 即返回结果为 null, 将当前线程阻塞在非空等待条件上
            // 等待唤醒后重新执行删除
            while ((x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }

}

put() 方法一个套路, 先通过 lock 加锁, 然后 while 循环重试, 失败就 await 阻塞等待, 等待下次唤醒, 直至成功。

2.3.3 获取元素方法

LinkedBlockingDeque 提供了获取元素的方法主要有 2 个 element()peek() 方法, 他们的区别在于, element() 方法在队列为空时会抛出异常, 而 peek() 方法则是返回 null。


public class LinkedBlockingDeque<E> {

    public E element() {
        // 调用 getFirst() 方法
        return getFirst();
    }

    public E peek() {
        // 调用 peekFirst() 方法
        return peekFirst();
    }

    public E getFirst() {
        // 调用 getFirst 方法结果还是调用到 peekFirst() 方法
        E x = peekFirst();
        if (x == null) 
            throw new NoSuchElementException();
        return x;
    }

    public E getLast() {
        // 调用 getLast 方法结果还是调用到 peekLast() 方法
        E x = peekLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }

    public E peekFirst() {
        
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();

        try {
            // 头节点为空吗 ? 是的话返回 null, 否则返回头节点的数据
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }

    public E peekLast() {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            // 尾节点为空吗? 是的话返回 null, 否则返回尾节点的数据
            return (last == null) ? null : last.item;
        } finally {
            lock.unlock();
        }
    }
}

获取元素前加锁, 防止并发问题导致数据不一致, 然后利用 first 和 last 2 个节点直接可以获得元素。

2.3.4 删除元素方法


public class LinkedBlockingDeque<E> {

    public boolean remove(Object o) {
        // 调用自身的 removeFirstOccurrence 方法
        return removeFirstOccurrence(o);
    }

    public boolean removeFirstOccurrence(Object o) {
        // 从头开始遍历
        // 需要移除的节点为 null, 直接抛异常
        if (o == null) 
            return false;
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();

        try {
            // 从 first 向后开始遍历比较, 找到元素后调用 unlink 移除
            for (Node<E> p = first; p != null; p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    public boolean removeLastOccurrence(Object o) {
        // 从尾开始遍历
        if (o == null) 
            return false;
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            // 从 last 向前开始遍历比较, 找到元素后调用 unlink 移除
            for (Node<E> p = last; p != null; p = p.prev) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    void unlink(Node<E> x) {

        // 获取删除节点的上一个节点
        Node<E> p = x.prev;
        // 获取删除节点的下一个节点
        Node<E> n = x.next;

        // 需要删除的节点上一个节点为空, 需要删除的节点为头节点
        if (p == null) {
            unlinkFirst();
        } else if (n == null) {
            // 需要删除的节点的下一个节点为空, 需要删除对接的为尾节点
            unlinkLast();
        } else {
            // 把需要删除的节点的上一个节点的下一个节点的指针 指向 需要删除的节点的下一个节点
            p.next = n;
            // 把需要删除的节点的下一个节点的上一个节点的指针 指向 需要删除的节点的上一个节点
            n.prev = p;
            // 置空需要删除节点的数据
            x.item = null;
            // 队列中的元素个数 - 1
            --count;
            // 唤醒阻塞在 notFull 等待队列上的线程
            notFull.signal();
        }
    }
}

删除元素是从头 / 尾向两边进行遍历比较, 故时间复杂度为 O(n), 最后调用 unlink 把要移除元素的 prev 和 next 进行关联, 把要移除的元素从链中脱离, 等待下次 GC 回收。

3 总结

总体而言, LinkedBlockingDeque 的源码实现通过锁、条件等待和双向链表结构, 保证了在多线程环境中对队列的安全操作。
其本身可以作为一个双端队列使用, 也可以作为一个双端阻塞队列使用, 具有很好的灵活性。

4 转载

【细谈Java并发】谈谈LinkedBlockingDeque


  目录