暂无图片
暂无图片
3
暂无图片
暂无图片
2
暂无图片

Postgres扩展插件又添一个: pg_incremental增量数据处理

原创 墨竹 2024-12-24
261

Postgres扩展插件又添一个: pg_incremental增量数据处理

最近看到一个新的开源 PostgreSQL扩展pg_incremental插件,可以帮助我们在PostgreSQL中快速,可靠的进行增量批处理。这个扩展可以在PostgreSQL中存储只有追加的事件数据流(例如物联网,时间序列)时,创建处理管道。

适合pg_incremental的使用的场景包括:

  • 创建和增量维护的汇总和聚合
  • 增量数据转换
  • 使用标准SQL定期导入或导出新数据

在设置pg_incremental管道后,就有一个好处就是会一直运行,也不需要我们定期的维护。

为什么要增量处理?

时序数据的场景在当下物联网时代已经遍布我们生活的方方面面,比如电子手表、车机、物流、电网等等。在时序场景中,一种比较常见的场景是定期将写入的数据聚合到汇总表中,在这个模型中,尤其是批量写操作速速很快,因为在这个过程中不会对数据做任何的处理,特别是在时序数据库中都没有涉及事务,写入还会进一步的提升;增量聚合之所有快,是因为它只处理新数据;查询的速度之所以很快,是因为访问汇总数据的索引表。就针对这个场景crunchydata团队之前开发pg_cron,但是创建端到端管道仍然需要大量的统计和严谨的并发性考虑。

虽然也可以通过一些其他的办法解决,比如增量物化视图或基于逻辑解码的解决方案,但它们有许多限制并且缺乏灵活性。此外,还有其他增量处理场景,例如从多个源收集数据,或定期导入导出。crunchydata团队认为之前场景仍然没有完全解决,因此觉得需要有一种简单、一劳永逸的工具,无需大量样板即可完成工作,所以就有了pg_incremental扩展插件的诞生。

扩展插件安装

由于pg_incremental依赖于pg_cron,因此需要先安装pg_cron插件。

创建数据库和用户

create database testdb ;
\c testdb;
create user test password 'Postgresql2024';
grant all privileges on database testdb to test;
alter database testdb owner to test;
复制

安装pg_cron插件

编译安装pg_cron插件

cd /home/postgres
wget https://github.com/citusdata/pg_cron/archive/v1.6.5.tar.gz
tar -zxvf /home/postgres/v1.6.5.tar.gz
cd pg_cron-1.6.5/
make   PG_CONFIG=/data/pgsql/bin/pg_config
make install  PG_CONFIG=/data/pgsql/bin/pg_config
--修改PG的postgresql.conf配置文件
vim data/postgresql.conf
  shared_preload_libraries = 'pg_cron'
  cron.database_name = 'postgres'
  cron.use_backgroud_works= on 
  max_worker_processes = 16
  max_connections=16000
复制

重启数据库

pg_ctl restart -D /data/pgdata
复制

创建扩展

create extension pg_cron;
复制

授权test用户需要有incremental的使用权限

GRANT USAGE ON SCHEMA cron TO test;
复制

在线的源码可以在下面的链接进行详细查看

https://gitee.com/mirrors/pg_cron

安装pg_incremental插件

编译安装pg_incremental插件

cd /home/postgres
wget https://github.com/CrunchyData/pg_incremental/archive/refs/tags/v1.0.0.tar.gz
tar -zxvf /home/postgres/v1.0.0.tar.gz
cd pg_incremental-1.0.0/
make   PG_CONFIG=/data/pgsql/bin/pg_config
make install  PG_CONFIG=/data/pgsql/bin/pg_config
--修改PG的postgresql.conf配置文件
vim data/postgresql.conf
  shared_preload_libraries = 'pg_incremental'
复制

重启数据库

pg_ctl restart -D /data/pgdata
复制

创建扩展

create extension pg_incremental;
复制

授权test用户需要有incremental的使用权限

GRANT USAGE ON SCHEMA incremental TO test;
复制

在线的源码可以在下面的链接进行详细查看

https://github.com/CrunchyData/pg_incremental

在这里需要注意您只能在具有pg_cron的数据库中创建pg_incremental。

创建增量处理管道

