Spaces:
Paused
Paused
| """ | |
| Codebook service — the single, audited mutation path. | |
| All codebook writes (human *or* LLM, in standard / solo / QDA mode) go | |
| through here so they share one audit trail (`created_by`), one set of | |
| invariants (no duplicate siblings, no cycles, recursive delete), and one | |
| change-notification hook (used by ICL to invalidate its prompt cache — | |
| registered via `register_change_listener` to avoid a hard import edge). | |
| Phase 1 ops: create / rename / recolor / move_under / delete. | |
| merge / split are Phase 2. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Callable, Dict, List, Optional | |
| from potato.codebook import store | |
| from potato.codebook.codebook import Codebook | |
| logger = logging.getLogger(__name__) | |
| class CodebookError(Exception): | |
| """Base for codebook mutation errors.""" | |
| class CodeNotFound(CodebookError): | |
| pass | |
| class DuplicateCodeError(CodebookError): | |
| pass | |
| class CodebookCycleError(CodebookError): | |
| pass | |
| # Change listeners: called (task_dir, project) after any successful | |
| # mutation. ICL registers one to invalidate its prompt cache. Kept as a | |
| # registry so codebook has no import dependency on the ICL/AI layer. | |
| _CHANGE_LISTENERS: List[Callable[[str, str], None]] = [] | |
| def register_change_listener(fn: Callable[[str, str], None]) -> None: | |
| if fn not in _CHANGE_LISTENERS: | |
| _CHANGE_LISTENERS.append(fn) | |
| def clear_change_listeners() -> None: | |
| """Tests only — the registry is process-global.""" | |
| _CHANGE_LISTENERS.clear() | |
| def _notify(task_dir: str, project: str) -> None: | |
| for fn in list(_CHANGE_LISTENERS): | |
| try: | |
| fn(task_dir, project) | |
| except Exception: # a listener must never break a mutation | |
| logger.exception("codebook change listener failed") | |
| def _require(task_dir: str, code_id: str) -> Dict[str, Any]: | |
| code = store.get_code(task_dir, code_id) | |
| if code is None: | |
| raise CodeNotFound(f"Code {code_id} not found") | |
| return code | |
| def create_code( | |
| task_dir: str, | |
| *, | |
| project: str, | |
| name: str, | |
| created_by: str, | |
| color: Optional[str] = None, | |
| parent_id: str = store.ROOT, | |
| code_id: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| name = (name or "").strip() | |
| if not name: | |
| raise CodebookError("Code name must not be empty") | |
| if parent_id != store.ROOT and store.get_code(task_dir, parent_id) is None: | |
| raise CodeNotFound(f"Parent code {parent_id} not found") | |
| if store.find_code(task_dir, project, parent_id, name) is not None: | |
| raise DuplicateCodeError( | |
| f"A code named {name!r} already exists at this level") | |
| siblings = store.children_of(task_dir, project, parent_id) | |
| # A new code changes the option set -> bump the project revision and | |
| # stamp the code with the revision it first appeared in. | |
| from potato.codebook import revision | |
| new_rev = revision.bump_revision(task_dir, project) | |
| code = store.insert_code( | |
| task_dir, project=project, name=name, created_by=created_by, | |
| color=color, parent_id=parent_id, sort_order=len(siblings), | |
| code_id=code_id, created_revision=new_rev, | |
| ) | |
| _notify(task_dir, project) | |
| return code | |
| def _restamp(task_dir: str, project: str, code_ids: List[str]) -> None: | |
| """Re-flag exactly the instances whose live links touch `code_ids` | |
| so the (B) review worklist resurfaces them (soft, dismissible).""" | |
| from potato.codebook import revision | |
| affected: List[str] = [] | |
| seen = set() | |
| for cid in code_ids: | |
| for aid in store.affected_annotation_ids(task_dir, project, cid): | |
| if aid not in seen: | |
| seen.add(aid) | |
| affected.append(aid) | |
| revision.touch_instances(task_dir, project, affected) | |
| def rename_code( | |
| task_dir: str, code_id: str, *, new_name: str, project: str, | |
| actor: str = "system", actor_kind: str = "human", | |
| ) -> Dict[str, Any]: | |
| new_name = (new_name or "").strip() | |
| if not new_name: | |
| raise CodebookError("Code name must not be empty") | |
| code = _require(task_dir, code_id) | |
| old_name = code["name"] | |
| clash = store.find_code( | |
| task_dir, project, code["parent_id"], new_name) | |
| if clash is not None and clash["id"] != code_id: | |
| raise DuplicateCodeError( | |
| f"A code named {new_name!r} already exists at this level") | |
| updated = store.update_code(task_dir, code_id, name=new_name) | |
| # Any codebook change bumps the revision (provenance: an instance | |
| # labeled before this change is flagged stale on revisit). | |
| from potato.codebook import revision | |
| from potato.codebook import changelog | |
| new_rev = revision.bump_revision(task_dir, project) | |
| changelog.log_change( | |
| task_dir, project=project, op="rename", code_id=code_id, | |
| old_value=old_name, new_value=new_name, actor=actor, | |
| actor_kind=actor_kind, revision=new_rev) | |
| _restamp(task_dir, project, [code_id]) | |
| _notify(task_dir, project) | |
| return updated | |
| def recolor_code( | |
| task_dir: str, code_id: str, *, color: str, project: str, | |
| actor: str = "system", actor_kind: str = "human", | |
| ) -> Dict[str, Any]: | |
| code = _require(task_dir, code_id) | |
| updated = store.update_code(task_dir, code_id, color=color) | |
| from potato.codebook import revision | |
| from potato.codebook import changelog | |
| new_rev = revision.bump_revision(task_dir, project) | |
| changelog.log_change( | |
| task_dir, project=project, op="recolor", code_id=code_id, | |
| old_value=code.get("color"), new_value=color, actor=actor, | |
| actor_kind=actor_kind, revision=new_rev) | |
| _restamp(task_dir, project, [code_id]) | |
| _notify(task_dir, project) | |
| return updated | |
| def _subtree_ids(task_dir: str, project: str, root_id: str) -> List[str]: | |
| cb = Codebook.load(task_dir, project) | |
| out: List[str] = [] | |
| def walk(cid: str) -> None: | |
| out.append(cid) | |
| for kid in cb.children(cid): | |
| walk(kid["id"]) | |
| walk(root_id) | |
| return out | |
| def move_under( | |
| task_dir: str, code_id: str, *, new_parent_id: str, project: str, | |
| actor: str = "system", actor_kind: str = "human", | |
| ) -> Dict[str, Any]: | |
| code = _require(task_dir, code_id) | |
| if new_parent_id == code_id: | |
| raise CodebookCycleError("A code cannot be its own parent") | |
| if new_parent_id != store.ROOT: | |
| if store.get_code(task_dir, new_parent_id) is None: | |
| raise CodeNotFound(f"Parent code {new_parent_id} not found") | |
| if new_parent_id in _subtree_ids(task_dir, project, code_id): | |
| raise CodebookCycleError( | |
| "Cannot move a code under one of its own descendants") | |
| clash = store.find_code( | |
| task_dir, project, new_parent_id, code["name"]) | |
| if clash is not None and clash["id"] != code_id: | |
| raise DuplicateCodeError( | |
| f"A code named {code['name']!r} already exists at the target") | |
| siblings = store.children_of(task_dir, project, new_parent_id) | |
| old_parent = code["parent_id"] | |
| updated = store.update_code( | |
| task_dir, code_id, | |
| parent_id=new_parent_id, sort_order=len(siblings)) | |
| from potato.codebook import revision | |
| from potato.codebook import changelog | |
| new_rev = revision.bump_revision(task_dir, project) | |
| changelog.log_change( | |
| task_dir, project=project, op="move", code_id=code_id, | |
| old_value=old_parent, new_value=new_parent_id, actor=actor, | |
| actor_kind=actor_kind, revision=new_rev) | |
| _restamp(task_dir, project, [code_id]) | |
| _notify(task_dir, project) | |
| return updated | |
| def delete_code( | |
| task_dir: str, code_id: str, *, project: str, | |
| actor: str = "system", actor_kind: str = "human", | |
| ) -> int: | |
| """Delete a code and its entire subtree (and annotation links). | |
| Returns the number of code rows removed.""" | |
| code = _require(task_dir, code_id) | |
| ids = _subtree_ids(task_dir, project, code_id) | |
| # Capture affected instances BEFORE the (existing) hard delete so | |
| # the worklist can still resurface them. | |
| from potato.codebook import revision | |
| from potato.codebook import changelog | |
| affected: List[str] = [] | |
| seen = set() | |
| for cid in ids: | |
| for aid in store.affected_annotation_ids(task_dir, project, cid): | |
| if aid not in seen: | |
| seen.add(aid) | |
| affected.append(aid) | |
| n = store.delete_codes(task_dir, ids) | |
| # Removing a code also changes the option set. | |
| new_rev = revision.bump_revision(task_dir, project) | |
| changelog.log_change( | |
| task_dir, project=project, op="delete", code_id=code_id, | |
| old_value=code["name"], new_value=None, actor=actor, | |
| actor_kind=actor_kind, revision=new_rev) | |
| revision.touch_instances(task_dir, project, affected) | |
| _notify(task_dir, project) | |
| return n | |
| # ---- annotation <-> code links (audited, same notify path) ------------- | |
| def apply_code( | |
| task_dir: str, | |
| *, | |
| project: str, | |
| annotation_id: str, | |
| code_id: str, | |
| created_by: str, | |
| started_at: Optional[float] = None, | |
| ended_at: Optional[float] = None, | |
| ) -> None: | |
| _require(task_dir, code_id) | |
| store.link_annotation( | |
| task_dir, project=project, annotation_id=annotation_id, | |
| code_id=code_id, created_by=created_by, | |
| started_at=started_at, ended_at=ended_at) | |
| def remove_code( | |
| task_dir: str, *, annotation_id: str, code_id: str | |
| ) -> bool: | |
| return store.unlink_annotation(task_dir, annotation_id, code_id) | |
| def codes_on(task_dir: str, annotation_id: str) -> List[Dict[str, Any]]: | |
| return store.codes_for_annotation(task_dir, annotation_id) | |
| # ---- Phase 2 (C): retroactive merge / split (append-only) -------------- | |
| def merge_codes( | |
| task_dir: str, *, project: str, src_id: str, dst_id: str, | |
| actor: str = "system", actor_kind: str = "human", | |
| ) -> Dict[str, Any]: | |
| """Fold `src` into `dst`: every live annotation link to src is | |
| re-pointed at dst (idempotent if the annotation already had dst), | |
| src's links are invalidated (not deleted), and src is archived (it | |
| leaves the palette/ICL prompt but its row + history survive). | |
| Affected instances are softly re-flagged for review.""" | |
| if src_id == dst_id: | |
| raise CodebookError("Cannot merge a code into itself") | |
| src = _require(task_dir, src_id) | |
| dst = _require(task_dir, dst_id) | |
| from potato.codebook import revision, changelog | |
| affected = store.affected_annotation_ids(task_dir, project, src_id) | |
| new_rev = revision.bump_revision(task_dir, project) | |
| change_id = changelog.log_change( | |
| task_dir, project=project, op="merge", code_id=src_id, | |
| related_code_id=dst_id, old_value=src["name"], | |
| new_value=dst["name"], actor=actor, actor_kind=actor_kind, | |
| revision=new_rev) | |
| for aid in affected: | |
| link = store.get_link(task_dir, aid, src_id) or {} | |
| store.set_link_live( | |
| task_dir, project=project, annotation_id=aid, | |
| code_id=dst_id, created_by=link.get("created_by", actor), | |
| started_at=link.get("started_at"), | |
| ended_at=link.get("ended_at")) | |
| store.invalidate_links( | |
| task_dir, project=project, code_id=src_id, change_id=change_id) | |
| store.archive_code(task_dir, src_id) | |
| revision.touch_instances(task_dir, project, affected) | |
| _notify(task_dir, project) | |
| return {"merged": len(affected), "src_id": src_id, | |
| "dst_id": dst_id, "change_id": change_id} | |
| def split_code( | |
| task_dir: str, *, project: str, src_id: str, annotator: str, | |
| new_name: Optional[str] = None, target_id: Optional[str] = None, | |
| actor: str = "system", actor_kind: str = "human", | |
| ) -> Dict[str, Any]: | |
| """Split `src` BY ANNOTATOR: move just `annotator`'s live links from | |
| src to a target code (existing `target_id`, or a new code named | |
| `new_name`). src stays live for other annotators; it is archived | |
| only if it ends up with no live links and no children.""" | |
| src = _require(task_dir, src_id) | |
| if not annotator: | |
| raise CodebookError("An annotator must be given to split by") | |
| from potato.codebook import revision, changelog | |
| if target_id: | |
| target = _require(task_dir, target_id) | |
| elif new_name: | |
| target = create_code( | |
| task_dir, project=project, name=new_name, | |
| created_by=actor, parent_id=src["parent_id"]) | |
| else: | |
| raise CodebookError("Provide either target_id or new_name") | |
| affected = store.affected_annotation_ids( | |
| task_dir, project, src_id, created_by=annotator) | |
| new_rev = revision.bump_revision(task_dir, project) | |
| change_id = changelog.log_change( | |
| task_dir, project=project, op="split", code_id=src_id, | |
| related_code_id=target["id"], old_value=src["name"], | |
| new_value=f"{target['name']} [{annotator}]", actor=actor, | |
| actor_kind=actor_kind, revision=new_rev) | |
| for aid in affected: | |
| link = store.get_link(task_dir, aid, src_id) or {} | |
| store.set_link_live( | |
| task_dir, project=project, annotation_id=aid, | |
| code_id=target["id"], created_by=annotator, | |
| started_at=link.get("started_at"), | |
| ended_at=link.get("ended_at")) | |
| store.invalidate_links( | |
| task_dir, project=project, code_id=src_id, | |
| change_id=change_id, created_by=annotator) | |
| # Archive src only if nothing live remains and it has no children. | |
| remaining = store.affected_annotation_ids(task_dir, project, src_id) | |
| children = Codebook.load(task_dir, project).children(src_id) | |
| if not remaining and not children: | |
| store.archive_code(task_dir, src_id) | |
| revision.touch_instances(task_dir, project, affected) | |
| _notify(task_dir, project) | |
| return {"moved": len(affected), "src_id": src_id, | |
| "target_id": target["id"], "change_id": change_id} | |