在如何使用 Redis 进行缓存中,我们实现了一个由 Redis 支持的简单缓存。+ 这只是 Redis 的一个用例。Redis 还用作消息传递服务器,以实现对后台作业或其他类型的消息传递任务的处理。这篇文章探讨了使用 Quarkus 和新的 Redis 数据源 API 来实现这种模式。
工作队列和 Supes!
作业队列是存储执行请求的数据结构。作业调度员在该数据结构中提交他们想要执行的任务。另一方面,作业消费者轮询请求并执行它们。

该模式有很多变体,因此让我们关注以下应用程序。我们有一个管理英雄和反派的应用程序。该应用程序提供了模拟随机英雄和随机恶棍之间的战斗的可能性。战斗模拟被委派给战斗模拟器,专用于该任务的应用程序。

在这种情况下,主应用程序将战斗请求提交到作业队列。然后,战斗模拟器轮询提交的战斗请求并执行它们。
战斗结果使用 Redis 的另一个功能进行通信:发布/订阅通信。模拟器将结果发送到应用程序使用的通道。然后应用程序将这些结果广播到网页。
这篇文章只讨论了与 Redis 的交互。应用程序的其余部分很简单,只需使用 RESTEasy Reactive 和 Hibernate ORM 和 Panache。您可以在https://github.com/cescoffier/quarkus-redis-job-queue-demo上找到该应用程序的完整代码。
提交作业
第一个任务是为作业队列建模。我们使用Redis 列表来存储FightRequest。
package me.escoffier.quarkus.redis.fight;
public record FightRequest(String id, Hero hero, Villain villain) {
}
Redis 列表区分列表的左侧和列表的右侧。这种区别允许实现一个 FIFO 队列,我们在左侧写入并从右侧消费。
要操作 Redis 列表,我们需要与此数据结构关联的一组命令。在SupesService 类中,我们注入RedisDataSource并检索命令组:
public SupesService(RedisDataSource dataSource, ...) {
commands = dataSource.list(FightRequest.class);
// ...
}
现在让我们看一下submitAFight方法:
public FightRequest submitAFight() {
var hero = Hero.getRandomHero();
var villain = Villain.getRandomVillain();
var id = UUID.randomUUID().toString();
var request = new FightRequest(id, hero, villain);
commands.lpush("fight-requests", request);
return request;
}
该submitAFight方法检索随机战斗机、计算 id、构建FightRequest实例并执行LPUSH命令。该命令将给定项目写入存储在给定键 ( )LPUSH处的列表的左侧。fight-requests
接收工作请求
现在让我们看看另一面:战斗模拟器。模拟器从代表我们的作业队列的 Redis 列表中轮询FightRequests并模拟战斗。
模拟器在me.escoffier.quarkus.redis.fight.FightSimulator. 构造函数接收配置的名称(以区分多个模拟器)和 Redis 数据源。它创建对象以发出 Redis 命令以从 Redis 列表中读取:
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, RedisDataSource ds) {
this.name = name;
this.queue = ds.list(FightRequest.class);
// ...
}
模拟器轮询战斗请求并为每个请求模拟战斗。该实现是一个无限循环(它仅在应用程序关闭时停止)。FightRequest在每次迭代中,它使用命令从队列的右侧读取待处理BRPOP。如果没有挂起的请求,它会从循环的开头重新开始。如果它有请求,它会模拟战斗:
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item =
queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
//...
}
}
}
该BRPOP命令检索并删除列表的最后一个(右)元素。与 不同的是RPOP,如果列表中没有元素,它会等待给定的时间(上面代码中的 1 秒)。所以,如果列表包含一个元素,它就会得到它。否则,它会在放弃之前等待最多一秒钟。null在这种情况下它会返回。该BRPOP命令返回KeyValue由列表的键和FightRequest. 它使用这种结构是因为您可以传递多个键,这在您有具有优先级的列表时很方便。
如果列表为空,该BRPOP命令还可以避免无限期旋转,因为它在每次迭代期间等待 1 秒。最后,BRPOP命令是atomic。这意味着如果您有多个模拟器,它们将无法检索相同的项目。它分派每个项目一次。
发送战斗结果
池循环FightRequests从队列中检索并模拟战斗,但如何传达结果?为此,我们使用了另一个 Redis 功能:发布/订阅通信。
简单来说,我们将发送FightResult到一个频道。订阅该频道的应用程序将收到发出的FightResult.
AFightResult包含请求 id、两个战斗机和获胜者的姓名:
package me.escoffier.quarkus.redis.fight;
public record FightResult(String id, Hero hero, Villain villain, String winner) {
}
要使用 Redis发布/订阅命令,我们需要与该组关联的对象。在 中FightSimulator,我们还使用pubsub方法来获取该对象:
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, Logger logger, RedisDataSource ds) {
this.name = name;
this.logger = logger;
this.queue = ds.list(FightRequest.class);
this.publisher = ds.pubsub(FightResult.class); // <--- this is it!
}
现在,我们可以使用它publisher来发送FightResults. 每次战斗后,我们调用publisher.publish将FightResult实例发送到fight-results通道:
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item = queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
publisher.publish("fight-results", result); // Send the outcome
}
}
}
收到战斗结果
在那时候:
- 我们将战斗请求提交到作业队列中,
- 我们消耗那个队列并模拟战斗,
- 我们将结果发送到fight-results频道。
因此,唯一缺少的部分是该渠道的消费。让我们回到me.escoffier.quarkus.redis.supes.SupesService课堂。在构造函数中,我们还注入ReactiveRedisDataSource了 Redis 数据源的响应式变体。然后,在构造函数代码中,我们订阅fight-results.
public SupesService(RedisDataSource dataSource, ReactiveRedisDataSource reactiveRedisDataSource) {
commands = dataSource.list(FightRequest.class);
stream = reactiveRedisDataSource.pubsub(FightResult.class).subscribe("fight-results")
.broadcast().toAllSubscribers();
}
因为我们使用响应式数据源,所以这个订阅返回一个Multi
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<FightResult> fights() {
return supes.getFightResults();
}
.broadcast().toAllSubscribers()指示 Quarkus 将所有接收FightResult到的信息广播到所有连接的 SSE。因此,浏览器会过滤掉未请求的结果。
运行系统
圈子就完成了!完整的代码源可从https://github.com/cescoffier/quarkus-redis-job-queue-demo获得。要运行系统,请打开三个终端。
首先,我们启动supes-application. 在第一个终端中,导航到supes-application并运行mvn quarkus:devQuarkus 会自动启动 PostgreSQL 和 Redis 实例(如果您的机器可以运行容器)。在控制台中,点击h然后c。它显示正在运行的开发服务。查找 redis 之一,并复制quarkus.redis.hosts注入的配置:
redis-client - Up About a minute
Container: 348edec50f80/trusting_jennings docker.io/redis:7-alpine
Network: bridge - 0.0.0.0:53853->6379/tcp
Exec command: docker exec -it 348edec50f80 /bin/bash
Injected Config: quarkus.redis.hosts=redis://localhost:53853
在上一个片段中,复制:quarkus.redis.hosts=redis://localhost:53853。这是redis服务器的地址。我们需要使用该地址配置模拟器。
如果您访问http://localhost:8080,则会提供网页。您可以fights!按几次按钮。

