DelayQueue详解

# DelayQueue详解

# 1、DelayQueue简介

DelayQueue 也是 Java 并发包(java.util.concurrent)中的一个特殊队列,用于在指定的延迟时间之后处理元素。

DelayQueue的一些关键特性:

  • 延迟元素处理:只有当元素的延迟时间到期时,元素才能被取出。使用 take 方法会阻塞直到有元素到期。
  • 无界队列:DelayQueue 是一个无界队列,这意味着它可以包含任意数量的元素(太多可能内存溢出)。
  • 元素排序:DelayQueue 中的元素按到期时间排序,最先到期的元素最早被取出。
  • 阻塞操作:take 方法会阻塞直到有元素到期,而 poll 方法可以在指定的时间内等待。

# 2、DelayQueue适用场景

DelayQueue 通常用于需要在未来某个时间点执行任务的场景。

  • 定时任务调度:可以用于实现定时任务调度系统,任务在特定的时间点被执行。
  • 缓存过期:实现缓存系统中的过期机制,当缓存项的过期时间到达时,将其从缓存中移除。
  • 限流控制:在某些系统中用于限流,限制某一操作在指定时间段内的执行频率。

我在生产上使用过这个队列,主要是做延时执行任务。
当时场景是一个工作流审批节点创建时的回调方法中需要根据某些条件自动通过某些人的审批操作,
由于是在审批节点创建时的回调方法中,此时节点还没有初始化完成,所以就利用异步的线程池+DelayQueue来完成延时任务触发。

# 3、DelayQueue继承体系

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>
mixureSecure

可以看到DelayQueue实现了BlockingQueue接口和Delayed接口。 实现BlockingQueue接口说明支持阻塞和线程安全。 DelayQueue<E extends Delayed> 泛型,表明DelayQueue存储的元素必须继承或者实现Delayed接口。

# 4、DelayQueue构造函数

空参构造

public DelayQueue() {}

带集合参数的构造函数

public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

# 5、DelayQueue数据结构

# DelayQueue类的属性注释:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    // 一个用于控制并发访问的可重入锁
    private final transient ReentrantLock lock = new ReentrantLock();

    // 用于存储队列元素的优先级队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 指向当前等待获取队列头部元素的线程
    private Thread leader = null;

    // 一个条件对象,用于管理队列元素的可用性
    private final Condition available = lock.newCondition();

}

可以看到DelayQueue内部使用PriorityQueue存储元素,所以DelayQueue的数据结构本质上也是数组存储的小顶堆。

# DelayQueue使用示例

场景:
有三只狗分别叫秀逗,四眼,二哈。
它们都被关在笼子里,现在到了吃饭时间,需要等2秒后把二哈先放出来,5秒后把四眼放出来,8秒后把秀逗放出来。

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class TestA {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个DelayQueue
        DelayQueue<DelayedDog> delayQueue = new DelayQueue<>();

        // 插入元素,设置不同的延迟时间
        delayQueue.put(new DelayedDog("秀逗", 8000));
        delayQueue.put(new DelayedDog("四眼", 5000));
        delayQueue.put(new DelayedDog("二哈", 2000));

        // 打印队列内容,仅仅是插入顺序
        System.out.println("DelayQueue: " + delayQueue);

        // 依次取出元素
        while (!delayQueue.isEmpty()) {
            DelayedDog dog = delayQueue.take();
            System.out.println("开门放狗: " + dog);
        }

    }
}


// 实现 Delayed 接口
class DelayedDog implements Delayed {
    private String name;        // 狗的名字
    private long delayTime;     // 延迟时间,以毫秒为单位
    private long expire;        // 到期时间点,以毫秒为单位

    // 构造方法,传入狗的名字和延迟时间(毫秒)
    public DelayedDog(String name, long delayTimeInMillis) {
        this.name = name;
        this.delayTime = delayTimeInMillis;
        // 计算到期时间点,当前时间加上延迟时间
        this.expire = System.currentTimeMillis() + delayTime;
    }

    // 获取剩余延迟时间,以给定时间单位表示
    @Override
    public long getDelay(TimeUnit unit) {
        // 计算剩余时间,单位为毫秒
        long remainingTime = expire - System.currentTimeMillis();
        // 将剩余时间转换为指定单位
        return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
    }

