暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何使用 Redis 实现作业队列

原创 谭磊Terry 恩墨学院 2022-10-29
824

在如何使用 Redis 进行缓存中,我们实现了一个由 Redis 支持的简单缓存。+ 这只是 Redis 的一个用例。Redis 还用作消息传递服务器,以实现对后台作业或其他类型的消息传递任务的处理。这篇文章探讨了使用 Quarkus 和新的 Redis 数据源 API 来实现这种模式。

工作队列和 Supes!

作业队列是存储执行请求的数据结构。作业调度员在该数据结构中提交他们想要执行的任务。另一方面,作业消费者轮询请求并执行它们。

image.png

该模式有很多变体,因此让我们关注以下应用程序。我们有一个管理英雄和反派的应用程序。该应用程序提供了模拟随机英雄和随机恶棍之间的战斗的可能性。战斗模拟被委派给战斗模拟器,专用于该任务的应用程序。

image.png

在这种情况下,主应用程序将战斗请求提交到作业队列。然后,战斗模拟器轮询提交的战斗请求并执行它们。

战斗结果使用 Redis 的另一个功能进行通信:发布/订阅通信。模拟器将结果发送到应用程序使用的通道。然后应用程序将这些结果广播到网页。

这篇文章只讨论了与 Redis 的交互。应用程序的其余部分很简单,只需使用 RESTEasy Reactive 和 Hibernate ORM 和 Panache。您可以在https://github.com/cescoffier/quarkus-redis-job-queue-demo上找到该应用程序的完整代码。

提交作业

第一个任务是为作业队列建模。我们使用Redis 列表来存储FightRequest。

package me.escoffier.quarkus.redis.fight;

public record FightRequest(String id, Hero hero, Villain villain) {

}

Redis 列表区分列表的左侧和列表的右侧。这种区别允许实现一个 FIFO 队列,我们​​在左侧写入并从右侧消费。

要操作 Redis 列表,我们需要与此数据结构关联的一组命令。在SupesService 类中,我们注入RedisDataSource并检索命令组:

public SupesService(RedisDataSource dataSource, ...) {
    commands = dataSource.list(FightRequest.class);
  // ...
}

现在让我们看一下submitAFight方法:

public FightRequest submitAFight() {
    var hero = Hero.getRandomHero();
    var villain = Villain.getRandomVillain();
    var id = UUID.randomUUID().toString();
    var request = new FightRequest(id, hero, villain);
    commands.lpush("fight-requests", request);
    return request;
}

该submitAFight方法检索随机战斗机、计算 id、构建FightRequest实例并执行LPUSH命令。该命令将给定项目写入存储在给定键 ( )LPUSH处的列表的左侧。fight-requests

接收工作请求

现在让我们看看另一面:战斗模拟器。模拟器从代表我们的作业队列的 Redis 列表中轮询FightRequests并模拟战斗。

模拟器在me.escoffier.quarkus.redis.fight.FightSimulator. 构造函数接收配置的名称(以区分多个模拟器)和 Redis 数据源。它创建对象以发出 Redis 命令以从 Redis 列表中读取:

public FightSimulator(@ConfigProperty(name = "simulator-name") String name, RedisDataSource ds) {
    this.name = name;
    this.queue = ds.list(FightRequest.class);
    // ...
}

模拟器轮询战斗请求并为每个请求模拟战斗。该实现是一个无限循环(它仅在应用程序关闭时停止)。FightRequest在每次迭代中,它使用命令从队列的右侧读取待处理BRPOP。如果没有挂起的请求,它会从循环的开头重新开始。如果它有请求,它会模拟战斗:

@Override
public void run() {
    logger.infof("Simulator %s starting", name);
    while ((!stopped)) {
        KeyValue<String, FightRequest> item =
            queue.brpop(Duration.ofSeconds(1), "fight-requests");
        if (item != null) {
            var request = item.value();
            var result = simulate(request);
            //...
        }
    }
}

该BRPOP命令检索并删除列表的最后一个(右)元素。与 不同的是RPOP,如果列表中没有元素,它会等待给定的时间(上面代码中的 1 秒)。所以,如果列表包含一个元素,它就会得到它。否则,它会在放弃之前等待最多一秒钟。null在这种情况下它会返回。该BRPOP命令返回KeyValue由列表的键和FightRequest. 它使用这种结构是因为您可以传递多个键,这在您有具有优先级的列表时很方便。

如果列表为空,该BRPOP命令还可以避免无限期旋转,因为它在每次迭代期间等待 1 秒。最后,BRPOP命令是atomic。这意味着如果您有多个模拟器,它们将无法检索相同的项目。它分派每个项目一次。

发送战斗结果

池循环FightRequests从队列中检索并模拟战斗,但如何传达结果?为此,我们使用了另一个 Redis 功能:发布/订阅通信。

简单来说,我们将发送FightResult到一个频道。订阅该频道的应用程序将收到发出的FightResult.

