File size: 5,961 Bytes
ae74af5
 
 
 
0dfd092
ae74af5
 
 
df6bf75
ae74af5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43dd6eb
 
 
 
 
 
 
 
ae74af5
 
 
 
0dfd092
ae74af5
 
43dd6eb
 
ae74af5
 
 
 
 
 
 
 
 
 
 
 
43dd6eb
ae74af5
 
 
 
 
0dfd092
ae74af5
 
 
 
 
 
 
 
749b346
ae74af5
 
0dfd092
ae74af5
 
 
 
 
 
749b346
ae74af5
 
 
 
 
 
df6bf75
ae74af5
0dfd092
ae74af5
 
 
 
 
 
2850a7e
 
ae74af5
 
 
 
 
 
43dd6eb
 
 
 
 
 
 
 
df6bf75
43dd6eb
 
 
 
 
 
 
ae74af5
 
 
 
 
 
 
 
 
 
 
43dd6eb
 
 
 
 
 
 
 
 
ae74af5
43dd6eb
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from __future__ import annotations

import json
import uuid
from datetime import datetime, UTC

import aiosqlite

from app.models import Job, JobRequest, JobStatus, ProductResult


class Database:
    def __init__(self, db_path: str = "aperture.db") -> None:
        self.db_path = db_path
        self._initialized = False

    async def _ensure_init(self) -> None:
        if not self._initialized:
            await self.init()

    async def init(self) -> None:
        self._initialized = True
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute(
                """
                CREATE TABLE IF NOT EXISTS jobs (
                    id TEXT PRIMARY KEY,
                    request_json TEXT NOT NULL,
                    status TEXT NOT NULL DEFAULT 'queued',
                    progress_json TEXT NOT NULL DEFAULT '{}',
                    results_json TEXT NOT NULL DEFAULT '[]',
                    error TEXT,
                    created_at TEXT NOT NULL,
                    updated_at TEXT NOT NULL
                )
                """
            )
            await db.commit()
            # Migration: add email column if missing
            cur = await db.execute("PRAGMA table_info(jobs)")
            columns = {row[1] for row in await cur.fetchall()}
            if "email" not in columns:
                await db.execute(
                    "ALTER TABLE jobs ADD COLUMN email TEXT NOT NULL DEFAULT ''"
                )
                await db.commit()

    async def create_job(self, request: JobRequest) -> str:
        await self._ensure_init()
        job_id = uuid.uuid4().hex[:12]
        now = datetime.now(UTC).isoformat()
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute(
                "INSERT INTO jobs (id, request_json, status, created_at, updated_at, email) VALUES (?, ?, ?, ?, ?, ?)",
                (job_id, request.model_dump_json(), JobStatus.QUEUED.value, now, now, request.email),
            )
            await db.commit()
        return job_id

    async def get_job(self, job_id: str) -> Job | None:
        await self._ensure_init()
        async with aiosqlite.connect(self.db_path) as db:
            db.row_factory = aiosqlite.Row
            cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
            row = await cursor.fetchone()
            if row is None:
                return None
            return self._row_to_job(row)

    async def update_job_status(
        self, job_id: str, status: JobStatus, error: str | None = None
    ) -> None:
        await self._ensure_init()
        now = datetime.now(UTC).isoformat()
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute(
                "UPDATE jobs SET status = ?, error = ?, updated_at = ? WHERE id = ?",
                (status.value, error, now, job_id),
            )
            await db.commit()

    async def update_job_progress(
        self, job_id: str, product_id: str, status: str
    ) -> None:
        await self._ensure_init()
        now = datetime.now(UTC).isoformat()
        async with aiosqlite.connect(self.db_path) as db:
            cursor = await db.execute(
                "SELECT progress_json FROM jobs WHERE id = ?", (job_id,)
            )
            row = await cursor.fetchone()
            progress = json.loads(row[0])
            progress[product_id] = status
            await db.execute(
                "UPDATE jobs SET progress_json = ?, updated_at = ? WHERE id = ?",
                (json.dumps(progress), now, job_id),
            )
            await db.commit()

    async def save_job_result(self, job_id: str, result: ProductResult) -> None:
        await self._ensure_init()
        now = datetime.now(UTC).isoformat()
        async with aiosqlite.connect(self.db_path) as db:
            cursor = await db.execute(
                "SELECT results_json FROM jobs WHERE id = ?", (job_id,)
            )
            row = await cursor.fetchone()
            results = json.loads(row[0])
            from app.models import sanitize_for_json
            results.append(sanitize_for_json(result.model_dump()))
            await db.execute(
                "UPDATE jobs SET results_json = ?, updated_at = ? WHERE id = ?",
                (json.dumps(results), now, job_id),
            )
            await db.commit()

    @staticmethod
    def _row_to_job(row) -> Job:
        return Job(
            id=row["id"],
            request=JobRequest.model_validate_json(row["request_json"]),
            status=JobStatus(row["status"]),
            progress=json.loads(row["progress_json"]),
            results=[
                ProductResult.model_validate(r)
                for r in json.loads(row["results_json"])
            ],
            error=row["error"],
            created_at=datetime.fromisoformat(row["created_at"]),
            updated_at=datetime.fromisoformat(row["updated_at"]),
        )

    async def get_next_queued_job(self) -> Job | None:
        await self._ensure_init()
        async with aiosqlite.connect(self.db_path) as db:
            db.row_factory = aiosqlite.Row
            cursor = await db.execute(
                "SELECT * FROM jobs WHERE status = ? ORDER BY created_at ASC LIMIT 1",
                (JobStatus.QUEUED.value,),
            )
            row = await cursor.fetchone()
            if row is None:
                return None
            return self._row_to_job(row)

    async def get_jobs_by_email(self, email: str) -> list[Job]:
        await self._ensure_init()
        async with aiosqlite.connect(self.db_path) as db:
            db.row_factory = aiosqlite.Row
            cur = await db.execute(
                "SELECT * FROM jobs WHERE email = ? ORDER BY created_at DESC",
                (email,),
            )
            rows = await cur.fetchall()
        return [self._row_to_job(row) for row in rows]