| from __future__ import annotations |
|
|
| import json |
| import uuid |
| from datetime import datetime, UTC |
|
|
| import aiosqlite |
|
|
| from app.models import Job, JobRequest, JobStatus, ProductResult |
|
|
|
|
| class Database: |
| def __init__(self, db_path: str = "aperture.db") -> None: |
| self.db_path = db_path |
| self._initialized = False |
|
|
| async def _ensure_init(self) -> None: |
| if not self._initialized: |
| await self.init() |
|
|
| async def init(self) -> None: |
| self._initialized = True |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS jobs ( |
| id TEXT PRIMARY KEY, |
| request_json TEXT NOT NULL, |
| status TEXT NOT NULL DEFAULT 'queued', |
| progress_json TEXT NOT NULL DEFAULT '{}', |
| results_json TEXT NOT NULL DEFAULT '[]', |
| error TEXT, |
| created_at TEXT NOT NULL, |
| updated_at TEXT NOT NULL |
| ) |
| """ |
| ) |
| await db.commit() |
| |
| cur = await db.execute("PRAGMA table_info(jobs)") |
| columns = {row[1] for row in await cur.fetchall()} |
| if "email" not in columns: |
| await db.execute( |
| "ALTER TABLE jobs ADD COLUMN email TEXT NOT NULL DEFAULT ''" |
| ) |
| await db.commit() |
|
|
| async def create_job(self, request: JobRequest) -> str: |
| await self._ensure_init() |
| job_id = uuid.uuid4().hex[:12] |
| now = datetime.now(UTC).isoformat() |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute( |
| "INSERT INTO jobs (id, request_json, status, created_at, updated_at, email) VALUES (?, ?, ?, ?, ?, ?)", |
| (job_id, request.model_dump_json(), JobStatus.QUEUED.value, now, now, request.email), |
| ) |
| await db.commit() |
| return job_id |
|
|
| async def get_job(self, job_id: str) -> Job | None: |
| await self._ensure_init() |
| async with aiosqlite.connect(self.db_path) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) |
| row = await cursor.fetchone() |
| if row is None: |
| return None |
| return self._row_to_job(row) |
|
|
| async def update_job_status( |
| self, job_id: str, status: JobStatus, error: str | None = None |
| ) -> None: |
| await self._ensure_init() |
| now = datetime.now(UTC).isoformat() |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute( |
| "UPDATE jobs SET status = ?, error = ?, updated_at = ? WHERE id = ?", |
| (status.value, error, now, job_id), |
| ) |
| await db.commit() |
|
|
| async def update_job_progress( |
| self, job_id: str, product_id: str, status: str |
| ) -> None: |
| await self._ensure_init() |
| now = datetime.now(UTC).isoformat() |
| async with aiosqlite.connect(self.db_path) as db: |
| cursor = await db.execute( |
| "SELECT progress_json FROM jobs WHERE id = ?", (job_id,) |
| ) |
| row = await cursor.fetchone() |
| progress = json.loads(row[0]) |
| progress[product_id] = status |
| await db.execute( |
| "UPDATE jobs SET progress_json = ?, updated_at = ? WHERE id = ?", |
| (json.dumps(progress), now, job_id), |
| ) |
| await db.commit() |
|
|
| async def save_job_result(self, job_id: str, result: ProductResult) -> None: |
| await self._ensure_init() |
| now = datetime.now(UTC).isoformat() |
| async with aiosqlite.connect(self.db_path) as db: |
| cursor = await db.execute( |
| "SELECT results_json FROM jobs WHERE id = ?", (job_id,) |
| ) |
| row = await cursor.fetchone() |
| results = json.loads(row[0]) |
| from app.models import sanitize_for_json |
| results.append(sanitize_for_json(result.model_dump())) |
| await db.execute( |
| "UPDATE jobs SET results_json = ?, updated_at = ? WHERE id = ?", |
| (json.dumps(results), now, job_id), |
| ) |
| await db.commit() |
|
|
| @staticmethod |
| def _row_to_job(row) -> Job: |
| return Job( |
| id=row["id"], |
| request=JobRequest.model_validate_json(row["request_json"]), |
| status=JobStatus(row["status"]), |
| progress=json.loads(row["progress_json"]), |
| results=[ |
| ProductResult.model_validate(r) |
| for r in json.loads(row["results_json"]) |
| ], |
| error=row["error"], |
| created_at=datetime.fromisoformat(row["created_at"]), |
| updated_at=datetime.fromisoformat(row["updated_at"]), |
| ) |
|
|
| async def get_next_queued_job(self) -> Job | None: |
| await self._ensure_init() |
| async with aiosqlite.connect(self.db_path) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute( |
| "SELECT * FROM jobs WHERE status = ? ORDER BY created_at ASC LIMIT 1", |
| (JobStatus.QUEUED.value,), |
| ) |
| row = await cursor.fetchone() |
| if row is None: |
| return None |
| return self._row_to_job(row) |
|
|
| async def get_jobs_by_email(self, email: str) -> list[Job]: |
| await self._ensure_init() |
| async with aiosqlite.connect(self.db_path) as db: |
| db.row_factory = aiosqlite.Row |
| cur = await db.execute( |
| "SELECT * FROM jobs WHERE email = ? ORDER BY created_at DESC", |
| (email,), |
| ) |
| rows = await cur.fetchall() |
| return [self._row_to_job(row) for row in rows] |
|
|