Citus 12.1 is out! Now with PG16 Support. Read all about it in Naisila’s 12.1 blog post. 💥
pg_cron is an open source PostgreSQL extension that provides a cron-based scheduler to periodically run SQL commands. Almost every managed PostgreSQL service supports pg_cron and it has become a standard tool for many PostgreSQL users. Since Citus has been my full-time job, pg_cron has always been a side project for me, and so I tried to architect it for simplicity, reliability, and low maintenance. Of course, with many users there is a long list of feature requests, and with the help of the Postgres community pg_cron keeps becoming more and more capable over time.
We recently added PostgreSQL 16 support (in version 1.6), but perhaps the most exciting feature added to pg_cron in the past year (in version 1.5) is the ability to schedule a job every few seconds. I shunned this feature idea for a while, because (a) it is not something regular cron can do; and (b) any issue in pg_cron would get much more severe if it were to happen every few seconds. However, by now pg_cron is reasonably battle-tested and second-granularity jobs had become the most popular pg_cron feature request by far.
Being able to run second-granularity jobs enables you to react quickly to incoming events in your database. Some example use cases include:
Since pg_cron 1.5, you can easily schedule jobs that run every 1-59 seconds:
-- Call my procedure every 10 seconds SELECT cron.schedule('call-my-agent', '10 seconds', 'call my_agent()')
The reason for not allowing intervals higher than 59 seconds is that existing cron schedules already allow for running a job every minute, and that logic more reliably deals with clock jumps. The reason for not allowing lower intervals (e.g. milliseconds) is that this is a different type of workload which might cause issues. Hence, 1-59 seemed like a safe range for a low maintenance, mission-critical project.
Tip: Be careful that every job run is still logged in
cron.job_run_details by default, which can grow very large after months of running jobs every few seconds. You could decide to disable the
cron.log_run setting if you expect a very high volume. It is recommended that you at least set up a pg_cron job to clean up after pg_cron:
-- Delete old cron.job_run_details records of the current user every day at noon SELECT cron.schedule('delete-job-run-details', '0 12 * * *', $$DELETE FROM cron.job_run_details WHERE end_time < now() - interval '3 days'$$);
Second-granularity scheduling enables you to use pg_cron as a foundational scheduling primitive on top of which you can build more sophisticated schedulers, without having to modify pg_cron itself.
A common request from pg_cron users is the ability to schedule one-off commands, which would be helpful for moving a large task into the background or scheduling many separate operations at once. For example, you may want to load batches of data from another system, apply transformations, perform an operation on many different tables, and more. However, that also brings in many questions around failure handling that pg_cron is not meant to answer. Instead, you could build such infrastructure on top of pg_cron.
Below, we give a basic (public domain) implementation of a job queue executor for one-off jobs in PL/pgSQL on top of pg_cron:
-- table to track jobs to be executed immediately CREATE TABLE job_queue ( jobid bigserial primary key, command text not null, search_path text not null default 'pg_catalog', attempts int not null default 0, max_attempts int not null default 5, last_attempt timestamptz, last_error text ); -- table to track job failures CREATE TABLE job_errors ( jobid bigint not null, command text not null, message text not null, start_time timestamptz not null, end_time timestamptz not null ); CREATE OR REPLACE FUNCTION schedule_once(p_command text) RETURNS void LANGUAGE plpgsql AS $fn$ BEGIN INSERT INTO job_queue (command, search_path) VALUES (p_command, current_setting('search_path')); END; $fn$; CREATE OR REPLACE PROCEDURE run_jobs() LANGUAGE plpgsql AS $fn$ DECLARE v_ctid tid; v_jobid bigint; v_command text; v_search_path text; v_message text; v_success bool; v_attempts int; v_max_attempts int; v_start_time timestamptz; v_end_time timestamptz; BEGIN LOOP -- get a job from the queue SELECT ctid, jobid, command, search_path, attempts + 1, max_attempts INTO v_ctid, v_jobid, v_command, v_search_path, v_attempts, v_max_attempts FROM job_queue WHERE last_attempt is null OR last_attempt < now() - interval '10 seconds' LIMIT 1 FOR UPDATE SKIP LOCKED; IF NOT FOUND THEN -- no jobs found, exit, but will resume soon EXIT; END IF; v_start_time := now(); BEGIN -- Execute the command SET LOCAL search_path TO v_search_path; EXECUTE v_command; RESET search_path; v_message := 'Success'; v_success := true; EXCEPTION WHEN others THEN -- Command failed, log and store the error message RAISE WARNING 'scheduled job failed: %', SQLERRM; v_message := SQLERRM; v_success := false; END; v_end_time := now(); IF v_success OR v_attempts >= v_max_attempts THEN -- delete the job if it was successful or we did more than max attempts DELETE FROM job_queue WHERE ctid = v_ctid; IF NOT v_success THEN -- we currently only log in case of error to minimize redundant inserts INSERT INTO job_errors (jobid, command, message, start_time, end_time) VALUES (v_jobid, v_command, v_message, v_start_time, now()); END IF; ELSE -- update the attempt number and try again later UPDATE job_queue SET attempts = v_attempts, last_attempt = now(), last_error = v_message WHERE ctid = v_ctid; END IF; COMMIT; END LOOP; END; $fn$; -- Run up to 4 jobs concurrently via pg_cron SELECT cron.schedule('job-runner-1', '5 seconds', 'call run_jobs()'); SELECT cron.schedule('job-runner-2', '5 seconds', 'call run_jobs()'); SELECT cron.schedule('job-runner-3', '5 seconds', 'call run_jobs()'); SELECT cron.schedule('job-runner-4', '5 seconds', 'call run_jobs()');
With the job queue set up, you can now schedule one-off jobs which will typically start within 5 seconds and will finish even if you disconnect:
-- start a long-running job in the background: select schedule_once('create table random as select random() from generate_series(1,10000000) s');
The system can run multiple jobs in parallel, and once active it will keep running jobs in quick succession without the overhead of spawning a new process, which enables it to scale to a large number of jobs. The
run_jobs procedure will also retry each job up to 5 times, with at least 10 seconds between runs. Permanent errors are logged into the
Tip: Keep in mind that your
cron.job_run_details table will fill up rapidly when using this pattern. Consider disabling the
cron.log_run setting (to skip
cron.job_run_details) and/or the
cron.log_statement setting (to skip PostgreSQL log) in your settings when using this pattern.
There are countless ways in which this job queue pattern can be used. An interesting example for Citus database users could be to manage a large number of schemas when using schema-based sharding. For instance, if you want to add a new column in many schemas:
-- add a column to a table in all distributed schemas: select schedule_once(format('alter table %I.data add column extra jsonb', schema_name)) from citus_schemas;
By performing an ALTER TABLE operation this way instead of iterating over the Postgres schemas, you can avoid running a long-running transaction that holds aggressive locks and can effectively parallelize the work.
Hopefully this post gives you other ideas for how you can use pg_cron to automate your PostgreSQL workflows. If you want to get started, the main documentation for pg_cron is in the pg_cron GitHub repo.