brian4dwell's picture
initi worker working
1c5aca1
"""Database helpers for persisting job metadata."""
from __future__ import annotations
import json
import logging
from contextlib import contextmanager
from typing import Iterator, Mapping
try: # Optional dependency
import psycopg2
from psycopg2.extensions import connection as PGConnection
from psycopg2.extras import Json
except ModuleNotFoundError: # pragma: no cover - exercised when psycopg2 missing
psycopg2 = None # type: ignore[assignment]
PGConnection = None # type: ignore[assignment]
Json = None # type: ignore[assignment]
from .config import WorkerSettings
logger = logging.getLogger(__name__)
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} (
job_id UUID PRIMARY KEY,
job_type TEXT NOT NULL,
scene_id TEXT NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
payload JSONB,
result JSONB,
error TEXT
);
CREATE INDEX IF NOT EXISTS {table_name}_scene_id_idx ON {table_name}(scene_id);
CREATE INDEX IF NOT EXISTS {table_name}_status_idx ON {table_name}(status);
"""
class DatabaseError(RuntimeError):
"""Raised when database operations fail."""
class BaseDatabaseClient:
"""Small interface for database persistence."""
def ensure_schema(self) -> None: # pragma: no cover - noop implementation
raise NotImplementedError
def upsert_job(
self,
*,
job_id: str,
job_type: str,
scene_id: str,
status: str,
payload: Mapping[str, object] | None = None,
result: Mapping[str, object] | None = None,
error: str | None = None,
) -> None:
raise NotImplementedError
def close(self) -> None: # pragma: no cover - noop implementation
pass
class NoopDatabaseClient(BaseDatabaseClient):
"""Fallback when no database configuration is provided."""
def ensure_schema(self) -> None: # pragma: no cover - nothing to do
logger.debug("Database is disabled; skipping schema creation")
def upsert_job(
self,
*,
job_id: str,
job_type: str,
scene_id: str,
status: str,
payload: Mapping[str, object] | None = None,
result: Mapping[str, object] | None = None,
error: str | None = None,
) -> None:
logger.debug(
"Noop DB: job_id=%s job_type=%s scene_id=%s status=%s", job_id, job_type, scene_id, status
)
class DatabaseClient(BaseDatabaseClient):
"""Postgres implementation using psycopg2."""
def __init__(self, settings: WorkerSettings):
if psycopg2 is None: # pragma: no cover - optional dependency guard
raise DatabaseError("psycopg2-binary is required for database support")
self.settings = settings
@contextmanager
def _connect(self) -> Iterator[PGConnection]:
conn = psycopg2.connect(self.settings.db_dsn) # type: ignore[arg-type]
try:
conn.autocommit = True
yield conn
finally:
conn.close()
def ensure_schema(self) -> None:
table_name = self.settings.job_table
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(SCHEMA_SQL.format(table_name=table_name))
def upsert_job(
self,
*,
job_id: str,
job_type: str,
scene_id: str,
status: str,
payload: Mapping[str, object] | None = None,
result: Mapping[str, object] | None = None,
error: str | None = None,
) -> None:
table = self.settings.job_table
payload_json = Json(payload) if payload is not None and Json is not None else None
result_json = Json(result) if result is not None and Json is not None else None
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(
f"""
INSERT INTO {table} (job_id, job_type, scene_id, status, payload, result, error, started_at, completed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s,
CASE WHEN %s = 'started' THEN now() ELSE NULL END,
CASE WHEN %s IN ('finished', 'failed') THEN now() ELSE NULL END)
ON CONFLICT (job_id)
DO UPDATE SET
job_type = EXCLUDED.job_type,
scene_id = EXCLUDED.scene_id,
status = EXCLUDED.status,
payload = COALESCE(EXCLUDED.payload, {table}.payload),
result = COALESCE(EXCLUDED.result, {table}.result),
error = EXCLUDED.error,
started_at = COALESCE({table}.started_at, EXCLUDED.started_at),
completed_at = COALESCE({table}.completed_at, EXCLUDED.completed_at)
""",
(
job_id,
job_type,
scene_id,
status,
payload_json,
result_json,
error,
status,
status,
),
)
def create_database_client(settings: WorkerSettings) -> BaseDatabaseClient:
"""Factory that returns a database client or a noop fallback."""
if not settings.db_dsn:
return NoopDatabaseClient()
return DatabaseClient(settings)