File size: 5,961 Bytes
ae74af5 0dfd092 ae74af5 df6bf75 ae74af5 43dd6eb ae74af5 0dfd092 ae74af5 43dd6eb ae74af5 43dd6eb ae74af5 0dfd092 ae74af5 749b346 ae74af5 0dfd092 ae74af5 749b346 ae74af5 df6bf75 ae74af5 0dfd092 ae74af5 2850a7e ae74af5 43dd6eb df6bf75 43dd6eb ae74af5 43dd6eb ae74af5 43dd6eb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | from __future__ import annotations
import json
import uuid
from datetime import datetime, UTC
import aiosqlite
from app.models import Job, JobRequest, JobStatus, ProductResult
class Database:
def __init__(self, db_path: str = "aperture.db") -> None:
self.db_path = db_path
self._initialized = False
async def _ensure_init(self) -> None:
if not self._initialized:
await self.init()
async def init(self) -> None:
self._initialized = True
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
request_json TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
progress_json TEXT NOT NULL DEFAULT '{}',
results_json TEXT NOT NULL DEFAULT '[]',
error TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
await db.commit()
# Migration: add email column if missing
cur = await db.execute("PRAGMA table_info(jobs)")
columns = {row[1] for row in await cur.fetchall()}
if "email" not in columns:
await db.execute(
"ALTER TABLE jobs ADD COLUMN email TEXT NOT NULL DEFAULT ''"
)
await db.commit()
async def create_job(self, request: JobRequest) -> str:
await self._ensure_init()
job_id = uuid.uuid4().hex[:12]
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"INSERT INTO jobs (id, request_json, status, created_at, updated_at, email) VALUES (?, ?, ?, ?, ?, ?)",
(job_id, request.model_dump_json(), JobStatus.QUEUED.value, now, now, request.email),
)
await db.commit()
return job_id
async def get_job(self, job_id: str) -> Job | None:
await self._ensure_init()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
row = await cursor.fetchone()
if row is None:
return None
return self._row_to_job(row)
async def update_job_status(
self, job_id: str, status: JobStatus, error: str | None = None
) -> None:
await self._ensure_init()
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"UPDATE jobs SET status = ?, error = ?, updated_at = ? WHERE id = ?",
(status.value, error, now, job_id),
)
await db.commit()
async def update_job_progress(
self, job_id: str, product_id: str, status: str
) -> None:
await self._ensure_init()
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT progress_json FROM jobs WHERE id = ?", (job_id,)
)
row = await cursor.fetchone()
progress = json.loads(row[0])
progress[product_id] = status
await db.execute(
"UPDATE jobs SET progress_json = ?, updated_at = ? WHERE id = ?",
(json.dumps(progress), now, job_id),
)
await db.commit()
async def save_job_result(self, job_id: str, result: ProductResult) -> None:
await self._ensure_init()
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT results_json FROM jobs WHERE id = ?", (job_id,)
)
row = await cursor.fetchone()
results = json.loads(row[0])
from app.models import sanitize_for_json
results.append(sanitize_for_json(result.model_dump()))
await db.execute(
"UPDATE jobs SET results_json = ?, updated_at = ? WHERE id = ?",
(json.dumps(results), now, job_id),
)
await db.commit()
@staticmethod
def _row_to_job(row) -> Job:
return Job(
id=row["id"],
request=JobRequest.model_validate_json(row["request_json"]),
status=JobStatus(row["status"]),
progress=json.loads(row["progress_json"]),
results=[
ProductResult.model_validate(r)
for r in json.loads(row["results_json"])
],
error=row["error"],
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]),
)
async def get_next_queued_job(self) -> Job | None:
await self._ensure_init()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM jobs WHERE status = ? ORDER BY created_at ASC LIMIT 1",
(JobStatus.QUEUED.value,),
)
row = await cursor.fetchone()
if row is None:
return None
return self._row_to_job(row)
async def get_jobs_by_email(self, email: str) -> list[Job]:
await self._ensure_init()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT * FROM jobs WHERE email = ? ORDER BY created_at DESC",
(email,),
)
rows = await cur.fetchall()
return [self._row_to_job(row) for row in rows]
|