暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL 流式数据处理(聚合、过滤、转换...)系列 - 5

digoal 2017-01-05
276

作者

digoal

日期

2017-01-05

标签

PostgreSQL , 流式 , 函数 , 流式处理 , 异步统计 , count , group , agg , 触发器 , xid , 事务隔离 , 异步气泡 , gap , function , 串行处理


背景

2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。

逐步优化,化繁为简。

在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。

比如以COUNT就是一个很典型的例子。

在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.

9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.

但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.

到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)

另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。

那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.

正文

到目前为止总共写了4篇关于实时和非实时的PostgreSQL count统计优化.

http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

第四篇介绍的是非实时的方法, 场景中的明细表涉及了插入和删除的操作, 设计时需要对明细表增加2个字段(xid和isdel).

本文主要介绍的是只有insert操作的明细表的非实时count统计方法.

其实只要使用第四篇讲的方法已经达到了这个效果, 本文只是单独将只读场景剥离出来, 简化第四篇的函数.

因为明细表只有插入的场景是非常普遍的, 没有必要搞那么复杂.

同样要解决气泡问题, 这里要用到的依然是xid字段. isdel字段就省略了, 因为没有删除操作.

详细的实施过程如下

测试表 :

create table log ( id serial primary key, xid int8 default txid_current() not null, c1 int not null, c2 int not null, c3 int not null, c4 text not null, crt_time timestamp default now() ); create index idx_log_1 on log(xid);

存放count()的表, 假设经常需要按log.c1以及log.crt_time分天, 周, 月, 年进行count()

create table log_c1_cnt_day (c1 int, cnt int8, stat_time text, primary key(c1,stat_time)); create table log_c1_cnt_week (c1 int, cnt int8, stat_time text, primary key(c1,stat_time)); create table log_c1_cnt_month (c1 int, cnt int8, stat_time text, primary key(c1,stat_time)); create table log_c1_cnt_year (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));

存放count()的表, 假设经常需要按log.c2, log.c3以及log.crt_time分天, 周, 月, 年进行count()

create table log_c2_c3_cnt_day (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time)); create table log_c2_c3_cnt_week (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time)); create table log_c2_c3_cnt_month (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time)); create table log_c2_c3_cnt_year (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));

插入测试数据

insert into log (c1,c2,c3,c4) values (1,1,1,1); insert into log (c1,c2,c3,c4) values (2,2,2,2);

验证

digoal=# select * from log; id | xid | c1 | c2 | c3 | c4 | crt_time ----+-----------+----+----+----+----+---------------------------- 1 | 480125659 | 1 | 1 | 1 | 1 | 2013-04-21 20:55:45.907713 2 | 480125660 | 2 | 2 | 2 | 2 | 2013-04-21 20:55:46.286933 (2 rows)

创建分析维度注册表, 记录每个明细表每次分析的截止xid, xip. (未来可以精细化, 每个统计维度一条记录. 增加dime 字段. tablename+dime组合pk)

xid 记录统计到哪个xid了, xip记录当前活动事务, 不计入当前统计范畴. 避免气泡问题.

create table log_read ( tablename name not null, xid int8 not null, xip int8[], xip_res int8[], -- 用于与xid比对的数据. 必须保留所有>=xid的xip信息. mod_time timestamp, primary key (tablename) ); insert into log_read values ('log', 0, null, null, now());

创建串行批量数据分析函数

