| """ |
| Codebook service — the single, audited mutation path. |
| |
| All codebook writes (human *or* LLM, in standard / solo / QDA mode) go |
| through here so they share one audit trail (`created_by`), one set of |
| invariants (no duplicate siblings, no cycles, recursive delete), and one |
| change-notification hook (used by ICL to invalidate its prompt cache — |
| registered via `register_change_listener` to avoid a hard import edge). |
| |
| Phase 1 ops: create / rename / recolor / move_under / delete. |
| merge / split are Phase 2. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from typing import Any, Callable, Dict, List, Optional |
|
|
| from potato.codebook import store |
| from potato.codebook.codebook import Codebook |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class CodebookError(Exception): |
| """Base for codebook mutation errors.""" |
|
|
|
|
| class CodeNotFound(CodebookError): |
| pass |
|
|
|
|
| class DuplicateCodeError(CodebookError): |
| pass |
|
|
|
|
| class CodebookCycleError(CodebookError): |
| pass |
|
|
|
|
| |
| |
| |
| _CHANGE_LISTENERS: List[Callable[[str, str], None]] = [] |
|
|
|
|
| def register_change_listener(fn: Callable[[str, str], None]) -> None: |
| if fn not in _CHANGE_LISTENERS: |
| _CHANGE_LISTENERS.append(fn) |
|
|
|
|
| def clear_change_listeners() -> None: |
| """Tests only — the registry is process-global.""" |
| _CHANGE_LISTENERS.clear() |
|
|
|
|
| def _notify(task_dir: str, project: str) -> None: |
| for fn in list(_CHANGE_LISTENERS): |
| try: |
| fn(task_dir, project) |
| except Exception: |
| logger.exception("codebook change listener failed") |
|
|
|
|
| def _require(task_dir: str, code_id: str) -> Dict[str, Any]: |
| code = store.get_code(task_dir, code_id) |
| if code is None: |
| raise CodeNotFound(f"Code {code_id} not found") |
| return code |
|
|
|
|
| def create_code( |
| task_dir: str, |
| *, |
| project: str, |
| name: str, |
| created_by: str, |
| color: Optional[str] = None, |
| parent_id: str = store.ROOT, |
| code_id: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| name = (name or "").strip() |
| if not name: |
| raise CodebookError("Code name must not be empty") |
| if parent_id != store.ROOT and store.get_code(task_dir, parent_id) is None: |
| raise CodeNotFound(f"Parent code {parent_id} not found") |
| if store.find_code(task_dir, project, parent_id, name) is not None: |
| raise DuplicateCodeError( |
| f"A code named {name!r} already exists at this level") |
| siblings = store.children_of(task_dir, project, parent_id) |
| |
| |
| from potato.codebook import revision |
| new_rev = revision.bump_revision(task_dir, project) |
| code = store.insert_code( |
| task_dir, project=project, name=name, created_by=created_by, |
| color=color, parent_id=parent_id, sort_order=len(siblings), |
| code_id=code_id, created_revision=new_rev, |
| ) |
| _notify(task_dir, project) |
| return code |
|
|
|
|
| def _restamp(task_dir: str, project: str, code_ids: List[str]) -> None: |
| """Re-flag exactly the instances whose live links touch `code_ids` |
| so the (B) review worklist resurfaces them (soft, dismissible).""" |
| from potato.codebook import revision |
| affected: List[str] = [] |
| seen = set() |
| for cid in code_ids: |
| for aid in store.affected_annotation_ids(task_dir, project, cid): |
| if aid not in seen: |
| seen.add(aid) |
| affected.append(aid) |
| revision.touch_instances(task_dir, project, affected) |
|
|
|
|
| def rename_code( |
| task_dir: str, code_id: str, *, new_name: str, project: str, |
| actor: str = "system", actor_kind: str = "human", |
| ) -> Dict[str, Any]: |
| new_name = (new_name or "").strip() |
| if not new_name: |
| raise CodebookError("Code name must not be empty") |
| code = _require(task_dir, code_id) |
| old_name = code["name"] |
| clash = store.find_code( |
| task_dir, project, code["parent_id"], new_name) |
| if clash is not None and clash["id"] != code_id: |
| raise DuplicateCodeError( |
| f"A code named {new_name!r} already exists at this level") |
| updated = store.update_code(task_dir, code_id, name=new_name) |
| |
| |
| from potato.codebook import revision |
| from potato.codebook import changelog |
| new_rev = revision.bump_revision(task_dir, project) |
| changelog.log_change( |
| task_dir, project=project, op="rename", code_id=code_id, |
| old_value=old_name, new_value=new_name, actor=actor, |
| actor_kind=actor_kind, revision=new_rev) |
| _restamp(task_dir, project, [code_id]) |
| _notify(task_dir, project) |
| return updated |
|
|
|
|
| def recolor_code( |
| task_dir: str, code_id: str, *, color: str, project: str, |
| actor: str = "system", actor_kind: str = "human", |
| ) -> Dict[str, Any]: |
| code = _require(task_dir, code_id) |
| updated = store.update_code(task_dir, code_id, color=color) |
| from potato.codebook import revision |
| from potato.codebook import changelog |
| new_rev = revision.bump_revision(task_dir, project) |
| changelog.log_change( |
| task_dir, project=project, op="recolor", code_id=code_id, |
| old_value=code.get("color"), new_value=color, actor=actor, |
| actor_kind=actor_kind, revision=new_rev) |
| _restamp(task_dir, project, [code_id]) |
| _notify(task_dir, project) |
| return updated |
|
|
|
|
| def _subtree_ids(task_dir: str, project: str, root_id: str) -> List[str]: |
| cb = Codebook.load(task_dir, project) |
| out: List[str] = [] |
|
|
| def walk(cid: str) -> None: |
| out.append(cid) |
| for kid in cb.children(cid): |
| walk(kid["id"]) |
|
|
| walk(root_id) |
| return out |
|
|
|
|
| def move_under( |
| task_dir: str, code_id: str, *, new_parent_id: str, project: str, |
| actor: str = "system", actor_kind: str = "human", |
| ) -> Dict[str, Any]: |
| code = _require(task_dir, code_id) |
| if new_parent_id == code_id: |
| raise CodebookCycleError("A code cannot be its own parent") |
| if new_parent_id != store.ROOT: |
| if store.get_code(task_dir, new_parent_id) is None: |
| raise CodeNotFound(f"Parent code {new_parent_id} not found") |
| if new_parent_id in _subtree_ids(task_dir, project, code_id): |
| raise CodebookCycleError( |
| "Cannot move a code under one of its own descendants") |
| clash = store.find_code( |
| task_dir, project, new_parent_id, code["name"]) |
| if clash is not None and clash["id"] != code_id: |
| raise DuplicateCodeError( |
| f"A code named {code['name']!r} already exists at the target") |
| siblings = store.children_of(task_dir, project, new_parent_id) |
| old_parent = code["parent_id"] |
| updated = store.update_code( |
| task_dir, code_id, |
| parent_id=new_parent_id, sort_order=len(siblings)) |
| from potato.codebook import revision |
| from potato.codebook import changelog |
| new_rev = revision.bump_revision(task_dir, project) |
| changelog.log_change( |
| task_dir, project=project, op="move", code_id=code_id, |
| old_value=old_parent, new_value=new_parent_id, actor=actor, |
| actor_kind=actor_kind, revision=new_rev) |
| _restamp(task_dir, project, [code_id]) |
| _notify(task_dir, project) |
| return updated |
|
|
|
|
| def delete_code( |
| task_dir: str, code_id: str, *, project: str, |
| actor: str = "system", actor_kind: str = "human", |
| ) -> int: |
| """Delete a code and its entire subtree (and annotation links). |
| Returns the number of code rows removed.""" |
| code = _require(task_dir, code_id) |
| ids = _subtree_ids(task_dir, project, code_id) |
| |
| |
| from potato.codebook import revision |
| from potato.codebook import changelog |
| affected: List[str] = [] |
| seen = set() |
| for cid in ids: |
| for aid in store.affected_annotation_ids(task_dir, project, cid): |
| if aid not in seen: |
| seen.add(aid) |
| affected.append(aid) |
| n = store.delete_codes(task_dir, ids) |
| |
| new_rev = revision.bump_revision(task_dir, project) |
| changelog.log_change( |
| task_dir, project=project, op="delete", code_id=code_id, |
| old_value=code["name"], new_value=None, actor=actor, |
| actor_kind=actor_kind, revision=new_rev) |
| revision.touch_instances(task_dir, project, affected) |
| _notify(task_dir, project) |
| return n |
|
|
|
|
| |
|
|
| def apply_code( |
| task_dir: str, |
| *, |
| project: str, |
| annotation_id: str, |
| code_id: str, |
| created_by: str, |
| started_at: Optional[float] = None, |
| ended_at: Optional[float] = None, |
| ) -> None: |
| _require(task_dir, code_id) |
| store.link_annotation( |
| task_dir, project=project, annotation_id=annotation_id, |
| code_id=code_id, created_by=created_by, |
| started_at=started_at, ended_at=ended_at) |
|
|
|
|
| def remove_code( |
| task_dir: str, *, annotation_id: str, code_id: str |
| ) -> bool: |
| return store.unlink_annotation(task_dir, annotation_id, code_id) |
|
|
|
|
| def codes_on(task_dir: str, annotation_id: str) -> List[Dict[str, Any]]: |
| return store.codes_for_annotation(task_dir, annotation_id) |
|
|
|
|
| |
|
|
| def merge_codes( |
| task_dir: str, *, project: str, src_id: str, dst_id: str, |
| actor: str = "system", actor_kind: str = "human", |
| ) -> Dict[str, Any]: |
| """Fold `src` into `dst`: every live annotation link to src is |
| re-pointed at dst (idempotent if the annotation already had dst), |
| src's links are invalidated (not deleted), and src is archived (it |
| leaves the palette/ICL prompt but its row + history survive). |
| Affected instances are softly re-flagged for review.""" |
| if src_id == dst_id: |
| raise CodebookError("Cannot merge a code into itself") |
| src = _require(task_dir, src_id) |
| dst = _require(task_dir, dst_id) |
| from potato.codebook import revision, changelog |
|
|
| affected = store.affected_annotation_ids(task_dir, project, src_id) |
| new_rev = revision.bump_revision(task_dir, project) |
| change_id = changelog.log_change( |
| task_dir, project=project, op="merge", code_id=src_id, |
| related_code_id=dst_id, old_value=src["name"], |
| new_value=dst["name"], actor=actor, actor_kind=actor_kind, |
| revision=new_rev) |
| for aid in affected: |
| link = store.get_link(task_dir, aid, src_id) or {} |
| store.set_link_live( |
| task_dir, project=project, annotation_id=aid, |
| code_id=dst_id, created_by=link.get("created_by", actor), |
| started_at=link.get("started_at"), |
| ended_at=link.get("ended_at")) |
| store.invalidate_links( |
| task_dir, project=project, code_id=src_id, change_id=change_id) |
| store.archive_code(task_dir, src_id) |
| revision.touch_instances(task_dir, project, affected) |
| _notify(task_dir, project) |
| return {"merged": len(affected), "src_id": src_id, |
| "dst_id": dst_id, "change_id": change_id} |
|
|
|
|
| def split_code( |
| task_dir: str, *, project: str, src_id: str, annotator: str, |
| new_name: Optional[str] = None, target_id: Optional[str] = None, |
| actor: str = "system", actor_kind: str = "human", |
| ) -> Dict[str, Any]: |
| """Split `src` BY ANNOTATOR: move just `annotator`'s live links from |
| src to a target code (existing `target_id`, or a new code named |
| `new_name`). src stays live for other annotators; it is archived |
| only if it ends up with no live links and no children.""" |
| src = _require(task_dir, src_id) |
| if not annotator: |
| raise CodebookError("An annotator must be given to split by") |
| from potato.codebook import revision, changelog |
|
|
| if target_id: |
| target = _require(task_dir, target_id) |
| elif new_name: |
| target = create_code( |
| task_dir, project=project, name=new_name, |
| created_by=actor, parent_id=src["parent_id"]) |
| else: |
| raise CodebookError("Provide either target_id or new_name") |
|
|
| affected = store.affected_annotation_ids( |
| task_dir, project, src_id, created_by=annotator) |
| new_rev = revision.bump_revision(task_dir, project) |
| change_id = changelog.log_change( |
| task_dir, project=project, op="split", code_id=src_id, |
| related_code_id=target["id"], old_value=src["name"], |
| new_value=f"{target['name']} [{annotator}]", actor=actor, |
| actor_kind=actor_kind, revision=new_rev) |
| for aid in affected: |
| link = store.get_link(task_dir, aid, src_id) or {} |
| store.set_link_live( |
| task_dir, project=project, annotation_id=aid, |
| code_id=target["id"], created_by=annotator, |
| started_at=link.get("started_at"), |
| ended_at=link.get("ended_at")) |
| store.invalidate_links( |
| task_dir, project=project, code_id=src_id, |
| change_id=change_id, created_by=annotator) |
| |
| remaining = store.affected_annotation_ids(task_dir, project, src_id) |
| children = Codebook.load(task_dir, project).children(src_id) |
| if not remaining and not children: |
| store.archive_code(task_dir, src_id) |
| revision.touch_instances(task_dir, project, affected) |
| _notify(task_dir, project) |
| return {"moved": len(affected), "src_id": src_id, |
| "target_id": target["id"], "change_id": change_id} |
|
|