暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Psycopg 中的管道模式

原创 Bigbig 2022-10-17
925

Psycopg是 Python 的 PostgreSQL 数据库适配器,最近增加了对libpq 管道模式的支持,从而显着提升了性能,尤其是在网络延迟很重要的情况下。在本文中,我们将从用户的角度和幕后简要描述它的工作原理, 同时提供一些实现细节。

Psycopg 徽标

Psycopg 3.1于 8 月下旬发布,是Psycopg 3 项目的第二个主要版本,它完全重写了古老的psycopg2。支持libpq 管道模式需要对驱动程序中的查询处理逻辑进行重大更改。然而,挑战在于使其与“普通”查询模式兼容,以保持 API 几乎不变,从而在不暴露批量查询模式的复杂性的情况下为用户带来性能优势。

对于不耐烦的人,请查看 Psycopg 的管道模式文档:它是自洽的,很好地解释了客户端/服务器通信的细节,以及从用户的角度来看事情是如何工作的。

在 Psycopg 中使用管道模式

Connection对象获得了一个pipeline()方法,通过上下文管理器(with语句)启用管道模式;所以使用它很简单:

conn = psycopg.connect()
with conn.pipeline():
   # do work

流水线模式有什么用?

Postgres 文档包含有关管道模式何时有用的建议。一种特殊情况是应用程序正在执行许多写操作(INSERTUPDATEDELETE)。

例如,让我们考虑以下模式:

CREATE TABLE t (x numeric, d timestamp, p boolean)

并假设一个应用程序做了很多查询,比如:

INSERT INTO t (x, d, p) VALUES ($1, $2, $3)

具有不同的值,并且x可能是一个大整数 ( n!, n<1000)。也许应用程序可以使用批量插入,例如 executemany(),也许不能(例如,因为它需要在插入之间做一些其他操作,比如查询另一个资源):这并不重要。

让我们把它放到一个小的demo.pyPython 程序中:

import math
import sys
from datetime import datetime
import psycopg

def create_table(conn: psycopg.Connection) -> None:
    conn.execute("DROP TABLE IF EXISTS t")
    conn.execute("CREATE UNLOGGED TABLE t (x numeric, d timestamp, p boolean)")

def do_insert(conn: psycopg.Connection, *, pipeline: bool, count: int = 1000) -> None:
    query = "INSERT INTO t (x, d, p) VALUES (%s, %s, %s)"
    for n in range(count):
        params = (math.factorial(n), datetime.now(), pipeline)
        conn.execute(query, params, prepare=True)

with psycopg.connect(autocommit=True) as conn:
    create_table(conn)
    if "--pipeline" in sys.argv:
        with conn.pipeline():
            do_insert(conn, pipeline=True)
    else:
        do_insert(conn, pipeline=False)
    row_count = conn.execute("select count(*) from t").fetchone()[0]
    print(f"→ {row_count} rows")

我们将运行我们的脚本python demo.py [--pipeline],该--pipeline标志允许启用管道模式。请注意,我们传递prepare=True给 Connection.execute(), 是为了发出一条PREPARE语句,因为我们将多次发出相同的查询。

一般来说,每个INSERT查询都会在服务器端快速执行。如果不启用流水线模式,客户端通常会发出查询然后等待其结果(尽管此处未使用):因此客户端/服务器往返时间可能会比执行时间(在服务器上)长得多。使用流水线模式,我们基本上大部分时间都省去了这些往返。

插曲:追踪

在优化客户端/服务器通信时,必须能够在相当低的水平上监控这种通信。从 Psycopg 的角度来看,边界是 libpq。幸运的是,该库通过PQtrace函数和朋友提供了跟踪机制。

此函数的输出如下所示(示例取自PostgreSQL 测试套件):

