from __future__ import annotations import json import uuid from datetime import datetime, UTC import aiosqlite from app.models import Job, JobRequest, JobStatus, ProductResult class Database: def __init__(self, db_path: str = "aperture.db") -> None: self.db_path = db_path self._initialized = False async def _ensure_init(self) -> None: if not self._initialized: await self.init() async def init(self) -> None: self._initialized = True async with aiosqlite.connect(self.db_path) as db: await db.execute( """ CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, request_json TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'queued', progress_json TEXT NOT NULL DEFAULT '{}', results_json TEXT NOT NULL DEFAULT '[]', error TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) await db.commit() # Migration: add email column if missing cur = await db.execute("PRAGMA table_info(jobs)") columns = {row[1] for row in await cur.fetchall()} if "email" not in columns: await db.execute( "ALTER TABLE jobs ADD COLUMN email TEXT NOT NULL DEFAULT ''" ) await db.commit() async def create_job(self, request: JobRequest) -> str: await self._ensure_init() job_id = uuid.uuid4().hex[:12] now = datetime.now(UTC).isoformat() async with aiosqlite.connect(self.db_path) as db: await db.execute( "INSERT INTO jobs (id, request_json, status, created_at, updated_at, email) VALUES (?, ?, ?, ?, ?, ?)", (job_id, request.model_dump_json(), JobStatus.QUEUED.value, now, now, request.email), ) await db.commit() return job_id async def get_job(self, job_id: str) -> Job | None: await self._ensure_init() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) row = await cursor.fetchone() if row is None: return None return self._row_to_job(row) async def update_job_status( self, job_id: str, status: JobStatus, error: str | None = None ) -> None: await self._ensure_init() now = datetime.now(UTC).isoformat() async with aiosqlite.connect(self.db_path) as db: await db.execute( "UPDATE jobs SET status = ?, error = ?, updated_at = ? WHERE id = ?", (status.value, error, now, job_id), ) await db.commit() async def update_job_progress( self, job_id: str, product_id: str, status: str ) -> None: await self._ensure_init() now = datetime.now(UTC).isoformat() async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute( "SELECT progress_json FROM jobs WHERE id = ?", (job_id,) ) row = await cursor.fetchone() progress = json.loads(row[0]) progress[product_id] = status await db.execute( "UPDATE jobs SET progress_json = ?, updated_at = ? WHERE id = ?", (json.dumps(progress), now, job_id), ) await db.commit() async def save_job_result(self, job_id: str, result: ProductResult) -> None: await self._ensure_init() now = datetime.now(UTC).isoformat() async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute( "SELECT results_json FROM jobs WHERE id = ?", (job_id,) ) row = await cursor.fetchone() results = json.loads(row[0]) from app.models import sanitize_for_json results.append(sanitize_for_json(result.model_dump())) await db.execute( "UPDATE jobs SET results_json = ?, updated_at = ? WHERE id = ?", (json.dumps(results), now, job_id), ) await db.commit() @staticmethod def _row_to_job(row) -> Job: return Job( id=row["id"], request=JobRequest.model_validate_json(row["request_json"]), status=JobStatus(row["status"]), progress=json.loads(row["progress_json"]), results=[ ProductResult.model_validate(r) for r in json.loads(row["results_json"]) ], error=row["error"], created_at=datetime.fromisoformat(row["created_at"]), updated_at=datetime.fromisoformat(row["updated_at"]), ) async def get_next_queued_job(self) -> Job | None: await self._ensure_init() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM jobs WHERE status = ? ORDER BY created_at ASC LIMIT 1", (JobStatus.QUEUED.value,), ) row = await cursor.fetchone() if row is None: return None return self._row_to_job(row) async def get_jobs_by_email(self, email: str) -> list[Job]: await self._ensure_init() async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cur = await db.execute( "SELECT * FROM jobs WHERE email = ? ORDER BY created_at DESC", (email,), ) rows = await cur.fetchall() return [self._row_to_job(row) for row in rows]