暂无图片
暂无图片
4
暂无图片
暂无图片
3
暂无图片

PostgreSQL消息队列拓展——PGQ

在分布式系统和异步任务处理中,消息队列是解耦服务、提升可靠性的核心组件。但当你的系统已经依赖PostgreSQL数据库时,引入RabbitMQ等独立消息中间件可能意味着更高的复杂性和运维成本。能否利用PostgreSQL数据库来构建消息队列,以减少运维成本呢?

PGMQ和PGQ就是PostgreSQL的消息队列拓展。PGMQ我们之前的文章中已有介绍,这里介绍另一款消息队列扩展PGQ。PGQ是一个通用消息队列拓展,可以解决实时事务的异步处理问题,比如在事务提交后,异步执行一些任务,比如发送邮件、更新缓存等。其核心是不会阻塞当前事务。实际上就是提供了类似消息队列的机制,提供纯SQL接口供开发者调用。

PGQ解决了哪些问题?

异步批量事务处理:在事务提交后,异步执行一些任务,比如发送邮件、更新缓存等。这些任务通常不会阻塞当前事务,但需要确保这些任务在事务提交后执行。具体一点就是当你执行某些比如说INSERT/DELETE/UPDATE事务,你希望触发一些行为,但是这些行为不需要在事务COMMIT之前完成,希望在事务COMMIT之后不远的时间异步执行,而不阻塞当前的事务。

基本原理

数据库实现消息队列,并不是像RabbitMQ一样,实现了AMQP协议,而是通过数据库机制来实现类似消息队列的功能。对外提供的接口是SQL接口。 数据库实现消息队列,通常都是通过表来实现队列,PGQ也是基于表来实现消息队列。一个队列就是一张表,客户端发送消息到队列,实际上就是向表中插入了一条数据。消费者读消息,就是读这张抽象为队列的表,消费消息实际上就是从被抽象为队列的表中删除消息。为了方便用户使用,封装了一些API来操作队列,与RabbitMQ不同的是,这里提供的是纯SQL接口,用户可以直接使用SQL来操作队列,比如发送消息、消费消息等。

安装

安装扩展十分简单:

-- 创建 PGQ 扩展 CREATE EXTENSION pgq;
复制

可通过函数pgq.version()查看当前pgq的版本。

postgres=# select pgq.version(); version --------- 3.5.1 (1 row)
复制

支持PG10~PG16

PGQ的使用

创建队列

使用消息队列,首先要创建一个消息队列,然后才能向队列中发送消息。

-- 创建队列 postgres=# select pgq.create_queue('myqueue'); create_queue -------------- 1 (1 row)
复制

为了查看队列的情况,PGQ中创建了表pgq.queue,用来存储已创建成功的队列信息。

postgres=# select * from pgq.queue; -[ RECORD 1 ]------------+------------------------------ queue_id | 1 queue_name | my_queue queue_ntables | 3 queue_cur_table | 0 queue_rotation_period | 02:00:00 queue_switch_step1 | 800 queue_switch_step2 | 800 queue_switch_time | 2025-03-20 13:58:20.872887+08 queue_external_ticker | f queue_disable_insert | f queue_ticker_paused | f queue_ticker_max_count | 500 queue_ticker_max_lag | 00:00:03 queue_ticker_idle_period | 00:01:00 queue_per_tx_limit | queue_data_pfx | pgq.event_1 queue_event_seq | pgq.event_1_id_seq queue_tick_seq | pgq.event_1_tick_seq queue_extra_maint | -[ RECORD 2 ]------------+------------------------------ queue_id | 2 queue_name | myqueue queue_ntables | 3 queue_cur_table | 0 queue_rotation_period | 02:00:00 queue_switch_step1 | 807 queue_switch_step2 | 807 queue_switch_time | 2025-03-21 17:19:50.565652+08 queue_external_ticker | f queue_disable_insert | f queue_ticker_paused | f queue_ticker_max_count | 2 queue_ticker_max_lag | 00:00:03 queue_ticker_idle_period | 00:01:00 queue_per_tx_limit | queue_data_pfx | pgq.event_2 queue_event_seq | pgq.event_2_id_seq queue_tick_seq | pgq.event_2_tick_seq queue_extra_maint |
复制

也可以通过函数pgq.get_queue_info查看队列信息。