F	68	Parse	 "select_one" "SELECT $1, '42', $1::numeric, interval '1 sec'" 1 NNNN
F	16	Describe	 S "select_one"
F	4	Sync
B	4	ParseComplete
B	10	ParameterDescription	 1 NNNN
B	113	RowDescription	 4 "?column?" NNNN 0 NNNN 4 -1 0 "?column?" NNNN 0 NNNN 65535 -1 0 "numeric" NNNN 0 NNNN 65535 -1 0 "interval" NNNN 0 NNNN 16 -1 0
B	5	ReadyForQuery	 I
F	10	Query	 "BEGIN"
B	10	CommandComplete	 "BEGIN"
B	5	ReadyForQuery	 T
F	43	Query	 "DECLARE cursor_one CURSOR FOR SELECT 1"
B	19	CommandComplete	 "DECLARE CURSOR"
B	5	ReadyForQuery	 T
F	16	Describe	 P "cursor_one"
F	4	Sync
B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
B	5	ReadyForQuery	 T
F	4	Terminate

每行包含“方向指示符”(F用于从客户端到服务器B的消息或从服务器到客户端的消息)、消息长度、 消息类型及其内容。此示例显示来自扩展查询协议的消息。

在 Psycopg 中,我们可以通过Connection.pgconn属性访问表示 libpq 连接的低级PGconn对象。

stderr对于我们上面的程序,以下是如何启用对 , 的跟踪demo.py

from contextlib import contextmanager
from typing import Iterator
from psycopg import pq

@contextmanager
def trace_to_stderr(conn: psycopg.Connection) -> Iterator[None]:
    """Enable tracing of the client/server communication to STDERR."""
    conn.pgconn.trace(sys.stderr.fileno())
    conn.pgconn.set_trace_flags(pq.Trace.SUPPRESS_TIMESTAMPS | pq.Trace.REGRESS_MODE)
    try:
        yield
    finally:
        conn.pgconn.untrace()

def do_insert(conn: psycopg.Connection, *, pipeline: bool, count: int = 1000) -> None:
    # ...
    with trace_to_stderr(conn):
        for _ in range(count):
            conn.execute(query, params, prepare=True)

流水线还是不流水线

如果我们运行我们的演示脚本(没有管道模式),我们通常会得到以下输出:

F	69	Parse	 "_pg3_0" "INSERT INTO t (x, d, p) VALUES ($1, $2, $3)" 3 NNNN NNNN NNNN
F	4	Sync
B	4	ParseComplete
B	5	ReadyForQuery	 I
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8fp~WN' 1 '\x00' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	4	Sync
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	5	ReadyForQuery	 I
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8fp~^\xffffff80' 1 '\x00' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	4	Sync
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	5	ReadyForQuery	 I
[ ... and so forth ~1000 more times ... ]

我们确实看到客户端/服务器以消息序列的形式进行往返F, 然后是B每个查询的消息序列。

第一个消息序列Parse+ParseComplete对应于 PREPARE语句。接下来的只有一个Bind//Describe客户端Execute消息,然后是服务器响应。

现在使用管道模式(使用 运行脚本--pipeline),我们得到以下跟踪:

