暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

DelayQueue的使用

码酱 2021-03-13
1882

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。 

适用场景:实现订单到期,限时支付,回调重试等等。

              

一、简介

DelayQueue是JDK1.5时,随着J.U.C包一起引入的一种阻塞队列:
public class DelayQueue<E extends Delayedextends AbstractQueue<E>
    implements BlockingQueue<E>

复制
DelayQueue也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue中的所有元素必须实现Delayed接口:
/**
 * 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
 * 此接口的实现必须定义一个compareTo方法,该方法提供与此接口的getDelay方法一致的排序。
 */

public interface Delayed extends Comparable<Delayed{

    /**
     * 返回与此对象相关的剩余有效时间,以给定的时间单位表示.
     */

    long getDelay(TimeUnit unit);
}

复制
  • 可以看到,Delayed接口除了自身的getDelay方法外,还实现了Comparable接口。getDelay方法用于返回对象的剩余有效时间,实现Comparable接口则是为了能够比较两个对象,以便排序。也就是说,如果一个类实现了Delayed接口,当创建该类的对象并添加到DelayQueue中后,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队。

DelayQueue的特点简要概括如下:

  • DelayQueue是无界阻塞队列;

  • 队列中的元素必须实现Delayed接口,元素过期后才会从队列中取走;

二、示例

为了便于理解DelayQueue的功能,我们先来看一个使用DelayQueue的示例。

1.队列元素

   第一节说了,队列元素必须实现Delayed接口,我们先来定义一个Item类,作为队列元素

public class Item implements Delayed{
    // 触发时间(失效时间点)
    private long time;
    //名称
    String name;

    //数据失效时间
    public Item(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
    }
    //返回剩余有效时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    /**
     *  比较两个Delayed对象的大小, 比较顺序如下:
     *  比较失效时间点, 先失效的返回-1,后失效的返回1
     */

    @Override
    public int compareTo(Delayed o) {
        Item item = (Item) o;
        return this.time - item.time <= 0 ? -1 : 1;
    }

    @Override
    public String toString() {
        return "Item{" + "time=" + time + ", name='" + name + '\'' + '}';
    }
}

复制
  1. getDelay方法返回元素的剩余有效时间,可以根据入参的TimeUnit选择时间的表示;

  2. compareTo方法用于比较两个元素的大小,以便在队列中排序。由于DelayQueue基于优先级队列实现,所以内部是“堆”的形式,我们定义的规则是先失效的元素将先出队,所以先失效元素应该在堆顶,即compareTo方法返回结果<0的元素优先出队;

2.测试DelayedQueueTest

public class DelayedQueueTest {
    public static void main(String[] args) throws InterruptedException {
        Item item1 = new Item("item1"5, TimeUnit.SECONDS);
        Item item2 = new Item("item2",10, TimeUnit.SECONDS);
        Item item3 = new Item("item3",15, TimeUnit.SECONDS);
        DelayQueue<Item> queue = new DelayQueue<>();
        queue.put(item1);
        queue.put(item2);
        queue.put(item3);
        System.out.println("begin time:" + LocalDateTime.now());
        while(true){
            if(queue.size()<=0){
                break;
            }
            Item take = queue.take();
            System.out.format("name:{%s}, time:{%s}\n",take.name, LocalDateTime.now());
        }
    }
}

复制

打印日志

3.DelayedQueue案例场景(总结中提到)

