一、简介
最近在我的开发工作中,除了现有的用于流式传输 WAL 数据的 walsender/walreceiver 连接之外,还需要在主节点上的 PG 后端和备用节点上的另一个 PG 后端之间维护自定义连接,以传递自定义数据。当然,我可以创建一个新的独立后端并自己维护一个套接字连接来传递自定义数据。从技术上讲,它可以工作,但也产生了一些问题。这个自定义连接的持久性、用户安全性、数据加密都需要处理。那么,为什么不直接使用 libpq 为我们处理所有这些问题呢?今天,在这篇博客中,我将分享我使用libpq COPY协议维护基于PG14的自定义数据的自定义连接的经验。
2.在libpqwalreceiver.c中新建例程
该文件位于共享库 (.so) 中src/backend/replication/libpqwalreceiver
并被编译为共享库 (.so),其中包含与 libpq 库相关的例程,允许 PG 后端使用 libpq 而无需在后端代码中编译它。当 PG 后端进程需要使用 libpq 时,需要先使用load_file()
call 加载共享库。
我的要求很简单,我需要的新例程是connect
, send
, recv
, 类似于正常的套接字交互。我没有在close
这里定义函数,因为我希望只要主服务器和备用服务器都在运行,就可以保持连接。当其中一个退出时,一旦检测到对等方已断开连接,连接将自动终止。
3. 连接例程
与libpqrcv_connect
复制例程不同,我的情况要简单得多。我只需要我的备用节点连接到主节点,所以我可以简单地重用备用节点的primary_conninfo
配置参数来连接。这将触发主节点分叉一个新的后端进程来服务这个连接。代码剪辑器可能如下所示:
我还将我的 libpq 套接字连接设置为使用阻塞套接字并设置asyncStatus
为PGASYNC_COPY_BOTH
表示我将进行双向数据通信
4.发送例程
libpqrcv_send
我的发送例程与复制例程完全相同。两者都用于PQputCopyData
将数据流发送到主节点。重命名它以保持一致性。片段如下:
5. 接收例程
此外,与复制程序非常相似libpqrcv_recv
,它共享几乎完全相同的代码。除了我的要求,连接需要是synchronous
连接。这意味着我的备用服务器将在等待主服务器响应时阻塞。为了使 recv 同步,我必须将 a 传递0
给PQgetCopyData
. 所以,如果你对asynchronous
连接没问题,这个程序看起来也完全一样libpqrcv_recv
。
6. 有待发送一些自定义数据
现在我们已经为我们自己的目的制作了 libpq 包装例程,然后我们可以让备用服务器向主服务器发送一些自定义数据并等待响应。请注意,我发送的是一个字母“N”,后跟 3 个示例自定义数据,100、200、300。Libpq COPY 使用该字母d
表示一个 COPY 命令,而我们在这里所做的是wrap
命令中我们自己的d
命令
StringInfoData buf_blk_request;
WalReceiverConn *wrconn;
int len;
load_file("libpqwalreceiver", false);
wrconn = netbuf_connect("dbname=postgres host=127.0.0.1 port=5550");
initStringInfo(&buf_blk_request);
pq_sendbyte(&buf_blk_request, 'N');
pq_sendint32(&buf_blk_request, 100);
pq_sendint32(&buf_blk_request, 200);
pq_sendint32(&buf_blk_request, 300);
pq_flush();
/* Send it */
netbuf_send(wrconn, buf_blk_request.data, buf_blk_request.len);
/* Read the data */
len = netbuf_recv(wrconn, &tmp, &fd);
if (len > 0)
{
/*
* Something was received from primary
*/
}
7. 让 Primary 接收自定义数据
当我们使用上述方法发送一些东西时,primary 的 postmaster 的主循环将接收数据并决定要做什么。因为我们使用的是 COPY 协议,所以第一个字符是d
,其中src/backend/tcop/postgres.c
已经有一个处理程序。所以我们需要在d
handler下添加额外的代码postgres.c
来接收和处理standby发送的数据,并在需要时提供响应。
case 'd': /* copy data */
elog(DEBUG2, "copy data request received");
int op;
op = pq_getmsgbyte(&input_message);
if (op == 'N')
{
StringInfoData buf_blk_reply;
int data1, data2, data3;
/* receive custom data here */
data1 = pq_getmsgint(&input_message, 4);
data2 = pq_getmsgint(&input_message, 4);
data3 = pq_getmsgint(&input_message, 4);
pq_getmsgend(&input_message);
/* send another custom data back to standby here */
pq_beginmessage(&buf_blk_reply, 'd');
pq_sendint32(&buf_blk_request, 400);
pq_sendint32(&buf_blk_request, 500);
pq_sendint32(&buf_blk_request, 600);
pq_endmessage(&buf_blk_reply);
pq_flush();
}
break;
8. 总结
基于 libpq COPY,我在主节点和备用节点之间创建了一个单独的通信通道,可用于通信自定义数据,类似于您通常处理常规套接字的方式。所有这些都基于 libpq 已经支持的 COPY 协议,并且在该协议中,我们包装了自己的数据。在上面的例子中,当standby向primary发送100、200、300时,它能够接收并响应400、500、600。这个简单的例子可以扩展来支持你在开发中可能需要的其他东西。这种为我自己的目的使用 COPY 的方式可能不是最干净的方式,但它对我有用。
黄嘉里
Cary 是 HighGo Software Canada 的高级软件开发人员,在加入 HighGo 之前,拥有 8 年在智能电网和计量领域开发 C/C++ 创新软件解决方案的行业经验。他于 2012 年在温哥华的英国列比亚大学 (UBC) 获得电气工程学士学位,并在以下技术方面拥有丰富的实践经验:高级网络、网络和数据安全、智能计量创新、Docker 部署管理、软件工程生命周期、可扩展性、身份验证、密码学、PostgreSQL 和非关系数据库、Web 服务、防火墙、嵌入式系统、RTOS、ARM、PKI、思科设备、功能和架构设计。
原文标题:Maintain a custom PG to PG Connection With libpq’s COPY protocol
原文作者:黄嘉里
原文地址:https://www.highgo.ca/2022/07/08/maintain-a-custom-pg-to-pg-connection-with-libpqs-copy-protocol/