File size: 5,460 Bytes
303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a 387797b 303465a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
import os
import random
import uuid
from typing import Any, Dict, List, Optional
from .bundle import write_bundle_zip
def _env_fingerprint() -> Dict[str, Any]:
return {
"python": os.environ.get("PYTHON_VERSION") or "unknown",
"space": os.environ.get("SPACE_ID") or os.environ.get("HF_SPACE_ID") or "unknown",
}
def _mk_event(kind: str, step: str, payload: Dict[str, Any]) -> Dict[str, Any]:
return {"kind": kind, "step": step, "payload": payload}
def make_demo_bundle_zip(out_path: str, *, seed: int, chaos: float, label: str) -> str:
"""
Creates a synthetic agent timeline with controlled randomness.
'chaos' increases divergence probability.
Also includes reward_total in state snapshots to demonstrate reward delta.
"""
rng = random.Random(seed)
run_id = f"demo-{label}-{uuid.uuid4().hex[:8]}"
framework = "demo-agent"
model_id = "demo-llm"
events: List[Dict[str, Any]] = []
memory: Dict[str, Any] = {"goal": "reach_target", "notes": []}
reward_total = 0.0
# Optional replay link template (demo)
replay = {
"base_url": "https://example.com/replay",
"pattern": "/?run_id={run_id}&i={i}",
}
for i in range(40):
action = rng.choice(["scan", "move", "ask_tool", "write_memory"])
if rng.random() < chaos:
action = rng.choice(["scan", "move", "ask_tool", "write_memory", "panic"])
score = rng.random()
events.append(_mk_event("plan_step", f"t{i}.plan", {"action": action, "score": score}))
if action == "ask_tool":
q = rng.choice(["price", "status", "latency", "risk"])
events.append(_mk_event("tool_call", f"t{i}.tool_call", {"tool": "mock_api", "query": q}))
if rng.random() < (0.15 + chaos * 0.2):
events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": False, "error": "timeout"}))
reward_total -= 0.5
else:
val = rng.randint(1, 100)
events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": True, "value": val}))
reward_total += 0.2
elif action == "write_memory":
note = rng.choice(["cached", "retry", "validated", "unsafe", "needs_review"])
memory["notes"].append(note)
events.append(_mk_event("memory_write", f"t{i}.mem", {"write": {"notes": list(memory["notes"])}}))
reward_total += 0.05
elif action == "panic":
events.append(_mk_event("guardrail", f"t{i}.guardrail", {"blocked": True, "reason": "anomaly"}))
events.append(
_mk_event("state_snapshot", f"t{i}.state", {"memory": memory, "mode": "halt", "reward_total": reward_total})
)
break
else:
# scan/move
x = rng.randint(0, 9)
y = rng.randint(0, 9)
reward_total += 0.01
events.append(_mk_event("state_snapshot", f"t{i}.state", {"x": x, "y": y, "memory": memory, "reward_total": reward_total}))
txt = rng.choice(
[
"Proceed with caution.",
"Tool looks stable.",
"Memory updated.",
"Need more evidence.",
"I will retry once.",
]
)
if rng.random() < chaos:
txt = rng.choice(
[
"Unexpected output detected.",
"I am uncertain; escalating.",
"This seems inconsistent.",
"Plan changed due to drift.",
]
)
events.append(_mk_event("llm_sample", f"t{i}.llm", {"text": txt, "tokens": rng.randint(20, 180)}))
return write_bundle_zip(
out_path,
run_id=run_id,
framework=framework,
model_id=model_id,
env_fingerprint=_env_fingerprint(),
events_payloads=events,
replay=replay,
)
def fork_patch_bundle(
out_path: str,
*,
source_zip: str,
fork_at_index: int,
patch_kind: Optional[str] = None,
patch_step: Optional[str] = None,
patch_payload_json: Optional[Dict[str, Any]] = None,
) -> str:
"""
Counterfactual workflow: patch an event at index N, re-hash-chain into a new bundle.
"""
from .bundle import load_bundle, write_bundle_zip
b = load_bundle(source_zip)
src_events = b.events
payloads: List[Dict[str, Any]] = []
for ev in src_events:
payloads.append(
{
"ts": ev.get("ts"),
"kind": ev.get("kind"),
"step": ev.get("step"),
"payload": ev.get("payload", {}),
}
)
if 0 <= fork_at_index < len(payloads):
if patch_kind:
payloads[fork_at_index]["kind"] = patch_kind
if patch_step:
payloads[fork_at_index]["step"] = patch_step
if patch_payload_json is not None:
payloads[fork_at_index]["payload"] = patch_payload_json
new_run = f"{b.manifest.get('run_id','run')}-fork"
return write_bundle_zip(
out_path,
run_id=new_run,
framework=b.manifest.get("framework", "unknown"),
model_id=b.manifest.get("model_id", "unknown"),
env_fingerprint=b.manifest.get("env", {}),
events_payloads=payloads,
replay=b.manifest.get("replay"),
run_url=b.manifest.get("run_url"),
) |