pg_incremental中有3种类型的管道

  • 序列管道- 对一系列序列值执行管道查询,并使用一种机制确保不再有新的序列值落入该范围内。这些管道最适合增量构建汇总表
  • 时间间隔管道- 在时间间隔过后,在一个时间间隔或时间间隔范围内执行管道查询。这些管道可用于增量构建汇总表或定期导出新数据。
  • 文件列表管道 (PREVIEW) - 对从文件列表函数获取的新文件执行管道查询。这些管道可用于导入新数据。

每个管道都有一个带有 1 或 2 个参数的命令。管道使用pg_cron定期运行(默认情况下每分钟一次),并且仅在有新数据要处理时才执行命令。但是不管是否有新数据,每个管道的执行都将出现在cron.job_run_details

下面来介绍每种类型的管道:

创建序列管道

创建源表和汇总表

-- 创建一个源表
create table events (
  event_id bigint generated always as identity,
  event_time timestamptz default now(),
  client_id bigint,
  path text,
  response_time double precision
);
--创建BRIN索引,在选择新范围方面非常有效
create index on events using brin (event_id);

-- 随机插入一些数据
insert into events (client_id, path, response_time)
select s % 100, '/page-' || (s % 3), random() from generate_series(1,1000000) s;

--创建汇总表来预聚合每天的事件数
create table events_agg (
  day timestamptz,
  event_count bigint,
  primary key (day)
);
复制

创建序列管道是通过incremental.create_sequence_pipeline函数来实现的,在函数中需要指定管道名称、序列或包含序列的表的名称以及定义使用该函数的序列管道命令。这个命令将在上下文中执行,其中$1$2分别设置为可以安全聚合的序列值范围 (bigint) 的最低和最高值。创建管道时,由于execute_immediately的默认参数true,因此将从 0 开始的所有序列值立即执行该命令。当然也可以通过设置execute_immediately的参数为false来禁用立即执行,在这种情况下,第一次执行将作为定期作业调度的一部分进行。

select incremental.create_sequence_pipeline('event-aggregation', 'events', $$
  insert into events_agg
  select date_trunc('day', event_time), count(*)
  from events
  where event_id between $1 and $2
  group by 1
  on conflict (day) do update set event_count = events_agg.event_count + excluded.event_count;
$$);

 create_sequence_pipeline
--------------------------

(1 row)

Time: 281.406 ms
复制

管道执行确保已知序列值的范围是安全的,这意味着不再有可能产生在该范围内的序列值的事务。这是通过在执行命令之前等待并发写事务来确保的。范围的大小实际上是自上次执行管道到新管道开始执行的时刻的插入数量。

序列管道的优点是,它们可以以小的增量方式处理数据,并且与聚合中使用的时间戳来自何处无关,比如即使延迟的数据很好,也没有影响。缺点是总是需要使用ON CONFLICT子句来合并聚合,即使在某些情况下不符合实际(例如:完全不同的计数)。

incremental.create_sequence_pipeline函数的的参数说明:

参数名称 类型 描述 默认
pipeline_name text 自定义的管道名称 必需的
sequence_name regclass 序列或包含序列的表名 必需的
command text 带有 $1 和 $2 参数的管道命令 必需的
schedule text pg_cron 定期执行的时间表(或 NULL) * * * * *(每分钟)
execute_immediately bool 对现有数据立即执行命令 true

手动执行管道命令函数或者等待序列管道自行定时调用(每分钟)

call incremental.execute_pipeline('event-aggregation');
复制

然后再看看events的数据量和events_agg记录的信息,可以到events的事件数和统计结果是一致的。

postgres=# select count(*) from events;
  count
---------
 1000000
(1 row)

Time: 26.782 ms
postgres=# select * from events_agg;
          day           | event_count
------------------------+-------------
 2024-12-23 00:00:00+08 |     1000000
(1 row)

Time: 0.321 ms
复制

插入新的数据

insert into events (client_id, path, response_time)
select s, '/page-' || (s % 3), random() from generate_series(1,100) s;
复制

再次手动调用管道命令函数

postgres=# call incremental.execute_pipeline('event-aggregation');
NOTICE:  pipeline event-aggregation: processing sequence values from 1000001 to 1000100
CALL
Time: 1.734 ms
复制

再次查看events_agg中的数据是否更新

postgres=# select count(*) from events;
  count
---------
 1000100
(1 row)

Time: 15.794 ms
postgres=# select * from events_agg;
          day           | event_count
------------------------+-------------
 2024-12-23 00:00:00+08 |     1000100
(1 row)

Time: 0.205 ms
复制

