Spaces:
Configuration error
Configuration error
| """Database helpers for persisting job metadata.""" | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from contextlib import contextmanager | |
| from typing import Iterator, Mapping | |
| try: # Optional dependency | |
| import psycopg2 | |
| from psycopg2.extensions import connection as PGConnection | |
| from psycopg2.extras import Json | |
| except ModuleNotFoundError: # pragma: no cover - exercised when psycopg2 missing | |
| psycopg2 = None # type: ignore[assignment] | |
| PGConnection = None # type: ignore[assignment] | |
| Json = None # type: ignore[assignment] | |
| from .config import WorkerSettings | |
| logger = logging.getLogger(__name__) | |
| SCHEMA_SQL = """ | |
| CREATE TABLE IF NOT EXISTS {table_name} ( | |
| job_id UUID PRIMARY KEY, | |
| job_type TEXT NOT NULL, | |
| scene_id TEXT NOT NULL, | |
| status TEXT NOT NULL, | |
| created_at TIMESTAMPTZ NOT NULL DEFAULT now(), | |
| started_at TIMESTAMPTZ, | |
| completed_at TIMESTAMPTZ, | |
| payload JSONB, | |
| result JSONB, | |
| error TEXT | |
| ); | |
| CREATE INDEX IF NOT EXISTS {table_name}_scene_id_idx ON {table_name}(scene_id); | |
| CREATE INDEX IF NOT EXISTS {table_name}_status_idx ON {table_name}(status); | |
| """ | |
| class DatabaseError(RuntimeError): | |
| """Raised when database operations fail.""" | |
| class BaseDatabaseClient: | |
| """Small interface for database persistence.""" | |
| def ensure_schema(self) -> None: # pragma: no cover - noop implementation | |
| raise NotImplementedError | |
| def upsert_job( | |
| self, | |
| *, | |
| job_id: str, | |
| job_type: str, | |
| scene_id: str, | |
| status: str, | |
| payload: Mapping[str, object] | None = None, | |
| result: Mapping[str, object] | None = None, | |
| error: str | None = None, | |
| ) -> None: | |
| raise NotImplementedError | |
| def close(self) -> None: # pragma: no cover - noop implementation | |
| pass | |
| class NoopDatabaseClient(BaseDatabaseClient): | |
| """Fallback when no database configuration is provided.""" | |
| def ensure_schema(self) -> None: # pragma: no cover - nothing to do | |
| logger.debug("Database is disabled; skipping schema creation") | |
| def upsert_job( | |
| self, | |
| *, | |
| job_id: str, | |
| job_type: str, | |
| scene_id: str, | |
| status: str, | |
| payload: Mapping[str, object] | None = None, | |
| result: Mapping[str, object] | None = None, | |
| error: str | None = None, | |
| ) -> None: | |
| logger.debug( | |
| "Noop DB: job_id=%s job_type=%s scene_id=%s status=%s", job_id, job_type, scene_id, status | |
| ) | |
| class DatabaseClient(BaseDatabaseClient): | |
| """Postgres implementation using psycopg2.""" | |
| def __init__(self, settings: WorkerSettings): | |
| if psycopg2 is None: # pragma: no cover - optional dependency guard | |
| raise DatabaseError("psycopg2-binary is required for database support") | |
| self.settings = settings | |
| def _connect(self) -> Iterator[PGConnection]: | |
| conn = psycopg2.connect(self.settings.db_dsn) # type: ignore[arg-type] | |
| try: | |
| conn.autocommit = True | |
| yield conn | |
| finally: | |
| conn.close() | |
| def ensure_schema(self) -> None: | |
| table_name = self.settings.job_table | |
| with self._connect() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(SCHEMA_SQL.format(table_name=table_name)) | |
| def upsert_job( | |
| self, | |
| *, | |
| job_id: str, | |
| job_type: str, | |
| scene_id: str, | |
| status: str, | |
| payload: Mapping[str, object] | None = None, | |
| result: Mapping[str, object] | None = None, | |
| error: str | None = None, | |
| ) -> None: | |
| table = self.settings.job_table | |
| payload_json = Json(payload) if payload is not None and Json is not None else None | |
| result_json = Json(result) if result is not None and Json is not None else None | |
| with self._connect() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| f""" | |
| INSERT INTO {table} (job_id, job_type, scene_id, status, payload, result, error, started_at, completed_at) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s, | |
| CASE WHEN %s = 'started' THEN now() ELSE NULL END, | |
| CASE WHEN %s IN ('finished', 'failed') THEN now() ELSE NULL END) | |
| ON CONFLICT (job_id) | |
| DO UPDATE SET | |
| job_type = EXCLUDED.job_type, | |
| scene_id = EXCLUDED.scene_id, | |
| status = EXCLUDED.status, | |
| payload = COALESCE(EXCLUDED.payload, {table}.payload), | |
| result = COALESCE(EXCLUDED.result, {table}.result), | |
| error = EXCLUDED.error, | |
| started_at = COALESCE({table}.started_at, EXCLUDED.started_at), | |
| completed_at = COALESCE({table}.completed_at, EXCLUDED.completed_at) | |
| """, | |
| ( | |
| job_id, | |
| job_type, | |
| scene_id, | |
| status, | |
| payload_json, | |
| result_json, | |
| error, | |
| status, | |
| status, | |
| ), | |
| ) | |
| def create_database_client(settings: WorkerSettings) -> BaseDatabaseClient: | |
| """Factory that returns a database client or a noop fallback.""" | |
| if not settings.db_dsn: | |
| return NoopDatabaseClient() | |
| return DatabaseClient(settings) | |