multi-span / potato /codebook /revision.py
davidjurgens's picture
Deploy Potato demo: Potato — Multi-Field Spans
dc4d6fd verified
Raw
History Blame Contribute Delete
7.86 kB
"""
Codebook revision provenance.
A per-project monotonic ``codebook_revision`` counter, bumped on **any**
codebook change (create / rename / recolor / move / delete) — all
codebook edits are server-persisted and every one advances the
revision. Every saved annotation is stamped with the revision in
effect, so analysts can condition on the codebook state an annotation
was made against, and the UI can softly flag instances labeled under an
older codebook revision.
`codes.created_revision` records the revision a code first appeared in,
so the review worklist can show *which* codes were added since a given
instance was labeled (precision: a niche new code only resurfaces
instances that predate it).
Tables (own migrations, universal project.sqlite):
- ``codebook_revision(project PK, revision, updated_at)``
- ``annotation_provenance(project, instance_id, username, revision,
updated_at)`` — PK(project, instance_id, username)
- ``codes.created_revision`` (added via ALTER; default 0 = pre-feature)
"""
from __future__ import annotations
import time
from typing import Dict, List, Optional
from potato.persistence import Migration, get_db, register_migration
_REVISION_MIGRATION = Migration(
name="0002_codebook_revision",
sql="""
CREATE TABLE IF NOT EXISTS codebook_revision (
project TEXT PRIMARY KEY,
revision INTEGER NOT NULL DEFAULT 0,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS annotation_provenance (
project TEXT NOT NULL,
instance_id TEXT NOT NULL,
username TEXT NOT NULL,
revision INTEGER NOT NULL,
updated_at REAL NOT NULL,
PRIMARY KEY (project, instance_id, username)
);
CREATE INDEX IF NOT EXISTS idx_provenance_stale
ON annotation_provenance (project, username, revision);
""",
)
# Separate migration: add created_revision to the codes table (created
# by store.py's 0001_codebook, which registers first at import).
_CODES_REV_MIGRATION = Migration(
name="0002_codes_created_revision",
sql="""
ALTER TABLE codes ADD COLUMN created_revision INTEGER NOT NULL
DEFAULT 0;
""",
)
# Defensive: guarantee the codes table migration (0001_codebook) is
# registered before this module's ALTER, regardless of import path.
from potato.codebook.store import _CODEBOOK_MIGRATION as _CB_MIG
register_migration(_CB_MIG)
register_migration(_REVISION_MIGRATION)
register_migration(_CODES_REV_MIGRATION)
def _db(task_dir: str):
register_migration(_REVISION_MIGRATION)
register_migration(_CODES_REV_MIGRATION)
return get_db(task_dir)
def current_revision(task_dir: str, project: str) -> int:
row = _db(task_dir).execute(
"SELECT revision FROM codebook_revision WHERE project = ?",
(project,),
).fetchone()
return int(row["revision"]) if row else 0
def bump_revision(task_dir: str, project: str) -> int:
"""Increment (or initialise) the project's revision; return the new
value. Called only for option-set-changing codebook ops."""
conn = _db(task_dir)
now = time.time()
conn.execute(
"""INSERT INTO codebook_revision (project, revision, updated_at)
VALUES (?, 1, ?)
ON CONFLICT(project) DO UPDATE SET
revision = revision + 1,
updated_at = excluded.updated_at""",
(project, now),
)
conn.commit()
return current_revision(task_dir, project)
def record_annotation(
task_dir: str, project: str, instance_id: str, username: str
) -> int:
"""Stamp (project, instance_id, username) with the current revision
at annotation save time. Idempotent upsert; returns the revision."""
rev = current_revision(task_dir, project)
conn = _db(task_dir)
conn.execute(
"""INSERT INTO annotation_provenance
(project, instance_id, username, revision, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(project, instance_id, username) DO UPDATE SET
revision = excluded.revision,
updated_at = excluded.updated_at""",
(project, instance_id, username, rev, time.time()),
)
conn.commit()
return rev
def instance_revision(
task_dir: str, project: str, instance_id: str, username: str
) -> Optional[int]:
row = _db(task_dir).execute(
"""SELECT revision FROM annotation_provenance
WHERE project = ? AND instance_id = ? AND username = ?""",
(project, instance_id, username),
).fetchone()
return int(row["revision"]) if row else None
def stale_instances(
task_dir: str, project: str, username: str
) -> List[Dict[str, object]]:
"""This user's annotated instances whose stamped revision is behind
the current one, with the count of codes added since."""
cur = current_revision(task_dir, project)
if cur <= 0:
return []
rows = _db(task_dir).execute(
"""SELECT instance_id, revision FROM annotation_provenance
WHERE project = ? AND username = ? AND revision < ?
ORDER BY revision ASC, instance_id ASC""",
(project, username, cur),
).fetchall()
out: List[Dict[str, object]] = []
for r in rows:
out.append({
"instance_id": r["instance_id"],
"annotated_revision": int(r["revision"]),
"current_revision": cur,
"codes_added_since": codes_added_since(
task_dir, project, int(r["revision"])),
})
return out
def all_stale_instances(
task_dir: str, project: str
) -> List[Dict[str, object]]:
"""Every (instance, user) annotated under an older revision —
project-wide, for admin oversight."""
cur = current_revision(task_dir, project)
if cur <= 0:
return []
rows = _db(task_dir).execute(
"""SELECT instance_id, username, revision
FROM annotation_provenance
WHERE project = ? AND revision < ?
ORDER BY revision ASC, instance_id ASC, username ASC""",
(project, cur),
).fetchall()
return [{
"instance_id": r["instance_id"],
"username": r["username"],
"annotated_revision": int(r["revision"]),
"current_revision": cur,
"codes_added_since": codes_added_since(
task_dir, project, int(r["revision"])),
} for r in rows]
def touch_instances(
task_dir: str, project: str, instance_ids: List[str]
) -> int:
"""Re-flag specific instances as stale after a retroactive codebook
edit (merge/split/rename) that affected exactly them. Sets their
stamped revision to current-1 so `stale_instances` resurfaces them —
soft and dismissible, never a hard re-label gate (Phase 2 (B)
policy). Only lowers a revision (never un-stales an already-older
row), and never raises one above current."""
if not instance_ids:
return 0
cur = current_revision(task_dir, project)
if cur <= 0:
return 0
target = cur - 1
qs = ",".join("?" * len(instance_ids))
conn = _db(task_dir)
cur_ = conn.execute(
f"""UPDATE annotation_provenance
SET revision = ?, updated_at = ?
WHERE project = ? AND instance_id IN ({qs})
AND revision > ?""",
[target, time.time(), project, *instance_ids, target],
)
conn.commit()
return cur_.rowcount
def codes_added_since(
task_dir: str, project: str, revision: int
) -> List[str]:
"""Names of codes created after `revision` — the precise set that
could change a label made at that revision."""
rows = _db(task_dir).execute(
"""SELECT name FROM codes
WHERE project = ? AND created_revision > ?
ORDER BY created_revision ASC, name ASC""",
(project, revision),
).fetchall()
return [r["name"] for r in rows]