//回调的重试
if (bestWaiEntity.getNotifyUrl() != null && bestWaiEntity.getNotifyUrl().length() > 0) {
    DelayEntity notifyOne = new DelayEntity(0, TimeUnit.MILLISECONDS);
    DelayEntity notifyTwo = new DelayEntity(1000, TimeUnit.MILLISECONDS);
    DelayEntity notifyThree = new DelayEntity(5000, TimeUnit.MILLISECONDS);
    DelayQueue<DelayEntity> queue = new DelayQueue<>();
    queue.put(notifyOne);
    queue.put(notifyTwo);
    queue.put(notifyThree);
    while (true) {
        if (queue.size() <= 0) {
            break;
        }
        Map<String,String> map = new HashMap();
        map.put("uniqueKey", bestBO.getUniqueKey());
        map.put("status""SUCCESS");
        map.put("message""");
        //2、数据返回
        String jsonString = JSON.toJSONString(map);
        String jsonReturn = restClient.postWithJson(bestWaiEntity.getNotifyUrl(), jsonString);
        JSONObject jsonObject = JSON.parseObject(jsonReturn);
        JSONObject data = jsonObject.getJSONObject("data");
        String status = data.getString("status");
        if (status.equalsIgnoreCase("SUCCESS")) {
            break;
        }
    }
}

复制

三、原理

介绍完了DelayQueued的基本使用,我们看看DelayQueued的实现原理。

1.构造方法

/**
 * 默认构造器.
 */

public DelayQueue() {}
/**
 * 从已有集合构造队列.
 */

public DelayQueue(Collection<? extends E> c) {this.addAll(c);}

复制

  可以看到,内部的PriorityQueue并非在构造时创建,而是对象创建时生成:

public class DelayQueue<E extends Delayedextends AbstractQueue<E>
    implements BlockingQueue<E
{

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    /**
     * leader线程是首个尝试出队元素(队列不为空)但被阻塞的线程.
     * 该线程会限时等待(队首元素的剩余有效时间),用于唤醒其它等待线程
     */

    private Thread leader = null;
    /**
     * 出队线程条件队列, 当有多个线程, 会在此条件队列上等待.
     */

    private final Condition available = lock.newCondition();
    //...
}

复制

2.入队——put

/**
 * 入队一个指定元素e.
 * 由于是无界队列, 所以该方法并不会阻塞线程.
 */

public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);             // 调用PriorityQueue的offer方法
        if (q.peek() == e) {    // 如果入队元素在队首, 则唤醒一个出队线程
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

复制

3.出队——take

/**
 * 队首出队元素.
 * 如果队首元素(堆顶)未到期或队列为空, 则阻塞线程.
 */

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (; ; ) {
            E first = q.peek();     // 读取队首元素
            if (first == null)      // CASE1: 队列为空, 直接阻塞
                available.await();
            else {                  // CASE2: 队列非空
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)                             // CASE2.0: 队首元素已过期
                    return q.poll();

                // 执行到此处说明队列非空, 且队首元素未过期
                first = null;
                if (leader != null)                         // CASE2.1: 已存在leader线程
                    available.await();      // 无限期阻塞当前线程
                else {                                      // CASE2.2: 不存在leader线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;    // 将当前线程置为leader线程
                    try {
                        available.awaitNanos(delay);        // 阻塞当前线程(限时等待剩余有效时间)
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)             // 不存在leader线程, 则唤醒一个其它出队线程
            available.signal();
        lock.unlock();
    }
}

复制

四、总结

DelayQueue是阻塞队列中非常有用的一种队列,经常被用于缓存或定时任务等的设计。考虑一种使用场景:
  • 异步回调的重试,在很多系统中,当用户完成服务调用后,系统有时需要将结果异步通知到用户的某个URI。由于网络等原因,很多时候会通知失败,这个时候就需要一种重试机制。

这时可以用DelayQueue来进行异步回调,这样每次从队列中take获取的就是剩余时间最短的请求。



喜欢就加个关注吧,


往期精选

使用并行流(ParallelStream)避坑
shardingjdbc实现分库分表
SpringCloud配置中心的使用
SpringCloud-Eureka高可用搭建
SpringCloud-Zuul服务网关
SpringCloud-Ribbon负载均衡
SpringCloud-Feign远程调用
SpringCloud-Hystrix解决雪崩
SpringCloud-Bus消息总线
文章转载自码酱,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论