F	69	Parse	 "_pg3_0" "INSERT INTO t (x, d, p) VALUES ($1, $2, $3)" 3 NNNN NNNN NNNN
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82W\xfffffffe\xffffffd0' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x01' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\x00\xffffffc0' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	49	Bind	 "" "_pg3_0" 3 1 1 1 3 2 '\x00\x02' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\x01\xffffff81' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
[ ... ~300 more of those ... ]
B	4	ParseComplete
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
B	4	BindComplete
B	4	NoData
B	15	CommandComplete	 "INSERT 0 1"
[ ... ~300 more of those ... ]
F	383	Bind	 "" "_pg3_3" 3 1 1 1 3 336 '\x00\xffffffa4\x00\xffffffa3\x00\x00\x00\x00\x00\x00\x19k\x18\xffffff8c\x0c\xffffffa3!\x13\x17\xfffffffe!*\x03\xffffff90\x1f\xffffff94\x19V\x1cN$\xffffff8d\x1c[\x1fB\x1e\xffffffbb\x10e\x0f\x05\x0e\xffffff85\x13\xffffffd0\x0dK\x011\x03h\x08_$o!u\x07\xffffffb5\x0a\xffffffa6!\xffffffde\x04\xffffffc6\x0d\xffffffd5\x1d\xffffffa8\x1a\xffffffc9\x12s\x02\xfffffff8\x15\xffffffa0\x04%$\xfffffff0\x1f\xffffffd8\x12\xfffffff5\x17\xffffffd8\x05\xffffff96"\xffffffe7\x03\xfffffff5\x1a\xfffffff3\x1a\x19\x0fR\x19w\x1d\xffffffc5\x0f\xffffffe0\x05r\x03G\x0a\x1e\x062\x06\x07\x06\xffffff9e\x17\xffffffab\x11Y\x1eg\x1c\xffffff82\x15\xffffffb0\x09\xffffffdc\x03\xffffff8b\x0e\xffffffe9\x14\xffffffca\x05E\x08n\x07\xffffffc1\x08\xffffffc2\x11\xffffffc9\x05\x1f$*\x08\xffffffc6\x0b\xffffff8a\x04\xffffffb9 ,$\xffffffd3\x0cR\x12\xffffffb7\x08x\x0d\xffffffa8$,\x1d\x03\x05\x0b\x0a\xffffffcb\x06\x00\x03\xfffffff1\x14\xfffffffa\x0az\x06\xffffff81!/\x1c\x14\x11\xffffffab\x1a\xffffffb4\x12I\x03\xffffffff\x1cn\x10\xffffffe3\x15\xffffff89\x06\xffffffe3\x08B"\x19\x02\xffffff88\x1a\xffffff87\x00\x0d\x1d,\x0b\xffffffe6"\xffffffeb%k\x1e\x18\x08@\x0a\xffffff9f\x10\x1c!G\x14\xffffffff\x05\xffffffe5&o\x0ep\x0d\x01\x06p\x08\xffffffa3#O\x06d!\xffffffaf\x03\xffffffce\x00\x02\x0e\xffffff9e\x19\xffffffd6 \x1c"y'\x07\x00\xffffffbb\x1f\xffffff99%.\x0a\xffffffa7%#\x1e\xfffffffa\x0a\xffffff84\x1cl\x19P\x18\x19\x12\xffffff99\x11\x16\x18\xffffffdb\x1c\xffffffe1\x0f\xfffffff0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\xffffff9f\xffffff97' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	383	Bind	 "" "_pg3_3" 3 1 1 1 3 336 '\x00\xffffffa4\x00\xffffffa3\x00\x00\x00\x00\x00\xffffffcd\x19\x0a\x16\xffffffd6\x09\xffffffdf\x16\xffffff86\x04t\x0b4 \xffffffff\x12\xffffff8c&M\x00\x1f\x1b\xffffff81\x10\x00"\xffffffb0\x17\xffffffd8\x18\xffffffe5\x14\x11\x12|\x0b+\x14\xffffffed\x19\x07\x15\xfffffff3\x1d:\x1d\xffffffb2\x19\xffffffca\x0d\xffffffe2\x06\xffffff99&\x1e\x18w#\xffffffeb$H\x1b1\x09\xffffffbc\x01N$\xffffffc1\x15\xffffffc6 \xffffffa1\x18)\x0e\xffffff9c"\xffffffcd\x08r\x0d\xffffffa4\x01F\x01'\x05'%V\x00\xfffffff4 \xffffffac\x10\xffffffac\x02\x12\x14U!*\x04\xffffffc8\x1d\xffffffd9\x15w\x12\xffffffb0\x0e\x11%\xffffffba\x18\xffffffc7\x11\xffffff9f\x1d\xffffffbc\x1aL\x18\xffffffc4\x07\x02\x18\xffffffd0\x07\xffffffc6\x1c\xffffffa2!\xffffffa7"U\x11\xffffffd8\x15\xffffffde&e\x0d\xffffffae\x09\x00\x0b9#G\x1a\xffffff9f\x0f\xffffffb8\x14N\x13\xffffffa4\x18\xfffffffa\x1b<\x1fk\x0cT\x15\x1f#5\x1b\\x1d\xffffff8c\x19\x08\x12'\x06\x0e%\x0c\x01C$\x0c\x0d\xffffffa9'\x00\x18b\x08s\x1c\x06 k \xffffffc0\x13v\x17D\x10\xfffffff7'\x00\x0b\x02\x13\xffffffa2\x1c'\x11\xffffffb2\x1d5$v\x0d}\x08}!c\x1b\xfffffff2$\x18\x1fi\x07\xffffffe0\x03E#\x01\x18\xffffffe7\x1cP\x13"\x1eh\x02\xffffffee\x0ay\x01\x1b\x1ev#7\x1b\xfffffff9$\xffffff83\x19\x18\x1e^\x07\xfffffff0\x11n\x17M\x03\xffffff85$\xffffffcc\x1e\xffffffc2%R\x12\x06\x09Q\x03\xffffffad\x18\xffffffac$@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\xffffffa6\x1c' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
F	385	Bind	 "" "_pg3_3" 3 1 1 1 3 338 '\x00\xffffffa5\x00\xffffffa4\x00\x00\x00\x00\x00\x06\x14D\x08k\x0c\xffffffbe\x04\xffffffd9\x1e\xffffffb2\x05\xffffffbe$\xffffffcf\x1e\xffffffa9\x152\x1f\xfffffffa\x002\x08N"\x09\x14!\x142\x01\xffffff93!\xffffff83\x00\xffffff86\x19H \xffffffb4\x04\xffffffad\x05\xffffffb4\x07\xfffffff3\x00+\x0b\xffffff82\x1a\x0f\x16Z\x0d\xffffff9c\x16\x1e\x13\xfffffff5\x11\xffffffa4\x1a;&\xfffffff6\x18 \x0b\x0d\x1c6\x1f\xffffffa1\x02\xfffffffb\x16\xffffffe6\x10}\x15X\x1b>\x0d\x17\x0d\xffffffe4 \xffffffe9&\xffffffa5\x1d\xffffffbd\x05\xfffffff3\x0b\xffffff9c\x1f\xffffffef\x00\xfffffffe\x05X :\x09C\x08\x12\x19\xfffffff2\x07\x1f\x06\xfffffffb\x03j\x00\xffffffe4\x0c\xffffff91\x10\xffffff94&l"\xffffffc3\x0e?\x04&\x0f+\x04\xffffffd1\x18q )\x13\x0d\x17\x10\x00\xffffffcf\x01\xffffffcb\x04\x03\x0b\xffffffe3\x01\xffffffe2\x16\xffffff8c\x1e\xfffffff5\x0f\xffffffee\x1b\xffffffcf\x01z&\x03\x02o\x10\xffffffd0\x1c\xffffffaf\x01\xfffffffb\x1f7\x05\xffffffcb\x0cL\x06r\x19&\x0a{\x15\x0a"\xffffffa1\x14\x05"N\x17\x0a\x11E\x04\x18\x1e\xffffffcd%\x0a\x1f\xfffffffd\x1b\xffffff87\x13\xffffff99\x0d\xffffff89\x0d\xffffff8e\x12\xffffff9a\x18g\x01\xfffffff8#\x1b\x12=#\xffffff97%\xffffff99\x1f\xffffffae$v#d#\xffffff8a\x15\xffffffed\x03G\x04P\x1e[\x0b`\x1d\x7f\x1e\xffffff9a&\xffffff9d&\xffffffe6\x08\xffffffcb\x1f.\x01M\x0c\xffffff82\x19\xfffffffe\x11F\x10\xffffffbd\x12#\x03\xffffffa5\x17\x1b\x18\xfffffff5\x18\xffffffd8"<\x0a\xffffff99\x17\xffffffba!1\x09\xffffffa2\x06\xffffffe0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 8 '\x00\x02\xffffff8b\xffffff8f\xffffff82X\xffffffa6\xffffffe6' 1 '\x01' 1 0
F	6	Describe	 P ""
F	9	Execute	 "" 0
[ ... ]

