multi-span / potato /codebook /service.py
davidjurgens's picture
Deploy Potato demo: Potato — Multi-Field Spans
dc4d6fd verified
Raw
History Blame Contribute Delete
13.8 kB
"""
Codebook service — the single, audited mutation path.
All codebook writes (human *or* LLM, in standard / solo / QDA mode) go
through here so they share one audit trail (`created_by`), one set of
invariants (no duplicate siblings, no cycles, recursive delete), and one
change-notification hook (used by ICL to invalidate its prompt cache —
registered via `register_change_listener` to avoid a hard import edge).
Phase 1 ops: create / rename / recolor / move_under / delete.
merge / split are Phase 2.
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Dict, List, Optional
from potato.codebook import store
from potato.codebook.codebook import Codebook
logger = logging.getLogger(__name__)
class CodebookError(Exception):
"""Base for codebook mutation errors."""
class CodeNotFound(CodebookError):
pass
class DuplicateCodeError(CodebookError):
pass
class CodebookCycleError(CodebookError):
pass
# Change listeners: called (task_dir, project) after any successful
# mutation. ICL registers one to invalidate its prompt cache. Kept as a
# registry so codebook has no import dependency on the ICL/AI layer.
_CHANGE_LISTENERS: List[Callable[[str, str], None]] = []
def register_change_listener(fn: Callable[[str, str], None]) -> None:
if fn not in _CHANGE_LISTENERS:
_CHANGE_LISTENERS.append(fn)
def clear_change_listeners() -> None:
"""Tests only — the registry is process-global."""
_CHANGE_LISTENERS.clear()
def _notify(task_dir: str, project: str) -> None:
for fn in list(_CHANGE_LISTENERS):
try:
fn(task_dir, project)
except Exception: # a listener must never break a mutation
logger.exception("codebook change listener failed")
def _require(task_dir: str, code_id: str) -> Dict[str, Any]:
code = store.get_code(task_dir, code_id)
if code is None:
raise CodeNotFound(f"Code {code_id} not found")
return code
def create_code(
task_dir: str,
*,
project: str,
name: str,
created_by: str,
color: Optional[str] = None,
parent_id: str = store.ROOT,
code_id: Optional[str] = None,
) -> Dict[str, Any]:
name = (name or "").strip()
if not name:
raise CodebookError("Code name must not be empty")
if parent_id != store.ROOT and store.get_code(task_dir, parent_id) is None:
raise CodeNotFound(f"Parent code {parent_id} not found")
if store.find_code(task_dir, project, parent_id, name) is not None:
raise DuplicateCodeError(
f"A code named {name!r} already exists at this level")
siblings = store.children_of(task_dir, project, parent_id)
# A new code changes the option set -> bump the project revision and
# stamp the code with the revision it first appeared in.
from potato.codebook import revision
new_rev = revision.bump_revision(task_dir, project)
code = store.insert_code(
task_dir, project=project, name=name, created_by=created_by,
color=color, parent_id=parent_id, sort_order=len(siblings),
code_id=code_id, created_revision=new_rev,
)
_notify(task_dir, project)
return code
def _restamp(task_dir: str, project: str, code_ids: List[str]) -> None:
"""Re-flag exactly the instances whose live links touch `code_ids`
so the (B) review worklist resurfaces them (soft, dismissible)."""
from potato.codebook import revision
affected: List[str] = []
seen = set()
for cid in code_ids:
for aid in store.affected_annotation_ids(task_dir, project, cid):
if aid not in seen:
seen.add(aid)
affected.append(aid)
revision.touch_instances(task_dir, project, affected)
def rename_code(
task_dir: str, code_id: str, *, new_name: str, project: str,
actor: str = "system", actor_kind: str = "human",
) -> Dict[str, Any]:
new_name = (new_name or "").strip()
if not new_name:
raise CodebookError("Code name must not be empty")
code = _require(task_dir, code_id)
old_name = code["name"]
clash = store.find_code(
task_dir, project, code["parent_id"], new_name)
if clash is not None and clash["id"] != code_id:
raise DuplicateCodeError(
f"A code named {new_name!r} already exists at this level")
updated = store.update_code(task_dir, code_id, name=new_name)
# Any codebook change bumps the revision (provenance: an instance
# labeled before this change is flagged stale on revisit).
from potato.codebook import revision
from potato.codebook import changelog
new_rev = revision.bump_revision(task_dir, project)
changelog.log_change(
task_dir, project=project, op="rename", code_id=code_id,
old_value=old_name, new_value=new_name, actor=actor,
actor_kind=actor_kind, revision=new_rev)
_restamp(task_dir, project, [code_id])
_notify(task_dir, project)
return updated
def recolor_code(
task_dir: str, code_id: str, *, color: str, project: str,
actor: str = "system", actor_kind: str = "human",
) -> Dict[str, Any]:
code = _require(task_dir, code_id)
updated = store.update_code(task_dir, code_id, color=color)
from potato.codebook import revision
from potato.codebook import changelog
new_rev = revision.bump_revision(task_dir, project)
changelog.log_change(
task_dir, project=project, op="recolor", code_id=code_id,
old_value=code.get("color"), new_value=color, actor=actor,
actor_kind=actor_kind, revision=new_rev)
_restamp(task_dir, project, [code_id])
_notify(task_dir, project)
return updated
def _subtree_ids(task_dir: str, project: str, root_id: str) -> List[str]:
cb = Codebook.load(task_dir, project)
out: List[str] = []
def walk(cid: str) -> None:
out.append(cid)
for kid in cb.children(cid):
walk(kid["id"])
walk(root_id)
return out
def move_under(
task_dir: str, code_id: str, *, new_parent_id: str, project: str,
actor: str = "system", actor_kind: str = "human",
) -> Dict[str, Any]:
code = _require(task_dir, code_id)
if new_parent_id == code_id:
raise CodebookCycleError("A code cannot be its own parent")
if new_parent_id != store.ROOT:
if store.get_code(task_dir, new_parent_id) is None:
raise CodeNotFound(f"Parent code {new_parent_id} not found")
if new_parent_id in _subtree_ids(task_dir, project, code_id):
raise CodebookCycleError(
"Cannot move a code under one of its own descendants")
clash = store.find_code(
task_dir, project, new_parent_id, code["name"])
if clash is not None and clash["id"] != code_id:
raise DuplicateCodeError(
f"A code named {code['name']!r} already exists at the target")
siblings = store.children_of(task_dir, project, new_parent_id)
old_parent = code["parent_id"]
updated = store.update_code(
task_dir, code_id,
parent_id=new_parent_id, sort_order=len(siblings))
from potato.codebook import revision
from potato.codebook import changelog
new_rev = revision.bump_revision(task_dir, project)
changelog.log_change(
task_dir, project=project, op="move", code_id=code_id,
old_value=old_parent, new_value=new_parent_id, actor=actor,
actor_kind=actor_kind, revision=new_rev)
_restamp(task_dir, project, [code_id])
_notify(task_dir, project)
return updated
def delete_code(
task_dir: str, code_id: str, *, project: str,
actor: str = "system", actor_kind: str = "human",
) -> int:
"""Delete a code and its entire subtree (and annotation links).
Returns the number of code rows removed."""
code = _require(task_dir, code_id)
ids = _subtree_ids(task_dir, project, code_id)
# Capture affected instances BEFORE the (existing) hard delete so
# the worklist can still resurface them.
from potato.codebook import revision
from potato.codebook import changelog
affected: List[str] = []
seen = set()
for cid in ids:
for aid in store.affected_annotation_ids(task_dir, project, cid):
if aid not in seen:
seen.add(aid)
affected.append(aid)
n = store.delete_codes(task_dir, ids)
# Removing a code also changes the option set.
new_rev = revision.bump_revision(task_dir, project)
changelog.log_change(
task_dir, project=project, op="delete", code_id=code_id,
old_value=code["name"], new_value=None, actor=actor,
actor_kind=actor_kind, revision=new_rev)
revision.touch_instances(task_dir, project, affected)
_notify(task_dir, project)
return n
# ---- annotation <-> code links (audited, same notify path) -------------
def apply_code(
task_dir: str,
*,
project: str,
annotation_id: str,
code_id: str,
created_by: str,
started_at: Optional[float] = None,
ended_at: Optional[float] = None,
) -> None:
_require(task_dir, code_id)
store.link_annotation(
task_dir, project=project, annotation_id=annotation_id,
code_id=code_id, created_by=created_by,
started_at=started_at, ended_at=ended_at)
def remove_code(
task_dir: str, *, annotation_id: str, code_id: str
) -> bool:
return store.unlink_annotation(task_dir, annotation_id, code_id)
def codes_on(task_dir: str, annotation_id: str) -> List[Dict[str, Any]]:
return store.codes_for_annotation(task_dir, annotation_id)
# ---- Phase 2 (C): retroactive merge / split (append-only) --------------
def merge_codes(
task_dir: str, *, project: str, src_id: str, dst_id: str,
actor: str = "system", actor_kind: str = "human",
) -> Dict[str, Any]:
"""Fold `src` into `dst`: every live annotation link to src is
re-pointed at dst (idempotent if the annotation already had dst),
src's links are invalidated (not deleted), and src is archived (it
leaves the palette/ICL prompt but its row + history survive).
Affected instances are softly re-flagged for review."""
if src_id == dst_id:
raise CodebookError("Cannot merge a code into itself")
src = _require(task_dir, src_id)
dst = _require(task_dir, dst_id)
from potato.codebook import revision, changelog
affected = store.affected_annotation_ids(task_dir, project, src_id)
new_rev = revision.bump_revision(task_dir, project)
change_id = changelog.log_change(
task_dir, project=project, op="merge", code_id=src_id,
related_code_id=dst_id, old_value=src["name"],
new_value=dst["name"], actor=actor, actor_kind=actor_kind,
revision=new_rev)
for aid in affected:
link = store.get_link(task_dir, aid, src_id) or {}
store.set_link_live(
task_dir, project=project, annotation_id=aid,
code_id=dst_id, created_by=link.get("created_by", actor),
started_at=link.get("started_at"),
ended_at=link.get("ended_at"))
store.invalidate_links(
task_dir, project=project, code_id=src_id, change_id=change_id)
store.archive_code(task_dir, src_id)
revision.touch_instances(task_dir, project, affected)
_notify(task_dir, project)
return {"merged": len(affected), "src_id": src_id,
"dst_id": dst_id, "change_id": change_id}
def split_code(
task_dir: str, *, project: str, src_id: str, annotator: str,
new_name: Optional[str] = None, target_id: Optional[str] = None,
actor: str = "system", actor_kind: str = "human",
) -> Dict[str, Any]:
"""Split `src` BY ANNOTATOR: move just `annotator`'s live links from
src to a target code (existing `target_id`, or a new code named
`new_name`). src stays live for other annotators; it is archived
only if it ends up with no live links and no children."""
src = _require(task_dir, src_id)
if not annotator:
raise CodebookError("An annotator must be given to split by")
from potato.codebook import revision, changelog
if target_id:
target = _require(task_dir, target_id)
elif new_name:
target = create_code(
task_dir, project=project, name=new_name,
created_by=actor, parent_id=src["parent_id"])
else:
raise CodebookError("Provide either target_id or new_name")
affected = store.affected_annotation_ids(
task_dir, project, src_id, created_by=annotator)
new_rev = revision.bump_revision(task_dir, project)
change_id = changelog.log_change(
task_dir, project=project, op="split", code_id=src_id,
related_code_id=target["id"], old_value=src["name"],
new_value=f"{target['name']} [{annotator}]", actor=actor,
actor_kind=actor_kind, revision=new_rev)
for aid in affected:
link = store.get_link(task_dir, aid, src_id) or {}
store.set_link_live(
task_dir, project=project, annotation_id=aid,
code_id=target["id"], created_by=annotator,
started_at=link.get("started_at"),
ended_at=link.get("ended_at"))
store.invalidate_links(
task_dir, project=project, code_id=src_id,
change_id=change_id, created_by=annotator)
# Archive src only if nothing live remains and it has no children.
remaining = store.affected_annotation_ids(task_dir, project, src_id)
children = Codebook.load(task_dir, project).children(src_id)
if not remaining and not children:
store.archive_code(task_dir, src_id)
revision.touch_instances(task_dir, project, affected)
_notify(task_dir, project)
return {"moved": len(affected), "src_id": src_id,
"target_id": target["id"], "change_id": change_id}