import os import random import uuid from typing import Any, Dict, List, Optional, Tuple from .bundle import write_bundle_zip def _env_fingerprint() -> Dict[str, Any]: # Keep lightweight; users can expand this in real exporters return { "python": os.environ.get("PYTHON_VERSION") or "unknown", "space": os.environ.get("SPACE_ID") or os.environ.get("HF_SPACE_ID") or "unknown", } def _mk_event(kind: str, step: str, payload: Dict[str, Any]) -> Dict[str, Any]: return {"kind": kind, "step": step, "payload": payload} def make_demo_bundle_zip(out_path: str, *, seed: int, chaos: float, label: str) -> str: """ Creates a synthetic agent timeline with controlled randomness. 'chaos' increases divergence probability. """ rng = random.Random(seed) run_id = f"demo-{label}-{uuid.uuid4().hex[:8]}" framework = "demo-agent" model_id = "demo-llm" events: List[Dict[str, Any]] = [] memory: Dict[str, Any] = {"goal": "reach_target", "notes": []} for i in range(40): # planning action = rng.choice(["scan", "move", "ask_tool", "write_memory"]) if rng.random() < chaos: action = rng.choice(["scan", "move", "ask_tool", "write_memory", "panic"]) events.append(_mk_event("plan_step", f"t{i}.plan", {"action": action, "score": rng.random()})) if action == "ask_tool": q = rng.choice(["price", "status", "latency", "risk"]) events.append(_mk_event("tool_call", f"t{i}.tool_call", {"tool": "mock_api", "query": q})) # tool sometimes flakes if rng.random() < (0.15 + chaos * 0.2): events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": False, "error": "timeout"})) else: val = rng.randint(1, 100) events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": True, "value": val})) elif action == "write_memory": note = rng.choice(["cached", "retry", "validated", "unsafe", "needs_review"]) memory["notes"].append(note) events.append(_mk_event("memory_write", f"t{i}.mem", {"write": {"notes": list(memory["notes"])}})) elif action == "panic": events.append(_mk_event("guardrail", f"t{i}.guardrail", {"blocked": True, "reason": "anomaly"})) events.append(_mk_event("state_snapshot", f"t{i}.state", {"memory": memory, "mode": "halt"})) break else: # move / scan influences a synthetic "state" events.append(_mk_event("state_snapshot", f"t{i}.state", {"x": rng.randint(0, 9), "y": rng.randint(0, 9), "memory": memory})) # llm sample (synthetic text) txt = rng.choice( [ "Proceed with caution.", "Tool looks stable.", "Memory updated.", "Need more evidence.", "I will retry once.", ] ) if rng.random() < chaos: txt = rng.choice( [ "Unexpected output detected.", "I am uncertain; escalating.", "This seems inconsistent.", "Plan changed due to drift.", ] ) events.append(_mk_event("llm_sample", f"t{i}.llm", {"text": txt, "tokens": rng.randint(20, 180)})) return write_bundle_zip( out_path, run_id=run_id, framework=framework, model_id=model_id, env_fingerprint=_env_fingerprint(), events_payloads=events, ) def fork_patch_bundle( out_path: str, *, source_zip: str, fork_at_index: int, patch_kind: Optional[str] = None, patch_step: Optional[str] = None, patch_payload_json: Optional[Dict[str, Any]] = None, ) -> str: """ Simple “what-if” fork: take an existing bundle and patch a single event (kind/step/payload) then re-hash-chain and re-emit as a new run. """ from .bundle import load_bundle, write_bundle_zip b = load_bundle(source_zip) src_events = b.events payloads: List[Dict[str, Any]] = [] for ev in src_events: payloads.append( { "ts": ev.get("ts"), "kind": ev.get("kind"), "step": ev.get("step"), "payload": ev.get("payload", {}), } ) if 0 <= fork_at_index < len(payloads): if patch_kind: payloads[fork_at_index]["kind"] = patch_kind if patch_step: payloads[fork_at_index]["step"] = patch_step if patch_payload_json is not None: payloads[fork_at_index]["payload"] = patch_payload_json new_run = f"{b.manifest.get('run_id','run')}-fork" return write_bundle_zip( out_path, run_id=new_run, framework=b.manifest.get("framework", "unknown"), model_id=b.manifest.get("model_id", "unknown"), env_fingerprint=b.manifest.get("env", {}), events_payloads=payloads, )