暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

构建亿级实时仪表盘,毫秒级响应实战指南

呆呆的私房菜 2025-03-31
26
    Whoami5年+金融、政府、医疗领域工作经验的DBA
    Certificate:PGCM、OCP、YCP
    Skill:Oracle、Mysql、PostgreSQL、国产数据库
    Platform:CSDN、墨天轮、公众号(呆呆的私房菜)
    复制
    近期在给客户做PostgreSQL Citus架构设计和培训,故本文继续分享关于PostgreSQL Citus实时分析场景下的应用方案。

    01

    概述
    • 在云服务、广告分析等领域,企业常常需要实时监控海量数据。
    • 比如每分钟百万级HTTP请求的状态分析,或是广告点击率的秒级趋势展示。
    • 传统单机数据库面对这样的场景往往捉襟见肘,而分布式数据库Citus通过独特的设计,让这类需求变得简单。

    02

    使用案例
    • 一、场景构建
    • 1. 假设我们是云服务提供商,需要为每个客户站点监控HTTP请求。每秒可能有数万条日志涌入系统。数据模型如下:
      -- 分布式事件表设计
      CREATE TABLE http_request (
        site_id INT,
        ingest_time TIMESTAMPTZ DEFAULT now(),
        url TEXT,
        request_country TEXT,
        ip_address TEXT,
        status_code INT,
        response_time_msec INT
      );


      -- 按site_id分片(建议分片数=CPU核心数x2~4)
      SELECT create_distributed_table('http_request''site_id');
      复制
      • 2. 模拟数据生成
        DO $$
        BEGIN
          LOOP
            INSERT INTO http_request (
              site_id, ingest_time, url,
              request_country, ip_address,
              status_code, response_time_msec
            ) VALUES (
              trunc(random() * 32), 
              clock_timestamp(),
              concat('http://example.com/', md5(random()::text)),
              (ARRAY['China','India','USA','Indonesia'])[ceil(random()*4)],
              concat(
                trunc(random()*250+2), '.',
                trunc(random()*250+2), '.',
                trunc(random()*250+2), '.',
                trunc(random()*250+2)
              )::inet,
              (ARRAY[200,404])[ceil(random()*2)],
              5+trunc(random()*150)
            );
            COMMIT;
            PERFORM pg_sleep(random() * 0.25);
          END LOOP;
        END 
        $$;
        复制
        • 3、查询过去5分钟数据聚合
          SELECT 
            site_id,
            date_trunc('minute', ingest_time) as minute,
            COUNT(1AS request_count,
            SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 1 ELSE 0 ENDas success_count,
            SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 0 ELSE 1 ENDas error_count,
            SUM(response_time_msec)/COUNT(1AS average_response_time_msec 
          FROM http_request
          WHERE date_trunc('minute', ingest_time) > now() - INTERVAL '5 minutes'
          GROUP BY site_id, minute
          ORDER BY minute ASC;
          复制
          • 4、现有方案的局限性:
            查询性能问题:全量扫描原始数据;
            存储成本问题:原始数据长期存储成本高昂。
            复制

            • 二、查询优化
            • 1. 数据汇总优化:通过预聚合到分钟级表
              CREATE TABLE http_request_1min (
                site_id INT,
                ingest_time TIMESTAMPTZ, -- 精确到分钟
                error_count INT,
                success_count INT,
                request_count INT,
                average_response_time_msec INT,
                CHECK (request_count = error_count + success_count),
                CHECK (ingest_time = date_trunc('minute', ingest_time))
              );


              SELECT create_distributed_table('http_request_1min''site_id');
              CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);
              复制
              关键特性:
                相同的分片数(site_id 分片)
                共置分片(相同 site_id 的原始数据和聚合数据存储在同一个节点)
                复制

                • 2. 自动汇总机制
                  -- 创建定时汇总函数
                  -- 记录最后一次汇总时间
                  CREATE TABLE latest_rollup (
                    minute timestamptz PRIMARY KEY CHECK (minute = date_trunc('minute'minute))
                  );
                  INSERT INTO latest_rollup VALUES ('1901-10-10'); -- 初始值


                  -- 创建汇总函数
                  CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
                  DECLARE
                    curr_rollup_time timestamptz := date_trunc('minute', now() - INTERVAL '1 minute');
                    last_rollup_time timestamptz := (SELECT minute FROM latest_rollup);
                  BEGIN
                    INSERT INTO http_request_1min (
                      site_id, ingest_time, 
                      request_count, success_count, 
                      error_count, average_response_time_msec
                    )
                    SELECT 
                      site_id,
                      date_trunc('minute', ingest_time),
                      COUNT(1),
                      SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 1 ELSE 0 END),
                      SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 0 ELSE 1 END),
                      SUM(response_time_msec)/COUNT(1)
                    FROM http_request
                    WHERE ingest_time <@ tstzrange(last_rollup_time, curr_rollup_time, '(]')
                    GROUP BY 12;


                    UPDATE latest_rollup SET minute = curr_rollup_time;
                  END;
                  $$ LANGUAGE plpgsql;
                  复制

                  • 3. 调度执行
                    # 系统 crontab 配置
                    * * * * * psql -c 'SELECT rollup_http_request();'
                    复制

                    • 4. 优化后查询
                      SELECT 
                        site_id,
                        ingest_time AS minute,
                        request_count,
                        success_count,
                        error_count,
                        average_response_time_msec 
                      FROM http_request_1min
                      WHERE ingest_time > date_trunc('minute', now()) - INTERVAL '5 minutes';
                      复制

                      • 三、数据生命周期管理
                      • 通过定期清理旧数据控制存储成本
                        -- 示例:保留最近1个月的原始数据
                        DELETE FROM http_request WHERE ingest_time < now() - INTERVAL '1 month';
                        复制


                        本文内容就到这啦,相信读完本文可以给你带来citus实战的一些技巧~希望本篇内容能给你带来帮助。我们下篇再见!

                        文章转载自呆呆的私房菜,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论