从上面的结果,可以看到在events_agg中的数据更新为了最新的汇总值和events中的统计结果一致。

删除序列管道

select incremental.drop_pipeline('event-aggregation');
复制

创建时间间隔管道

可以用incremental.create_time_interval_pipeline 通过指定管道名称、时间间隔和管道命令。该命令将在一个上下文中执行,$1$2 分别设置为时间间隔的开始和结束(不包括$2的值)。

-- BRIN索引在选择新范围方面非常有效
create index on events using brin (event_time);

--创建一个管道,以1天的间隔聚合新插入
-- $1和$2将被设置为可聚合的时间间隔范围的开始和结束(不包括)
select incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
  insert into events_agg
  select event_time::date, count(distinct event_id)
  from events
  where event_time >= $1 and event_time < $2
  group by 1
$$);
复制

在创建管道时,该命令将立即执行,从默认2000-01-01 00:00:00时间点开始,当然我们也可以通过对start_time参数进行自定义配置。另外也可以通过设置execute_immediately的参数为false来禁用立即执行,在这种情况下,第一次执行将作为定期作业调度的一部分进行。

时间间隔管道的好处是,它们更容易定义,可以进行更复杂的处理,比如精确的不同计数,而且更适合导出数据,因为该命令总是处理精确的时间范围。缺点是您需要等到一段时间间隔过去后才能看到结果,并且插入旧的时间戳可能会导致跳过数据。序列管道在这个意义上更可靠,因为值总是由数据库生成的。

incremental.create_time_range_pipeline函数的的参数说明:

参数名称 类型 描述 默认
pipeline_name text 自定义的管道名称 必需
time_interval interval 管道执行的间隔时间 必需
command text 有$1和$2参数的管道命令 必需
batched text 是否运行多个intervals的命令 true
start_time timestamptz intervals开始的时间 2000-01-01 00:00:00
source_table_name regclass 在聚合之前等待这个表的锁 NULL(不等待)
schedule text pg_cron定时执行计划(或NULL) * * * * *(每分钟)
min_delay interval 需要等待多长时间来处理过去的间隔 30 seconds
execute_immediately bool 立即对现有数据执行命令 true

下面利用时间间隔管道验证数据文件的导出。

-- 定义一个导出函数
create function export_events(start_time timestamptz, end_time timestamptz)
returns void language plpgsql as $function$
declare
  path text := format('/data/export/%s.csv', start_time::date);
begin
  execute format($$copy (select * from events where event_time >= start_time and event_time < end_time) to %L$$, path);
end;
$function$;

--从2024-12-24开始,每1天事件导出为CSV文件
--该命令在每个时间间隔内分别执行
select incremental.create_time_interval_pipeline('event-export',
  time_interval := '1 day',
  batched:= false ,
  start_time := '2024-12-25',
  command := $$ select export_events($1, $2) $$
);
--手动调用
call incremental.execute_pipeline('event-export');
复制

根据官网提供的案例,对间隔时间按天导出数据,验证未生成文件,如果再次手动调用会报错。

