原文链接:https://blog.dalibo.com/2022/06/01/psycopg-row-factories.html
作者:Toulouse
psycopg 是 Python 编程语言的 PostgreSQL 数据库适配器。因此,它的主要目标之一是在发出查询和检索结果时,自动调整 PostgreSQL 和 Python 类型。虽然这种转换适用于内置类型,但 psycopg 还公开了一种处理特定于域的数据类型的本地方法:行工厂。
这些领域模型通常由应用程序开发人员以 Python 中的数据类的形式定义,映射到 PostgreSQL 中的一个或多个关系(通常是表)。这通常是 ORM 发挥作用的地方,它提供了一个抽象层来编写来自模型类的查询,并将结果映射到模型实例。在 psycopg 3 中,行工厂提供了一种简单而强大的方法来处理特定于域的模型,而无需任何额外的依赖。此外,我们将看到如何利用这个轻量级特性来编写更安全的应用程序,特别是通过静态类型。
什么是行工厂?
假设我们正在处理天气数据,将城市的日期报告收集到weather
表中:
CREATE TABLE weather ( city varchar(80), temp_lo int, temp_hi int, prcp real, date date );
复制
当使用 psycopg 查询此表时,我们将行作为元组值获取,每一列都具有与 PostgreSQL 匹配的正确 Python 类型:
>>> with psycopg.connect() as conn: ... print(conn.execute("SELECT * FROM weather").fetchone()) ('San Francisco', 46, 50, 0.25, datetime.date(1994, 11, 27))
复制
获取其他内容的最简单方法是使用内置的行工厂,例如dict_row
,它生成将结果集列名映射到值的字典:
>>> with psycopg.connect() as conn, conn.cursor(row_factory=psycopg.rows.dict_row) as cur: ... print(cur.execute("SELECT * FROM weather").fetchone()) {'city': 'San Francisco', 'temp_lo': 46, 'temp_hi': 50, 'prcp': 0.25, 'date': datetime.date(1994, 11, 27)}
复制
行工厂在游标初始化时传递,从 fetchone() 调用获得的行确实是一个字典。
现在更有趣的是,假设我们的 Python 应用程序中有以下特定于域的模型:
from dataclasses import dataclass from datetime import date @dataclass class Weather: city: str temperature: tuple[int, int] precipitation: float date: date
复制
目标是在执行诸如SELECT * FROM weather
之类的查询时获取此Weather
类的实例。这就是自定义行工厂发挥作用的地方。
行工厂通常是一个函数,它将原始 Python 数据(由 psycopg 从 SQL 值改编)处理成某种最终类型,这里是上面 Weather
类的实例。我们可以写一个这样的函数:
def weather_from_row(city, temp_lo, temp_hi, prcp, date): return Weather( city=city, temperature=(temp_lo, temp_hi), precipitation=prcp, date=date )
复制
但是,这还不够,因为 psycopg 缺少有关用于此函数的 SQL 结果集中的列的信息。这就是为什么行工厂额外使用当前查询所使用的游标的原因。因此,必须将该函数包装到工厂中,如下所示:
def weather_row_factory(cursor): # Extract result set column names. columns = [column.name for column in cursor.description] def make_row(values): # Map column names to values row = dict(zip(columns, values)) return weather_from_row(**row) return make_row
复制
接下来,如果这些函数是上面 Weather
类的类方法,我们可以说会得到更好的设计;我们开始做吧:
@dataclass class Weather: city: str ... @classmethod def from_row(cls, *, city, temp_lo, temp_hi, prcp, date): return cls( city=city, temperature=(temp_lo, temp_hi), precipitation=prcp, date=date ) @classmethod def row_factory(cls, cursor): columns = [column.name for column in cursor.description] def make_row(values): row = dict(zip(columns, values)) return cls.from_row(**row) return make_row
复制
现在一切就绪,让我们看看它是如何工作的:
>>> with psycopg.connect() as conn: ... with conn.cursor(row_factory=Weather.row_factory) as cur: ... cur.execute("SELECT * FROM weather") ... row = cur.fetchone() >>> print(row) Weather(city='San Francisco', temperature=(46, 50), precipitation=0.25, date=datetime.date(1994, 11, 27))
复制
如您所见,上面的row
变量是一个 Weather
实例,因此我们可以这样操作它,例如访问属性:
>>> import statictics >>> statictics.mean(row.temperature) 48
复制
如何更安全?
在数据库或一般 I/O 的上下文中,涉及在 I/O 边界工作的代码(将数据从一种类型系统转换为另一种类型系统)通常非常“不安全”,需要特别小心,例如解码/编码时的验证。
如上一节所示,通过从 psycopg 发出查询时使用行工厂,我们尽早执行从“原始”Python 类型到特定领域模型的转换。因此,在此 I/O 边界上花费的时间减少了。通常,人们会非常注意编写正确的 SQL 查询及其相应的行工厂,测试它们,然后应用程序代码的其余部分会很好地工作,而不必担心来自数据库的数据。这通常是 ORM 也有帮助的地方。
将行工厂与 Python 静态类型结合使用时,还有一个很大的好处。静态类型以及诸如 mypy 之类的类型检查器提供了额外的安全保证,而不会影响运行时性能。然而,在 I/O 边界上利用这一点可能更难,因为这通常涉及“弱”类型(即 str 或 dict 类型,而不是被视为“强”类型的 Weather)。在这种情况下,psycopg 的行工厂特性有很大帮助,因为它为 I/O 边界带来了强类型化。因此,通常会使用具有强大验证能力的模型类型(例如,Python 生态系统中的 pydantic 等库)。
所以让我们回到我们之前的例子来添加类型注释:
from typing import Any, Sequence @dataclass class Weather: city: str ... @classmethod def from_row(cls, *, city: str, temp_lo: int, temp_hi: int, prcp: float, date: date) -> Weather: return cls( city=city, temperature=(temp_lo, temp_hi), precipitation=prcp, date=date ) @classmethod def row_factory(cls, cursor: Cursor[Any]) -> Callable[[Sequence[Any]], Weather]: columns = [column.name for column in cursor.description] def make_row(values: Sequence[Any]) -> Weather: row = dict(zip(columns, values)) return cls.from_row(**row) return make_row
复制
以下示例类型检查正常:
def get_weather_reports(conn: Connection[Any]) -> list[Weather]: with conn.cursor(row_factory=Weather.row_factory) as cur: cur.execute("SELECT * FROM weather") return cur.fetchall()
复制
通过将类型化的行工厂传递给用于获取查询的游标,从此游标的 fetchall()
调用中获得的值被推断为具有相应的行类型。也就是说,在这里,游标有一个行工厂返回 Weather
类型的行,因此 fetchall()
返回一个 list[Weather]
值,它与 get_weather_reports()
声明的内容相匹配(并且将由诸如 mypy
之类的类型检查器进行验证)。这更安全,因为数据以强类型(例如 Weather
类型)的形式来自数据库,而不是匿名值(int
、date
等)的元组(或 dict
)。这更安全,因为当我们传递那些强类型值时,我们避免了类型检查器无法验证的典型运行时错误(例如元组的 IndexError
或 dict
的 KeyError
);另一方面,可以检查强类型发生的典型 AttributeError
。例如,以下 IndexError
直到运行时才能捕获:
>>> with conn.cursor() as cur: ... rows = cur.execute("SELECT * FROM weather").fetchall() # type is list[tuple[Any, ...]] >>> rows[0][5] Traceback (most recent call last): ... IndexError: tuple index out of range
复制
而以下 AttributeError
被类型检查器捕获(因此可以在运行时避免):
>>> with conn.cursor(row_factory=Weather.row_factory) as cur: ... rows = cur.execute("SELECT * FROM weather").fetchall() # type is list[Weather] >>> rows[0].prcp Traceback (most recent call last): ... AttributeError: 'Weather' object has no attribute 'prcp'
复制
最后,我们通过使用行工厂(psycopg 的内置功能)变得更安全,但我们也通过在查询数据库时避免笨拙的 dict 等来获得表达能力。
在引擎盖下如何工作?
现在,上一节的最后一部分演示了行工厂中定义的类型通过 Cursor
方法传播,这似乎有点神奇。然后让我们回顾上一个示例并添加一些reveal_type()
调用(结果显示为#note
:comments):
def get_weather_reports(conn: Connection[Any]) -> list[Weather]: with conn.cursor(row_factory=Weather.row_factory) as cur: reveal_type(cur) # note: Revealed type is "psycopg.cursor.Cursor[Weather]" cur.execute("SELECT * FROM weather") rset = cur.fetchall() reveal_type(rset) # note: Revealed type is "builtins.list[Weather]" return rset
复制
我们可以看到 cur: Cursor[Weather]
值是在“行”类型 Weather
上参数化的,如 Weather.row_factory(cursor: Cursor[Any]) -> Callable[[Sequence[Any]], Weather]
中所声明的(重要的是 Weather
类型声明为行工厂返回的可调用的返回值)。同样, rset: list[Weather]
确实被推断为 Weather
对象的列表。
要了解它是如何工作的,我们先看一下 psycopg 中 Connection
的定义(简化版):
Row = TypeVar("Row", covariant=True) RowFactory = Callable[[Sequence[Any]], Row] # simplified class Connection: @overload def cursor(self) -> Cursor[Tuple[Any, ...]]: ... @overload def cursor(self, *, row_factory: RowFactory[Row]) -> Cursor[Row]: ... def cursor(self, *, row_factory: Optional[RowFactory[Any]] = None) -> Cursor[Any]: # implementation here
复制
方法 Connection.cursor()
被定义为重载,具体取决于 row_factory
参数的值,并返回具有特定类型变量 Row
的 Cursor
对象。换句话说,该 Row
类型变量从行工厂参数绑定到 Cursor
返回值。
然后使用相同的 Row
类型变量来定义一个泛型 Cursor
类,该类又允许 fetch*()
方法返回 Row
值(仍然简化):
class Cursor(Generic[Row]): def fetchone(self) -> Optional[Row]: ... def fetchall(self) -> List[Row]: ...
复制
因此,如果我们回想一下我们的最后一个示例:
>>> cur = conn.cursor(row_factory=Weather.row_factory) >>> cur.execute("SELECT * FROM weather") >>> rows = cur.fetchall()
复制
cur
是Cursor[Weather]
因为Weather.row_factory
具有RowFactory[Weather]
类型,并且,rows
是Weather
列表,因为cur
是Cursor[Weather]
。
这就是对 psycopg 中行工厂的概述。接下来查看 psycopg 文档,尤其是关于静态类型的页面(它解释了 Cursor 在 Row 上的通用性)、主行工厂页面以及提供不错的行工厂生成器帮助程序的行 API 模块。