暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

支付宝大佬教我的 Redis 延时队列

一个优秀的废人 2020-11-09
797

redis 之延时队列

常用的消息队列

我们常见的 MQ 有 RabbmitMq、Kafka、RecoketMQ 等等,他们都是专业级的消息队列,各有各的特性。但他们共同的特性就是比较复杂。有没有一种非常简单,同样也非常高效的消息队列呢? 使用 redis 我们可以非常简单的实现一个高效的消息队列。

redis 的消息队列使用场景

使用 redis 实现的消息队列虽然简单,但是与专业级的相比,肯定是有其不足之处。比如消息的重发、ack 机制、消息持久化等等。当遇到以下场景时,可以考虑使用 redis。

  • 如果你的需求是快产快消的即时消费场景,并且生产的消息立即被消费者消费掉
  • 如果速度是你十分看重的,比如慢了一秒好几千万这种
  • 如果允许出现消息丢失的场景
  • 如果你不需要系统保存你发送过的消息,做到来无影去无踪
  • 需要处理的数据量并不是那么巨大

redis 消息队列的简单实现

首先一个简单的消息队列我们需要两个角色,生产者和消费者。生产者负责生产消息,往队列里面存储,消费者负责监听这个队列,一发现有消息,就立即读出来。

image

这里我们使用 reids 的 List 来实现消息队列,生产者使用 rpush 往队列里写数据,消费者使用 brpop 阻塞读去读取队列中的数据,这样消费者就可以做到类似于监听的效果,只有队列中有数据就能立即读出来。为避免一些重复的代码,我们先写一个 RedisConnection 类,用来获取 redis 连接

public class RedisConnection {
    private static String host = "localhost";
    private static int port = 6379;
    public static Jedis getConnection(){
        JedisPool jedisPool = new JedisPool(host,port);
        return jedisPool.getResource();
    }
}

先写一个生产者 Producer

class Producer implements Runnable{

    @Override
    public void run() {
        Jedis jedis = RedisConnection.getConnection();
        int i =0;
        while (true){
            String s = String.valueOf(i++);
            //生产者往队列里写入数据
            jedis.rpush("MSG_PIPELINE", s);
            System.out.println("Producer write in redis = "+ s);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                System.out.println("生产者线程被打断");
            }
        }
    }
}

再写一个消费者 Consumer

class Consumer implements Runnable{

    @Override
    public void run() {
        //消费者使用阻塞读来实现未读取到消息时线程等待
        Jedis jedis = RedisConnection.getConnection();
        while (true){
            /**
             * int timeout 单次等待超时时间。如果超过这个时间没有获取到队列中的消息,则会返回null,
             *             并且再次重试获取队列中的消息
             * String key  redis List 的key
             */

            List<String> rpop = jedis.brpop(10,"MSG_PIPELINE");
            System.out.println("Consumer read redis = "+rpop);
        }
    }
}

最后用一个 main 方法启动这两个线程,这样一个简单的基于 redis 的消息队列就实现了。

public static void main(String[] args) {

    //启动生产者
    new Thread(new Producer()).start();

    //启动消费者
    new Thread(new Consumer()).start();
}

日志打印

Producer write in redis = 0
Consumer read redis = [MSG_PIPELINE, 0]
Producer write in redis = 1
Consumer read redis = [MSG_PIPELINE, 1]
Producer write in redis = 2
Consumer read redis = [MSG_PIPELINE, 2]
Producer write in redis = 3
Consumer read redis = [MSG_PIPELINE, 3]

redis 实现延时队列

延时队列的使用场景还是很多的,比如说 12306 购买火车票的时候超过 30 分钟未付款就自动取消,某宝秒杀抢购活动,超过一定时间未付款就取消订单,商品锁定的库存就会回到库存池里,购买电影票等等。使用 redis 的 ZSet,也能简单实现一个延时队列。

我们创建一个 RedisDelayQueue 用来操作读写队列。

public class RedisDelayQueue<T>  {

    /**
     * 消息体
     */

    static class TaskItem<T{
        private T msg;
        private int delayScore;
    }
    
    /**
     * 队列名称
     */

    private String queueKey;

    public RedisDelayQueue(String queueKey) {
        this.queueKey = queueKey;
    }

    /**
     * 往队列写入消息
     * @param msg 消息
     * @param delayScore 延迟时间
     */

    public void delay(T msg, int delayScore) {
        
    }

    /**
     * 轮询获取消息队列中的消息
     */

    public void loop(){
       
    }
}

接下来我们编写一下关键的两个函数,delay 函数是往队列中写入数据。 消息被读取的时间 = **delayScore **+ 当前时间,在入参的时候,只需要写入需要等待的时间即可。

/**
 * 往队列写入消息
 * @param msg 消息
 * @param delayScore 延迟时间
 */

public void delay(T msg, int delayScore) {
    Jedis jedis = RedisConnection.getConnection();
    TaskItem < T > task = new TaskItem < T > ();
    task.msg = msg;
    task.delayScore = delayScore;
    String s = JSON.toJSONString(task);
    System.out.println("producer线程池id=" + Thread.currentThread().getId() + ",写入延迟队列,val=" + JSON.toJSONString(task));
    //写入延迟队列
    jedis.zadd(queueKey, System.currentTimeMillis() + delayScore, s);
}

loop 函数是一个阻塞读函数,当队列没有数据时,线程会休眠 500 毫秒。

/**
 * 轮询获取消息队列中的消息
 */

public void loop() {
    System.out.println("consumer线程id=" + Thread.currentThread().getId() + "启动");
    while (!Thread.interrupted()) {
        Jedis jedis = RedisConnection.getConnection();
        Set < String > values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 01);
        if (values.isEmpty()) {
            try {
                System.out.println("consumer没有读取到数据,线程休眠");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                System.out.println("线程被打断");
                //如果线程被打断,则退出当前线程
                break;
            }
            continue;
        }
        String msg = values.iterator().next();
        //根据当前获取的值,尝试删除。如果删除成功则为只有当前线程获取到了本条消息
        if (jedis.zrem(queueKey, msg) > 0) {
            TaskItem < T > taskItem = JSON.parseObject(msg, TaskItem.class);
            System.out.println("线程池id=" + Thread.currentThread().getId() + ",获取到了消息:" + taskItem.msg);
        }
    }
}

到此我们就用 redis 实现了延迟队列。当然这是一个非常简单的模型,只是为这种实现方式提供了一种思路。具体在生产环境的话,还需要考虑多种因素。

-END-

如果看到这里,喜欢这篇文章的话,请帮点个好看。微信搜索「一个优秀的废人」,关注后回复「 1024」送你一套完整的 java 教程(包括视频)。回复「 电子书」送你全编程领域电子书 (不只Java)。

教程节选

文章转载自一个优秀的废人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论