暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

使用GO对PostgreSQL进行有意思的多线程压测

原创 Sean 2024-03-28
333

使用GO对PostgreSQL进行有意思的多线程压测

前言

针对PostgreSQL进行压缩,有很多相关的工具。有同学又要问了,为何还要再搞一个?比如,pgbench, sysbench之类的,已经很强大了。是的,它们都很强大。但有时候,在一些特殊的场景,可能自己构造一个更能接近真实的生产环境。

这里,我半写,半借助于ChatGPT,搞出一个代码片段来模拟启动一段多线程并发SQL请求,作用于PostgreSQL数据库。然后,你可以对请求执行完以后的结果进行观测,尤其是表膨胀,受影响记录条数之类的。

基于此,我们还可以进行持续改造,快速用于工作之中。

实作

需求:

实现一段代码,读取一个sql文件,然后分段分批执行,并且是以多线程(比如10个线程,go里边可能就是协程,非常高效)去执行这个SQL中的所有SQL语句。再加一个时间限制,比如持续执行120秒。

实现:

package main

import (
	"bufio"
	"context"
	"database/sql"
	"fmt"
	"io"
	"os"
	"strings"
	"sync"
	"time"
	_ "github.com/lib/pq"
)

const (
	host     = "localhost"
	port     =  5555
	user     = "postgres"
	password = "password"
	dbname   = "mydb"
)

func execute_sqls(ctx context.Context, sqls []string, wg *sync.WaitGroup, thread int) {
	defer wg.Done()

	psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
	db, err := sql.Open("postgres", psqlInfo)
	if err != nil {
		panic(err)
	}
	defer db.Close()

	start := time.Now()
	
	for {
		for _, sql := range sqls {
			select {
			case <-ctx.Done():
				elapsed := time.Since(start)
				fmt.Printf("Thread %d stopped. It executed SQLs for %s \n", thread, elapsed)
				return
			default:
				_, err := db.Exec(sql)
				if err != nil {
					fmt.Println(err)
				}
			}
		}
	}
}

func read_sqls(file string) []string {
	f, err := os.Open(file)
	if err != nil {
		panic(err)
	}
	defer f.Close()

	sqls := make([]string, 0)
	r := bufio.NewReader(f)
	
	for {
		line, err := r.ReadString(';')
		if err == io.EOF {
			break
		} else if err != nil {
			panic(err)
		}
		
		sql := strings.TrimSpace(line)
		if sql != "" {
			sqls = append(sqls, sql)
		}
	}
	
	return sqls
}

func main() {
	filepath := "file.sql" // Replace with your file path
	numThreads := 10  // Number of threads

	sqls := read_sqls(filepath)
	
	var wg sync.WaitGroup
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // 60 seconds
	
	for i := 0; i < numThreads; i++ {
		wg.Add(1)
		go execute_sqls(ctx, sqls, &wg, i)
	}
	wg.Wait()
	
	cancel()
	fmt.Println("All goroutines stopped")
}

上边的代码,关于输入文件:file.sql, 线程数:10, 运行时间:60秒,都是硬编码进去的。你可以根据实际情况,进行参数化。

体验:

在你的go环境已经安装了"github.com/lib/pq"等必备包之后(go get github.com/lib/pq),就可以直接执行了。我们准备一个pg的基本环境。database: mydb, 端口:5555, 就用postgres用户及相应密码(仅用于测试目的),不缀述。

目标表的准备:

\c mydb
create table t(id int, col2 varchar(32));

file.sql文件内容如下:

insert into t values ((10000*random())::int, md5(random()::varchar));
with updates as (select (10000*random())::int as id) update t set col2 = 'update' || updates.id from updates where t.id=updates.id returning updates.id;

这个测试的代码片段,就是插入一条随机记录,并且再随机更新一条记录,使用CTE语法,把对应的id值返回来,有可能找不到对应的记录,就返回的是空值。在并发大的情况下,update语句慢慢就起作用了。这样就可以反复执行。

来看看效果:

go run ./stress.go

hread 7 stopped. It executed SQLs for 59.999324s 
Thread 1 stopped. It executed SQLs for 1m0.000756208s 
Thread 2 stopped. It executed SQLs for 1m0.000604792s 
Thread 4 stopped. It executed SQLs for 1m0.001703583s 
Thread 0 stopped. It executed SQLs for 1m0.008518875s 
Thread 9 stopped. It executed SQLs for 1m0.008456083s 
Thread 5 stopped. It executed SQLs for 1m0.007964375s 
Thread 6 stopped. It executed SQLs for 1m0.007968292s 
Thread 3 stopped. It executed SQLs for 1m0.008145042s 
Thread 8 stopped. It executed SQLs for 1m0.008202209s 
All goroutines stopped

