""" Realtime LLM-based route decision script for mock sensor data streams. Designed to run outside the main project. It reads sensor payloads (JSON per line), formats a prompt for a Llama-family model (e.g., via Ollama), and logs the AI's route recommendation plus warnings/actions for each update. """ import argparse import json import sys import textwrap import time from pathlib import Path from typing import Dict, Any, List, Optional import requests PROMPT_TEMPLATE = """You are an emergency evacuation strategist using live sensor and route telemetry. Evaluate the candidate routes and choose the safest *passable* option. Decision rules: 1. Reject any route marked passable=false or containing simultaneous fire and oxygen-cylinder hazards. 2. Avoid rooms with biohazard alerts unless respirators are available; penalize heavily if no PPE. 3. Prefer routes with lower average danger and lower max danger; break ties by shorter path_length. 4. Flag any oxygen cylinders near heat (>40°C) or fire for operator intervention. 5. Provide backup route only if it is passable and within 15 danger points of the best route. Data: {data} Respond in JSON with this schema: {{ "recommended_route": "", "confidence": "high|medium|low", "warnings": ["..."], "actions": ["..."], "backup_route": "" }} If no safe route exists, set recommended_route=null and explain why in warnings. """ def load_event_stream(path: Path): """Yield parsed JSON objects from a file (one JSON object per line).""" with path.open("r", encoding="utf-8") as f: for line_num, line in enumerate(f, start=1): line = line.strip() if not line: continue try: yield json.loads(line) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON on line {line_num}: {exc}") from exc def format_payload(event: Dict[str, Any]) -> str: """Pretty-print the incoming event for the LLM prompt.""" return json.dumps(event, indent=2, ensure_ascii=False) def call_llm(model: str, prompt: str, server_url: str) -> str: """ Call a Llama-family model via Ollama (or any compatible endpoint). server_url defaults to http://localhost:11434/api/generate. """ payload = { "model": model, "prompt": prompt, "stream": False, } response = requests.post(server_url, json=payload, timeout=120) response.raise_for_status() data = response.json() # Ollama returns {"response": "...", "done": true, ...} if "response" in data: return data["response"] # Fallback for other APIs return data.get("text") or json.dumps(data) def parse_llm_output(text_output: str) -> Dict[str, Any]: """Attempt to parse the LLM JSON; fall back to error record.""" text_output = text_output.strip() try: return json.loads(text_output) except json.JSONDecodeError: return { "recommended_route": None, "confidence": "low", "warnings": ["LLM returned non-JSON output", text_output], "actions": [], "backup_route": None, } def process_event(event: Dict[str, Any], args) -> Dict[str, Any]: """Send single event to LLM and return structured decision.""" payload_str = format_payload(event) prompt = PROMPT_TEMPLATE.format(data=payload_str) raw_output = call_llm(args.model, prompt, args.server_url) decision = parse_llm_output(raw_output) return { "timestamp": event.get("timestamp_sec"), "decision": decision, "raw_prompt": prompt if args.debug else None, "raw_output": raw_output if args.debug else None, } def log_decision(result: Dict[str, Any]): """Print a concise summary to stdout.""" decision = result["decision"] ts = result["timestamp"] header = f"[t={ts}s]" if ts is not None else "[t=?]" print(f"{header} recommended_route={decision.get('recommended_route')} " f"confidence={decision.get('confidence')}") if decision.get("warnings"): print(" warnings:") for warning in decision["warnings"]: wrapped = textwrap.fill(warning, width=78, subsequent_indent=" ") print(f" - {wrapped}") if decision.get("actions"): print(" actions:") for action in decision["actions"]: wrapped = textwrap.fill(action, width=78, subsequent_indent=" ") print(f" - {wrapped}") if decision.get("backup_route"): print(f" backup_route: {decision['backup_route']}") print() def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Stream sensor events to a Llama model for route decisions.") parser.add_argument("--input", required=True, help="Path to JSONL file containing mock sensor events.") parser.add_argument("--model", default="llama3", help="Model name served by Ollama (default: llama3).") parser.add_argument("--server-url", default="http://localhost:11434/api/generate", help="Generation endpoint URL.") parser.add_argument("--delay", type=float, default=0.0, help="Seconds to wait between events (simulate realtime).") parser.add_argument("--debug", action="store_true", help="Print raw prompt/output for troubleshooting.") return parser def main(argv: Optional[List[str]] = None): parser = build_arg_parser() args = parser.parse_args(argv) input_path = Path(args.input) if not input_path.exists(): parser.error(f"Input file not found: {input_path}") try: for event in load_event_stream(input_path): result = process_event(event, args) log_decision(result) if args.debug: print("----- RAW PROMPT -----") print(result["raw_prompt"]) print("----- RAW OUTPUT -----") print(result["raw_output"]) print("----------------------\n") if args.delay > 0: time.sleep(args.delay) except KeyboardInterrupt: print("\nInterrupted by user.", file=sys.stderr) except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()