postgres=# select * from pgq.get_queue_info(); -[ RECORD 1 ]------------+------------------------------ queue_name | myqueue queue_ntables | 3 queue_cur_table | 0 queue_rotation_period | 02:00:00 queue_switch_time | 2025-03-21 17:19:50.565652+08 queue_external_ticker | f queue_ticker_paused | f queue_ticker_max_count | 2 queue_ticker_max_lag | 00:00:03 queue_ticker_idle_period | 00:01:00 ticker_lag | 2 days 22:09:55.306126 ev_per_sec | ev_new | 3 last_tick_id | 1 -[ RECORD 2 ]------------+------------------------------ queue_name | testqueue queue_ntables | 3 queue_cur_table | 0 queue_rotation_period | 02:00:00 queue_switch_time | 2025-03-24 14:43:01.80398+08 queue_external_ticker | f queue_ticker_paused | f queue_ticker_max_count | 500 queue_ticker_max_lag | 00:00:03 queue_ticker_idle_period | 00:01:00 ticker_lag | 00:46:44.067798 ev_per_sec | ev_new | 0 last_tick_id | 1
复制

注册消费者

可通过函数pgq.register_consumer注册消费者,消费者可以消费队列中的消息。

-- 注册消费者 postgres=# select pgq.register_consumer('myqueue','myconsumer'); register_consumer ------------------- 1 (1 row)
复制

PGQ中创建了表pgq.consumer,用来存储已注册成功的消费者信息。

-- 查看消费者, postgres=# select * from pgq.consumer; co_id | co_name -------+-------------- 1 | myconsumer 2 | testconsumer (2 rows)
复制

也可通过函数pgq.get_consumer_info查看消费者信息。

postgres=# select * from pgq.get_consumer_info(); queue_name | consumer_name | lag | last_seen | last_tick | current_batch | next_tick | pending_events ------------+---------------+------------------------+------------------------+-----------+---------------+-----------+---------------- myqueue | myconsumer | 2 days 22:11:53.125704 | 2 days 22:10:33.319906 | 1 | | | 0 (1 row)
复制

发送消息

创建完队列后就可以发送消息了,PGQ可通过pgq.insert_event函数向队列中发送消息。

postgres=# select pgq.insert_event('myqueue','hangzhou','hangzhou print log'); insert_event -------------- 4 (1 row)
复制

我们看一下其声明参数,需要传递三个参数,分别是队列名称、事件类型和事件数据。后面的参数是可选的。

create or replace function pgq.insert_event( queue_name text, ev_type text, ev_data text, ev_extra1 text, ev_extra2 text, ev_extra3 text, ev_extra4 text) returns bigint as $$ -- ---------------------------------------------------------------------- -- Function: pgq.insert_event(7) -- -- Insert a event into queue with all the extra fields. -- -- Parameters: -- queue_name - Name of the queue -- ev_type - User-specified type for the event -- ev_data - User data for the event -- ev_extra1 - Extra data field for the event -- ev_extra2 - Extra data field for the event -- ev_extra3 - Extra data field for the event -- ev_extra4 - Extra data field for the event -- -- Returns: -- Event ID -- Calls: -- pgq.insert_event_raw(11) -- Tables directly manipulated: -- insert - pgq.insert_event_raw(11), a C function, inserts into current event_N_M table -- ---------------------------------------------------------------------- begin return pgq.insert_event_raw(queue_name, null, now(), null, null, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4); end; $$ language plpgsql security definer; -- 简化版 create or replace function pgq.insert_event(queue_name text, ev_type text, ev_data text) returns bigint as $$ -- ---------------------------------------------------------------------- -- Function: pgq.insert_event(3) -- -- Insert a event into queue. -- -- Parameters: -- queue_name - Name of the queue -- ev_type - User-specified type for the event -- ev_data - User data for the event -- -- Returns: -- Event ID -- Calls: -- pgq.insert_event(7) -- ---------------------------------------------------------------------- begin return pgq.insert_event(queue_name, ev_type, ev_data, null, null, null, null); end; $$ language plpgsql;
复制

我们可以通过函数pgq.current_event_table来查看当前队列的表名。

