Spaces:
Paused
Paused
| """ | |
| Codebook revision provenance. | |
| A per-project monotonic ``codebook_revision`` counter, bumped on **any** | |
| codebook change (create / rename / recolor / move / delete) — all | |
| codebook edits are server-persisted and every one advances the | |
| revision. Every saved annotation is stamped with the revision in | |
| effect, so analysts can condition on the codebook state an annotation | |
| was made against, and the UI can softly flag instances labeled under an | |
| older codebook revision. | |
| `codes.created_revision` records the revision a code first appeared in, | |
| so the review worklist can show *which* codes were added since a given | |
| instance was labeled (precision: a niche new code only resurfaces | |
| instances that predate it). | |
| Tables (own migrations, universal project.sqlite): | |
| - ``codebook_revision(project PK, revision, updated_at)`` | |
| - ``annotation_provenance(project, instance_id, username, revision, | |
| updated_at)`` — PK(project, instance_id, username) | |
| - ``codes.created_revision`` (added via ALTER; default 0 = pre-feature) | |
| """ | |
| from __future__ import annotations | |
| import time | |
| from typing import Dict, List, Optional | |
| from potato.persistence import Migration, get_db, register_migration | |
| _REVISION_MIGRATION = Migration( | |
| name="0002_codebook_revision", | |
| sql=""" | |
| CREATE TABLE IF NOT EXISTS codebook_revision ( | |
| project TEXT PRIMARY KEY, | |
| revision INTEGER NOT NULL DEFAULT 0, | |
| updated_at REAL NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS annotation_provenance ( | |
| project TEXT NOT NULL, | |
| instance_id TEXT NOT NULL, | |
| username TEXT NOT NULL, | |
| revision INTEGER NOT NULL, | |
| updated_at REAL NOT NULL, | |
| PRIMARY KEY (project, instance_id, username) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_provenance_stale | |
| ON annotation_provenance (project, username, revision); | |
| """, | |
| ) | |
| # Separate migration: add created_revision to the codes table (created | |
| # by store.py's 0001_codebook, which registers first at import). | |
| _CODES_REV_MIGRATION = Migration( | |
| name="0002_codes_created_revision", | |
| sql=""" | |
| ALTER TABLE codes ADD COLUMN created_revision INTEGER NOT NULL | |
| DEFAULT 0; | |
| """, | |
| ) | |
| # Defensive: guarantee the codes table migration (0001_codebook) is | |
| # registered before this module's ALTER, regardless of import path. | |
| from potato.codebook.store import _CODEBOOK_MIGRATION as _CB_MIG | |
| register_migration(_CB_MIG) | |
| register_migration(_REVISION_MIGRATION) | |
| register_migration(_CODES_REV_MIGRATION) | |
| def _db(task_dir: str): | |
| register_migration(_REVISION_MIGRATION) | |
| register_migration(_CODES_REV_MIGRATION) | |
| return get_db(task_dir) | |
| def current_revision(task_dir: str, project: str) -> int: | |
| row = _db(task_dir).execute( | |
| "SELECT revision FROM codebook_revision WHERE project = ?", | |
| (project,), | |
| ).fetchone() | |
| return int(row["revision"]) if row else 0 | |
| def bump_revision(task_dir: str, project: str) -> int: | |
| """Increment (or initialise) the project's revision; return the new | |
| value. Called only for option-set-changing codebook ops.""" | |
| conn = _db(task_dir) | |
| now = time.time() | |
| conn.execute( | |
| """INSERT INTO codebook_revision (project, revision, updated_at) | |
| VALUES (?, 1, ?) | |
| ON CONFLICT(project) DO UPDATE SET | |
| revision = revision + 1, | |
| updated_at = excluded.updated_at""", | |
| (project, now), | |
| ) | |
| conn.commit() | |
| return current_revision(task_dir, project) | |
| def record_annotation( | |
| task_dir: str, project: str, instance_id: str, username: str | |
| ) -> int: | |
| """Stamp (project, instance_id, username) with the current revision | |
| at annotation save time. Idempotent upsert; returns the revision.""" | |
| rev = current_revision(task_dir, project) | |
| conn = _db(task_dir) | |
| conn.execute( | |
| """INSERT INTO annotation_provenance | |
| (project, instance_id, username, revision, updated_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| ON CONFLICT(project, instance_id, username) DO UPDATE SET | |
| revision = excluded.revision, | |
| updated_at = excluded.updated_at""", | |
| (project, instance_id, username, rev, time.time()), | |
| ) | |
| conn.commit() | |
| return rev | |
| def instance_revision( | |
| task_dir: str, project: str, instance_id: str, username: str | |
| ) -> Optional[int]: | |
| row = _db(task_dir).execute( | |
| """SELECT revision FROM annotation_provenance | |
| WHERE project = ? AND instance_id = ? AND username = ?""", | |
| (project, instance_id, username), | |
| ).fetchone() | |
| return int(row["revision"]) if row else None | |
| def stale_instances( | |
| task_dir: str, project: str, username: str | |
| ) -> List[Dict[str, object]]: | |
| """This user's annotated instances whose stamped revision is behind | |
| the current one, with the count of codes added since.""" | |
| cur = current_revision(task_dir, project) | |
| if cur <= 0: | |
| return [] | |
| rows = _db(task_dir).execute( | |
| """SELECT instance_id, revision FROM annotation_provenance | |
| WHERE project = ? AND username = ? AND revision < ? | |
| ORDER BY revision ASC, instance_id ASC""", | |
| (project, username, cur), | |
| ).fetchall() | |
| out: List[Dict[str, object]] = [] | |
| for r in rows: | |
| out.append({ | |
| "instance_id": r["instance_id"], | |
| "annotated_revision": int(r["revision"]), | |
| "current_revision": cur, | |
| "codes_added_since": codes_added_since( | |
| task_dir, project, int(r["revision"])), | |
| }) | |
| return out | |
| def all_stale_instances( | |
| task_dir: str, project: str | |
| ) -> List[Dict[str, object]]: | |
| """Every (instance, user) annotated under an older revision — | |
| project-wide, for admin oversight.""" | |
| cur = current_revision(task_dir, project) | |
| if cur <= 0: | |
| return [] | |
| rows = _db(task_dir).execute( | |
| """SELECT instance_id, username, revision | |
| FROM annotation_provenance | |
| WHERE project = ? AND revision < ? | |
| ORDER BY revision ASC, instance_id ASC, username ASC""", | |
| (project, cur), | |
| ).fetchall() | |
| return [{ | |
| "instance_id": r["instance_id"], | |
| "username": r["username"], | |
| "annotated_revision": int(r["revision"]), | |
| "current_revision": cur, | |
| "codes_added_since": codes_added_since( | |
| task_dir, project, int(r["revision"])), | |
| } for r in rows] | |
| def touch_instances( | |
| task_dir: str, project: str, instance_ids: List[str] | |
| ) -> int: | |
| """Re-flag specific instances as stale after a retroactive codebook | |
| edit (merge/split/rename) that affected exactly them. Sets their | |
| stamped revision to current-1 so `stale_instances` resurfaces them — | |
| soft and dismissible, never a hard re-label gate (Phase 2 (B) | |
| policy). Only lowers a revision (never un-stales an already-older | |
| row), and never raises one above current.""" | |
| if not instance_ids: | |
| return 0 | |
| cur = current_revision(task_dir, project) | |
| if cur <= 0: | |
| return 0 | |
| target = cur - 1 | |
| qs = ",".join("?" * len(instance_ids)) | |
| conn = _db(task_dir) | |
| cur_ = conn.execute( | |
| f"""UPDATE annotation_provenance | |
| SET revision = ?, updated_at = ? | |
| WHERE project = ? AND instance_id IN ({qs}) | |
| AND revision > ?""", | |
| [target, time.time(), project, *instance_ids, target], | |
| ) | |
| conn.commit() | |
| return cur_.rowcount | |
| def codes_added_since( | |
| task_dir: str, project: str, revision: int | |
| ) -> List[str]: | |
| """Names of codes created after `revision` — the precise set that | |
| could change a label made at that revision.""" | |
| rows = _db(task_dir).execute( | |
| """SELECT name FROM codes | |
| WHERE project = ? AND created_revision > ? | |
| ORDER BY created_revision ASC, name ASC""", | |
| (project, revision), | |
| ).fetchall() | |
| return [r["name"] for r in rows] | |