postgres=# call incremental.execute_pipeline('event-export');
NOTICE:  pipeline event-export: processing overall range from 2024-12-24 00:00:00+08 to 2024-12-25 00:00:00+08
NOTICE:  pipeline event-export: processing time range from 2024-12-24 00:00:00+08 to 2024-12-25 00:00:00+08
ERROR:  column "start_time" does not exist
LINE 1: copy (select * from events where event_time >= start_time an...
                                                       ^
QUERY:  copy (select * from events where event_time >= start_time and event_time < end_time) to '/data/export/2024-12-24.csv'
CONTEXT:  PL/pgSQL function export_events(timestamp with time zone,timestamp with time zone) line 5 at EXECUTE
SQL statement " SELECT public.export_events($1, $2) AS export_events"
复制

问题未解决后续继续找时间研究一下

创建文件列表管道

可以通过incremental.create_file_list_pipeline来创建文件列表管道,需要指定管道名称、文件模式和使用该函数定义文件列表管道命令。该命令将在$1设置为文件路径的上下文中执行。管道定期查找列表函数,然后返回的新文件并对每个新文件执行命令。

定义一个封装COPY命令的导入函数

create function import_events(path text)
returns void language plpgsql as $function$
begin
	execute format($$copy events from %L$$, path);
end;
$function$;
复制

创建一个管道,将/data/import目录下的文件导入表中,这些文件是从events导出的数据。在这里利用pg_ls_dir函数来读取服务器上的文件列表

-- $1将被设置为新文件的路径
select incremental.create_file_list_pipeline('event-import', '/data/import/*.csv', $$
   select import_events($1)
$$,false,'pg_ls_dir');
复制

执行报错如下:

postgres=# select incremental.create_file_list_pipeline('event-import', '/data/import/*.csv', $$
postgres$#    select import_events($1)
postgres$# $$,false,'pg_ls_dir');
ERROR:  column "path" does not exist
LINE 1: select path from pg_catalog.pg_ls_dir($2) where path not in ...
               ^
QUERY:  select path from pg_catalog.pg_ls_dir($2) where path not in (select path from incremental.processed_files where pipeline_name operator(pg_catalog.=) $1)
Time: 0.541 ms
复制

从这个报错看应该是由于缺失path列,我然后就执行下面的sql

postgres=# select * from pg_catalog.pg_ls_dir('/data/import');
 pg_ls_dir
------------
 122401.csv
 122402.csv
(2 rows)
复制

可以看到,默认的列名是函数名,和代码中的path不符合。个人判断应该还是插件代码的问题。

在官网介绍说文件列表管道的 API 仍可能发生变化。

incremental.create_file_list_pipeline函数的的参数说明:

参数名称 类型 描述 默认
pipeline_name text 自定义的管道名称 必需的
file_pattern text 要传递给列表函数的文件模式 必需的
command text 带有 $1 和 $2 参数的管道命令 必需的
batched bool 目前未使用 false
list_function text 列出文件的函数名称 crunchy_lake.list_files
schedule text pg_cron 定期执行的时间表(或 NULL) * * * * *(每分钟)
execute_immediately bool 对现有数据立即执行命令 true

监控管道

目前有两种监控管道的方法:

1)通过表对应每个管道类型:incremental.sequence_pipelines, incremental.time_interval_pipelines, and incremental.processed_files

2)通过查看底层作业pg_cron的cron.Job_run_details 中记录的信息

查看序列管道的信息:

select * from incremental.sequence_pipelines ; ┌─────────────────────┬────────────────────────────┬────────────────────────────────┐ │ pipeline_name │ sequence_name │ last_processed_sequence_number │ ├─────────────────────┼────────────────────────────┼────────────────────────────────┤ │ view-count-pipeline │ public.events_event_id_seq │ 3000000 │ │ event-aggregation │ events_event_id_seq │ 1000000 │ └─────────────────────┴────────────────────────────┴────────────────────────────────┘
复制

查看时间间隔管道的信息:

postgres=# select * from incremental.time_interval_pipelines; pipeline_name | time_interval | batched | min_delay | last_processed_time -------------------+---------------+---------+-----------+------------------------ event-aggregation | 00:01:00 | t | 00:00:30 | 2024-12-24 17:27:00+08 event-export | 00:05:00 | f | 00:00:30 | 2024-12-24 17:28:00+08 (2 rows)
复制

查看文件列表管道的文件信息:

select * from incremental.file_list_pipelines ; ┌───────────────┬─────────────────────────────────────┬─────────┬─────────────────────────┐ │ pipeline_name │ file_pattern │ batched │ list_function │ ├───────────────┼─────────────────────────────────────┼─────────┼─────────────────────────┤ │ event-import │ s3://marco-crunchy-data/inbox/*.csv │ f │ crunchy_lake.list_files │ └───────────────┴─────────────────────────────────────┴─────────┴─────────────────────────┘ select * from incremental.processed_files ; ┌───────────────┬────────────────────────────────────────────┐ │ pipeline_name │ path │ ├───────────────┼────────────────────────────────────────────┤ │ event-import │ s3://marco-crunchy-data/inbox/20241215.csv │ │ event-import │ s3://marco-crunchy-data/inbox/20241215.csv │ └───────────────┴────────────────────────────────────────────┘
复制

pg_cron中记录的作业信息

对于所有管道,我们都可以通过查询底层pg_cron中记录的作业信息及错误消息。

select jobname, start_time, status, return_message from cron.job_run_details join cron.job using (jobid) where jobname like 'pipeline:event-import%' order by 1 desc limit 3; ┌───────────────────────┬───────────────────────────────┬───────────┬────────────────┐ │ jobname │ start_time │ status │ return_message │ ├───────────────────────┼───────────────────────────────┼───────────┼────────────────┤ │ pipeline:event-import │ 2024-12-17 13:27:00.090057+01 │ succeeded │ CALL │ │ pipeline:event-import2024-12-17 13:26:00.055813+01 │ succeeded │ CALL │ │ pipeline:event-import2024-12-17 13:25:00.086688+01 │ succeeded │ CALL │ └───────────────────────┴───────────────────────────────┴───────────┴────────────────┘
复制

