暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Citus 分析(九)跨DN(worker)节点查询缓慢分析

原创 姚崇 2023-03-19
529

使用分片键进行关联的情况

数据库中多数部分SQL如下,表之间的关联条件使用InnerCode,效率很高

INSERT INTO JYDB.tmp_MF_NVPerformanceTransH
SELECT DISTINCT A.InnerCode, B.EndDate
FROM JYDB.MF_NVPerformanceTransH A
JOIN JYDB.MF_FundMaxDrawd B ON A.InnerCode = B.InnerCode AND A.TradingDay = B.EndDate
JOIN JYDB.MF_DerivativeIndexInfo C ON B.IndexCode = C.IndexCode AND C.IndexType = 11
WHERE ((A.JSID > 1 AND A.JSID <= 7306949) OR (B.JSID > 1 AND B.JSID <= 7306949))
  AND NOT EXISTS(
  SELECT 1 FROM JYDB.tmp_MF_NVPerformanceTransH D
  WHERE D.InnerCode = A.InnerCode AND D.EndDate = B.EndDate);

查看表分片规则

如下表均为innercode进行分片

lightdb@jydb=# select * from canopy_tables;
           table_name            | canopy_table_type | distribution_column | colocation_id | table_size | shard_count | table_owner | access_method 
---------------------------------+-------------------+---------------------+---------------+------------+-------------+-------------+---------------
 jydb.mf_abnormalreturn          | distributed       | innercode           |             4 | 18 GB      |          80 | jydb        | heap
 jydb.mf_calmarratio             | distributed       | innercode           |             4 | 28 GB      |          80 | jydb        | heap
 jydb.mf_fundmaxdrawd            | distributed       | innercode           |             4 | 58 GB      |          80 | jydb        | heap
 mf_calmarratio_tid              | reference         | <none>              |             8 |            |           1 | jydb_tid    | heap

跨节点进行关联

SQL如下,表union all后使用A表ID与B表ID进行关联,ID键为普通列,整个耗时特别长,单次执行2392.04 秒

INSERT INTO JYDB.tmp_mf_calmarratio 
SELECT DISTINCT C.InnerCode, C.EndDate, C.IndexCode, 2 AS LB 
FROM (
SELECT ID, 'MF_FundMaxDrawd' AS TableName FROM JYDB.mf_fundmaxdrawd WHERE JSID > 1 
UNION ALL 
SELECT ID, 'MF_AbnormalReturn' AS TableName FROM JYDB.mf_abnormalreturn WHERE JSID > 1 ) A 
JOIN JYDB_TID.mf_calmarratio_tid B ON A.ID = B.ID AND A.TableName = B.TableName 
JOIN JYDB.mf_calmarratio C ON B.TID = C.ID;

表结构与表数量

jydb@jydb=> \d mf_abnormalreturn
                      Table "jydb.mf_abnormalreturn"
   Column   |            Type             | Collation | Nullable | Default 
------------+-----------------------------+-----------+----------+---------
 id         | bigint                      |           | not null | 
 innercode  | integer                     |           | not null | 
 indexcode  | integer                     |           | not null | 
 indexname  | character varying(100)      |           | not null | 
 indexcycle | integer                     |           | not null | 
 enddate    | timestamp without time zone |           |          | 
 datavalue  | numeric(18,9)               |           | not null | 
 inserttime | timestamp without time zone |           |          | 
 updatetime | timestamp without time zone |           |          | 
 jsid       | bigint                      |           | not null | 
Indexes:
    "ix_mf_abnormalreturn_id" btree (id)
    "ix_mf_abnormalreturn_jsid" btree (jsid)
    "ix_mf_abnormalreturn_updatetime" btree (updatetime)
    "u_mf_abnormalreturn" UNIQUE CONSTRAINT, btree (innercode, enddate, indexcode)

jydb@jydb=> select count(*) from mf_abnormalreturn;
  count   
----------
 73900950
(1 row)

jydb@jydb=> \d mf_fundmaxdrawd
                       Table "jydb.mf_fundmaxdrawd"
   Column   |            Type             | Collation | Nullable | Default 
------------+-----------------------------+-----------+----------+---------
 id         | bigint                      |           | not null | 
 innercode  | integer                     |           | not null | 
 indexcode  | integer                     |           | not null | 
 indexname  | character varying(100)      |           | not null | 
 indexcycle | integer                     |           | not null | 
 enddate    | timestamp without time zone |           |          | 
 datavalue  | numeric(18,9)               |           | not null | 
 inserttime | timestamp without time zone |           |          | 
 updatetime | timestamp without time zone |           |          | 
 jsid       | bigint                      |           | not null | 
Indexes:
    "ix_mf_fundmaxdrawd_id" btree (id)
    "ix_mf_fundmaxdrawd_jsid" btree (jsid)
    "ix_mf_fundmaxdrawd_updatetime" btree (updatetime)
    "u_mf_fundmaxdrawd" UNIQUE CONSTRAINT, btree (innercode, enddate, indexcode)

