DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
适用场景:实现订单到期,限时支付,回调重试等等。
一、简介
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>复制
/**
* 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
* 此接口的实现必须定义一个compareTo方法,该方法提供与此接口的getDelay方法一致的排序。
*/
public interface Delayed extends Comparable<Delayed> {
/**
* 返回与此对象相关的剩余有效时间,以给定的时间单位表示.
*/
long getDelay(TimeUnit unit);
}复制
可以看到,Delayed接口除了自身的getDelay方法外,还实现了Comparable接口。getDelay方法用于返回对象的剩余有效时间,实现Comparable接口则是为了能够比较两个对象,以便排序。也就是说,如果一个类实现了Delayed接口,当创建该类的对象并添加到DelayQueue中后,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队。
DelayQueue的特点简要概括如下:
DelayQueue是无界阻塞队列;
队列中的元素必须实现Delayed接口,元素过期后才会从队列中取走;
二、示例
为了便于理解DelayQueue的功能,我们先来看一个使用DelayQueue的示例。
1.队列元素
第一节说了,队列元素必须实现Delayed接口,我们先来定义一个Item类,作为队列元素
public class Item implements Delayed{
// 触发时间(失效时间点)
private long time;
//名称
String name;
//数据失效时间
public Item(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
}
//返回剩余有效时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 比较两个Delayed对象的大小, 比较顺序如下:
* 比较失效时间点, 先失效的返回-1,后失效的返回1
*/
@Override
public int compareTo(Delayed o) {
Item item = (Item) o;
return this.time - item.time <= 0 ? -1 : 1;
}
@Override
public String toString() {
return "Item{" + "time=" + time + ", name='" + name + '\'' + '}';
}
}复制
getDelay方法返回元素的剩余有效时间,可以根据入参的TimeUnit选择时间的表示;
compareTo方法用于比较两个元素的大小,以便在队列中排序。由于DelayQueue基于优先级队列实现,所以内部是“堆”的形式,我们定义的规则是先失效的元素将先出队,所以先失效元素应该在堆顶,即compareTo方法返回结果<0的元素优先出队;
2.测试DelayedQueueTest
public class DelayedQueueTest {
public static void main(String[] args) throws InterruptedException {
Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
Item item2 = new Item("item2",10, TimeUnit.SECONDS);
Item item3 = new Item("item3",15, TimeUnit.SECONDS);
DelayQueue<Item> queue = new DelayQueue<>();
queue.put(item1);
queue.put(item2);
queue.put(item3);
System.out.println("begin time:" + LocalDateTime.now());
while(true){
if(queue.size()<=0){
break;
}
Item take = queue.take();
System.out.format("name:{%s}, time:{%s}\n",take.name, LocalDateTime.now());
}
}
}复制
打印日志
3.DelayedQueue案例场景(总结中提到)
//回调的重试
if (bestWaiEntity.getNotifyUrl() != null && bestWaiEntity.getNotifyUrl().length() > 0) {
DelayEntity notifyOne = new DelayEntity(0, TimeUnit.MILLISECONDS);
DelayEntity notifyTwo = new DelayEntity(1000, TimeUnit.MILLISECONDS);
DelayEntity notifyThree = new DelayEntity(5000, TimeUnit.MILLISECONDS);
DelayQueue<DelayEntity> queue = new DelayQueue<>();
queue.put(notifyOne);
queue.put(notifyTwo);
queue.put(notifyThree);
while (true) {
if (queue.size() <= 0) {
break;
}
Map<String,String> map = new HashMap();
map.put("uniqueKey", bestBO.getUniqueKey());
map.put("status", "SUCCESS");
map.put("message", "");
//2、数据返回
String jsonString = JSON.toJSONString(map);
String jsonReturn = restClient.postWithJson(bestWaiEntity.getNotifyUrl(), jsonString);
JSONObject jsonObject = JSON.parseObject(jsonReturn);
JSONObject data = jsonObject.getJSONObject("data");
String status = data.getString("status");
if (status.equalsIgnoreCase("SUCCESS")) {
break;
}
}
}复制
三、原理
介绍完了DelayQueued的基本使用,我们看看DelayQueued的实现原理。
1.构造方法
/**
* 默认构造器.
*/
public DelayQueue() {}
/**
* 从已有集合构造队列.
*/
public DelayQueue(Collection<? extends E> c) {this.addAll(c);}复制
可以看到,内部的PriorityQueue并非在构造时创建,而是对象创建时生成:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* leader线程是首个尝试出队元素(队列不为空)但被阻塞的线程.
* 该线程会限时等待(队首元素的剩余有效时间),用于唤醒其它等待线程
*/
private Thread leader = null;
/**
* 出队线程条件队列, 当有多个线程, 会在此条件队列上等待.
*/
private final Condition available = lock.newCondition();
//...
}复制
2.入队——put
/**
* 入队一个指定元素e.
* 由于是无界队列, 所以该方法并不会阻塞线程.
*/
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 调用PriorityQueue的offer方法
if (q.peek() == e) { // 如果入队元素在队首, 则唤醒一个出队线程
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}复制
3.出队——take
/**
* 队首出队元素.
* 如果队首元素(堆顶)未到期或队列为空, 则阻塞线程.
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (; ; ) {
E first = q.peek(); // 读取队首元素
if (first == null) // CASE1: 队列为空, 直接阻塞
available.await();
else { // CASE2: 队列非空
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // CASE2.0: 队首元素已过期
return q.poll();
// 执行到此处说明队列非空, 且队首元素未过期
first = null;
if (leader != null) // CASE2.1: 已存在leader线程
available.await(); // 无限期阻塞当前线程
else { // CASE2.2: 不存在leader线程
Thread thisThread = Thread.currentThread();
leader = thisThread; // 将当前线程置为leader线程
try {
available.awaitNanos(delay); // 阻塞当前线程(限时等待剩余有效时间)
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null) // 不存在leader线程, 则唤醒一个其它出队线程
available.signal();
lock.unlock();
}
}复制
四、总结
异步回调的重试,在很多系统中,当用户完成服务调用后,系统有时需要将结果异步通知到用户的某个URI。由于网络等原因,很多时候会通知失败,这个时候就需要一种重试机制。
喜欢就加个关注吧,

往期精选