手动执行管道

也支持手动执行管道,只需要call调用incremental.execute_pipeline函数即可,不过它只会在有新数据需要处理时才运行该命令。

call incremental.execute_pipeline('event-aggregation');
复制

如果需要禁用定时调度,则需要在创建管道时指定schedule的参数为NULL。

incremental.execute_pipeline函数的的参数说明:

参数名称 类型 描述 默认
pipeline_name text 自定义的管道名称 必须

重置增量处理管道

如果增量管道处理的数据有问题,也提供重新进行初始化的函数,但是需要先删除之前的数据,比如要重新对events的聚合进行构建,需要先删除 events_agg的数据,然后执行incremental.reset_pipeline函数

begin; delete from events_agg; select incremental.reset_pipeline('event-aggregation'); commit;
复制

在这里还需要注意,如果函数执行失败,则不会重置管道。

incremental.reset_pipeline函数的的参数说明:

参数名称 类型 描述 默认
pipeline_name text 自定义的管道名称 必须
execute_immediately bool 立即对现有数据执行命令 true

删除增量处理管道

当我们需要删除一个管道时,可以使用incremental.drop_pipeline函数处理,只需要指定自定义的管道名称就行。

-- Drop the pipeline select incremental.drop_pipeline('event-aggregation');
复制

incremental.drop_pipeline 函数的参数:

参数名称 类型 描述 默认
pipeline_name text 自定义的管道名称 必须

总结

通过对3种类型增量管道进行实际操作,还是发现一些问题:

1、文件列表管道在使用pg_ls_dir函数时报错,应该是代码的bug

2、时间间隔管道在导出文件时,一直看不到文件生成(根据验证结果,明天修改结论)。

3、目前不支持对已经定义的增量管道信息进行修改,如果需要修改只能先删除管道然后再次创建管道的方式实现。

虽然目前仍然存在一些问题,毕竟第一个版本,但是瑕不掩瑜,提供的功能还是解决了一些实际问题。

1、pg_incremental的 管道的每次执行具有事务性,考虑并发场景,因此每个值都会只处理一次

2、可以支持对多个数据源的增量数据导入和导出

参考

https://www.crunchydata.com/blog/pg_incremental-incremental-data-processing-in-postgres

https://github.com/crunchydata/pg_incremental?CrunchyAnonId=twayalziarcsdbfcqeywrqevgvngsabfwnkjajzipamfjctlboy
– / END / –

可以通过下面的方式联系我

  • 微信公众号:@墨竹札记
  • 墨天轮:@墨竹
  • 微信:wshf395062788
  • PGFans:@墨竹

如果这篇文章为你带来了灵感或启发,就请帮忙点赞收藏转发;如果文章中不严谨或者错漏之处,请及时评论指正。非常感谢!

最后修改时间:2025-02-06 10:45:23
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论

星星之火
暂无图片
2月前
评论
暂无图片 0
因此觉得需要有一种简单、一劳永逸的工具,无需大量样板即可完成工作,所以就有了pg_incremental扩展插件的诞生
2月前
暂无图片 点赞
评论
懂那个董
暂无图片
3月前
评论
暂无图片 0
z z xi
3月前
暂无图片 点赞
评论
暂无图片
获得了64次点赞
暂无图片
内容获得87次评论
暂无图片
获得了25次收藏
TA的专栏
PostgreSQL17新功能
收录17篇内容
目录
  • Postgres扩展插件又添一个: pg_incremental增量数据处理
    • 为什么要增量处理?
    • 扩展插件安装
      • 创建数据库和用户
      • 安装pg_cron插件
      • 安装pg_incremental插件
    • 创建增量处理管道
      • 创建序列管道
      • 创建时间间隔管道
      • 创建文件列表管道
    • 监控管道
      • 查看序列管道的信息:
      • 查看时间间隔管道的信息:
      • 查看文件列表管道的文件信息:
      • pg_cron中记录的作业信息
    • 手动执行管道
    • 重置增量处理管道
    • 删除增量处理管道
    • 总结
    • 参考