Postgres扩展插件又添一个: pg_incremental增量数据处理
最近看到一个新的开源 PostgreSQL扩展pg_incremental插件,可以帮助我们在PostgreSQL中快速,可靠的进行增量批处理。这个扩展可以在PostgreSQL中存储只有追加的事件数据流(例如物联网,时间序列)时,创建处理管道。
适合pg_incremental的使用的场景包括:
- 创建和增量维护的汇总和聚合
- 增量数据转换
- 使用标准SQL定期导入或导出新数据
在设置pg_incremental管道后,就有一个好处就是会一直运行,也不需要我们定期的维护。
为什么要增量处理?
时序数据的场景在当下物联网时代已经遍布我们生活的方方面面,比如电子手表、车机、物流、电网等等。在时序场景中,一种比较常见的场景是定期将写入的数据聚合到汇总表中,在这个模型中,尤其是批量写操作速速很快,因为在这个过程中不会对数据做任何的处理,特别是在时序数据库中都没有涉及事务,写入还会进一步的提升;增量聚合之所有快,是因为它只处理新数据;查询的速度之所以很快,是因为访问汇总数据的索引表。就针对这个场景crunchydata团队之前开发pg_cron,但是创建端到端管道仍然需要大量的统计和严谨的并发性考虑。
虽然也可以通过一些其他的办法解决,比如增量物化视图或基于逻辑解码的解决方案,但它们有许多限制并且缺乏灵活性。此外,还有其他增量处理场景,例如从多个源收集数据,或定期导入导出。crunchydata团队认为之前场景仍然没有完全解决,因此觉得需要有一种简单、一劳永逸的工具,无需大量样板即可完成工作,所以就有了pg_incremental扩展插件的诞生。
扩展插件安装
由于pg_incremental依赖于pg_cron,因此需要先安装pg_cron插件。
创建数据库和用户
create database testdb ; \c testdb; create user test password 'Postgresql2024'; grant all privileges on database testdb to test; alter database testdb owner to test;
复制
安装pg_cron插件
编译安装pg_cron插件
cd /home/postgres wget https://github.com/citusdata/pg_cron/archive/v1.6.5.tar.gz tar -zxvf /home/postgres/v1.6.5.tar.gz cd pg_cron-1.6.5/ make PG_CONFIG=/data/pgsql/bin/pg_config make install PG_CONFIG=/data/pgsql/bin/pg_config --修改PG的postgresql.conf配置文件 vim data/postgresql.conf shared_preload_libraries = 'pg_cron' cron.database_name = 'postgres' cron.use_backgroud_works= on max_worker_processes = 16 max_connections=16000
复制
重启数据库
pg_ctl restart -D /data/pgdata
复制
创建扩展
create extension pg_cron;
复制
授权test用户需要有incremental的使用权限
GRANT USAGE ON SCHEMA cron TO test;
复制
在线的源码可以在下面的链接进行详细查看
https://gitee.com/mirrors/pg_cron
安装pg_incremental插件
编译安装pg_incremental插件
cd /home/postgres wget https://github.com/CrunchyData/pg_incremental/archive/refs/tags/v1.0.0.tar.gz tar -zxvf /home/postgres/v1.0.0.tar.gz cd pg_incremental-1.0.0/ make PG_CONFIG=/data/pgsql/bin/pg_config make install PG_CONFIG=/data/pgsql/bin/pg_config --修改PG的postgresql.conf配置文件 vim data/postgresql.conf shared_preload_libraries = 'pg_incremental'
复制
重启数据库
pg_ctl restart -D /data/pgdata
复制
创建扩展
create extension pg_incremental;
复制
授权test用户需要有incremental的使用权限
GRANT USAGE ON SCHEMA incremental TO test;
复制
在线的源码可以在下面的链接进行详细查看
https://github.com/CrunchyData/pg_incremental
在这里需要注意您只能在具有pg_cron的数据库中创建pg_incremental。
创建增量处理管道
pg_incremental中有3种类型的管道
- 序列管道- 对一系列序列值执行管道查询,并使用一种机制确保不再有新的序列值落入该范围内。这些管道最适合增量构建汇总表。
- 时间间隔管道- 在时间间隔过后,在一个时间间隔或时间间隔范围内执行管道查询。这些管道可用于增量构建汇总表或定期导出新数据。
- 文件列表管道 (PREVIEW) - 对从文件列表函数获取的新文件执行管道查询。这些管道可用于导入新数据。
每个管道都有一个带有 1 或 2 个参数的命令。管道使用pg_cron定期运行(默认情况下每分钟一次),并且仅在有新数据要处理时才执行命令。但是不管是否有新数据,每个管道的执行都将出现在cron.job_run_details
。
下面来介绍每种类型的管道:
创建序列管道
创建源表和汇总表
-- 创建一个源表 create table events ( event_id bigint generated always as identity, event_time timestamptz default now(), client_id bigint, path text, response_time double precision ); --创建BRIN索引,在选择新范围方面非常有效 create index on events using brin (event_id); -- 随机插入一些数据 insert into events (client_id, path, response_time) select s % 100, '/page-' || (s % 3), random() from generate_series(1,1000000) s; --创建汇总表来预聚合每天的事件数 create table events_agg ( day timestamptz, event_count bigint, primary key (day) );
复制
创建序列管道是通过incremental.create_sequence_pipeline
函数来实现的,在函数中需要指定管道名称、序列或包含序列的表的名称以及定义使用该函数的序列管道命令。这个命令将在上下文中执行,其中$1
和$2
分别设置为可以安全聚合的序列值范围 (bigint) 的最低和最高值。创建管道时,由于execute_immediately的默认参数true,因此将从 0 开始的所有序列值立即执行该命令。当然也可以通过设置execute_immediately的参数为false来禁用立即执行,在这种情况下,第一次执行将作为定期作业调度的一部分进行。
select incremental.create_sequence_pipeline('event-aggregation', 'events', $$ insert into events_agg select date_trunc('day', event_time), count(*) from events where event_id between $1 and $2 group by 1 on conflict (day) do update set event_count = events_agg.event_count + excluded.event_count; $$); create_sequence_pipeline -------------------------- (1 row) Time: 281.406 ms
复制
管道执行确保已知序列值的范围是安全的,这意味着不再有可能产生在该范围内的序列值的事务。这是通过在执行命令之前等待并发写事务来确保的。范围的大小实际上是自上次执行管道到新管道开始执行的时刻的插入数量。
序列管道的优点是,它们可以以小的增量方式处理数据,并且与聚合中使用的时间戳来自何处无关,比如即使延迟的数据很好,也没有影响。缺点是总是需要使用ON CONFLICT子句来合并聚合,即使在某些情况下不符合实际(例如:完全不同的计数)。
incremental.create_sequence_pipeline函数的的参数说明:
参数名称 | 类型 | 描述 | 默认 |
---|---|---|---|
pipeline_name |
text | 自定义的管道名称 | 必需的 |
sequence_name |
regclass | 序列或包含序列的表名 | 必需的 |
command |
text | 带有 $1 和 $2 参数的管道命令 | 必需的 |
schedule |
text | pg_cron 定期执行的时间表(或 NULL) | * * * * * (每分钟) |
execute_immediately |
bool | 对现有数据立即执行命令 | true |
手动执行管道命令函数或者等待序列管道自行定时调用(每分钟)
call incremental.execute_pipeline('event-aggregation');
复制
然后再看看events的数据量和events_agg记录的信息,可以到events的事件数和统计结果是一致的。
postgres=# select count(*) from events; count --------- 1000000 (1 row) Time: 26.782 ms postgres=# select * from events_agg; day | event_count ------------------------+------------- 2024-12-23 00:00:00+08 | 1000000 (1 row) Time: 0.321 ms
复制
插入新的数据
insert into events (client_id, path, response_time) select s, '/page-' || (s % 3), random() from generate_series(1,100) s;
复制
再次手动调用管道命令函数
postgres=# call incremental.execute_pipeline('event-aggregation'); NOTICE: pipeline event-aggregation: processing sequence values from 1000001 to 1000100 CALL Time: 1.734 ms
复制
再次查看events_agg中的数据是否更新
postgres=# select count(*) from events; count --------- 1000100 (1 row) Time: 15.794 ms postgres=# select * from events_agg; day | event_count ------------------------+------------- 2024-12-23 00:00:00+08 | 1000100 (1 row) Time: 0.205 ms
复制
从上面的结果,可以看到在events_agg中的数据更新为了最新的汇总值和events中的统计结果一致。
删除序列管道
select incremental.drop_pipeline('event-aggregation');
复制
创建时间间隔管道
可以用incremental.create_time_interval_pipeline
通过指定管道名称、时间间隔和管道命令。该命令将在一个上下文中执行,$1
和$2
分别设置为时间间隔的开始和结束(不包括$2的值)。
-- BRIN索引在选择新范围方面非常有效 create index on events using brin (event_time); --创建一个管道,以1天的间隔聚合新插入 -- $1和$2将被设置为可聚合的时间间隔范围的开始和结束(不包括) select incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$ insert into events_agg select event_time::date, count(distinct event_id) from events where event_time >= $1 and event_time < $2 group by 1 $$);
复制
在创建管道时,该命令将立即执行,从默认2000-01-01 00:00:00时间点开始,当然我们也可以通过对start_time参数进行自定义配置。另外也可以通过设置execute_immediately的参数为false来禁用立即执行,在这种情况下,第一次执行将作为定期作业调度的一部分进行。
时间间隔管道的好处是,它们更容易定义,可以进行更复杂的处理,比如精确的不同计数,而且更适合导出数据,因为该命令总是处理精确的时间范围。缺点是您需要等到一段时间间隔过去后才能看到结果,并且插入旧的时间戳可能会导致跳过数据。序列管道在这个意义上更可靠,因为值总是由数据库生成的。
incremental.create_time_range_pipeline函数的的参数说明:
参数名称 | 类型 | 描述 | 默认 |
---|---|---|---|
pipeline_name |
text | 自定义的管道名称 | 必需 |
time_interval |
interval | 管道执行的间隔时间 | 必需 |
command |
text | 有$1和$2参数的管道命令 | 必需 |
batched |
text | 是否运行多个intervals的命令 | true |
start_time |
timestamptz | intervals开始的时间 | 2000-01-01 00:00:00 |
source_table_name |
regclass | 在聚合之前等待这个表的锁 | NULL(不等待) |
schedule |
text | pg_cron定时执行计划(或NULL) | * * * * * (每分钟) |
min_delay |
interval | 需要等待多长时间来处理过去的间隔 | 30 seconds |
execute_immediately |
bool | 立即对现有数据执行命令 | true |
下面利用时间间隔管道验证数据文件的导出。
-- 定义一个导出函数 create function export_events(start_time timestamptz, end_time timestamptz) returns void language plpgsql as $function$ declare path text := format('/data/export/%s.csv', start_time::date); begin execute format($$copy (select * from events where event_time >= start_time and event_time < end_time) to %L$$, path); end; $function$; --从2024-12-24开始,每1天事件导出为CSV文件 --该命令在每个时间间隔内分别执行 select incremental.create_time_interval_pipeline('event-export', time_interval := '1 day', batched:= false , start_time := '2024-12-25', command := $$ select export_events($1, $2) $$ ); --手动调用 call incremental.execute_pipeline('event-export');
复制
根据官网提供的案例,对间隔时间按天导出数据,验证未生成文件,如果再次手动调用会报错。
postgres=# call incremental.execute_pipeline('event-export'); NOTICE: pipeline event-export: processing overall range from 2024-12-24 00:00:00+08 to 2024-12-25 00:00:00+08 NOTICE: pipeline event-export: processing time range from 2024-12-24 00:00:00+08 to 2024-12-25 00:00:00+08 ERROR: column "start_time" does not exist LINE 1: copy (select * from events where event_time >= start_time an... ^ QUERY: copy (select * from events where event_time >= start_time and event_time < end_time) to '/data/export/2024-12-24.csv' CONTEXT: PL/pgSQL function export_events(timestamp with time zone,timestamp with time zone) line 5 at EXECUTE SQL statement " SELECT public.export_events($1, $2) AS export_events"
复制
问题未解决后续继续找时间研究一下
创建文件列表管道
可以通过incremental.create_file_list_pipeline
来创建文件列表管道,需要指定管道名称、文件模式和使用该函数定义文件列表管道命令。该命令将在$1
设置为文件路径的上下文中执行。管道定期查找列表函数,然后返回的新文件并对每个新文件执行命令。
定义一个封装COPY命令的导入函数
create function import_events(path text) returns void language plpgsql as $function$ begin execute format($$copy events from %L$$, path); end; $function$;
复制
创建一个管道,将/data/import目录下的文件导入表中,这些文件是从events导出的数据。在这里利用pg_ls_dir函数来读取服务器上的文件列表
-- $1将被设置为新文件的路径 select incremental.create_file_list_pipeline('event-import', '/data/import/*.csv', $$ select import_events($1) $$,false,'pg_ls_dir');
复制
执行报错如下:
postgres=# select incremental.create_file_list_pipeline('event-import', '/data/import/*.csv', $$ postgres$# select import_events($1) postgres$# $$,false,'pg_ls_dir'); ERROR: column "path" does not exist LINE 1: select path from pg_catalog.pg_ls_dir($2) where path not in ... ^ QUERY: select path from pg_catalog.pg_ls_dir($2) where path not in (select path from incremental.processed_files where pipeline_name operator(pg_catalog.=) $1) Time: 0.541 ms
复制
从这个报错看应该是由于缺失path列,我然后就执行下面的sql
postgres=# select * from pg_catalog.pg_ls_dir('/data/import'); pg_ls_dir ------------ 122401.csv 122402.csv (2 rows)
复制
可以看到,默认的列名是函数名,和代码中的path不符合。个人判断应该还是插件代码的问题。
在官网介绍说文件列表管道的 API 仍可能发生变化。
incremental.create_file_list_pipeline函数的的参数说明:
参数名称 | 类型 | 描述 | 默认 |
---|---|---|---|
pipeline_name |
text | 自定义的管道名称 | 必需的 |
file_pattern |
text | 要传递给列表函数的文件模式 | 必需的 |
command |
text | 带有 $1 和 $2 参数的管道命令 | 必需的 |
batched |
bool | 目前未使用 | false |
list_function |
text | 列出文件的函数名称 | crunchy_lake.list_files |
schedule |
text | pg_cron 定期执行的时间表(或 NULL) | * * * * * (每分钟) |
execute_immediately |
bool | 对现有数据立即执行命令 | true |
监控管道
目前有两种监控管道的方法:
1)通过表对应每个管道类型:incremental.sequence_pipelines
, incremental.time_interval_pipelines
, and incremental.processed_files
2)通过查看底层作业pg_cron的cron.Job_run_details 中记录的信息
查看序列管道的信息:
select * from incremental.sequence_pipelines ;
┌─────────────────────┬────────────────────────────┬────────────────────────────────┐
│ pipeline_name │ sequence_name │ last_processed_sequence_number │
├─────────────────────┼────────────────────────────┼────────────────────────────────┤
│ view-count-pipeline │ public.events_event_id_seq │ 3000000 │
│ event-aggregation │ events_event_id_seq │ 1000000 │
└─────────────────────┴────────────────────────────┴────────────────────────────────┘
复制
查看时间间隔管道的信息:
postgres=# select * from incremental.time_interval_pipelines;
pipeline_name | time_interval | batched | min_delay | last_processed_time
-------------------+---------------+---------+-----------+------------------------
event-aggregation | 00:01:00 | t | 00:00:30 | 2024-12-24 17:27:00+08
event-export | 00:05:00 | f | 00:00:30 | 2024-12-24 17:28:00+08
(2 rows)
复制
查看文件列表管道的文件信息:
select * from incremental.file_list_pipelines ;
┌───────────────┬─────────────────────────────────────┬─────────┬─────────────────────────┐
│ pipeline_name │ file_pattern │ batched │ list_function │
├───────────────┼─────────────────────────────────────┼─────────┼─────────────────────────┤
│ event-import │ s3://marco-crunchy-data/inbox/*.csv │ f │ crunchy_lake.list_files │
└───────────────┴─────────────────────────────────────┴─────────┴─────────────────────────┘
select * from incremental.processed_files ;
┌───────────────┬────────────────────────────────────────────┐
│ pipeline_name │ path │
├───────────────┼────────────────────────────────────────────┤
│ event-import │ s3://marco-crunchy-data/inbox/20241215.csv │
│ event-import │ s3://marco-crunchy-data/inbox/20241215.csv │
└───────────────┴────────────────────────────────────────────┘
复制
pg_cron中记录的作业信息
对于所有管道,我们都可以通过查询底层pg_cron中记录的作业信息及错误消息。
select jobname, start_time, status, return_message
from cron.job_run_details join cron.job using (jobid)
where jobname like 'pipeline:event-import%' order by 1 desc limit 3;
┌───────────────────────┬───────────────────────────────┬───────────┬────────────────┐
│ jobname │ start_time │ status │ return_message │
├───────────────────────┼───────────────────────────────┼───────────┼────────────────┤
│ pipeline:event-import │ 2024-12-17 13:27:00.090057+01 │ succeeded │ CALL │
│ pipeline:event-import │ 2024-12-17 13:26:00.055813+01 │ succeeded │ CALL │
│ pipeline:event-import │ 2024-12-17 13:25:00.086688+01 │ succeeded │ CALL │
└───────────────────────┴───────────────────────────────┴───────────┴────────────────┘
复制
手动执行管道
也支持手动执行管道,只需要call调用incremental.execute_pipeline函数即可,不过它只会在有新数据需要处理时才运行该命令。
call incremental.execute_pipeline('event-aggregation');
复制
如果需要禁用定时调度,则需要在创建管道时指定schedule的参数为NULL。
incremental.execute_pipeline函数的的参数说明:
参数名称 | 类型 | 描述 | 默认 |
---|---|---|---|
pipeline_name |
text | 自定义的管道名称 | 必须 |
重置增量处理管道
如果增量管道处理的数据有问题,也提供重新进行初始化的函数,但是需要先删除之前的数据,比如要重新对events的聚合进行构建,需要先删除 events_agg的数据,然后执行incremental.reset_pipeline函数
begin;
delete from events_agg;
select incremental.reset_pipeline('event-aggregation');
commit;
复制
在这里还需要注意,如果函数执行失败,则不会重置管道。
incremental.reset_pipeline函数的的参数说明:
参数名称 | 类型 | 描述 | 默认 |
---|---|---|---|
pipeline_name |
text | 自定义的管道名称 | 必须 |
execute_immediately |
bool | 立即对现有数据执行命令 | true |
删除增量处理管道
当我们需要删除一个管道时,可以使用incremental.drop_pipeline函数处理,只需要指定自定义的管道名称就行。
-- Drop the pipeline
select incremental.drop_pipeline('event-aggregation');
复制
incremental.drop_pipeline
函数的参数:
参数名称 | 类型 | 描述 | 默认 |
---|---|---|---|
pipeline_name |
text | 自定义的管道名称 | 必须 |
总结
通过对3种类型增量管道进行实际操作,还是发现一些问题:
1、文件列表管道在使用pg_ls_dir函数时报错,应该是代码的bug
2、时间间隔管道在导出文件时,一直看不到文件生成(根据验证结果,明天修改结论)。
3、目前不支持对已经定义的增量管道信息进行修改,如果需要修改只能先删除管道然后再次创建管道的方式实现。
虽然目前仍然存在一些问题,毕竟第一个版本,但是瑕不掩瑜,提供的功能还是解决了一些实际问题。
1、pg_incremental的 管道的每次执行具有事务性,考虑并发场景,因此每个值都会只处理一次
2、可以支持对多个数据源的增量数据导入和导出
参考
https://www.crunchydata.com/blog/pg_incremental-incremental-data-processing-in-postgres
https://github.com/crunchydata/pg_incremental?CrunchyAnonId=twayalziarcsdbfcqeywrqevgvngsabfwnkjajzipamfjctlboy
– / END / –
可以通过下面的方式联系我
如果这篇文章为你带来了灵感或启发,就请帮忙点赞、收藏、转发;如果文章中不严谨或者错漏之处,请及时评论指正。非常感谢!
评论

