Whoami:5年+金融、政府、医疗领域工作经验的DBA
Certificate:PGCM、OCP、YCP
Skill:Oracle、Mysql、PostgreSQL、国产数据库
Platform:CSDN、墨天轮、公众号(呆呆的私房菜)
复制
在云服务、广告分析等领域,企业常常需要实时监控海量数据。 比如每分钟百万级HTTP请求的状态分析,或是广告点击率的秒级趋势展示。 传统单机数据库面对这样的场景往往捉襟见肘,而分布式数据库Citus通过独特的设计,让这类需求变得简单。
一、场景构建
1. 假设我们是云服务提供商,需要为每个客户站点监控HTTP请求。每秒可能有数万条日志涌入系统。数据模型如下:
-- 分布式事件表设计
CREATE TABLE http_request (
site_id INT,
ingest_time TIMESTAMPTZ DEFAULT now(),
url TEXT,
request_country TEXT,
ip_address TEXT,
status_code INT,
response_time_msec INT
);
-- 按site_id分片(建议分片数=CPU核心数x2~4)
SELECT create_distributed_table('http_request', 'site_id');
复制
2. 模拟数据生成
DO $$
BEGIN
LOOP
INSERT INTO http_request (
site_id, ingest_time, url,
request_country, ip_address,
status_code, response_time_msec
) VALUES (
trunc(random() * 32),
clock_timestamp(),
concat('http://example.com/', md5(random()::text)),
(ARRAY['China','India','USA','Indonesia'])[ceil(random()*4)],
concat(
trunc(random()*250+2), '.',
trunc(random()*250+2), '.',
trunc(random()*250+2), '.',
trunc(random()*250+2)
)::inet,
(ARRAY[200,404])[ceil(random()*2)],
5+trunc(random()*150)
);
COMMIT;
PERFORM pg_sleep(random() * 0.25);
END LOOP;
END
$$;
复制
3、查询过去5分钟数据聚合
SELECT
site_id,
date_trunc('minute', ingest_time) as minute,
COUNT(1) AS request_count,
SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec)/COUNT(1) AS average_response_time_msec
FROM http_request
WHERE date_trunc('minute', ingest_time) > now() - INTERVAL '5 minutes'
GROUP BY site_id, minute
ORDER BY minute ASC;
复制
4、现有方案的局限性:
查询性能问题:全量扫描原始数据;
存储成本问题:原始数据长期存储成本高昂。
复制
二、查询优化
1. 数据汇总优化:通过预聚合到分钟级表
CREATE TABLE http_request_1min (
site_id INT,
ingest_time TIMESTAMPTZ, -- 精确到分钟
error_count INT,
success_count INT,
request_count INT,
average_response_time_msec INT,
CHECK (request_count = error_count + success_count),
CHECK (ingest_time = date_trunc('minute', ingest_time))
);
SELECT create_distributed_table('http_request_1min', 'site_id');
CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);
复制
相同的分片数(site_id 分片)
共置分片(相同 site_id 的原始数据和聚合数据存储在同一个节点)
复制
2. 自动汇总机制
-- 创建定时汇总函数
-- 记录最后一次汇总时间
CREATE TABLE latest_rollup (
minute timestamptz PRIMARY KEY CHECK (minute = date_trunc('minute', minute))
);
INSERT INTO latest_rollup VALUES ('1901-10-10'); -- 初始值
-- 创建汇总函数
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
curr_rollup_time timestamptz := date_trunc('minute', now() - INTERVAL '1 minute');
last_rollup_time timestamptz := (SELECT minute FROM latest_rollup);
BEGIN
INSERT INTO http_request_1min (
site_id, ingest_time,
request_count, success_count,
error_count, average_response_time_msec
)
SELECT
site_id,
date_trunc('minute', ingest_time),
COUNT(1),
SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 1 ELSE 0 END),
SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 0 ELSE 1 END),
SUM(response_time_msec)/COUNT(1)
FROM http_request
WHERE ingest_time <@ tstzrange(last_rollup_time, curr_rollup_time, '(]')
GROUP BY 1, 2;
UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;
复制
3. 调度执行
# 系统 crontab 配置
* * * * * psql -c 'SELECT rollup_http_request();'
复制
4. 优化后查询
SELECT
site_id,
ingest_time AS minute,
request_count,
success_count,
error_count,
average_response_time_msec
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - INTERVAL '5 minutes';
复制
三、数据生命周期管理
通过定期清理旧数据控制存储成本
-- 示例:保留最近1个月的原始数据
DELETE FROM http_request WHERE ingest_time < now() - INTERVAL '1 month';
复制
本文内容就到这啦,相信读完本文可以给你带来citus实战的一些技巧~希望本篇内容能给你带来帮助。我们下篇再见!
文章转载自呆呆的私房菜,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。