1分钟跑完之后,我们看到这样的部分记录结果:

mydb=# select * from t limit 10;
  id  |    col2
------+------------
 4792 | update4792
 3416 | update3416
 9290 | update9290
  887 | update887
 8778 | update8778
 7472 | update7472
 4602 | update4602
 3454 | update3454
 2604 | update2604
 1990 | update1990
(10 rows)

总记录条数:

mydb=# select count(*) from t;
 count
--------
 126056
(1 row)

引申:可以认为单个C+U操作,10个线程并发,1分钟入库12.6万。

表大小:

mydb=# select pg_total_relation_size('t');
 pg_total_relation_size
------------------------
                8372224
(1 row)

使用下边的SQL看看相关指标:

WITH cteTableInfo AS 
(
    SELECT 
        COUNT(1) AS ct
        ,SUM(length(t::text)) AS TextLength  
        ,'public.t'::regclass AS TableName  
    FROM public.t AS t  
)
,cteRowSize AS 
(
   SELECT ARRAY [pg_relation_size(TableName)
               , pg_relation_size(TableName, 'vm')
               , pg_relation_size(TableName, 'fsm')
               , pg_table_size(TableName)
               , pg_indexes_size(TableName)
               , pg_total_relation_size(TableName)
               , TextLength
             ] AS val
        , ARRAY ['Relation Size'
               , 'Visibility Map'
               , 'Free Space Map'
               , 'Table Included Toast Size'
               , 'Indexes Size'
               , 'Total Relation Size'
               , 'Live Row Byte Size'
             ] AS Name
   FROM cteTableInfo
)
SELECT 
    unnest(name) AS Description
    ,unnest(val) AS Bytes
    ,pg_size_pretty(unnest(val)) AS BytesPretty
    ,unnest(val) / ct AS bytes_per_row
FROM cteTableInfo, cteRowSize

UNION ALL SELECT '------------------------------', NULL, NULL, NULL
UNION ALL SELECT 'TotalRows', ct, NULL, NULL FROM cteTableInfo
UNION ALL SELECT 'LiveTuples', pg_stat_get_live_tuples(TableName), NULL, NULL FROM cteTableInfo
UNION ALL SELECT 'DeadTuples', pg_stat_get_dead_tuples(TableName), NULL, NULL FROM cteTableInfo;

结果:

          description           |  bytes  | bytespretty | bytes_per_row
--------------------------------+---------+-------------+---------------
 Relation Size                  | 8339456 | 8144 kB     |            66
 Visibility Map                 |    8192 | 8192 bytes  |             0
 Free Space Map                 |   24576 | 24 kB       |             0
 Table Included Toast Size      | 8372224 | 8176 kB     |            66
 Indexes Size                   |       0 | 0 bytes     |             0
 Total Relation Size            | 8372224 | 8176 kB     |            66
 Live Row Byte Size             | 2338451 | 2284 kB     |            18
 ------------------------------ |         |             |
 TotalRows                      |  126056 |             |
 LiveTuples                     |  126056 |             |
 DeadTuples                     |   12274 |             |
(11 rows)

里边有涉及到的死元组为12274行。

mydb=# create extension pgstattuple;
CREATE EXTENSION

mydb=# select * from pgstattuple('public.t') \gx
-[ RECORD 1 ]------+--------
table_len          | 8339456
tuple_count        | 126056
tuple_len          | 5125646
tuple_percent      | 61.46
dead_tuple_count   | 12110
dead_tuple_len     | 483172
dead_tuple_percent | 5.79
free_space         | 1451672
free_percent       | 17.41

这两种统计结果也都比较接近。

当你针对相同的表,进行随机多次测试,发现上边的值也会不断变化(update的命中率会越来越高)。

总结:

本文的目的,只是作一个抛砖引玉,可以随时使用go, python甚至rust去构建一个小的压缩环境,对各种复杂的压力环境进行模拟,并得出相关结论。当然,作为一个团队,可以开发出使用Java之类的接近业务逻辑的工具也是可以的。也有的测试团队,原意使用JMeter + jdbc来构建测试套集,都不失为一种方式。这类工具,是介于pgbench 和 真实业务场景压测之间的一种使用方式。哪个更方便,就可以用哪个。

上边的代码片段,稍加改造,就可以用到实际的实验当中。

关于表膨胀,可以看看我前边的文章:

也聊聊PostgreSQL中的空间膨胀与AutoVacuum

PG中的一例简单的update看表膨胀

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论