@pgpm/jobs
Core job system for background task processing in PostgreSQL
Install Package
pgpm install @pgpm/jobsDocumentation
@pgpm/jobs
Core job system for background task processing in PostgreSQL.
Overview
@pgpm/jobs provides the core abstractions and interfaces for a PostgreSQL-based background job processing system. This package defines the schema, tables, and procedures for job queue management, scheduled jobs, and worker coordination. It serves as the foundation for building reliable background task processing systems entirely within PostgreSQL.
Features
- Job Queue Schema: Core
app_jobsschema with jobs, scheduled_jobs, and job_queues tables - Job Management Procedures: Functions for adding, retrieving, completing, and failing jobs
- Scheduled Jobs: Support for cron-style and rule-based job scheduling
- Worker Coordination: Job locking and worker management with expiry
- Priority Queue: Process jobs by priority, run time, and insertion order
- Automatic Retries: Configurable retry attempts with failure tracking
- Job Keys: Upsert semantics for idempotent job creation
- Trigger Functions: Automatic job creation from table changes
Installation
If you have pgpm installed:
pgpm install @pgpm/jobs
pgpm deploy
This is a quick way to get started. The sections below provide more detailed installation options.
Prerequisites
# Install pgpm CLI
npm install -g pgpm
# Start local Postgres (via Docker) and export env vars
pgpm docker start
eval "$(pgpm env)"
Tip: Already running Postgres? Skip the Docker step and just export your
PG*environment variables.
Add to an Existing Package
# 1. Install the package
pgpm install @pgpm/jobs
# 2. Deploy locally
pgpm deploy
Add to a New Project
# 1. Create a workspace
pgpm init workspace
# 2. Create your first module
cd my-workspace
pgpm init
# 3. Install a package
cd packages/my-module
pgpm install @pgpm/jobs
# 4. Deploy everything
pgpm deploy --createdb --database mydb1
Core Schema
The app_jobs schema provides three main tables:
jobs Table
Stores active jobs awaiting processing:
id: Unique job identifierdatabase_id: Database/tenant identifiertask_identifier: Job type/handler namepayload: JSON data for the jobpriority: Lower numbers = higher priorityrun_at: Scheduled execution timeattempts: Current attempt countmax_attempts: Maximum retry attemptslocked_by: Worker ID holding the locklocked_at: Lock timestampkey: Optional unique key for upsert
scheduled_jobs Table
Stores recurring job definitions:
id: Unique identifierdatabase_id: Database/tenant identifiertask_identifier: Job type/handler namepayload: JSON data templateschedule_info: Cron or rule-based schedulepriority: Job prioritymax_attempts: Maximum retries
job_queues Table
Tracks queue statistics and locking:
queue_name: Queue identifierjob_count: Number of jobs in queuelocked_by: Worker ID holding queue locklocked_at: Lock timestamp
Usage
Adding Jobs
-- Add a simple job
SELECT app_jobs.add_job(
db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
identifier := 'send_email',
payload := '{"to": "user@example.com"}'::json
);
-- Add a delayed job with priority
SELECT app_jobs.add_job(
db_id := '5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
identifier := 'generate_report',
payload := '{"report_id": 123}'::json,
run_at := now() + interval '1 hour',
priority := 10
);
Retrieving Jobs
-- Worker fetches next job
SELECT * FROM app_jobs.get_job(
worker_id := 'worker-1',
task_identifiers := ARRAY['send_email', 'generate_report']
);
Completing Jobs
-- Mark job as successfully completed
SELECT app_jobs.complete_job(
worker_id := 'worker-1',
job_id := 123
);
Failing Jobs
-- Mark job as failed (will retry if attempts remain)
SELECT app_jobs.fail_job(
worker_id := 'worker-1',
job_id := 123,
error_message := 'Connection timeout'
);
Scheduled Jobs
-- Create a scheduled job
INSERT INTO app_jobs.scheduled_jobs (
database_id,
task_identifier,
schedule_info
) VALUES (
'5b720132-17d5-424d-9bcb-ee7b17c13d43'::uuid,
'cleanup_task',
'{"hour": [2], "minute": [0]}'::json
);
-- Execute a scheduled job
SELECT * FROM app_jobs.run_scheduled_job(1);
Trigger Functions
The package includes trigger functions for automatic job creation:
tg_add_job_with_row_id
Creates a job when a row is inserted, using the row's ID in the payload.
CREATE TRIGGER auto_process
AFTER INSERT ON my_table
FOR EACH ROW
EXECUTE FUNCTION app_jobs.tg_add_job_with_row_id(
'database-uuid',
'process_record',
'id'
);
tg_add_job_with_row
Creates a job with the entire row as JSON payload.
CREATE TRIGGER auto_process
AFTER INSERT ON my_table
FOR EACH ROW
EXECUTE FUNCTION app_jobs.tg_add_job_with_row(
'database-uuid',
'process_record'
);
tg_add_job_with_fields
Creates a job with specific fields from the row.
CREATE TRIGGER auto_process
AFTER INSERT ON my_table
FOR EACH ROW
EXECUTE FUNCTION app_jobs.tg_add_job_with_fields(
'database-uuid',
'process_record',
'field1',
'field2',
'field3'
);
Functions Reference
app_jobs.add_job(...)
Adds a new job to the queue.
Parameters:
db_id(uuid): Database identifieridentifier(text): Job typepayload(json): Job datajob_key(text): Optional unique keyqueue_name(text): Optional queue namerun_at(timestamptz): Execution timemax_attempts(integer): Max retriespriority(integer): Job priority
app_jobs.get_job(...)
Retrieves and locks the next available job.
Parameters:
worker_id(text): Worker identifiertask_identifiers(text[]): Job types to fetchjob_expiry(interval): Lock expiry duration
app_jobs.complete_job(...)
Marks a job as completed.
Parameters:
worker_id(text): Worker identifierjob_id(bigint): Job identifier
app_jobs.fail_job(...)
Marks a job as failed.
Parameters:
worker_id(text): Worker identifierjob_id(bigint): Job identifiererror_message(text): Error description
app_jobs.add_scheduled_job(...)
Creates a scheduled job.
Parameters:
db_id(uuid): Database identifieridentifier(text): Job typepayload(json): Job dataschedule_info(json): Schedule configuration- Additional optional parameters
app_jobs.run_scheduled_job(...)
Executes a scheduled job.
Parameters:
scheduled_job_id(bigint): Scheduled job identifier
Dependencies
@pgpm/default-roles: Role-based access control@pgpm/verify: Verification utilities
Testing
pnpm test
Related Tooling
- pgpm: 🖥️ PostgreSQL Package Manager for modular Postgres development. Works with database workspaces, scaffolding, migrations, seeding, and installing database packages.
- pgsql-test: 📊 Isolated testing environments with per-test transaction rollbacks—ideal for integration tests, complex migrations, and RLS simulation.
- supabase-test: 🧪 Supabase-native test harness preconfigured for the local Supabase stack—per-test rollbacks, JWT/role context helpers, and CI/GitHub Actions ready.
- graphile-test: 🔐 Authentication mocking for Graphile-focused test helpers and emulating row-level security contexts.
- pgsql-parser: 🔄 SQL conversion engine that interprets and converts PostgreSQL syntax.
- libpg-query-node: 🌉 Node.js bindings for
libpg_query, converting SQL into parse trees. - pg-proto-parser: 📦 Protobuf parser for parsing PostgreSQL Protocol Buffers definitions to generate TypeScript interfaces, utility functions, and JSON mappings for enums.
Disclaimer
AS DESCRIBED IN THE LICENSES, THE SOFTWARE IS PROVIDED "AS IS", AT YOUR OWN RISK, AND WITHOUT WARRANTIES OF ANY KIND.
No developer or entity involved in creating this software will be liable for any claims or damages whatsoever associated with your use, inability to use, or your interaction with other users of the code, including any direct, indirect, incidental, special, exemplary, punitive or consequential damages, or loss of profits, cryptocurrencies, tokens, or anything else of value.
Install pgpm CLI
npm install -g pgpm# Start local Postgres (via Docker)
pgpm docker start
eval "$(pgpm env)"Workspace Setup
# 1. Create a workspace
pgpm init workspace
cd my-app
# 2. Create your first module
pgpm init
cd packages/your-module
# 3. Install a package
pgpm install @pgpm/jobs
# 4. Deploy everything
pgpm deploy --createdb --database mydb1