-- 查看当前队列存储消息事件的表名 postgres=# select pgq.current_event_table('myqueue'); current_event_table --------------------- pgq.event_2_0 (1 row) -- 查看表结构 postgres=# \d pgq.event_2_0; Table "pgq.event_2_0" Column | Type | Collation | Nullable | Default -----------+--------------------------+-----------+----------+----------------------------------------- ev_id | bigint | | not null | nextval('pgq.event_2_id_seq'::regclass) ev_time | timestamp with time zone | | not null | ev_txid | bigint | | not null | txid_current() ev_owner | integer | | | ev_retry | integer | | | ev_type | text | | | ev_data | text | | | ev_extra1 | text | | | ev_extra2 | text | | | ev_extra3 | text | | | ev_extra4 | text | | | Indexes: "event_2_0_txid_idx" btree (ev_txid) Inherits: pgq.event_2 -- 查看具体的表,可以看到我们插入的消息已经存储到了该表中。 postgres=# select * from pgq.event_2_0; ev_id | ev_time | ev_txid | ev_owner | ev_retry | ev_type | ev_data | ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4 -------+-------------------------------+---------+----------+----------+----------+--------------------+-----------+-----------+-----------+----------- 1 | 2025-03-21 17:20:15.532007+08 | 808 | | | insert | insert a tuple | | | | 2 | 2025-03-21 17:20:28.98632+08 | 809 | | | update | update a tuple | | | | 3 | 2025-03-21 17:20:39.131856+08 | 810 | | | delete | delete a tuple | | | | 4 | 2025-03-24 15:00:31.981381+08 | 835 | | | hangzhou | hangzhou print log | | | | (4 rows)
复制

也就是说,创建一个队列,其实就是创建了一些列的表,用来存储消息,以及存储一些元数据。而发送消息,实际上就是向这些表插入数据。

Ticker

ticker是PGQ消息处理的核心调度组件,一个轻量级的PostgreSQL任务队列工具,专门用于处理周期性任务和延时任务,负责周期性地将队列中的消息事件分批次推送给消费者。

深入可参考pgq ticker

如果没有配置外部ticker,可通过SQL函数pgq.ticker直接调用ticker。

postgres=# select pgq.ticker(); ticker -------- 0 (1 row)
复制

Notice: 如果没有配置外部ticker,或者手动执行pgq.ticker(),消费者读取不到消息。

消费消息

消费消息,可先通过函数pgq.next_batch获取队列下一批次消息ID,再通过函数pgq.get_batch_events获取批次消息。最后通过函数pgq.finish_batch结束批次消息。

-- 获取下一批次消息ID postgres=# select pgq.next_batch('myqueue','myconsumer'); next_batch ------------ 1 (1 row) -- 获取批次消息 postgres=# select * from pgq.get_batch_events(1); ev_id | ev_time | ev_txid | ev_retry | ev_type | ev_data | ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4 -------+-------------------------------+---------+----------+----------+--------------------+-----------+-----------+-----------+----------- 1 | 2025-03-21 17:20:15.532007+08 | 808 | | insert | insert a tuple | | | | 2 | 2025-03-21 17:20:28.98632+08 | 809 | | update | update a tuple | | | | 3 | 2025-03-21 17:20:39.131856+08 | 810 | | delete | delete a tuple | | | | 4 | 2025-03-24 15:00:31.981381+08 | 835 | | hangzhou | hangzhou print log | | | | (4 rows) -- 结束批次消息 postgres=# select pgq.finish_batch(1); finish_batch -------------- 1 (1 row)
复制

注销消费者

可通过函数pgq.unregister_consumer注销消费者,注销消费者后,消费者将不再消费队列中的消息。

postgres=# select pgq.unregister_consumer('myqueue','myconsumer'); unregister_consumer --------------------- 1 (1 row) -- 查看,myconsumer消费者已被删除 postgres=# select * from pgq.consumer; co_id | co_name -------+------------ 1 | testconsumer (1 row)
复制

删除队列

最后,删除队列,可通过函数pgq.drop_queue删除队列。

-- 删除队列 postgres=# select pgq.drop_queue('myqueue'); drop_queue ------------ 1 (1 row)
复制

参考资料

PGQ Tutorial
pgq SQL接口说明
The PgQueue module
PGQ: Queuing for Long-Running Jobs in Go Written Atop Postgres
The ticker daemon

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论

chirpyli
暂无图片
9天前
评论
暂无图片 0
能否利用PostgreSQL数据库来构建消息队列,以减少运维成本呢?
9天前
暂无图片 点赞
评论
...
暂无图片
9天前
评论
暂无图片 0
在分布式系统和异步任务处理中,消息队列是解耦服务、提升可靠性的核心组件。但当你的系统已经依赖PostgreSQL数据库时,引入RabbitMQ等独立消息中间件可能意味着更高的复杂性和运维成本
9天前
暂无图片 点赞
评论
怀
怀念和想念
暂无图片
12天前
评论
暂无图片 0
PostgreSQL消息队列拓展——PGQ
12天前
暂无图片 点赞
评论