我们可以看到客户端在服务器回复之前发送了超过 900 条消息(消息数量相同)。显然,这会对性能产生巨大影响,尤其是在网络延迟很重要的情况下。事实上,即使 Postgres 服务器开启,它的运行速度也快了两倍localhost

实际发生的是客户端发送尽可能多的查询,直到服务器决定它不能管理更多(通常是因为它的输出缓冲区已满,通常是因为我们要插入的大整数),此时服务器发回所有查询的结果;冲洗并重复。管道模式不是产生小而频繁的客户端/服务器往返,而是通过产生大而稀缺的往返来优化网络通信。“缺点”(记住我们得到了 2 倍的加速)是客户端程序通常需要处理更多内存中的数据。

它是如何工作的?

如前所述,管道模式的入口点是 进入和存在管道模式的对象上的pipeline()方法。Connection但是,这是什么意思?好吧,基本上,这涉及调用底层PQ{enter,exit}PipelineMode函数。

但这并不能说明 Psycopg 中的工作原理。

要真正理解事情是如何工作的,我们需要退后一步阅读libpq 管道模式文档,其中“Interleaving Result Processing and Query Dispatch”部分指出:

客户端应用程序通常应该维护一个剩余待分派的工作队列和一个已分派但尚未处理其结果的工作队列。当套接字可写时,它应该分派更多的工作。当套接字可读时,它应该读取结果并处理它们,将它们匹配到相应结果队列中的下一个条目。

