
原文链接:
https://www.citusdata.com/blog/2023/10/26/making-postgres-tick-new-features-in-pg-cron/
作者:Marco Slot
2023年10月26日
pg_cron 是一个开源的PostgreSQL扩展,提供了基于 cron 的调度器,用于定期运行 SQL 命令。几乎每个管理 PostgreSQL 的服务都支持 pg_cron,并且它已经成为许多 PostgreSQL 用户的标准工具。自从我在Citus全职工作以来,pg_cron 一直是我的副业项目,因此我试图使其结构简单、可靠且易于维护。当然了,随着用户数量的增加,功能需求列表也在不断延长。在 Postgres 社区的帮助下,pg_cron 随着时间的推移越来越强大。
我们最近在 1.6 版本中增加了对 PostgreSQL 16 的支持,但可能过去一年中在 pg_cron 中添加的最令人兴奋的功能(在 1.5 版本中)是能够每隔几秒钟安排一项任务。我曾经对这个功能的想法持保留态度,因为(a)这不是常规 cron 能做的事情;并且(b)如果每隔几秒钟就发生一次,pg_cron 中的任何问题都会变得更为严重。然而,到目前为止,pg_cron 已经经过了相当充分的实战测试,而且按秒计划的任务已经成为迄今为止最受欢迎的 pg_cron 功能需求。
每隔几秒钟安排一项任务
能够执行按秒计划的任务使你能够快速响应数据库中的新事件。一些示例用例包括:
更新实时洞察的聚合
检测异常(例如:同一 IP 的多次请求)
轮询外部源(例如:频繁地从远程服务器同步)
实现更为复杂的任务调度工作流
自 pg_cron 1.5 版本起,你可以轻松安排每 1-59 秒执行的任务:
--每10秒调用我的过程
SELECT cron.schedule('call-my-agent','10 seconds','call my_agent()')
不允许间隔时间超过 59 秒的原因是,现有的 cron 计划已经允许每分钟运行一次作业,而且这种逻辑更可靠地处理时钟跳跃。不允许更低的间隔时间(例如,毫秒)的原因是,这是一种可能导致问题的不同类型的工作负载。因此,1-59 看起来是一个低维护、关键任务项目的安全范围。
提示:请注意,每个作业运行仍然默认记录在 cron.job_run_details
中,几个月后,每几秒运行一次作业,这可能会变得非常大。如果你预计会有非常高的数量,你可以选择禁用 cron.log_run
设置。建议你至少设置一个 pg_cron 作业来清理 pg_cron 后的内容:
--每天中午删除当前用户的旧 cron.job_run_details 记录
SELECT cron.schedule('delete-job-run-details','0 12 * * *', $$DELETE FROM cron.job_run_details WHERE end_time < now()- interval '3 days'$$);
在 PostgreSQL 中可扩展的并行任务队列执行器
秒级粒度的调度使你能够使用 pg_cron 作为基础调度原语,在其基础上构建更为复杂的调度器,而无需修改 pg_cron 本身。
pg_cron 用户的一个常见请求是能够调度一次性命令,这对于将大任务移到后台或一次安排多个独立操作很有帮助。例如,你可能想从另一个系统中加载数据批次,应用转换,对许多不同的表执行操作等等。然而,这也带来了许多围绕失败处理的问题,而 pg_cron 本不是为解决这些问题而设计的。相反,你可以在 pg_cron 的基础上构建这样的基础设施。
下面,我们提供了一个基本的(公有领域)实现,用于在 pg_cron 之上的 PL/pgSQL 中执行一次性工作的任务队列执行器:
--用于跟踪要立即执行的作业的表
CREATE TABLE job_queue (
jobid bigserial primary key,
command text notnull,
search_path text notnulldefault'pg_catalog',
attempts intnotnulldefault0,
max_attempts intnotnulldefault5,
last_attempt timestamptz,
last_error text
);
--用于跟踪作业失败的表
CREATE TABLE job_errors (
jobid bigint notnull,
command text notnull,
message text notnull,
start_time timestamptz notnull,
end_time timestamptz notnull
);
CREATE OR REPLACE FUNCTION schedule_once(p_command text)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO job_queue (command, search_path)
VALUES (p_command, current_setting('search_path'));
END; $fn$;
CREATE OR REPLACE PROCEDURE run_jobs()
LANGUAGE plpgsql AS $fn$
DECLARE
v_ctid tid;
v_jobid bigint;
v_command text;
v_search_path text;
v_message text;
v_success bool;
v_attempts int;
v_max_attempts int;
v_start_time timestamptz;
v_end_time timestamptz;
BEGIN
LOOP
--从队列中获取作业
SELECT ctid, jobid, command, search_path, attempts +1, max_attempts
INTO v_ctid, v_jobid, v_command, v_search_path, v_attempts, v_max_attempts
FROM job_queue
WHERE last_attempt isnull OR last_attempt < now()- interval '10 seconds'
LIMIT 1 FOR UPDATE SKIP LOCKED;
IF NOT FOUND THEN
--找不到作业,退出,但很快会恢复
EXIT;
END IF;
v_start_time := now();
BEGIN
--执行命令
SET LOCAL search_path TO v_search_path;
EXECUTE v_command;
RESET search_path;
v_message :='Success';
v_success :=true;
EXCEPTION WHEN others THEN
--命令失败,记录并存储错误消息
RAISE WARNING 'scheduled job failed: %', SQLERRM;
v_message := SQLERRM;
v_success :=false;
END;
v_end_time := now();
IF v_success OR v_attempts >= v_max_attempts THEN
--如果成功或我们尝试次数超过最大尝试次数,则删除作业
DELETE FROM job_queue WHERE ctid = v_ctid;
IF NOT v_success THEN
--目前我们只在出错的情况下记录,以减少冗余插入
INSERT INTO job_errors (jobid, command, message, start_time, end_time)
VALUES (v_jobid, v_command, v_message, v_start_time, now());
END IF;
ELSE
--更新尝试次数并稍后重试
UPDATE job_queue
SET attempts = v_attempts, last_attempt = now(), last_error = v_message
WHERE ctid = v_ctid;
END IF;
COMMIT;
END LOOP;
END; $fn$;
--通过pg_cron并行运行多达4个作业
SELECT cron.schedule('job-runner-1','5 seconds','call run_jobs()');
SELECT cron.schedule('job-runner-2','5 seconds','call run_jobs()');
SELECT cron.schedule('job-runner-3','5 seconds','call run_jobs()');
SELECT cron.schedule('job-runner-4','5 seconds','call run_jobs()');
在设置了任务队列后,现在你可以安排一次性的任务,这些任务通常会在 5 秒内开始,并且即使你断开连接,它们也会完成执行:
--在后台启动长时间运行的作业:
SELECT schedule_once('create table random as select random() from generate_series(1,10000000) s');
系统可以并行运行多个任务,一旦激活,它会快速连续地运行任务,而无需产生新的进程开销,使其能够扩展到大量的任务。run_jobs
过程还会尝试每个任务最多 5 次,每次运行之间至少间隔 10 秒。永久性错误会被记录到 job_errors
表中。
提示:请记住,当使用这种模式时,你的 cron.job_run_details
表会快速填满。考虑在设置中禁用 cron.log_run
设置(以跳过 cron.job_run_details
)和/或 cron.log_statement
设置(以跳过 PostgreSQL 日志)。
使用 pg_cron 在任务队列模式中的示例
有无数种方式可以使用这种任务队列模式。对于 Citus 数据库用户来说,一个有趣的示例可能是在使用 基于模式的分片 时管理大量的模式。例如,如果你想在许多模式中添加一个新列:
--在所有分布式模式中的表中添加列:
select schedule_once(format('alter table %I.data add column extra jsonb', schema_name))from citus_schemas;
通过这种方式执行 ALTER TABLE 操作,而不是遍历 Postgres 的模式,你可以避免运行一个持有激进锁定的长时间事务,并能有效地并行化工作。
用 pg_cron 愉快地安排任务吧!
希望这篇文章能为你提供其他想法,展示如何使用 pg_cron 来自动化你的 PostgreSQL 工作流。如果你想开始使用,pg_cron 的主要文档位于 pg_cron GitHub 仓库。

由 Marco Slot 撰写
前 Microsoft Citus 数据库引擎的主要工程师。曾在 Postgres Conf EU、PostgresOpen、pgDay Paris、Hello World、SIGMOD 以及许多聚会上发表演讲。Citus Con:一个针对 Postgres 的活动的演讲选择团队成员。分布式系统博士。热爱山地徒步。