jydb@jydb=> select count(*) from mf_fundmaxdrawd;
   count   
-----------
 234409479
(1 row)

jydb@jydb=> 
jydb@jydb=> \d mf_calmarratio_tid
Did not find any relation named "mf_calmarratio_tid".
jydb@jydb=> \d jydb_tid.mf_calmarratio_tid
                 Table "jydb_tid.mf_calmarratio_tid"
  Column   |          Type          | Collation | Nullable | Default 
-----------+------------------------+-----------+----------+---------
 tid       | bigint                 |           | not null | 
 tablename | character varying(100) |           | not null | 
 id        | bigint                 |           | not null | 

jydb@jydb=> select count(*) from jydb_tid.mf_calmarratio_tid;
  count  
---------
 3077864
(1 row)

jydb@jydb=> \d mf_calmarratio
                        Table "jydb.mf_calmarratio"
   Column   |            Type             | Collation | Nullable | Default 
------------+-----------------------------+-----------+----------+---------
 id         | bigint                      |           | not null | 
 innercode  | integer                     |           | not null | 
 indexcode  | integer                     |           | not null | 
 indexname  | character varying(100)      |           | not null | 
 indexcycle | integer                     |           | not null | 
 enddate    | timestamp without time zone |           |          | 
 datavalue  | numeric(18,9)               |           | not null | 
 inserttime | timestamp without time zone |           |          | 
 updatetime | timestamp without time zone |           |          | 
 jsid       | bigint                      |           | not null | 
Indexes:
    "ix_mf_calmarratio_id" btree (id)
    "ix_mf_calmarratio_jsid" btree (jsid)
    "ix_mf_calmarratio_updatetime" btree (updatetime)
    "u_mf_calmarratio" UNIQUE CONSTRAINT, btree (innercode, indexcode, enddate)

jydb@jydb=> select count(*) from mf_calmarratio;
   count   
-----------
 123924995
(1 row)

跨节点SQL的执行计划

在Citus中,跨节点执行的计划通常带有以下关键字:

  • Gather:该操作用于将分散在不同节点上的查询结果收集到一个节点上进行处理。Gather操作是一种消耗资源的操作,因为它涉及到网络通信和数据传输。
  • Broadcast:该操作用于将数据复制到所有节点上,以便在所有节点上执行查询。Broadcast操作适用于小数据集,因为它会导致网络拥塞和资源浪费。
  • Remote Scan:该操作用于在远程节点上执行查询,然后将结果传输回本地节点。这种操作通常用于跨多个节点进行过滤或聚合的查询。
  • Partial Aggregation:该操作用于在多个节点上执行部分聚合操作,并将结果传输回本地节点进行最终聚合。这种操作可以提高查询性能,因为它可以利用多个节点的计算资源。
  • Parallel Append:该操作用于将多个查询结果组合成单个结果,并在多个节点上并行执行。这种操作通常用于跨多个节点进行联合查询的情况。
    这些关键字可以在执行计划中看到,可以使用EXPLAIN命令来查看查询的执行计划。
explain SELECT DISTINCT C.InnerCode, C.EndDate, C.IndexCode, 2 AS LB 
 FROM ( SELECT ID, 'MF_FundMaxDrawd' AS TableName FROM JYDB.MF_FundMaxDrawd WHERE JSID > 1 
   UNION ALL SELECT ID, 'MF_AbnormalReturn' AS TableName FROM JYDB.MF_AbnormalReturn WHERE JSID > 1 ) A 
 JOIN JYDB_TID.MF_CalmarRatio_TID B ON A.ID = B.ID AND A.TableName = B.TableName 
 JOIN JYDB.MF_CalmarRatio C ON B.TID = C.ID;


 HashAggregate  (cost=1000.00..1002.00 rows=200 width=20)
   Group Key: remote_scan.innercode, remote_scan.enddate, remote_scan.indexcode, remote_scan.lb
   ->  Custom Scan (Canopy Adaptive)  (cost=0.00..0.00 rows=100000 width=20)
         ->  Distributed Subplan 1_1
               ->  Custom Scan (Canopy Adaptive)  (cost=0.00..0.00 rows=100000 width=40)
                     Task Count: 80
                     Tasks Shown: One of 80
                     ->  Task
                           Node: host=10.101.0.123 port=5435 dbname=jydb
                           ->  Seq Scan on mf_fundmaxdrawd_105288 mf_fundmaxdrawd  (cost=0.00..79340.26 rows=2754261 width=40)
                                 Filter: (jsid > 1)
         ->  Distributed Subplan 1_2
               ->  Custom Scan (Canopy Adaptive)  (cost=0.00..0.00 rows=100000 width=40)
                     Task Count: 80
                     Tasks Shown: One of 80
                     ->  Task
                           Node: host=10.101.0.123 port=5435 dbname=jydb
                           ->  Seq Scan on mf_abnormalreturn_105368 mf_abnormalreturn  (cost=0.00..24340.49 rows=831879 width=40)
                                 Filter: (jsid > 1)
         ->  Distributed Subplan 1_3
               ->  Custom Scan (Canopy Adaptive)  (cost=0.00..0.00 rows=0 width=0)
                     Task Count: 1
                     Tasks Shown: All
                     ->  Task
                           Node: host=localhost port=5432 dbname=jydb
                           ->  Append  (cost=0.00..30.00 rows=2000 width=40)
                                 ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=40)
                                 ->  Function Scan on read_intermediate_result intermediate_result_1  (cost=0.00..10.00 rows=1000 width=40)
         Task Count: 80
         Tasks Shown: One of 80
         ->  Task
               Node: host=10.101.0.123 port=5435 dbname=jydb
               ->  Unique  (cost=101202.02..101202.08 rows=6 width=20)
                     ->  Sort  (cost=101202.02..101202.04 rows=6 width=20)
                           Sort Key: c.innercode, c.enddate, c.indexcode
                           ->  Nested Loop  (cost=101177.03..101201.95 rows=6 width=20)
                                 ->  Hash Join  (cost=101176.60..101199.16 rows=6 width=8)
                                       Hash Cond: ((intermediate_result.id = b.id) AND (intermediate_result.tablename = (b.tablename)::text))
                                       ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=40)
                                       ->  Hash  (cost=55008.64..55008.64 rows=3077864 width=33)
                                             ->  Seq Scan on mf_calmarratio_tid_105450 b  (cost=0.00..55008.64 rows=3077864 width=33)
                                 ->  Index Scan using ix_mf_calmarratio_id_105699 on mf_calmarratio_105699 c  (cost=0.43..0.45 rows=1 width=24)
                                       Index Cond: (id = b.tid)
