底层

DelayQueue 是 JUC 包(java.util.concurrent)为我们提供的延迟队列,用于实现延时任务比如订单下单 15 分钟未支付直接取消。它是 BlockingQueue 的一种,底层是一个基于 PriorityQueue 实现的一个无界队列,是线程安全的。默认情况下, DelayQueue 会按照到期时间升序编排任务。只有当元素过期时(getDelay()方法返回值小于等于0),才能从队列中取出。

DelayQueue 常见使用场景示例

我们这里希望任务可以按照我们预期的时间执行,例如提交 3 个任务,分别要求 1s、2s、3s 后执行,即使是乱序添加,1s 后要求 1s 执行的任务会准时执行。
对此我们可以使用 DelayQueue 来实现,所以我们首先需要继承 Delayed 实现 DelayedTask,实现 getDelay 方法以及优先级比较 compareTo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 延迟任务
*/
public class DelayedTask implements Delayed {

/**
* 任务到期时间
*/
private long executeTime;
/**
* 任务
*/
private Runnable task;

public DelayedTask(long delay, Runnable task) {

this.executeTime = System.currentTimeMillis() + delay;
this.task = task;
}

/**
* 查看当前任务还有多久到期
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {

return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

/**
* 延迟队列需要到期时间升序入队,所以我们需要实现compareTo进行到期时间比较
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {

return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}

public void execute() {

task.run();
}
}

完成任务的封装之后,使用就很简单了,设置好多久到期然后将任务提交到延迟队列中即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 创建延迟队列,并添加任务
DelayQueue < DelayedTask > delayQueue = new DelayQueue < > ();

//分别添加1s、2s、3s到期的任务
delayQueue.add(new DelayedTask(2000, () -> System.out.println("Task 2")));
delayQueue.add(new DelayedTask(1000, () -> System.out.println("Task 1")));
delayQueue.add(new DelayedTask(3000, () -> System.out.println("Task 3")));

// 取出任务并执行
while (!delayQueue.isEmpty()) {

//阻塞获取最先到期的任务
DelayedTask task = delayQueue.take();
if (task != null) {

task.execute();
}
}

结构

在这里插入图片描述

四个核心成员变量

1
2
3
4
5
6
7
8
9
//可重入锁,实现线程安全的关键
private final transient ReentrantLock lock = new ReentrantLock();
//延迟队列底层存储数据的集合,确保元素按照到期时间升序排列
private final PriorityQueue<E> q = new PriorityQueue<E>();

//指向准备执行优先级最高的线程
private Thread leader = null;
//实现多线程之间等待唤醒的交互
private final Condition available = lock.newCondition();

lock : 我们都知道 DelayQueue 存取是线程安全的,所以为了保证存取元素时线程安全,我们就需要在存取时上锁,而 DelayQueue 就是基于 ReentrantLock 独占锁确保存取操作的线程安全。
q : 延迟队列要求元素按照到期时间进行升序排列,所以元素添加时势必需要进行优先级排序,所以 DelayQueue 底层元素的存取都是通过这个优先队列 PriorityQueue 的成员变量 q 来管理的。
leader : 延迟队列的任务只有到期之后才会执行,对于没有到期的任务只有等待,为了确保优先级最高的任务到期后可以即刻被执行,设计者就用 leader 来管理延迟任务,只有 leader 所指向的线程才具备定时等待任务到期执行的权限,而其他那些优先级低的任务只能无限期等待,直到 leader 线程执行完手头的延迟任务后唤醒它。
available : 上文讲述 leader 线程时提到的等待唤醒操作的交互就是通过 available 实现的,假如线程 1 尝试在空的 DelayQueue 获取任务时,available 就会将其放入等待队列中。直到有一个线程添加一个延迟任务后通过 available 的 signal 方法将其唤醒

构造方法

相较于其他的并发容器,延迟队列的构造方法比较简单,它只有两个构造方法,因为所有成员变量在类加载时都已经初始完成了,所以默认构造方法什么也没做。

添加元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offer(E e) {

//尝试获取lock
final ReentrantLock lock = this.lock;
lock.lock();
try {

//如果上锁成功,则调q的offer方法将元素存放到优先队列中
q.offer(e);
//调用peek方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素)
if (q.peek() == e) {

//将leader设置为空,通知调用取元素方法而阻塞的线程来争抢这个任务
leader = null;
available.signal();
}
return true;
} finally {

//上述步骤执行完成,释放lock
lock.unlock();
}
}
  • 尝试获取 lock 。
  • 如果上锁成功,则调 q 的 offer 方法将元素存放到优先队列中。
  • 调用 peek 方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素),于是将 leader 设置为空,通知因为队列为空时调用 take 等方法导致阻塞的线程来争抢元素。
  • 上述步骤执行完成,释放 lock。
  • 返回 true。

获取元素

DelayQueue 中获取元素的方式分为阻塞式和非阻塞式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public E take() throws InterruptedException {

// 尝试获取可重入锁,将底层AQS的state设置为1,并设置为独占锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {

for (;;) {

//查看队列第一个元素
E first = q.peek();
//若为空,则将当前线程放入ConditionObject的等待队列中,并将底层AQS的state设置为0,表示释放锁并进入无限期等待
if (first == null)
available.await();
else {

//若元素不为空,则查看当前元素多久到期
long delay = first.getDelay(NANOSECONDS);
//如果小于0则说明已到期直接返回出去
if (delay <= 0)
return q.poll();
//如果大于0则说明任务还没到期,首先需要释放对这个元素的引用
first = null; // don't retain ref while waiting
//判断leader是否为空,如果不为空,则说明正有线程作为leader并等待一个任务到期,则当前线程进入无限期等待
if (leader != null)
available.await();
else {

//反之将我们的线程成为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {

//并进入有限期等待
available.awaitNanos(delay);
} finally {

//等待任务到期时,释放leader引用,进入下一次循环将任务return出去
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {

//收尾逻辑:如果leader不为空且q有元素,则说明有任务没人认领,直接发起通知唤醒因为锁被当前消费者持有而导致阻塞的生产者(即调用put、add、offer的线程)
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E poll() {

//尝试获取可重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {

//查看队列第一个元素,判断元素是否为空
E first = q.peek();

//若元素为空,或者元素未到期,则直接返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//若元素不为空且到期了,直接调用poll返回出去
return q.poll();
} finally {

//释放可重入锁lock
lock.unlock();
}
}

查看元素

1
2
3
4
5
6
7
8
9
10
11
12
public E peek() {

final ReentrantLock lock = this.lock;
lock.lock();
try {

return q.peek();
} finally {

lock.unlock();
}
}

DelayQueue 的实现原理是什么?

DelayQueue 底层是使用优先队列 PriorityQueue 来存储元素,而 PriorityQueue 采用二叉小顶堆的思想确保值小的元素排在最前面,这就使得 DelayQueue 对于延迟任务优先级的管理就变得十分方便了。同时 DelayQueue 为了保证线程安全还用到了可重入锁 ReentrantLock,确保单位时间内只有一个线程可以操作延迟队列。最后,为了实现多线程之间等待和唤醒的交互效率,DelayQueue 还用到了 Condition,通过 Condition 的 await 和 signal 方法完成多线程之间的等待唤醒。

DelayQueue 的实现是否线程安全?

DelayQueue 的实现是线程安全的,它通过 ReentrantLock 实现了互斥访问和 Condition 实现了线程间的等待和唤醒操作,可以保证多线程环境下的安全性和可靠性。

DelayQueue 的使用场景有哪些?

DelayQueue 通常用于实现定时任务调度和缓存过期删除等场景。在定时任务调度中,需要将需要执行的任务封装成延迟任务对象,并将其添加到 DelayQueue 中,DelayQueue 会自动按照剩余延迟时间进行升序排序(默认情况),以保证任务能够按照时间先后顺序执行。对于缓存过期这个场景而言,在数据被缓存到内存之后,我们可以将缓存的 key 封装成一个延迟的删除任务,并将其添加到 DelayQueue 中,当数据过期时,拿到这个任务的 key,将这个 key 从内存中移除

DelayQueue 中 Delayed 接口的作用是什么?

Delayed 接口定义了元素的剩余延迟时间(getDelay)和元素之间的比较规则(该接口继承了 Comparable 接口)。若希望元素能够存放到 DelayQueue 中,就必须实现 Delayed 接口的 getDelay() 方法和 compareTo() 方法,否则 DelayQueue 无法得知当前任务剩余时长和任务优先级的比较。

DelayQueue 和 Timer/TimerTask 的区别是什么?

DelayQueue 和 Timer/TimerTask 都可以用于实现定时任务调度,但是它们的实现方式不同。DelayQueue 是基于优先级队列和堆排序算法实现的,可以实现多个任务按照时间先后顺序执行;而 Timer/TimerTask 是基于单线程实现的,只能按照任务的执行顺序依次执行,如果某个任务执行时间过长,会影响其他任务的执行。另外,DelayQueue 还支持动态添加和移除任务,而 Timer/TimerTask 只能在创建时指定任务。

作者声明

1
如有问题,欢迎指正!