    // 比较两个 DelayedDog 对象的顺序
    @Override
    public int compareTo(Delayed other) {
        // 强制转换为 DelayedDog 类型
        DelayedDog otherDog = (DelayedDog) other;
        // 比较两个对象的到期时间点,返回比较结果
        return Long.compare(this.expire, otherDog.expire);
    }

    // 返回 DelayedDog 对象的字符串表示形式
    @Override
    public String toString() {
        return "DelayedDog{" +
                "name='" + name + '\'' +
                ", delayTime=" + delayTime +
                '}';
    }
}


运行结果:

DelayQueue: [DelayedDog{name='二哈', delayTime=2000}, DelayedDog{name='秀逗', delayTime=8000}, DelayedDog{name='四眼', delayTime=5000}]
开门放狗: DelayedDog{name='二哈', delayTime=2000}
开门放狗: DelayedDog{name='四眼', delayTime=5000}
开门放狗: DelayedDog{name='秀逗', delayTime=8000}

# Delayed接口的作用

Delayed 接口在 Java 中用于表示具有延迟功能的对象,这些对象在某个特定时间点之前是不可用的。
实现该接口的对象可以用于构建像 DelayQueue 这样的延迟队列,其元素在到期时间之前是不可访问的。

Delayed 接口定义

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

作用和功能

定义延迟时间: getDelay(TimeUnit unit) 方法返回当前对象的剩余延迟时间。返回值为正数表示还未到期,负数或零表示已经到期。

支持排序:
Delayed 接口继承了 Comparable 接口,因此实现了该接口的对象可以根据其延迟时间进行比较和排序。
这使得 DelayQueue 能够始终将延迟时间最短(即最早到期)的元素放在队列头部

Delayed 接口的常用实现:
主要看 getDelaycompareTo方法。

// 实现 Delayed 接口
class DelayedDog implements Delayed {
    private String name;        // 狗的名字
    private long delayTime;     // 延迟时间,以毫秒为单位
    private long expire;        // 到期时间点,以毫秒为单位

    // 构造方法,传入狗的名字和延迟时间(毫秒)
    public DelayedDog(String name, long delayTimeInMillis) {
        this.name = name;
        this.delayTime = delayTimeInMillis;
        // 计算到期时间点,当前时间加上延迟时间
        this.expire = System.currentTimeMillis() + delayTime;
    }

    // 获取剩余延迟时间,以给定时间单位表示
    @Override
    public long getDelay(TimeUnit unit) {
        // 计算剩余时间,单位为毫秒
        long remainingTime = expire - System.currentTimeMillis();
        // 将剩余时间转换为指定单位
        return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
    }

    // 比较两个 DelayedDog 对象的顺序
    @Override
    public int compareTo(Delayed other) {
        // 强制转换为 DelayedDog 类型
        DelayedDog otherDog = (DelayedDog) other;
        // 比较两个对象的到期时间点,返回比较结果
        return Long.compare(this.expire, otherDog.expire);
    }
}

# 6、DelayQueue的put方法

put 方法用于将一个元素插入队列中。
put 调用了 offer 方法,offer方法实现了实际的插入逻辑。

// put 方法:将元素插入队列
public void put(E e) {
   offer(e);
}

// offer 方法:实现实际的插入逻辑
public boolean offer(E e) {
   // 获取锁以确保线程安全
   final ReentrantLock lock = this.lock;
   lock.lock();
   try {
       // 将元素 e 插入优先级队列 q
       q.offer(e);

       // 如果新插入的元素是队列的头部元素
       if (q.peek() == e) {
           // 设置 leader 为 null,表示没有线程在等待获取队列头部元素
           leader = null;

           // 通知等待的线程有新元素可用
           available.signal();
       }

       // 返回 true 表示元素成功插入
       return true;
   } finally {
       // 释放锁
       lock.unlock();
   }
} 

