RFTSystems's picture
Update drp/simulate.py
387797b verified
import os
import random
import uuid
from typing import Any, Dict, List, Optional
from .bundle import write_bundle_zip
def _env_fingerprint() -> Dict[str, Any]:
return {
"python": os.environ.get("PYTHON_VERSION") or "unknown",
"space": os.environ.get("SPACE_ID") or os.environ.get("HF_SPACE_ID") or "unknown",
}
def _mk_event(kind: str, step: str, payload: Dict[str, Any]) -> Dict[str, Any]:
return {"kind": kind, "step": step, "payload": payload}
def make_demo_bundle_zip(out_path: str, *, seed: int, chaos: float, label: str) -> str:
"""
Creates a synthetic agent timeline with controlled randomness.
'chaos' increases divergence probability.
Also includes reward_total in state snapshots to demonstrate reward delta.
"""
rng = random.Random(seed)
run_id = f"demo-{label}-{uuid.uuid4().hex[:8]}"
framework = "demo-agent"
model_id = "demo-llm"
events: List[Dict[str, Any]] = []
memory: Dict[str, Any] = {"goal": "reach_target", "notes": []}
reward_total = 0.0
# Optional replay link template (demo)
replay = {
"base_url": "https://example.com/replay",
"pattern": "/?run_id={run_id}&i={i}",
}
for i in range(40):
action = rng.choice(["scan", "move", "ask_tool", "write_memory"])
if rng.random() < chaos:
action = rng.choice(["scan", "move", "ask_tool", "write_memory", "panic"])
score = rng.random()
events.append(_mk_event("plan_step", f"t{i}.plan", {"action": action, "score": score}))
if action == "ask_tool":
q = rng.choice(["price", "status", "latency", "risk"])
events.append(_mk_event("tool_call", f"t{i}.tool_call", {"tool": "mock_api", "query": q}))
if rng.random() < (0.15 + chaos * 0.2):
events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": False, "error": "timeout"}))
reward_total -= 0.5
else:
val = rng.randint(1, 100)
events.append(_mk_event("tool_result", f"t{i}.tool_result", {"ok": True, "value": val}))
reward_total += 0.2
elif action == "write_memory":
note = rng.choice(["cached", "retry", "validated", "unsafe", "needs_review"])
memory["notes"].append(note)
events.append(_mk_event("memory_write", f"t{i}.mem", {"write": {"notes": list(memory["notes"])}}))
reward_total += 0.05
elif action == "panic":
events.append(_mk_event("guardrail", f"t{i}.guardrail", {"blocked": True, "reason": "anomaly"}))
events.append(
_mk_event("state_snapshot", f"t{i}.state", {"memory": memory, "mode": "halt", "reward_total": reward_total})
)
break
else:
# scan/move
x = rng.randint(0, 9)
y = rng.randint(0, 9)
reward_total += 0.01
events.append(_mk_event("state_snapshot", f"t{i}.state", {"x": x, "y": y, "memory": memory, "reward_total": reward_total}))
txt = rng.choice(
[
"Proceed with caution.",
"Tool looks stable.",
"Memory updated.",
"Need more evidence.",
"I will retry once.",
]
)
if rng.random() < chaos:
txt = rng.choice(
[
"Unexpected output detected.",
"I am uncertain; escalating.",
"This seems inconsistent.",
"Plan changed due to drift.",
]
)
events.append(_mk_event("llm_sample", f"t{i}.llm", {"text": txt, "tokens": rng.randint(20, 180)}))
return write_bundle_zip(
out_path,
run_id=run_id,
framework=framework,
model_id=model_id,
env_fingerprint=_env_fingerprint(),
events_payloads=events,
replay=replay,
)
def fork_patch_bundle(
out_path: str,
*,
source_zip: str,
fork_at_index: int,
patch_kind: Optional[str] = None,
patch_step: Optional[str] = None,
patch_payload_json: Optional[Dict[str, Any]] = None,
) -> str:
"""
Counterfactual workflow: patch an event at index N, re-hash-chain into a new bundle.
"""
from .bundle import load_bundle, write_bundle_zip
b = load_bundle(source_zip)
src_events = b.events
payloads: List[Dict[str, Any]] = []
for ev in src_events:
payloads.append(
{
"ts": ev.get("ts"),
"kind": ev.get("kind"),
"step": ev.get("step"),
"payload": ev.get("payload", {}),
}
)
if 0 <= fork_at_index < len(payloads):
if patch_kind:
payloads[fork_at_index]["kind"] = patch_kind
if patch_step:
payloads[fork_at_index]["step"] = patch_step
if patch_payload_json is not None:
payloads[fork_at_index]["payload"] = patch_payload_json
new_run = f"{b.manifest.get('run_id','run')}-fork"
return write_bundle_zip(
out_path,
run_id=new_run,
framework=b.manifest.get("framework", "unknown"),
model_id=b.manifest.get("model_id", "unknown"),
env_fingerprint=b.manifest.get("env", {}),
events_payloads=payloads,
replay=b.manifest.get("replay"),
run_url=b.manifest.get("run_url"),
)