AFightResult包含请求 id、两个战斗机和获胜者的姓名:

package me.escoffier.quarkus.redis.fight;

public record FightResult(String id, Hero hero, Villain villain, String winner) {

}

要使用 Redis发布/订阅命令,我们需要与该组关联的对象。在 中FightSimulator,我们还使用pubsub方法来获取该对象:

public FightSimulator(@ConfigProperty(name = "simulator-name") String name, Logger logger, RedisDataSource ds) {
    this.name = name;
    this.logger = logger;
    this.queue = ds.list(FightRequest.class);
    this.publisher = ds.pubsub(FightResult.class);  // <--- this is it!
}

现在,我们可以使用它publisher来发送FightResults. 每次战斗后,我们调用publisher.publish将FightResult实例发送到fight-results通道:

@Override
public void run() {
    logger.infof("Simulator %s starting", name);
    while ((!stopped)) {
        KeyValue<String, FightRequest> item = queue.brpop(Duration.ofSeconds(1), "fight-requests");
        if (item != null) {
            var request = item.value();
            var result = simulate(request);
            publisher.publish("fight-results", result);  // Send the outcome
           }
    }
}

收到战斗结果

在那时候:

  • 我们将战斗请求提交到作业队列中,
  • 我们消耗那个队列并模拟战斗,
  • 我们将结果发送到fight-results频道。

因此,唯一缺少的部分是该渠道的消费。让我们回到me.escoffier.quarkus.redis.supes.SupesService课堂。在构造函数中,我们还注入ReactiveRedisDataSource了 Redis 数据源的响应式变体。然后,在构造函数代码中,我们订阅fight-results.

public SupesService(RedisDataSource dataSource, ReactiveRedisDataSource reactiveRedisDataSource) {
    commands = dataSource.list(FightRequest.class);
    stream = reactiveRedisDataSource.pubsub(FightResult.class).subscribe("fight-results")
            .broadcast().toAllSubscribers();
}

因为我们使用响应式数据源,所以这个订阅返回一个Multi,准备好由 Quarkus 和一个 SSE 服务(参见SupesResource.java):

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<FightResult> fights() {
    return supes.getFightResults();
}

.broadcast().toAllSubscribers()指示 Quarkus 将所有接收FightResult到的信息广播到所有连接的 SSE。因此,浏览器会过滤掉未请求的结果。

运行系统

圈子就完成了!完整的代码源可从https://github.com/cescoffier/quarkus-redis-job-queue-demo获得。要运行系统,请打开三个终端。

首先,我们启动supes-application. 在第一个终端中,导航到supes-application并运行mvn quarkus:devQuarkus 会自动启动 PostgreSQL 和 Redis 实例(如果您的机器可以运行容器)。在控制台中,点击h然后c。它显示正在运行的开发服务。查找 redis 之一,并复制quarkus.redis.hosts注入的配置:

redis-client - Up About a minute
  Container:        348edec50f80/trusting_jennings  docker.io/redis:7-alpine
  Network:          bridge - 0.0.0.0:53853->6379/tcp
  Exec command:     docker exec -it 348edec50f80 /bin/bash
  Injected Config:  quarkus.redis.hosts=redis://localhost:53853

在上一个片段中,复制:quarkus.redis.hosts=redis://localhost:53853。这是redis服务器的地址。我们需要使用该地址配置模拟器。

如果您访问http://localhost:8080,则会提供网页。您可以fights!按几次按钮。
image.png

战斗不会发生,因为我们没有模拟器。但是,战斗请求已提交并存储在列表中。所以他们没有迷路。

现在,在第二个终端中,导航到该fight-simulator目录,然后运行:

mvn package
java -Dsimulator-name=A -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar

重要提示:quarkus.redis-hosts使用上面复制的更新。

一旦您启动它,它就会处理待处理的战斗请求:

2022-09-11 15:31:58,914 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Pakku and Tulon Voidgazer
2022-09-11 15:31:59,786 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Comet Zuko and Arishem The Judge (Knullified)
2022-09-11 15:32:01,809 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Ms. America and Kazumi (Devil Form)

如果你回到网页,获胜者会得到一个光环:
image.png

现在,在第三个终端中,导航到该fight-simulator目录,然后运行:

java -Dsimulator-name=B -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar

重要提示quarkus.redis-hosts:与上一个命令一样,使用上面复制的命令进行更新。

返回网页并单击fight!几次按钮。检查两个模拟器的日志以查看战斗请求现在在两个模拟器之间发送。

概括

这篇文章解释了如何使用 Redis 和 Quarkus Redis 数据源 API 实现作业队列。

从Quarkus 文档了解有关 Redis 数据源 API 的更多信息。我们将发布更多关于 Redis 模式的内容,敬请期待!

原文标题:How to implement a job queue with Redis
原文作者:Clement Escoffier
原文地址:https://cn.quarkus.io/blog/redis-job-queue/

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论