总结下:

  • ①、获取锁:首先获取 lock 对象,并锁定它以确保对队列的并发访问是安全的。
  • ②、插入元素:将元素 e 插入到优先级队列 q 中。PriorityQueue 会根据元素的自然顺序或提供的比较器进行排序。
  • ③、检查并更新当前等待获取队列头部元素的线程,如果新插入的元素 e 是队列的头部元素(即延迟时间最短),将 leader 设置为 null 并调用 available.signal() 方法,通知其他等待的线程有新的元素可用。
  • ④、返回结果
  • ⑤、释放锁:在 finally 块中释放锁,以确保无论插入操作是否成功,锁都会被正确释放,避免死锁的发生。

# 7、DelayQueue的take方法

take方法用于获取并移除队列的头部元素,如果没有元素达到过期时间则等待。 该方法是阻塞式的。

/**
 * 获取并移除队列的头部元素,如果没有元素达到过期时间则等待。
 *
 * @return 队列的头部元素
 * @throws InterruptedException 如果在等待期间被中断
 */
public E take() throws InterruptedException {
    // 获取用于并发控制的可中断锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) { // 无限循环,直到成功取出元素
            // 获取队列头部元素,但不移除
            E first = q.peek();
            if (first == null) {
                // 如果队列为空,等待元素变得可用
                available.await();
            } else {
                // 获取头部元素的延迟时间(纳秒)
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) {
                    // 如果延迟时间小于等于0,立即取出头部元素并返回
                    return q.poll();
                }
                first = null; // 在等待期间不保留对头部元素的引用
                if (leader != null) {
                    // 如果当前有其他线程在等待获取队列头部元素,当前线程等待
                    available.await();
                } else {
                    // 当前线程成为“领导者”,负责等待队列头部元素的到期
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 当前线程等待指定的纳秒时间
                        available.awaitNanos(delay);
                    } finally {
                        // 如果当前线程仍是“领导者”,则重置领导者
                        if (leader == thisThread) {
                            leader = null;
                        }
                    }
                }
            }
        }
    } finally {
        // 如果没有领导者且队列不为空,通知其他等待的线程
        if (leader == null && q.peek() != null) {
            available.signal();
        }
        // 释放锁
        lock.unlock();
    }
}

总结下:

  • ①、获取锁:获取锁以确保线程安全。lockInterruptibly 方法允许当前线程在等待锁的过程中被中断。
  • ②、循环:使用无限循环,直到成功取出一个元素为止。
  • ③、获取队列头部元素:获取队列头部元素但不移除它。如果队列为空,peek 方法返回 null。
  • ④、检查头部元素是否存在:如果队列为空,等待元素变得可用。
  • ⑤、获取头部元素的延迟时间:计算头部元素的延迟时间(单位为纳秒)
  • ⑥、检查延迟时间:如果延迟时间小于等于0,立即移除并返回头部元素。
  • ⑦、处理等待:
    • 在等待期间,不保留对头部元素的引用。(这样做可以减少引用占用的内存,避免内存泄漏)
    • 如果有其他线程在等待获取队列头部元素,当前线程等待。
    • 否则,当前线程成为“领导者”,负责等待头部元素的到期,并等待指定的纳秒时间。
  • ⑧、通知等待线程:如果没有领导者且队列不为空,通知其他等待的线程。
  • ⑨、释放锁: 在 finally 块中释放锁,以确保无论操作是否成功,锁都会被正确释放,避免死锁的发生。

# 8、DelayQueue的poll方法

该方法是非阻塞的。

/**
 * 从队列中获取并移除头部元素,如果头部元素的延迟时间未到,则返回 null。
 *
 * @return 队列的头部元素,如果不存在或延迟时间未到,则返回 null
 */