战斗不会发生,因为我们没有模拟器。但是,战斗请求已提交并存储在列表中。所以他们没有迷路。
现在,在第二个终端中,导航到该fight-simulator目录,然后运行:
mvn package
java -Dsimulator-name=A -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar
重要提示:quarkus.redis-hosts使用上面复制的更新。
一旦您启动它,它就会处理待处理的战斗请求:
2022-09-11 15:31:58,914 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Pakku and Tulon Voidgazer
2022-09-11 15:31:59,786 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Comet Zuko and Arishem The Judge (Knullified)
2022-09-11 15:32:01,809 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Ms. America and Kazumi (Devil Form)
如果你回到网页,获胜者会得到一个光环:

现在,在第三个终端中,导航到该fight-simulator目录,然后运行:
java -Dsimulator-name=B -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar
重要提示quarkus.redis-hosts:与上一个命令一样,使用上面复制的命令进行更新。
返回网页并单击fight!几次按钮。检查两个模拟器的日志以查看战斗请求现在在两个模拟器之间发送。
概括
这篇文章解释了如何使用 Redis 和 Quarkus Redis 数据源 API 实现作业队列。
从Quarkus 文档了解有关 Redis 数据源 API 的更多信息。我们将发布更多关于 Redis 模式的内容,敬请期待!
原文标题:How to implement a job queue with Redis
原文作者:Clement Escoffier
原文地址:https://cn.quarkus.io/blog/redis-job-queue/




