"""Database helpers for persisting job metadata.""" from __future__ import annotations import json import logging from contextlib import contextmanager from typing import Iterator, Mapping try: # Optional dependency import psycopg2 from psycopg2.extensions import connection as PGConnection from psycopg2.extras import Json except ModuleNotFoundError: # pragma: no cover - exercised when psycopg2 missing psycopg2 = None # type: ignore[assignment] PGConnection = None # type: ignore[assignment] Json = None # type: ignore[assignment] from .config import WorkerSettings logger = logging.getLogger(__name__) SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS {table_name} ( job_id UUID PRIMARY KEY, job_type TEXT NOT NULL, scene_id TEXT NOT NULL, status TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, payload JSONB, result JSONB, error TEXT ); CREATE INDEX IF NOT EXISTS {table_name}_scene_id_idx ON {table_name}(scene_id); CREATE INDEX IF NOT EXISTS {table_name}_status_idx ON {table_name}(status); """ class DatabaseError(RuntimeError): """Raised when database operations fail.""" class BaseDatabaseClient: """Small interface for database persistence.""" def ensure_schema(self) -> None: # pragma: no cover - noop implementation raise NotImplementedError def upsert_job( self, *, job_id: str, job_type: str, scene_id: str, status: str, payload: Mapping[str, object] | None = None, result: Mapping[str, object] | None = None, error: str | None = None, ) -> None: raise NotImplementedError def close(self) -> None: # pragma: no cover - noop implementation pass class NoopDatabaseClient(BaseDatabaseClient): """Fallback when no database configuration is provided.""" def ensure_schema(self) -> None: # pragma: no cover - nothing to do logger.debug("Database is disabled; skipping schema creation") def upsert_job( self, *, job_id: str, job_type: str, scene_id: str, status: str, payload: Mapping[str, object] | None = None, result: Mapping[str, object] | None = None, error: str | None = None, ) -> None: logger.debug( "Noop DB: job_id=%s job_type=%s scene_id=%s status=%s", job_id, job_type, scene_id, status ) class DatabaseClient(BaseDatabaseClient): """Postgres implementation using psycopg2.""" def __init__(self, settings: WorkerSettings): if psycopg2 is None: # pragma: no cover - optional dependency guard raise DatabaseError("psycopg2-binary is required for database support") self.settings = settings @contextmanager def _connect(self) -> Iterator[PGConnection]: conn = psycopg2.connect(self.settings.db_dsn) # type: ignore[arg-type] try: conn.autocommit = True yield conn finally: conn.close() def ensure_schema(self) -> None: table_name = self.settings.job_table with self._connect() as conn: with conn.cursor() as cur: cur.execute(SCHEMA_SQL.format(table_name=table_name)) def upsert_job( self, *, job_id: str, job_type: str, scene_id: str, status: str, payload: Mapping[str, object] | None = None, result: Mapping[str, object] | None = None, error: str | None = None, ) -> None: table = self.settings.job_table payload_json = Json(payload) if payload is not None and Json is not None else None result_json = Json(result) if result is not None and Json is not None else None with self._connect() as conn: with conn.cursor() as cur: cur.execute( f""" INSERT INTO {table} (job_id, job_type, scene_id, status, payload, result, error, started_at, completed_at) VALUES (%s, %s, %s, %s, %s, %s, %s, CASE WHEN %s = 'started' THEN now() ELSE NULL END, CASE WHEN %s IN ('finished', 'failed') THEN now() ELSE NULL END) ON CONFLICT (job_id) DO UPDATE SET job_type = EXCLUDED.job_type, scene_id = EXCLUDED.scene_id, status = EXCLUDED.status, payload = COALESCE(EXCLUDED.payload, {table}.payload), result = COALESCE(EXCLUDED.result, {table}.result), error = EXCLUDED.error, started_at = COALESCE({table}.started_at, EXCLUDED.started_at), completed_at = COALESCE({table}.completed_at, EXCLUDED.completed_at) """, ( job_id, job_type, scene_id, status, payload_json, result_json, error, status, status, ), ) def create_database_client(settings: WorkerSettings) -> BaseDatabaseClient: """Factory that returns a database client or a noop fallback.""" if not settings.db_dsn: return NoopDatabaseClient() return DatabaseClient(settings)