Spaces:
Paused
Paused
| """ | |
| Codebook storage (universal). | |
| SQLite-backed CRUD over `codes` and `annotation_codes` in | |
| `<task_dir>/project.sqlite` via the universal persistence layer. No | |
| business rules live here (no cycle checks, no permissions, no cache | |
| invalidation) — that is the service layer's job. This module only | |
| persists rows. | |
| A *code* is a (possibly nested) label in a project's codebook. An | |
| *annotation_code* links a stored annotation to a code, optionally with a | |
| time span (`started_at`/`ended_at`) for temporal / agentic-trace coding. | |
| Universal: usable in standard annotation, solo mode, and QDA mode. | |
| Design notes: | |
| - `parent_id` is TEXT NOT NULL DEFAULT '' where '' means "root". Using a | |
| sentinel instead of NULL lets `UNIQUE(project, parent_id, name)` | |
| actually prevent duplicate sibling names at the root too (SQLite | |
| treats NULLs as distinct in UNIQUE constraints). | |
| - No SQL foreign keys (consistent with the memos store): the service | |
| layer enforces parent existence, cycle-freedom, and recursive delete. | |
| """ | |
| from __future__ import annotations | |
| import time | |
| import uuid | |
| from typing import Any, Dict, List, Optional | |
| from potato.persistence import Migration, get_db, register_migration | |
| ROOT = "" # sentinel parent_id for top-level codes | |
| _CODEBOOK_MIGRATION = Migration( | |
| name="0001_codebook", | |
| sql=""" | |
| CREATE TABLE IF NOT EXISTS codes ( | |
| id TEXT PRIMARY KEY, | |
| project TEXT NOT NULL, | |
| name TEXT NOT NULL, | |
| color TEXT, | |
| parent_id TEXT NOT NULL DEFAULT '', | |
| sort_order INTEGER NOT NULL DEFAULT 0, | |
| created_by TEXT NOT NULL, | |
| created_at REAL NOT NULL, | |
| updated_at REAL NOT NULL, | |
| UNIQUE (project, parent_id, name) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_codes_project ON codes (project); | |
| CREATE INDEX IF NOT EXISTS idx_codes_parent | |
| ON codes (project, parent_id); | |
| CREATE TABLE IF NOT EXISTS annotation_codes ( | |
| annotation_id TEXT NOT NULL, | |
| code_id TEXT NOT NULL, | |
| project TEXT NOT NULL, | |
| created_by TEXT NOT NULL, | |
| started_at REAL, | |
| ended_at REAL, | |
| PRIMARY KEY (annotation_id, code_id) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_anncodes_code | |
| ON annotation_codes (project, code_id); | |
| CREATE INDEX IF NOT EXISTS idx_anncodes_ann | |
| ON annotation_codes (annotation_id); | |
| """, | |
| ) | |
| register_migration(_CODEBOOK_MIGRATION) | |
| def _db(task_dir: str): | |
| """Connection guaranteeing the codebook migration is registered. | |
| register_migration is idempotent, so this is a no-op normally; it | |
| makes the store robust if a test helper (clear_migrations) wiped the | |
| process-global registry before this task_dir's first get_db(). | |
| """ | |
| register_migration(_CODEBOOK_MIGRATION) | |
| return get_db(task_dir) | |
| def _ensure_temporal_schema() -> None: | |
| """Guarantee the Phase 2 (C) append-only columns/tables exist before | |
| any link read/write that depends on `invalidated_at`. Lazy import | |
| avoids a module-load cycle (changelog imports this module). The | |
| 0003 migration is additive (nullable cols + new tables), so this is | |
| safe even for callers that only registered 0001.""" | |
| from potato.codebook.changelog import _CHANGE_MIGRATION | |
| from potato.codebook.revision import ( | |
| _REVISION_MIGRATION, _CODES_REV_MIGRATION) | |
| register_migration(_REVISION_MIGRATION) | |
| register_migration(_CODES_REV_MIGRATION) | |
| register_migration(_CHANGE_MIGRATION) | |
| # ---- codes --------------------------------------------------------------- | |
| def insert_code( | |
| task_dir: str, | |
| *, | |
| project: str, | |
| name: str, | |
| created_by: str, | |
| color: Optional[str] = None, | |
| parent_id: str = ROOT, | |
| sort_order: int = 0, | |
| code_id: Optional[str] = None, | |
| created_revision: int = 0, | |
| ) -> Dict[str, Any]: | |
| """Insert one code row and return it. `code_id` lets the init CLI | |
| supply a deterministic id; otherwise a random uuid4 is used. | |
| `created_revision` records the codebook revision the code first | |
| appeared in (for provenance / the review worklist).""" | |
| cid = code_id or uuid.uuid4().hex | |
| now = time.time() | |
| conn = _db(task_dir) | |
| conn.execute( | |
| """INSERT INTO codes | |
| (id, project, name, color, parent_id, sort_order, | |
| created_by, created_at, updated_at, created_revision) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", | |
| (cid, project, name, color, parent_id, sort_order, | |
| created_by, now, now, created_revision), | |
| ) | |
| conn.commit() | |
| return get_code(task_dir, cid) | |
| def get_code(task_dir: str, code_id: str) -> Optional[Dict[str, Any]]: | |
| row = _db(task_dir).execute( | |
| "SELECT * FROM codes WHERE id = ?", (code_id,) | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def find_code( | |
| task_dir: str, project: str, parent_id: str, name: str | |
| ) -> Optional[Dict[str, Any]]: | |
| row = _db(task_dir).execute( | |
| """SELECT * FROM codes | |
| WHERE project = ? AND parent_id = ? AND name = ?""", | |
| (project, parent_id, name), | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def list_codes(task_dir: str, project: str) -> List[Dict[str, Any]]: | |
| rows = _db(task_dir).execute( | |
| """SELECT * FROM codes WHERE project = ? | |
| ORDER BY parent_id ASC, sort_order ASC, name ASC""", | |
| (project,), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def children_of( | |
| task_dir: str, project: str, parent_id: str | |
| ) -> List[Dict[str, Any]]: | |
| rows = _db(task_dir).execute( | |
| """SELECT * FROM codes WHERE project = ? AND parent_id = ? | |
| ORDER BY sort_order ASC, name ASC""", | |
| (project, parent_id), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def update_code( | |
| task_dir: str, | |
| code_id: str, | |
| *, | |
| name: Optional[str] = None, | |
| color: Optional[str] = None, | |
| parent_id: Optional[str] = None, | |
| sort_order: Optional[int] = None, | |
| ) -> Optional[Dict[str, Any]]: | |
| sets, params = [], [] | |
| if name is not None: | |
| sets.append("name = ?"); params.append(name) | |
| if color is not None: | |
| sets.append("color = ?"); params.append(color) | |
| if parent_id is not None: | |
| sets.append("parent_id = ?"); params.append(parent_id) | |
| if sort_order is not None: | |
| sets.append("sort_order = ?"); params.append(sort_order) | |
| if not sets: | |
| return get_code(task_dir, code_id) | |
| sets.append("updated_at = ?"); params.append(time.time()) | |
| params.append(code_id) | |
| conn = _db(task_dir) | |
| conn.execute(f"UPDATE codes SET {', '.join(sets)} WHERE id = ?", params) | |
| conn.commit() | |
| return get_code(task_dir, code_id) | |
| def delete_codes(task_dir: str, code_ids: List[str]) -> int: | |
| """Delete the given codes and their annotation_codes links. The | |
| service computes the full subtree; this just executes the delete.""" | |
| if not code_ids: | |
| return 0 | |
| qs = ",".join("?" * len(code_ids)) | |
| conn = _db(task_dir) | |
| conn.execute( | |
| f"DELETE FROM annotation_codes WHERE code_id IN ({qs})", code_ids) | |
| cur = conn.execute( | |
| f"DELETE FROM codes WHERE id IN ({qs})", code_ids) | |
| conn.commit() | |
| return cur.rowcount | |
| # ---- annotation_codes ---------------------------------------------------- | |
| def link_annotation( | |
| task_dir: str, | |
| *, | |
| project: str, | |
| annotation_id: str, | |
| code_id: str, | |
| created_by: str, | |
| started_at: Optional[float] = None, | |
| ended_at: Optional[float] = None, | |
| ) -> None: | |
| conn = _db(task_dir) | |
| conn.execute( | |
| """INSERT OR REPLACE INTO annotation_codes | |
| (annotation_id, code_id, project, created_by, | |
| started_at, ended_at) | |
| VALUES (?, ?, ?, ?, ?, ?)""", | |
| (annotation_id, code_id, project, created_by, | |
| started_at, ended_at), | |
| ) | |
| conn.commit() | |
| def unlink_annotation( | |
| task_dir: str, annotation_id: str, code_id: str | |
| ) -> bool: | |
| conn = _db(task_dir) | |
| cur = conn.execute( | |
| """DELETE FROM annotation_codes | |
| WHERE annotation_id = ? AND code_id = ?""", | |
| (annotation_id, code_id), | |
| ) | |
| conn.commit() | |
| return cur.rowcount > 0 | |
| def codes_for_annotation( | |
| task_dir: str, annotation_id: str | |
| ) -> List[Dict[str, Any]]: | |
| # THE single load-bearing temporal reader: only LIVE links, and | |
| # never an archived (e.g. merged-away) code. | |
| _ensure_temporal_schema() | |
| rows = _db(task_dir).execute( | |
| """SELECT ac.code_id, ac.started_at, ac.ended_at, | |
| ac.created_by, c.name, c.color, c.parent_id | |
| FROM annotation_codes ac | |
| JOIN codes c ON c.id = ac.code_id | |
| WHERE ac.annotation_id = ? | |
| AND ac.invalidated_at IS NULL | |
| AND c.archived_at IS NULL | |
| ORDER BY c.name ASC""", | |
| (annotation_id,), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| # ---- Phase 2 (C): append-only retroactive primitives -------------------- | |
| def affected_annotation_ids( | |
| task_dir: str, project: str, code_id: str, | |
| created_by: Optional[str] = None, | |
| ) -> List[str]: | |
| """annotation_ids with a LIVE link to `code_id` (optionally only | |
| those created by `created_by` — the split-by-annotator selector).""" | |
| _ensure_temporal_schema() | |
| q = ("SELECT DISTINCT annotation_id FROM annotation_codes " | |
| "WHERE project = ? AND code_id = ? AND invalidated_at IS NULL") | |
| p: List[Any] = [project, code_id] | |
| if created_by is not None: | |
| q += " AND created_by = ?" | |
| p.append(created_by) | |
| rows = _db(task_dir).execute(q, p).fetchall() | |
| return [r["annotation_id"] for r in rows] | |
| def get_link( | |
| task_dir: str, annotation_id: str, code_id: str | |
| ) -> Optional[Dict[str, Any]]: | |
| _ensure_temporal_schema() | |
| row = _db(task_dir).execute( | |
| """SELECT * FROM annotation_codes | |
| WHERE annotation_id = ? AND code_id = ?""", | |
| (annotation_id, code_id), | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def invalidate_links( | |
| task_dir: str, *, project: str, code_id: str, change_id: str, | |
| created_by: Optional[str] = None, | |
| ) -> int: | |
| """Mark live links to `code_id` superseded (append-only — never | |
| DELETE). Optional `created_by` scopes to one annotator (split).""" | |
| _ensure_temporal_schema() | |
| q = ("UPDATE annotation_codes SET invalidated_at = ?, " | |
| "invalidated_by_change = ? " | |
| "WHERE project = ? AND code_id = ? AND invalidated_at IS NULL") | |
| p: List[Any] = [time.time(), change_id, project, code_id] | |
| if created_by is not None: | |
| q += " AND created_by = ?" | |
| p.append(created_by) | |
| conn = _db(task_dir) | |
| cur = conn.execute(q, p) | |
| conn.commit() | |
| return cur.rowcount | |
| def set_link_live( | |
| task_dir: str, *, project: str, annotation_id: str, code_id: str, | |
| created_by: str, started_at: Optional[float] = None, | |
| ended_at: Optional[float] = None, | |
| ) -> None: | |
| """Make (annotation_id, code_id) a LIVE link. Idempotent against the | |
| PK(annotation_id, code_id): if the row exists (live or invalidated) | |
| it is reactivated rather than duplicated/clobbered — this is how a | |
| merge stays correct when the annotation is already on the target.""" | |
| _ensure_temporal_schema() | |
| conn = _db(task_dir) | |
| cur = conn.execute( | |
| """UPDATE annotation_codes | |
| SET invalidated_at = NULL, invalidated_by_change = NULL | |
| WHERE annotation_id = ? AND code_id = ?""", | |
| (annotation_id, code_id), | |
| ) | |
| if cur.rowcount == 0: | |
| conn.execute( | |
| """INSERT INTO annotation_codes | |
| (annotation_id, code_id, project, created_by, | |
| started_at, ended_at) | |
| VALUES (?, ?, ?, ?, ?, ?)""", | |
| (annotation_id, code_id, project, created_by, | |
| started_at, ended_at), | |
| ) | |
| conn.commit() | |
| def archive_code(task_dir: str, code_id: str) -> bool: | |
| """Soft-archive a code (merged away): leaves the live palette + ICL | |
| prompt but the row and its history survive (append-only).""" | |
| _ensure_temporal_schema() | |
| conn = _db(task_dir) | |
| cur = conn.execute( | |
| "UPDATE codes SET archived_at = ?, updated_at = ? WHERE id = ?", | |
| (time.time(), time.time(), code_id), | |
| ) | |
| conn.commit() | |
| return cur.rowcount > 0 | |