与 PostgreSQL 一样,一切都在那里。但这一段在某种程度上是神秘的。然而,事实上,它描述了 Psycopg 驱动程序算法的核心(尽管我们花了一些时间来掌握这几句话所暗示的所有细节……)。

通讯插座

当套接字可写时,它应该分派更多的工作。当套接字可读时,它应该读取结果 […]。

在 Psycopg 中,用于交换 libpq 消息的套接字通信是通过由 I/O 层(阻塞或异步)捆绑在一起的等待函数生成器实现的:这在Daniele 的博客文章中有详细解释。

管道模式(主要)重要的是生成器部分,因为它负责将查询分派到套接字或从套接字读取结果。与普通查询模式相比,这些步骤由独立的逻辑顺序处理,管道模式需要交错的结果处理和查询调度:这是由pipeline_communicate() 生成器实现的。无需过多介绍细节,我们可以注意到:

  • 该函数采用“命令”队列,例如 pgconn.send_query_params()或类似的,
  • 它不断等待套接字读取或写入就绪(或两者)(ready = yield Wait.RW),
  • 当套接字准备就绪(if ready & Ready.R:)时,获取结果(调用pgconn.get_result()),
  • 当套接字准备好(if ready & Ready.W:)时,发送命令(调用pgconn.flush()以刷新先前发送的命令队列,然后调用任何待处理的命令),
  • 直到命令队列变空。

排队工作,处理结果

围绕上述pipeline_communicate()生成器,我们需要处理命令队列以及待处理的结果队列。第一部分,填充命令队列,简单地通过堆叠命令来管理,而不是直接调用它们,同时保持对用于execute(). 第二部分意味着处理上述pipeline_communicate()生成器 的输出,即PGresult列表。每个获取的结果项:

  • 可能绑定回其各自的光标(各自的 execute()来源),
  • OK如果其状态为非(例如FATAL_ERROR),则可能会触发错误。

所有这些都在BasePipeline类的方法中处理(参见_末尾带有前缀的方法)。

与高级功能集成:事务

除了上面描述的低级逻辑,在 Psycopg 中实现流水线模式意味着处理一些 Psycopg 特定的功能,例如:事务。

由于错误处理在管道模式下的工作方式,事务需要特别注意。有一些不同的情况需要正确处理,具体取决于管道是使用隐式事务还是包含显式事务。但一般规则是,当发生错误时,管道进入中止状态,这意味着后续命令将被跳过,并且先前的语句可能会被持久化(取决于显式事务的使用与否)。

考虑以下在管道中执行的语句:

BEGIN;  # transaction 1
INSERT INTO s VALUES ('abc');
COMMIT;
BEGIN;  # transaction 2
INSERT INTO no_such_table VALUES ('x');
ROLLBACK;
BEGIN;  # transaction 3
INSERT INTO s VALUES ('xyz');
COMMIT;

