Skip to content

非阻塞队列

并发编程中,需要用到安全的队列,实现安全队列可以使用 2 种方式:

  • 加锁,这种实现方式是阻塞队列
  • 使用循环 CAS 算法实现,这种方式是非阻塞队列

ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序,当添加一个元素时,会添加到队列的尾部,当获取一个元素时,会返回队列头部的元素

补充:ConcurrentLinkedDeque 是双向链表结构的无界并发队列

ConcurrentLinkedQueue 使用约定:

  1. 不允许 null 入列
  2. 队列中所有未删除的节点的 item 都不能为 null 且都能从 head 节点遍历到
  3. 删除节点是将 item 设置为 null,队列迭代时跳过 item 为 null 节点
  4. head 节点跟 tail 不一定指向头节点或尾节点,可能存在滞后性

ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点由节点元素和指向下一个节点的引用组成,组成一张链表结构的队列

java
private transient volatile Node<E> head;
private transient volatile Node<E> tail;

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    //.....
}

构造方法

  • 无参构造方法:

    java
    public ConcurrentLinkedQueue() {
        // 默认情况下 head 节点存储的元素为空,dummy 节点,tail 节点等于 head 节点
        head = tail = new Node<E>(null);
    }
  • 有参构造方法

    java
    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        // 遍历节点
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                // 单向链表
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

入队方法

与传统的链表不同,单线程入队的工作流程:

  • 将入队节点设置成当前队列尾节点的下一个节点
  • 更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,所以 tail 节点不总是尾节点,存在滞后性
java
public boolean offer(E e) {
    checkNotNull(e);
    // 创建入队节点
    final Node<E> newNode = new Node<E>(e);
    
    // 循环 CAS 直到入队成功
    for (Node<E> t = tail, p = t;;) {
        // p 用来表示队列的尾节点,初始情况下等于 tail 节点,q 是 p 的 next 节点
        Node<E> q = p.next;
        // 条件成立说明 p 是尾节点
        if (q == null) {
            // p 是尾节点,设置 p 节点的下一个节点为新节点
            // 设置成功则 casNext 返回 true,否则返回 false,说明有其他线程更新过尾节点,继续寻找尾节点,继续 CAS
            if (p.casNext(null, newNode)) {
                // 首次添加时,p 等于 t,不进行尾节点更新,所以尾节点存在滞后性
                if (p != t)
                    // 将 tail 设置成新入队的节点,设置失败表示其他线程更新了 tail 节点
                    casTail(t, newNode); 
                return true;
            }
        }
        else if (p == q)
            // 当 tail 不指向最后节点时,如果执行出列操作,可能将 tail 也移除,tail 不在链表中 
            // 此时需要对 tail 节点进行复位,复位到 head 节点
            p = (t != (t = tail)) ? t : head;
        else
            // 推动 tail 尾节点往队尾移动
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

图解入队:

An image


An image


An image

当 tail 节点和尾节点的距离大于等于 1 时(每入队两次)更新 tail,可以减少 CAS 更新 tail 节点的次数,提高入队效率

线程安全问题:

  • 线程 1 线程 2 同时入队,无论从哪个位置开始并发入队,都可以循环 CAS,直到入队成功,线程安全
  • 线程 1 遍历,线程 2 入队,所以造成 ConcurrentLinkedQueue 的 size 是变化,需要加锁保证安全
  • 线程 1 线程 2 同时出列,线程也是安全的

出队方法

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用,并不是每次出队都更新 head 节点

  • 当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点
  • 当 head 节点里没有元素时,出队操作才会更新 head 节点

批处理方式可以减少使用 CAS 更新 head 节点的消耗,从而提高出队效率

java
public E poll() {
    restartFromHead:
    for (;;) {
        // p 节点表示首节点,即需要出队的节点,FIFO
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // 如果 p 节点的元素不为 null,则通过 CAS 来设置 p 节点引用元素为 null,成功返回 item
            if (item != null && p.casItem(item, null)) {
                if (p != h) 
                    // 对 head 进行移动
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 逻辑到这说明头节点的元素为空或头节点发生了变化,头节点被另外一个线程修改了
            // 那么获取 p 节点的下一个节点,如果 p 节点的下一节点也为 null,则表明队列已经空了
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // 第一轮操作失败,下一轮继续,调回到循环前
            else if (p == q)
                continue restartFromHead;
            // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
            else
                p = q;
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        // 将旧结点 h 的 next 域指向为 h,help gc
        h.lazySetNext(h);
}

在更新完 head 之后,会将旧的头结点 h 的 next 域指向为 h,图中所示的虚线也就表示这个节点的自引用,被移动的节点(item 为 null 的节点)会被 GC 回收

An image


An image


An image

如果这时,有一个线程来添加元素,通过 tail 获取的 next 节点则仍然是它本身,这就出现了p == q 的情况,出现该种情况之后,则会触发执行 head 的更新,将 p 节点重新指向为 head

成员方法

  • peek():会改变 head 指向,执行 peek() 方法后 head 会指向第一个具有非空元素的节点

    java
    // 获取链表的首部元素,只读取而不移除
    public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    // 更改h的位置为非空元素节点
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
  • size():用来获取当前队列的元素个数,因为整个过程都没有加锁,在并发环境中从调用 size 方法到返回结果期间有可能增删元素,导致统计的元素个数不精确

    java
    public int size() {
        int count = 0;
        // first() 获取第一个具有非空元素的节点,若不存在,返回 null
        // succ(p) 方法获取 p 的后继节点,若 p == p.next,则返回 head
        // 类似遍历链表
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // 最大返回Integer.MAX_VALUE
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }
  • remove():移除元素

    java
    public boolean remove(Object o) {
        // 删除的元素不能为null
        if (o != null) {
            Node<E> next, pred = null;
            for (Node<E> p = first(); p != null; pred = p, p = next) {
                boolean removed = false;
                E item = p.item;
                // 节点元素不为null
                if (item != null) {
                    // 若不匹配,则获取next节点继续匹配
                    if (!o.equals(item)) {
                        next = succ(p);
                        continue;
                    }
                    // 若匹配,则通过 CAS 操作将对应节点元素置为 null
                    removed = p.casItem(item, null);
                }
                // 获取删除节点的后继节点
                next = succ(p);
                // 将被删除的节点移除队列
                if (pred != null && next != null) // unlink
                    pred.casNext(p, next);
                if (removed)
                    return true;
            }
        }
        return false;
    }