import os import random import uuid from typing import Any, Dict, List, Optional from .bundle import write_bundle_zip def _env_fingerprint() -> Dict[str, Any]: return { "python": os.environ.get("PYTHON_VERSION") or "unknown", "space": os.environ.get("SPACE_ID") or os.environ.get("HF_SPACE_ID") or "unknown", } def _mk_event(kind: str, step: str, payload: Dict[str, Any]) -> Dict[str, Any]: return {"kind": kind, "step": step, "payload": payload} def make_demo_bundle_zip(out_path: str, *, seed: int, chaos: float, label: str) -> str: """ Creates a synthetic agent timeline with controlled randomness. 'chaos' increases divergence probability. Also includes reward_total in state snapshots to demonstrate reward delta. """ rng = random.Random(seed) run_id = f"demo-{label}-{uuid.uuid4().hex[:8]}" framework = "demo-agent" model_id = "demo-llm" events: List[Dict[str, Any]] = [] memory: Dict[str, Any] = {"goal": "reach_target", "notes": []} reward_total = 0.0 # Optional replay link template (demo) replay = { "base_url": "https://example.com/replay", "pattern": "/?run_id={run_id}&i={i}", } for i in range(40): action = rng.choice(["scan", "move", "ask_tool", "write_memory"]) if rng.random() < chaos: action = rng.choice(["scan", "move", "ask_tool", "write_memory", "panic"]) score = rng.random() events.append(_mk_event("plan_step", f"t{i}.plan", {"action": action, "score": score})) if action == "ask_tool": q = rng.choice(["price", "status", "latency", "risk"]) events.append(_mk_event("tool_call", f"t{i}.tool_call", {"tool": "mock_api", "query": q})) if rng.random() < (0.15 + chaos * 0.2): events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": False, "error": "timeout"})) reward_total -= 0.5 else: val = rng.randint(1, 100) events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": True, "value": val})) reward_total += 0.2 elif action == "write_memory": note = rng.choice(["cached", "retry", "validated", "unsafe", "needs_review"]) memory["notes"].append(note) events.append(_mk_event("memory_write", f"t{i}.mem", {"write": {"notes": list(memory["notes"])}})) reward_total += 0.05 elif action == "panic": events.append(_mk_event("guardrail", f"t{i}.guardrail", {"blocked": True, "reason": "anomaly"})) events.append( _mk_event("state_snapshot", f"t{i}.state", {"memory": memory, "mode": "halt", "reward_total": reward_total}) ) break else: # scan/move x = rng.randint(0, 9) y = rng.randint(0, 9) reward_total += 0.01 events.append(_mk_event("state_snapshot", f"t{i}.state", {"x": x, "y": y, "memory": memory, "reward_total": reward_total})) txt = rng.choice( [ "Proceed with caution.", "Tool looks stable.", "Memory updated.", "Need more evidence.", "I will retry once.", ] ) if rng.random() < chaos: txt = rng.choice( [ "Unexpected output detected.", "I am uncertain; escalating.", "This seems inconsistent.", "Plan changed due to drift.", ] ) events.append(_mk_event("llm_sample", f"t{i}.llm", {"text": txt, "tokens": rng.randint(20, 180)})) return write_bundle_zip( out_path, run_id=run_id, framework=framework, model_id=model_id, env_fingerprint=_env_fingerprint(), events_payloads=events, replay=replay, ) def fork_patch_bundle( out_path: str, *, source_zip: str, fork_at_index: int, patch_kind: Optional[str] = None, patch_step: Optional[str] = None, patch_payload_json: Optional[Dict[str, Any]] = None, ) -> str: """ Counterfactual workflow: patch an event at index N, re-hash-chain into a new bundle. """ from .bundle import load_bundle, write_bundle_zip b = load_bundle(source_zip) src_events = b.events payloads: List[Dict[str, Any]] = [] for ev in src_events: payloads.append( { "ts": ev.get("ts"), "kind": ev.get("kind"), "step": ev.get("step"), "payload": ev.get("payload", {}), } ) if 0 <= fork_at_index < len(payloads): if patch_kind: payloads[fork_at_index]["kind"] = patch_kind if patch_step: payloads[fork_at_index]["step"] = patch_step if patch_payload_json is not None: payloads[fork_at_index]["payload"] = patch_payload_json new_run = f"{b.manifest.get('run_id','run')}-fork" return write_bundle_zip( out_path, run_id=new_run, framework=b.manifest.get("framework", "unknown"), model_id=b.manifest.get("model_id", "unknown"), env_fingerprint=b.manifest.get("env", {}), events_payloads=payloads, replay=b.manifest.get("replay"), run_url=b.manifest.get("run_url"), )