corp-env / server /swd.py
Navigam's picture
refactor: update E1 and M1 tasks for agent roles and milestone definitions
fbb8abf
Raw
History Blame Contribute Delete
3.36 kB
"""Shared Workspace Document (SWD) schema, JSON Patch apply, and validation helpers."""
from __future__ import annotations
import copy
import uuid
from typing import Any, Dict, List, Optional, Tuple
import jsonpatch
REQUIRED_TOP_LEVEL = (
"episode_id",
"scenario",
"phase",
"milestones",
"agent_reports",
"decisions",
"conflicts_identified",
"conflict_resolutions",
"reasoning_log",
"final_recommendation",
"swd_version",
)
VALID_PHASES = frozenset({"discovery", "analysis", "decision", "execution"})
MILESTONE_KEYS = frozenset({"id", "label", "due_by_turn", "status", "owner", "output"})
def new_episode_id() -> str:
return str(uuid.uuid4())
def deep_clone_swd(swd: Dict[str, Any]) -> Dict[str, Any]:
return copy.deepcopy(swd)
def ensure_agent_report_keys(swd: Dict[str, Any]) -> None:
ar = swd.setdefault("agent_reports", {})
for k in ("qa", "dev", "hr", "finance"):
ar.setdefault(k, None)
def validate_milestone_shapes(swd: Dict[str, Any]) -> bool:
for m in swd.get("milestones", []) or []:
if not isinstance(m, dict) or not MILESTONE_KEYS.issubset(m.keys()):
return False
return True
def validate_swd_structure(swd: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
for k in REQUIRED_TOP_LEVEL:
if k not in swd:
return False, f"missing key: {k}"
if swd.get("phase") not in VALID_PHASES:
return False, "invalid phase"
if not isinstance(swd.get("swd_version"), int):
return False, "swd_version must be int"
if not validate_milestone_shapes(swd):
return False, "invalid milestone shape"
ensure_agent_report_keys(swd)
return True, None
def apply_json_patch(swd: Dict[str, Any], patch_ops: List[Any]) -> None:
"""Apply RFC 6902 JSON Patch in-place; increments swd_version on success."""
patch = jsonpatch.JsonPatch(patch_ops)
patch.apply(swd, in_place=True)
swd["swd_version"] = int(swd.get("swd_version", 0)) + 1
def try_apply_json_patch(
swd: Dict[str, Any], patch_ops: List[Any]
) -> Tuple[Dict[str, Any], Optional[str]]:
"""
Apply patch to a deep copy; on success returns mutated document and None error.
On failure returns original snapshot and error message (caller restores if needed).
"""
snapshot = deep_clone_swd(swd)
try:
apply_json_patch(snapshot, patch_ops)
ok, err = validate_swd_structure(snapshot)
if not ok:
return swd, err or "structure invalid after patch"
if not resolutions_reference_valid_conflicts(snapshot):
return swd, "conflict_resolutions reference unknown conflict id"
return snapshot, None
except jsonpatch.JsonPatchException as e:
return swd, str(e)
def conflict_ids_from_swd(swd: Dict[str, Any]) -> set:
out = set()
for c in swd.get("conflicts_identified", []) or []:
if isinstance(c, dict) and c.get("id") is not None:
out.add(c["id"])
return out
def resolutions_reference_valid_conflicts(swd: Dict[str, Any]) -> bool:
ids = conflict_ids_from_swd(swd)
for r in swd.get("conflict_resolutions", []) or []:
if not isinstance(r, dict):
return False
cid = r.get("conflict_id")
if cid is not None and cid not in ids:
return False
return True