```
create or replace function analyze_log(v_limit int) returns void as $$
declare
v_advisory_xact_lock int8 := null; -- 串行处理锁.

v_xid_snap txid_snapshot := null; -- 当前事务状态快照
v_xmin int8 := null; -- 当前事务状态快照中未完成的最小事务
v_xmax int8 := null; -- 当前事务状态快照中未分配的最小事务
v_xip int8[] := null; -- 当前事务状态快照中未完成的事务数组

v_log_read_log_xid int8 := null; -- 上次log的xid分析截止位
v_log_read_log_xid_update int8 := null; -- 更新值, 不能为空

v_log_read_log_xip int8[] := null; -- 上次log_read.xip(tablename=log)
v_log_read_log_xip_do int8[] := null; -- 解析本次log_read.xip(tablename=log) where (xip !@ txid_snapshot)
v_log_read_log_xip_update int8[] := null; -- xip更新值
v_log_read_log_xip_res int8[] := null; -- xip保留值
v_log_read_log_xip_res_update int8[] := null; -- xip保留更新值, 所有大于v_log_read_log_xid_update的元素必须保留.

v_log log[] := null; -- 聚合本次log的分析数组, [末尾调用,false]
v_log_doxip log[] := null; -- 聚合本次分析log数组:
-- where log.xid (@ log_read.xip(tablename=log) and !@ txid_snapshot) , [末尾调用,false]
begin
-- 判断limit
if v_limit <=0 then
raise notice 'please ensure v_limit > 0 .';
return;
end if;

-- 串行处理, 如果不能获得锁则直接退出. 确保v_advisory_xact_lock全局唯一.
v_advisory_xact_lock := 1;
if not pg_try_advisory_xact_lock(v_advisory_xact_lock) then
raise notice 'Another function is calling, this call will exit.';
return;
end if;

-- 生成 xid snapshot 数据.
v_xid_snap := txid_current_snapshot();
v_xmin := txid_snapshot_xmin(v_xid_snap);
v_xmax := txid_snapshot_xmax(v_xid_snap);
select array_agg(t) into v_xip from txid_snapshot_xip(v_xid_snap) g(t);

-- 取v_log_read_log_xid截止值, v_log_read_log_xip数组.
select xid,xip,xip_res into v_log_read_log_xid,v_log_read_log_xip,v_log_read_log_xip_res from log_read where tablename='log';
if not found then
raise notice 'log_read no log entry. please add it in log_read table first.';
return;
end if;

-- 取log1(取非xip中的数据, 隔离log2操作)
-- 取xid临界点
select max(xid) into v_log_read_log_xid_update from (select xid from log where xid > v_log_read_log_xid and xid < v_xmax and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_xip_res)) order by xid limit v_limit) t;
if v_log_read_log_xid_update is not null then
raise notice '取log1';
-- 根据临界点,取log数据
select array_agg(log) into v_log from (select log from log where xid > v_log_read_log_xid and xid<=v_log_read_log_xid_update and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_xip_res)) order by xid) t;
else
-- 如果没有数据, 更新值不变
v_log_read_log_xid_update := v_log_read_log_xid;
end if;

-- 取log2 (log_xip - v_xip) (取xip中的数据, 隔离log1操作)
-- 生成log_read.xip(tablename=log) do数组(已经完成的事务)
select array_agg(i) into v_log_read_log_xip_do from (select * from unnest(v_log_read_log_xip) i except select * from unnest(v_xip))t where i is not null;
-- 生成log_read.xip(tablename=log) update数组(未完成的事务)
select array_agg(i) into v_log_read_log_xip_update from
( select i from (select * from unnest(v_log_read_log_xip) i union all select * from unnest(v_xip)
except select * from unnest(v_log_read_log_xip_do)) t where i is not null group by i ) t;
-- 生成xip_res更新值
select array_agg(i) into v_log_read_log_xip_res_update from (select * from unnest(v_log_read_log_xip_res) i union select * from unnest(v_log_read_log_xip) union select * from unnest(v_xip))t where i>v_log_read_log_xid_update;
-- 生成log do数组
select array_agg(log) into v_log_doxip from log where xid in (select * from unnest(v_log_read_log_xip_do));

-- 更新log_read(tablename=log)
update log_read set
xip=v_log_read_log_xip_update,
xid=v_log_read_log_xid_update,
xip_res=v_log_read_log_xip_res_update,
mod_time=now()
where tablename='log';
-- raise notice 'log_read.oldxip(log): %.', v_log_read_log_xip;
-- raise notice 'log_read.newxip(log): %.', v_log_read_log_xip_update;
-- raise notice 'log_read.newxipres(log): %.', v_log_read_log_xip_res_update;

-- 分析函数可以另外写, 在此调用.
perform stat_log_c1(v_log);
perform stat_log_c1(v_log_doxip);

return;
end;
$$ language plpgsql;
```

统计函数stat_log_c1

CREATE OR REPLACE FUNCTION public.stat_log_c1(v_log log[]) RETURNS void LANGUAGE plpgsql AS $function$ declare v_stat_time text; v_c1 int; v_cnt int8; begin -- 统计log_c1_cnt_day for v_stat_time, v_c1, v_cnt in select to_char(crt_time, 'yyyymmdd'), c1 , count(*) from (select ((unnest(v_log)::log)).*) t group by to_char(crt_time, 'yyyymmdd'), c1 loop perform 1 from log_c1_cnt_day where c1=v_c1 and stat_time=v_stat_time; if not found then insert into log_c1_cnt_day(c1, cnt, stat_time) values (v_c1, v_cnt, v_stat_time); else update log_c1_cnt_day set cnt=cnt+v_cnt where c1=v_c1 and stat_time=v_stat_time; end if; end loop; -- 统计log_c1_cnt_week , .... 略 end; $function$;