SELECT * from s;
-> abc

INSERT INTO no_such_table语句将产生错误,使管道中止;因此,以下显式ROLLBACK将不会被执行。接下来的语句(“事务 3”)也将被跳过。

另一个例子:

BEGIN;  # main transaction
INSERT INTO s VALUES ('abc');
BEGIN;  # sub-transaction
INSERT INTO no_such_table VALUES ('x');
ROLLBACK;
INSERT INTO s VALUES ('xyz');
COMMIT;

SELECT * from s;
-> []

在这里,仍然由于相同的错误INSERT INTO no_such_table,最终 COMMIT语句没有执行,主(外部)事务没有提交(尽管内部子事务显式回滚)。

这通常是高级驱动程序的用户不想要的。

在 Psycopg 中,事务是通过对象上的transaction() 上下文管理器方法显式管理的Connection。所以为了保持一致的行为,它的逻辑需要适应流水线模式。这是通过PQpipelineSync和嵌套管道利用同步点来实现的。

嵌套管道

在 libpq 中,没有嵌套管道这样的东西,因为连接只能进入管道模式一次。Psycopg 中所谓的“嵌套管道”是通过同步点“隔离”管道会话中的一系列命令的操作。通过这样做,我们解决了上述令人惊讶的行为(提交的事务被回滚)。这是发生的事情:

with conn.pipeline():  # emits PQenterPipelineMode
  conn.execute(...)
  with conn.pipeline():  # emits PQpipelineSync
    conn.execute(...)
  # exiting the inner 'with' block emits PQpipelineSync
# exiting the outermost 'with' block emits PQexitPipelineMode

PQpipelineSync操作重置管道状态,从而允许后续命令独立运行,而与之前的命令是否成功无关。(它还会触发从服务器发回结果,但这是另一回事。)

流水线交易

通过对 Psycopg 事务使用嵌套管道,我们通常遵循libpq 管道模式 文档中提到的“逻辑工作单元”模式:

管道的范围应限定为逻辑工作单元,通常(但不一定)每个管道一个事务。

(除了我们没有严格地为每个事务使用一个管道,而是一个嵌套的管道。)

在实践中,这意味着with transaction():在管道会话中使用块是安全的,因为事务和管道的语义都被保留:事务要么成功要么总体失败,只有在管道会话中的先前命令成功时才会执行:

with conn.pipeline():
  conn.execute(...)
  try:
      with conn.transaction():  # implicit nested pipeline (with conn.pipeline())
          conn.execute(...)
  finally:
      # This will be executed independently of whether the previous
      # transaction succeeded or not.
      conn.execute(...)

回到上面的(第二个)示例,如果使用 Psycopg 编写:

>>> with psycopg.connect(autocommit=True):
...     with conn.pipeline():
...         with conn.transaction():
...             conn.execute("INSERT INTO s VALUES (%s)", ("abc",))
...             try:
...                 with conn.transaction():
...                     conn.execute("INSERT INTO no_such_table VALUES (%s)", ("x",))
...             except errors.UndefinedTable:
...                 pass
...             conn.execute("INSERT INTO s VALUES (%s)", ("xyz",))
...     conn.execute("SELECT * FROM s ).fetchall()
[('abc',), ('xyz',)]

我们确实让内部事务回滚,而外部事务被提交,就像没有管道模式一样。

这是一个实现细节,用户不需要知道这一点,因为整体行为希望是自然的。


在 Psycopg 中支持 libpq 管道模式是一个重要的里程碑。它需要数月的工作和大量的思考和测试。关于它可能还有更多要说的,比如它如何透明地管理自动准备语句或executemany()如何优化以隐式使用管道模式(尝试调整上面的演示脚本以使用它 - 提示:不需要with pipeline:块)。并且一定要尽快阅读 Psycopg 的管道模式文档!

原文标题:Pipeline mode in Psycopg

原文作者:丹尼斯· 拉克萨尔德

原文链接:https://blog.dalibo.com/2022/09/19/psycopg-pipeline-mode.html

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论