(43 rows)

 
--走分片键的执行计划
explain
select DISTINCT A.ID, 1 AS Flag 
 FROM JYDB.MF_CalmarRatio A -- 123924995
              JOIN JYDB.tmp_MF_CalmarRatio B  -- 202406241
         ON A.InnerCode = B.InnerCode AND A.EndDate = B.EndDate AND A.IndexCode = B.IndexCode
              LEFT JOIN JYDB.MF_CalmarRatio_TMP C 
         ON A.InnerCode = C.InnerCode AND A.EndDate = C.EndDate AND A.IndexCode = C.IndexCode
    WHERE C.InnerCode IS NULL;

 HashAggregate  (cost=500.00..502.00 rows=200 width=12)
   Group Key: remote_scan.id, remote_scan.flag
   ->  Custom Scan (Canopy Adaptive)  (cost=0.00..0.00 rows=100000 width=12)
         Task Count: 80
         Tasks Shown: One of 80
         ->  Task
               Node: host=10.101.0.122 port=5432 dbname=jydb
               ->  HashAggregate  (cost=134807.19..141019.36 rows=621217 width=12)
                     Group Key: a.id, 1
                     ->  Hash Anti Join  (cost=62185.66..131701.11 rows=621217 width=12)
                           Hash Cond: ((a.innercode = c.innercode) AND (a.enddate = c.enddate) AND (a.indexcode = c.indexcode))
                           ->  Hash Join  (cost=62162.18..120555.33 rows=621618 width=24)
                                 Hash Cond: ((b.innercode = a.innercode) AND (b.enddate = a.enddate) AND (b.indexcode = a.indexcode))
                                 ->  Seq Scan on tmp_mf_calmarratio_105779 b  (cost=0.00..39426.12 rows=2408512 width=16)
                                 ->  Hash  (cost=37197.52..37197.52 rows=1426552 width=24)
                                       ->  Seq Scan on mf_calmarratio_105699 a  (cost=0.00..37197.52 rows=1426552 width=24)
                           ->  Hash  (cost=14.90..14.90 rows=490 width=16)
                                 ->  Seq Scan on mf_calmarratio_tmp_105859 c  (cost=0.00..14.90 rows=490 width=16)
(18 rows)

Citus是一个分布式数据库,它将数据分布在多个节点上进行处理。由于数据分散在不同的节点上,当查询需要跨多个节点执行时,会涉及网络通信和数据传输,从而导致效率低下。

具体来说,当跨节点执行查询时,数据需要通过网络传输到不同的节点上进行处理,这会导致延迟和网络带宽的瓶颈。此外,每个节点都需要执行查询,这可能会导致节点之间的负载不平衡,从而导致一些节点的响应时间变慢。
为了提高效率,可以考虑以下几个方面:

  • 数据分布:将数据分配到不同的节点上时,可以根据查询模式和数据的特性进行优化,以最大限度地减少跨节点查询的需求。
  • 索引:对于常用的查询,可以在每个节点上创建适当的索引以加快查询速度。
  • 集群节点的硬件配置:可以提高节点的带宽、内存和处理器性能,以减少网络延迟和提高查询性能。
  • 并行查询:使用并行查询可以将查询任务分解成多个子任务,并在多个节点上并行执行,以提高整个查询的性能。
最后修改时间:2023-03-19 11:08:26
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论