public E poll() {
    // 获取用于并发控制的锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取队列头部元素,但不移除
        E first = q.peek();
        // 如果头部元素不存在,或者头部元素的延迟时间大于0,则返回 null
        if (first == null || first.getDelay(NANOSECONDS) > 0) {
            return null;
        } else {
            // 否则,移除并返回头部元素
            return q.poll();
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

总结下:

  • ①、获取锁:获取锁以确保线程安全。由于 poll 方法是非阻塞的,所以使用普通的 lock 方法,而不是 lockInterruptibly。
  • ②、尝试获取队列头部元素:获取队列头部元素但不移除它。如果队列为空,peek 方法返回 null。
  • ③、检查头部元素的存在和延迟时间:
    • 如果头部元素不存在(即队列为空),返回 null。
    • 如果头部元素的延迟时间大于0(即还未到期),返回 null。
  • ④、移除并返回头部元素:如果头部元素的延迟时间小于等于0(即已到期),则从队列中移除并返回该元素。
  • ⑤、释放锁:在 finally 块中释放锁,以确保无论操作是否成功,锁都会被正确释放,避免死锁的发生。

# 补充:带参的poll(long timeout, TimeUnit unit)方法

/**
 * 从队列中获取并移除头部元素,等待指定的时间,如果头部元素的延迟时间未到或超时,则返回 null。
 *
 * @param timeout 等待的最大时间
 * @param unit 时间单位
 * @return 队列的头部元素,如果不存在或延迟时间未到或超时,则返回 null
 * @throws InterruptedException 如果在等待期间被中断
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 将等待时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    // 获取用于并发控制的可中断锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) { // 无限循环,直到成功取出元素或超时
            // 获取队列头部元素,但不移除
            E first = q.peek();
            if (first == null) {
                // 如果队列为空
                if (nanos <= 0) {
                    return null; // 如果已超时,返回 null
                } else {
                    // 否则,等待指定的纳秒时间
                    nanos = available.awaitNanos(nanos);
                }
            } else {
                // 获取头部元素的延迟时间(纳秒)
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) {
                    return q.poll(); // 如果延迟时间小于等于0,移除并返回头部元素
                }
                if (nanos <= 0) {
                    return null; // 如果已超时,返回 null
                }
                first = null; // 在等待期间不保留对头部元素的引用
                if (nanos < delay || leader != null) {
                    // 如果剩余等待时间小于头部元素的延迟时间或已有领导线程,继续等待
                    nanos = available.awaitNanos(nanos);
                } else {
                    // 当前线程成为“领导者”,负责等待队列头部元素的到期
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay); // 等待延迟时间到期
                        nanos -= delay - timeLeft; // 更新剩余等待时间
                    } finally {
                        if (leader == thisThread) {
                            leader = null; // 解除领导者身份
                        }
                    }
                }
            }
        }
    } finally {
        // 如果没有领导者且队列不为空,通知其他等待的线程
        if (leader == null && q.peek() != null) {
            available.signal();
        }
        // 释放锁
        lock.unlock();
    }
}

总结下poll(long timeout, TimeUnit unit)

  • 将等待时间转换为纳秒
  • 获取锁并支持中断
  • 无限循环等待元素可用或超时
  • 尝试获取队列头部元素
  • 检查队列是否为空
  • 获取头部元素的延迟时间
  • 检查延迟时间和剩余等待时间
  • 等待元素可用或超时
  • 通知其他等待线程
  • 释放锁

# 9、DelayQueue的takepoll方法区别

  • ①、阻塞行为不同:

take 方法:这是一个阻塞方法。如果队列为空或头部元素的延迟时间未到,调用线程会等待直到可以获取到一个元素。
take 方法会一直等待,直到满足条件或线程被中断。

poll 方法:这是一个非阻塞方法。如果队列为空或头部元素的延迟时间未到,它会立即返回 null,而不会等待。

  • ②、返回结果: take 方法:总是返回一个元素或在等待期间被中断。如果调用线程被中断,会抛出 InterruptedException。
    poll 方法:立即返回队列头部的元素或 null,如果没有元素可用。

  • ③、使用场景: take 方法:适用于消费者线程必须获取元素才能继续工作的情况,特别是消费者-生产者模式中的消费者。
    poll 方法:适用于不希望阻塞线程的情况,可以定期尝试获取元素,但如果没有元素可用,可以继续执行其他任务。

# 10、DelayQueue的take方法的原理

  • 线程同步:通过 ReentrantLock 实现线程同步,确保多线程环境下的安全访问。
  • 条件变量:通过 Condition 实现线程等待和通知机制,保证线程在没有可用元素时进入等待状态,并在有新元素时被唤醒。
  • 优先级队列:基于优先级队列 (PriorityQueue),确保每次获取的都是延迟时间最短且已到期的元素。

至于private final Condition available = lock.newCondition(); 本质上是获取Condition 的实例 ConditionObject,ConditionObject是如何实现线程的等待唤醒机制的这里不再讨论了,后续会在并发相关的文章中详细分析相关内容。