暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

超时流式处理 - 没有消息流入的数据异常监控

digoal 2017-12-22
203

作者

digoal

日期

2017-12-22

标签

PostgreSQL , 流式处理 , 无流入数据超时异常


背景

流计算有个特点,数据流式写入,流式计算。

但是有一种情况,可能无法覆盖。例如电商中的 收货超时,退款处理超时 事件的流式监控。因为数据都不会再写进来了,所以也无法触发流式计算。

这些问题如何流式预警呢?

可以用超时时间+调度的方式,当然这里面有PostgreSQL的独门秘籍:

1、CTE,语法灵活。

2、partial index,不需要检索的数据不构建索引。

3、DML returning,可以返回DML语句的结果,结合CTE实现最小交互计算。

4、multi-index bitmap scan,多个索引合并扫描,在使用OR条件时,可以结合多个字段的索引进行合并扫描。

DEMO设计

1、被监控表的结构。里面记录了订单、退款等事件的超时处理时间,超时通知次数,下一次通知时间间隔,完结状态等。

create table tbl ( id int8, -- ..... 其他字段 (比如已完结状态) state int, -- 完结状态(1 表示已完结) deadts timestamp, -- 超时时间 nts interval, -- 超时间隔,用于更新下一次通知时间 (比如一天通知一次) notify_times int default 0, -- 通知次数 deadts_next timestamp -- 下一次通知时间 );

2、创建partial index,也就是说,对未完结工单才需要通知用户,这些数据是业务关心的,使用partial index可以简化索引大小。提高速度。

```
create index idx_tbl_1 on tbl (deadts) where notify_times=0 and state<>1;

create index idx_tbl_2 on tbl (deadts_next) where deadts_next is not null and state<>1;
```

3、获取需要通知的数据,并且更新通知次数以及下一次的通知时间。

with tmp1 as ( update tbl set deadts_next=now()+nts, notify_times=notify_times+1 where ctid = any (array( select ctid from tbl where ( deadts < now() and notify_times=0 and state<>1) union all select ctid from tbl where ( deadts_next < now() and deadts_next is not null and state<>1) limit 10000 -- 一次获取1万条超时数据 )) returning * ) select * from tmp1;

4、执行计划完美

CTE Scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48) CTE tmp1 -> Update on tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54) InitPlan 1 (returns $0) -> Limit (cost=0.13..18151.03 rows=10000 width=6) -> Append (cost=0.13..764699.60 rows=421301 width=6) -> Index Scan using idx_tbl_1 on tbl (cost=0.13..169527.13 rows=369766 width=6) Index Cond: (deadts < now()) -> Index Scan using idx_tbl_2 on tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6) Index Cond: (deadts_next < now()) -> Tid Scan on tbl tbl_2 (cost=0.01..12.21 rows=10 width=54) TID Cond: (ctid = ANY ($0)) (12 rows)

5、调度

《PostgreSQL 定时任务方法2》

《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》

当然你如果使用阿里云,可以使用阿里云的调度平台,配置调度任务。

性能指标

1、写入1亿数据,假设有100万条同时超时需要处理,耗时如何?

```
-- 1亿条完结
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);

-- 100万条超时
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);
```

通知性能,比如每一批通知1万条:

(小批量获取,并更新超时时间,目的是让autovacuum介入,实时回收垃圾)

```
with tmp1 as (
update tbl set
deadts_next=now()+nts,
notify_times=notify_times+1
where ctid = any (array(
select ctid from tbl where
( deadts < now() and notify_times=0 and state<>1)
union all select ctid from tbl where ( deadts_next < now() and deadts_next is not null and state<>1)
limit 10000 -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;

-- 计划

CTE Scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1) Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next Buffers: shared hit=75094 read=49 dirtied=49 CTE tmp1 -> Update on public.tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1) Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next Buffers: shared hit=75094 read=49 dirtied=49 InitPlan 1 (returns $0) -> Limit (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1) Output: tbl.ctid Buffers: shared hit=11395 -> Append (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1) Buffers: shared hit=11395 -> Index Scan using idx_tbl_1 on public.tbl (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1) Output: tbl.ctid Index Cond: (tbl.deadts < now()) Buffers: shared hit=1 -> Index Scan using idx_tbl_2 on public.tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1) Output: tbl_1.ctid Index Cond: (tbl_1.deadts_next < now()) Buffers: shared hit=11394 -> Tid Scan on public.tbl tbl_2 (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1) Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid TID Cond: (tbl_2.ctid = ANY ($0)) Buffers: shared hit=21395 Planning time: 0.301 ms Execution time: 79.905 ms ```

丝般柔滑

Time: 79.905 ms

小结

使用以上方法,可以完美的解决超时数据的监控问题。性能好。

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

德哥 / digoal's github - 公益是一辈子的事.

digoal's wechat

文章转载自digoal,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论