测试, 清理原始数据

truncate log; truncate log_c1_cnt_day; update log_read set xid=0, xip=null, xip_res=null;

pgbench脚本, 测试插入场景

cat ins.sql insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 8 -j 2 transaction type: Custom query scaling factor: 1 query mode: prepared number of clients: 8 number of threads: 2 duration: 60 s number of transactions actually processed: 2887271 tps = 48121.007692 (including connections establishing) tps = 48131.903512 (excluding connections establishing) statement latencies in milliseconds: 0.164881 insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);

压力测试的同时执行analyze_log. 确保pgbench同时执行analyze_log.

```
pg92@digoal-PowerEdge-R610-> cat analyze.sh

!/bin/bash

for ((i=0;i<100;i++))
do
psql -c "select * from analyze_log(1);"
psql -c "select * from analyze_log(1000000);"
done
```

调用analyze.sh

验证数据是否准确

digoal=# select c1,count(*),to_char(crt_time,'yyyymmdd') from log group by c1,to_char(crt_time,'yyyymmdd') order by c1; c1 | count | to_char ----+--------+---------- 0 | 140071 | 20130421 1 | 282303 | 20130421 2 | 281499 | 20130421 3 | 281339 | 20130421 4 | 282008 | 20130421 5 | 281871 | 20130421 6 | 282954 | 20130421 7 | 281855 | 20130421 8 | 281560 | 20130421 9 | 281516 | 20130421 10 | 140456 | 20130421 (11 rows) digoal=# select * from log_c1_cnt_day where cnt<>0 order by c1; c1 | cnt | stat_time ----+--------+----------- 0 | 140071 | 20130421 1 | 282303 | 20130421 2 | 281499 | 20130421 3 | 281339 | 20130421 4 | 282008 | 20130421 5 | 281871 | 20130421 6 | 282954 | 20130421 7 | 281855 | 20130421 8 | 281560 | 20130421 9 | 281516 | 20130421 10 | 140456 | 20130421 (11 rows)

测试多SQL, 带回滚场景.

pgbench脚本

cat ins.sql begin; insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); end; begin; insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); rollback; insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3); insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 8 -j 2 transaction type: Custom query scaling factor: 1 query mode: prepared number of clients: 8 number of threads: 2 duration: 60 s number of transactions actually processed: 100862 tps = 1680.570420 (including connections establishing) tps = 1680.946330 (excluding connections establishing) statement latencies in milliseconds: ............略

压力测试的同时执行analyze_log. 确保pgbench同时执行analyze_log.

```
pg92@digoal-PowerEdge-R610-> cat analyze.sh

!/bin/bash

for ((i=0;i<100;i++))
do
psql -c "select * from analyze_log(1);"
psql -c "select * from analyze_log(1000000);"
done
```

调用analyze.sh

验证数据是否准确

digoal=# select c1,count(*),to_char(crt_time,'yyyymmdd') from log group by c1,to_char(crt_time,'yyyymmdd') order by c1; c1 | count | to_char ----+--------+---------- 0 | 245002 | 20130426 1 | 491034 | 20130426 2 | 489717 | 20130426 3 | 490628 | 20130426 4 | 490064 | 20130426 5 | 490393 | 20130426 6 | 490893 | 20130426 7 | 490081 | 20130426 8 | 490180 | 20130426 9 | 490659 | 20130426 10 | 245860 | 20130426 (11 rows) digoal=# select * from log_c1_cnt_day where cnt<>0 order by c1; c1 | cnt | stat_time ----+--------+----------- 0 | 245002 | 20130426 1 | 491034 | 20130426 2 | 489717 | 20130426 3 | 490628 | 20130426 4 | 490064 | 20130426 5 | 490393 | 20130426 6 | 490893 | 20130426 7 | 490081 | 20130426 8 | 490180 | 20130426 9 | 490659 | 20130426 10 | 245860 | 20130426 (11 rows)

特别注意

由于本例采用了PostgreSQL系统xid来解决气泡问题, 所以特别需要注意以下问题 :

xid的问题, 当使用pg_resetxlog修改xid时(如果xid改小)将打破使用该方法的统计. 所以安全的做法是xid改大可以, 改小不行.

当使用pg_dump导出明细数据到另一个库后, 记得先使用pg_resetxlog将新集群的xid调整到大于明细表的max(xid)

为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

德哥 / digoal's github - 公益是一辈子的事.

digoal's wechat

文章转载自digoal,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论