davidjurgens's picture
Deploy Potato demo: Potato — Coreference
80f50ca verified
Raw
History Blame Contribute Delete
12.2 kB
"""
Codebook storage (universal).
SQLite-backed CRUD over `codes` and `annotation_codes` in
`<task_dir>/project.sqlite` via the universal persistence layer. No
business rules live here (no cycle checks, no permissions, no cache
invalidation) — that is the service layer's job. This module only
persists rows.
A *code* is a (possibly nested) label in a project's codebook. An
*annotation_code* links a stored annotation to a code, optionally with a
time span (`started_at`/`ended_at`) for temporal / agentic-trace coding.
Universal: usable in standard annotation, solo mode, and QDA mode.
Design notes:
- `parent_id` is TEXT NOT NULL DEFAULT '' where '' means "root". Using a
sentinel instead of NULL lets `UNIQUE(project, parent_id, name)`
actually prevent duplicate sibling names at the root too (SQLite
treats NULLs as distinct in UNIQUE constraints).
- No SQL foreign keys (consistent with the memos store): the service
layer enforces parent existence, cycle-freedom, and recursive delete.
"""
from __future__ import annotations
import time
import uuid
from typing import Any, Dict, List, Optional
from potato.persistence import Migration, get_db, register_migration
ROOT = "" # sentinel parent_id for top-level codes
_CODEBOOK_MIGRATION = Migration(
name="0001_codebook",
sql="""
CREATE TABLE IF NOT EXISTS codes (
id TEXT PRIMARY KEY,
project TEXT NOT NULL,
name TEXT NOT NULL,
color TEXT,
parent_id TEXT NOT NULL DEFAULT '',
sort_order INTEGER NOT NULL DEFAULT 0,
created_by TEXT NOT NULL,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
UNIQUE (project, parent_id, name)
);
CREATE INDEX IF NOT EXISTS idx_codes_project ON codes (project);
CREATE INDEX IF NOT EXISTS idx_codes_parent
ON codes (project, parent_id);
CREATE TABLE IF NOT EXISTS annotation_codes (
annotation_id TEXT NOT NULL,
code_id TEXT NOT NULL,
project TEXT NOT NULL,
created_by TEXT NOT NULL,
started_at REAL,
ended_at REAL,
PRIMARY KEY (annotation_id, code_id)
);
CREATE INDEX IF NOT EXISTS idx_anncodes_code
ON annotation_codes (project, code_id);
CREATE INDEX IF NOT EXISTS idx_anncodes_ann
ON annotation_codes (annotation_id);
""",
)
register_migration(_CODEBOOK_MIGRATION)
def _db(task_dir: str):
"""Connection guaranteeing the codebook migration is registered.
register_migration is idempotent, so this is a no-op normally; it
makes the store robust if a test helper (clear_migrations) wiped the
process-global registry before this task_dir's first get_db().
"""
register_migration(_CODEBOOK_MIGRATION)
return get_db(task_dir)
def _ensure_temporal_schema() -> None:
"""Guarantee the Phase 2 (C) append-only columns/tables exist before
any link read/write that depends on `invalidated_at`. Lazy import
avoids a module-load cycle (changelog imports this module). The
0003 migration is additive (nullable cols + new tables), so this is
safe even for callers that only registered 0001."""
from potato.codebook.changelog import _CHANGE_MIGRATION
from potato.codebook.revision import (
_REVISION_MIGRATION, _CODES_REV_MIGRATION)
register_migration(_REVISION_MIGRATION)
register_migration(_CODES_REV_MIGRATION)
register_migration(_CHANGE_MIGRATION)
# ---- codes ---------------------------------------------------------------
def insert_code(
task_dir: str,
*,
project: str,
name: str,
created_by: str,
color: Optional[str] = None,
parent_id: str = ROOT,
sort_order: int = 0,
code_id: Optional[str] = None,
created_revision: int = 0,
) -> Dict[str, Any]:
"""Insert one code row and return it. `code_id` lets the init CLI
supply a deterministic id; otherwise a random uuid4 is used.
`created_revision` records the codebook revision the code first
appeared in (for provenance / the review worklist)."""
cid = code_id or uuid.uuid4().hex
now = time.time()
conn = _db(task_dir)
conn.execute(
"""INSERT INTO codes
(id, project, name, color, parent_id, sort_order,
created_by, created_at, updated_at, created_revision)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(cid, project, name, color, parent_id, sort_order,
created_by, now, now, created_revision),
)
conn.commit()
return get_code(task_dir, cid)
def get_code(task_dir: str, code_id: str) -> Optional[Dict[str, Any]]:
row = _db(task_dir).execute(
"SELECT * FROM codes WHERE id = ?", (code_id,)
).fetchone()
return dict(row) if row else None
def find_code(
task_dir: str, project: str, parent_id: str, name: str
) -> Optional[Dict[str, Any]]:
row = _db(task_dir).execute(
"""SELECT * FROM codes
WHERE project = ? AND parent_id = ? AND name = ?""",
(project, parent_id, name),
).fetchone()
return dict(row) if row else None
def list_codes(task_dir: str, project: str) -> List[Dict[str, Any]]:
rows = _db(task_dir).execute(
"""SELECT * FROM codes WHERE project = ?
ORDER BY parent_id ASC, sort_order ASC, name ASC""",
(project,),
).fetchall()
return [dict(r) for r in rows]
def children_of(
task_dir: str, project: str, parent_id: str
) -> List[Dict[str, Any]]:
rows = _db(task_dir).execute(
"""SELECT * FROM codes WHERE project = ? AND parent_id = ?
ORDER BY sort_order ASC, name ASC""",
(project, parent_id),
).fetchall()
return [dict(r) for r in rows]
def update_code(
task_dir: str,
code_id: str,
*,
name: Optional[str] = None,
color: Optional[str] = None,
parent_id: Optional[str] = None,
sort_order: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
sets, params = [], []
if name is not None:
sets.append("name = ?"); params.append(name)
if color is not None:
sets.append("color = ?"); params.append(color)
if parent_id is not None:
sets.append("parent_id = ?"); params.append(parent_id)
if sort_order is not None:
sets.append("sort_order = ?"); params.append(sort_order)
if not sets:
return get_code(task_dir, code_id)
sets.append("updated_at = ?"); params.append(time.time())
params.append(code_id)
conn = _db(task_dir)
conn.execute(f"UPDATE codes SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
return get_code(task_dir, code_id)
def delete_codes(task_dir: str, code_ids: List[str]) -> int:
"""Delete the given codes and their annotation_codes links. The
service computes the full subtree; this just executes the delete."""
if not code_ids:
return 0
qs = ",".join("?" * len(code_ids))
conn = _db(task_dir)
conn.execute(
f"DELETE FROM annotation_codes WHERE code_id IN ({qs})", code_ids)
cur = conn.execute(
f"DELETE FROM codes WHERE id IN ({qs})", code_ids)
conn.commit()
return cur.rowcount
# ---- annotation_codes ----------------------------------------------------
def link_annotation(
task_dir: str,
*,
project: str,
annotation_id: str,
code_id: str,
created_by: str,
started_at: Optional[float] = None,
ended_at: Optional[float] = None,
) -> None:
conn = _db(task_dir)
conn.execute(
"""INSERT OR REPLACE INTO annotation_codes
(annotation_id, code_id, project, created_by,
started_at, ended_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(annotation_id, code_id, project, created_by,
started_at, ended_at),
)
conn.commit()
def unlink_annotation(
task_dir: str, annotation_id: str, code_id: str
) -> bool:
conn = _db(task_dir)
cur = conn.execute(
"""DELETE FROM annotation_codes
WHERE annotation_id = ? AND code_id = ?""",
(annotation_id, code_id),
)
conn.commit()
return cur.rowcount > 0
def codes_for_annotation(
task_dir: str, annotation_id: str
) -> List[Dict[str, Any]]:
# THE single load-bearing temporal reader: only LIVE links, and
# never an archived (e.g. merged-away) code.
_ensure_temporal_schema()
rows = _db(task_dir).execute(
"""SELECT ac.code_id, ac.started_at, ac.ended_at,
ac.created_by, c.name, c.color, c.parent_id
FROM annotation_codes ac
JOIN codes c ON c.id = ac.code_id
WHERE ac.annotation_id = ?
AND ac.invalidated_at IS NULL
AND c.archived_at IS NULL
ORDER BY c.name ASC""",
(annotation_id,),
).fetchall()
return [dict(r) for r in rows]
# ---- Phase 2 (C): append-only retroactive primitives --------------------
def affected_annotation_ids(
task_dir: str, project: str, code_id: str,
created_by: Optional[str] = None,
) -> List[str]:
"""annotation_ids with a LIVE link to `code_id` (optionally only
those created by `created_by` — the split-by-annotator selector)."""
_ensure_temporal_schema()
q = ("SELECT DISTINCT annotation_id FROM annotation_codes "
"WHERE project = ? AND code_id = ? AND invalidated_at IS NULL")
p: List[Any] = [project, code_id]
if created_by is not None:
q += " AND created_by = ?"
p.append(created_by)
rows = _db(task_dir).execute(q, p).fetchall()
return [r["annotation_id"] for r in rows]
def get_link(
task_dir: str, annotation_id: str, code_id: str
) -> Optional[Dict[str, Any]]:
_ensure_temporal_schema()
row = _db(task_dir).execute(
"""SELECT * FROM annotation_codes
WHERE annotation_id = ? AND code_id = ?""",
(annotation_id, code_id),
).fetchone()
return dict(row) if row else None
def invalidate_links(
task_dir: str, *, project: str, code_id: str, change_id: str,
created_by: Optional[str] = None,
) -> int:
"""Mark live links to `code_id` superseded (append-only — never
DELETE). Optional `created_by` scopes to one annotator (split)."""
_ensure_temporal_schema()
q = ("UPDATE annotation_codes SET invalidated_at = ?, "
"invalidated_by_change = ? "
"WHERE project = ? AND code_id = ? AND invalidated_at IS NULL")
p: List[Any] = [time.time(), change_id, project, code_id]
if created_by is not None:
q += " AND created_by = ?"
p.append(created_by)
conn = _db(task_dir)
cur = conn.execute(q, p)
conn.commit()
return cur.rowcount
def set_link_live(
task_dir: str, *, project: str, annotation_id: str, code_id: str,
created_by: str, started_at: Optional[float] = None,
ended_at: Optional[float] = None,
) -> None:
"""Make (annotation_id, code_id) a LIVE link. Idempotent against the
PK(annotation_id, code_id): if the row exists (live or invalidated)
it is reactivated rather than duplicated/clobbered — this is how a
merge stays correct when the annotation is already on the target."""
_ensure_temporal_schema()
conn = _db(task_dir)
cur = conn.execute(
"""UPDATE annotation_codes
SET invalidated_at = NULL, invalidated_by_change = NULL
WHERE annotation_id = ? AND code_id = ?""",
(annotation_id, code_id),
)
if cur.rowcount == 0:
conn.execute(
"""INSERT INTO annotation_codes
(annotation_id, code_id, project, created_by,
started_at, ended_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(annotation_id, code_id, project, created_by,
started_at, ended_at),
)
conn.commit()
def archive_code(task_dir: str, code_id: str) -> bool:
"""Soft-archive a code (merged away): leaves the live palette + ICL
prompt but the row and its history survive (append-only)."""
_ensure_temporal_schema()
conn = _db(task_dir)
cur = conn.execute(
"UPDATE codes SET archived_at = ?, updated_at = ? WHERE id = ?",
(time.time(), time.time(), code_id),
)
conn.commit()
return cur.rowcount > 0