"""Analysis job repository — SQLite CRUD for analysis_jobs table.""" from __future__ import annotations from datetime import UTC, datetime from domain.models import AnalysisJob, AnalysisStatus from persistence.database import get_connection def _parse_dt(value: str | None) -> datetime | None: """Parse an ISO-format datetime string back into a datetime object.""" if not value: return None return datetime.fromisoformat(value) def _row_to_job(row) -> AnalysisJob: keys = row.keys() return AnalysisJob( id=row["id"], document_id=row["document_id"], status=AnalysisStatus(row["status"]), content_markdown=row["content_markdown"], content_html=row["content_html"], pages_json=row["pages_json"], document_json=row["document_json"] if "document_json" in keys else None, chunks_json=row["chunks_json"] if "chunks_json" in keys else None, error_message=row["error_message"], started_at=_parse_dt(row["started_at"]), completed_at=_parse_dt(row["completed_at"]), created_at=_parse_dt(row["created_at"]) or datetime.now(UTC), document_filename=row["filename"] if "filename" in keys else None, ) _SELECT_WITH_DOC = """ SELECT aj.*, d.filename FROM analysis_jobs aj JOIN documents d ON d.id = aj.document_id """ async def insert(job: AnalysisJob) -> None: """Persist a new analysis job record.""" async with get_connection() as db: await db.execute( """INSERT INTO analysis_jobs (id, document_id, status, created_at) VALUES (?, ?, ?, ?)""", (job.id, job.document_id, job.status.value, str(job.created_at)), ) await db.commit() async def find_all(*, limit: int = 200, offset: int = 0) -> list[AnalysisJob]: """Return analysis jobs with document info, newest first.""" async with get_connection() as db: cursor = await db.execute( f"{_SELECT_WITH_DOC} ORDER BY aj.created_at DESC LIMIT ? OFFSET ?", (limit, offset), ) rows = await cursor.fetchall() return [_row_to_job(r) for r in rows] async def find_by_id(job_id: str) -> AnalysisJob | None: """Find an analysis job by ID (with document filename), or return None.""" async with get_connection() as db: cursor = await db.execute(f"{_SELECT_WITH_DOC} WHERE aj.id = ?", (job_id,)) row = await cursor.fetchone() return _row_to_job(row) if row else None async def update_status(job: AnalysisJob) -> None: """Persist all mutable fields of an analysis job (status, results, timestamps).""" async with get_connection() as db: await db.execute( """UPDATE analysis_jobs SET status = ?, content_markdown = ?, content_html = ?, pages_json = ?, document_json = ?, chunks_json = ?, error_message = ?, started_at = ?, completed_at = ? WHERE id = ?""", ( job.status.value, job.content_markdown, job.content_html, job.pages_json, job.document_json, job.chunks_json, job.error_message, str(job.started_at) if job.started_at else None, str(job.completed_at) if job.completed_at else None, job.id, ), ) await db.commit() async def update_chunks(job_id: str, chunks_json: str) -> bool: """Update only the chunks_json column for a completed analysis.""" async with get_connection() as db: cursor = await db.execute( "UPDATE analysis_jobs SET chunks_json = ? WHERE id = ?", (chunks_json, job_id), ) await db.commit() return cursor.rowcount > 0 async def delete(job_id: str) -> bool: """Delete an analysis job by ID. Returns True if a row was removed.""" async with get_connection() as db: cursor = await db.execute("DELETE FROM analysis_jobs WHERE id = ?", (job_id,)) await db.commit() return cursor.rowcount > 0 async def delete_by_document(document_id: str) -> int: """Delete all analysis jobs for a given document. Returns count deleted.""" async with get_connection() as db: cursor = await db.execute("DELETE FROM analysis_jobs WHERE document_id = ?", (document_id,)) await db.commit() return cursor.rowcount