crisis_inbox / demo.py
eptan's picture
Upload folder using huggingface_hub
19da990 verified
#!/usr/bin/env python3
"""
CrisisInbox Demo Display
Runs a full episode with color-coded urgency levels, drift event
notifications, and agent action visualization. Designed for the
3-minute hackathon demo.
Usage:
python demo.py # Run against local server
python demo.py --remote # Run against HF Spaces
python demo.py --strategy smart # Use smart triage (default)
python demo.py --strategy naive # Use naive arrival-order strategy
"""
import argparse
import json
import sys
import time
from server.crisis_inbox_environment import CrisisInboxEnvironment
from openenv.core.env_server.mcp_types import CallToolAction
# ---------------------------------------------------------------------------
# ANSI colors
# ---------------------------------------------------------------------------
class C:
RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
GRAY = "\033[90m"
BOLD = "\033[1m"
DIM = "\033[2m"
UNDERLINE = "\033[4m"
RESET = "\033[0m"
BG_RED = "\033[41m"
BG_YELLOW = "\033[43m"
BG_BLUE = "\033[44m"
BG_MAGENTA = "\033[45m"
URGENCY_COLORS = {
"critical": C.BG_RED + C.WHITE + C.BOLD,
"high": C.RED + C.BOLD,
"medium": C.YELLOW,
"low": C.GREEN,
}
URGENCY_LABELS = {
"critical": " CRITICAL ",
"high": " HIGH ",
"medium": " MEDIUM ",
"low": " LOW ",
}
CHANNEL_ICONS = {
"sms": "SMS",
"email": "EMAIL",
"phone": "PHONE",
"government_alert": "ALERT",
"app_notification": "APP",
"social_media": "SOCIAL",
}
# ---------------------------------------------------------------------------
# Display helpers
# ---------------------------------------------------------------------------
def header(text: str):
width = 72
print(f"\n{C.BOLD}{C.CYAN}{'=' * width}{C.RESET}")
print(f"{C.BOLD}{C.CYAN} {text}{C.RESET}")
print(f"{C.BOLD}{C.CYAN}{'=' * width}{C.RESET}")
def subheader(text: str):
print(f"\n{C.BOLD}{C.WHITE}--- {text} ---{C.RESET}")
def show_time(hour: float):
h = int(hour)
m = int((hour - h) * 60)
day = "Day 1" if hour < 24 else "Day 2"
display_h = h % 24
ampm = "AM" if display_h < 12 else "PM"
display_h = display_h % 12 or 12
return f"{day} {display_h}:{m:02d} {ampm} (hour {hour:.1f}/48)"
def show_message_line(msg: dict, prefix: str = ""):
urg = msg["urgency"]
color = URGENCY_COLORS.get(urg, "")
label = URGENCY_LABELS.get(urg, urg.upper())
channel = CHANNEL_ICONS.get(msg["channel"], msg["channel"])
handled = f"{C.GRAY}[DONE]{C.RESET} " if msg.get("handled") else ""
superseded = f"{C.GRAY}[STALE]{C.RESET} " if msg.get("superseded") else ""
drift = f"{C.BG_MAGENTA}{C.WHITE} DRIFT {C.RESET} " if msg.get("drift_flag") else ""
deadline_str = ""
if msg.get("deadline_hours") is not None:
deadline_str = f" {C.DIM}(due h{msg['deadline_hours']:.0f}){C.RESET}"
print(
f" {prefix}{color}[{label}]{C.RESET} "
f"{C.DIM}{channel:>5}{C.RESET} "
f"{drift}{superseded}{handled}"
f"{C.BOLD}{msg['sender']}{C.RESET}: {msg['subject']}"
f"{deadline_str}"
)
def show_action(action_text: str):
print(f"\n {C.BG_BLUE}{C.WHITE}{C.BOLD} AGENT ACTION {C.RESET} {C.CYAN}{action_text}{C.RESET}")
def show_reward(reward: float, total: float):
color = C.GREEN if reward > 0 else C.RED
print(f" {color}+{reward:.1f} pts{C.RESET} {C.DIM}(total: {total:.1f}){C.RESET}")
def show_drift_alert(msg: dict):
print(f"\n {C.BG_MAGENTA}{C.WHITE}{C.BOLD} SCHEMA DRIFT {C.RESET} "
f"{C.MAGENTA}{C.BOLD}{msg['sender']}: {msg['subject']}{C.RESET}")
print(f" {C.MAGENTA}Rules have changed! Previous information may be outdated.{C.RESET}")
def show_expired(expired: list):
if expired:
print(f"\n {C.RED}{C.BOLD}EXPIRED DEADLINES:{C.RESET}")
for e in expired:
print(f" {C.RED}x {e['subject']}{C.RESET}")
def show_upcoming(upcoming: list):
if upcoming:
print(f"\n {C.YELLOW}UPCOMING DEADLINES:{C.RESET}")
for u in upcoming:
print(f" {C.YELLOW}! {u['subject']} ({u['hours_remaining']:.1f}h left){C.RESET}")
def pause(seconds: float = 0.5):
time.sleep(seconds)
# ---------------------------------------------------------------------------
# Agent strategies
# ---------------------------------------------------------------------------
def smart_priority(messages: list[dict]) -> list[dict]:
"""Triage: safety first, then deadlines, then drift, then urgency."""
urgency_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
unhandled = [m for m in messages if not m.get("handled") and not m.get("superseded")]
def score(m):
urg = urgency_order.get(m["urgency"], 4)
deadline = m.get("deadline_hours") or 999
drift = 0 if m.get("drift_flag") else 1
return (urg, drift, deadline)
return sorted(unhandled, key=score)
def naive_order(messages: list[dict]) -> list[dict]:
"""Respond in arrival order (bad strategy for comparison)."""
return [m for m in messages if not m.get("handled") and not m.get("superseded")]
# ---------------------------------------------------------------------------
# Main demo loop
# ---------------------------------------------------------------------------
def run_demo(strategy: str = "smart", seed: int = 42, speed: float = 0.3):
env = CrisisInboxEnvironment()
obs = env.reset(seed=seed)
def call(tool, **kwargs):
o = env.step(CallToolAction(type="call_tool", tool_name=tool, arguments=kwargs))
return json.loads(o.result.data)
prioritize = smart_priority if strategy == "smart" else naive_order
strategy_name = "SMART TRIAGE" if strategy == "smart" else "NAIVE (arrival order)"
header(f"CrisisInbox Demo | Strategy: {strategy_name}")
print(f"\n {C.DIM}Scenario: Post-hurricane evacuation in Sacramento")
print(f" You are a working parent. Your phone is about to explode.{C.RESET}")
print(f" {C.DIM}Messages: {obs.metadata['messages_total']} total | "
f"Drift events: {obs.metadata['drift_events_scheduled']} scheduled{C.RESET}")
pause(speed * 2)
last_drift_count = 0
actions_taken = 0
max_actions = 25 # Cap for demo length
# Advance through the timeline in chunks
time_chunks = [0, 2, 6, 12, 20, 25, 34, 44, 48]
for i in range(len(time_chunks) - 1):
target_hour = time_chunks[i + 1]
current = time_chunks[i]
# Advance time to target
while current < target_hour:
advance = min(4.0, target_hour - current)
result = call("advance_time", hours=advance)
current = result["current_hour"]
if result.get("new_messages", 0) > 0:
pass # Messages delivered silently
# Get current state
status = call("get_status")
inbox = call("get_inbox")
subheader(f"HOUR {status['current_hour']:.1f} | {show_time(status['current_hour'])}")
print(f" {C.DIM}Messages arrived: {status['messages_total_arrived']} | "
f"Handled: {status['messages_handled']} | "
f"Score: {status['total_score']:.1f}{C.RESET}")
# Check for new drift events
if status["drift_events_fired"] > last_drift_count:
drift_msgs = [m for m in inbox if m.get("drift_flag") and not m.get("handled")]
for dm in drift_msgs:
show_drift_alert(dm)
pause(speed)
last_drift_count = status["drift_events_fired"]
# Show deadlines
show_expired(status.get("expired_deadlines", []))
show_upcoming(status.get("upcoming_deadlines", []))
# Show current inbox
print(f"\n {C.BOLD}INBOX ({len(inbox)} messages):{C.RESET}")
# Show only recent unhandled, plus any drift
visible = [m for m in inbox if not m.get("handled")]
for msg in visible[:8]: # Show max 8 at a time
show_message_line(msg)
if len(visible) > 8:
print(f" {C.DIM} ... and {len(visible) - 8} more unread{C.RESET}")
pause(speed)
# Agent takes actions on available messages
prioritized = prioritize(inbox)
actions_this_chunk = 0
max_per_chunk = 4 # Don't monopolize one time period
for msg in prioritized:
if actions_taken >= max_actions or actions_this_chunk >= max_per_chunk:
break
msg_id = msg["id"]
# Read the message first
full_msg = call("read_message", message_id=msg_id)
if "error" in full_msg:
continue
# Generate a contextual response based on sender/urgency
response = _generate_response(full_msg)
# Try to respond
result = call("respond_to_message", message_id=msg_id, response=response)
if "error" in result:
if "dependencies" in result.get("error", ""):
show_action(f"Cannot handle {msg_id} yet - dependencies unmet")
pause(speed * 0.5)
continue
show_action(f"Respond to {msg['sender']} - \"{msg['subject']}\"")
print(f" {C.DIM}\"{response[:80]}{'...' if len(response) > 80 else ''}\"{C.RESET}")
show_reward(result["reward"], result["total_score"])
pause(speed)
actions_taken += 1
actions_this_chunk += 1
if status.get("done"):
break
# Final summary
final_status = call("get_status")
final_inbox = call("get_inbox")
header("EPISODE COMPLETE")
handled_count = final_status["messages_handled"]
total_arrived = final_status["messages_total_arrived"]
missed = len(final_status.get("expired_deadlines", []))
print(f"\n Strategy: {C.BOLD}{strategy_name}{C.RESET}")
print(f" Final Score: {C.BOLD}{C.GREEN}{final_status['total_score']:.1f} pts{C.RESET}")
print(f" Messages Handled: {handled_count}/{total_arrived}")
print(f" Deadlines Missed: {C.RED}{missed}{C.RESET}")
print(f" Drift Events Encountered: {final_status['drift_events_fired']}")
# Show what was handled vs missed by urgency
handled_ids = {m["id"] for m in final_inbox if m.get("handled")}
by_urgency = {"critical": [0, 0], "high": [0, 0], "medium": [0, 0], "low": [0, 0]}
for m in final_inbox:
urg = m["urgency"]
if urg in by_urgency:
by_urgency[urg][1] += 1
if m["id"] in handled_ids:
by_urgency[urg][0] += 1
print(f"\n {C.BOLD}Coverage by Urgency:{C.RESET}")
for urg, (handled, total) in by_urgency.items():
color = URGENCY_COLORS.get(urg, "")
pct = (handled / total * 100) if total > 0 else 0
bar = "#" * int(pct / 5) + "." * (20 - int(pct / 5))
print(f" {color}{urg:>8}{C.RESET}: {handled}/{total} [{bar}] {pct:.0f}%")
print()
return final_status["total_score"]
def _generate_response(msg: dict) -> str:
"""Generate a contextual response based on sender and content."""
sender = msg.get("sender", "")
urgency = msg.get("urgency", "")
subject = msg.get("subject", "").lower()
if "national weather" in sender.lower() or "fema" in sender.lower():
return "Acknowledged. Following evacuation orders immediately. Heading to designated shelter with essential documents and medication."
if sender == "Mom":
return "I'm safe, Mom. Don't worry. I'm following the evacuation orders and heading to the shelter. I'll call you as soon as I can. Love you."
if sender == "Sister":
if "kids" in subject or "take" in subject:
return "I'll get them, don't worry. Heading to Oakwood now. I'll text you the second I have them safe."
return "Kids are safe with me. Emma is being brave. Don't worry about anything, just stay safe yourself."
if "emma" in sender.lower():
return "Hey sweetie! Mac and cheese sounds perfect. Mommy is coming soon. Let's build a blanket fort while we wait!"
if sender == "Boss" or sender == "HR Department":
if "drift" in str(msg.get("drift_flag", "")):
return "Thanks for the update on the emergency leave policy. I've submitted my status form on the HR portal. Will be taking the emergency leave days."
return "I'm in the evacuation zone and following mandatory orders. Will work remotely when I can access wifi. Updating my status on the portal now."
if "insurance" in sender.lower() or "state farm" in sender.lower():
return "Filing claim now with policy number and damage photos. Have documented all damage with timestamps before any cleanup."
if sender == "Neighbor Dave":
return "Thanks for the heads up Dave. Stay safe at the shelter. I'll keep an eye on things here. We'll get through this."
if "delta" in sender.lower() or "airline" in sender.lower():
return "Selecting Option A for rebooking to the earliest available flight. Thank you for the flexibility during this emergency."
if "school" in sender.lower() or "oakwood" in sender.lower():
return "Acknowledged. Will arrange pickup before the deadline. Thank you for the early notification."
if "pharmacy" in sender.lower() or "cvs" in sender.lower():
return "Will pick up the prescription today. If the usual location is closed, I'll transfer to the nearest open CVS."
if "sacramento" in sender.lower():
return "Acknowledged. Following all advisories and avoiding affected areas."
if urgency == "critical":
return "Taking immediate action on this critical matter. Will follow up with details shortly."
if urgency == "high":
return "Understood, this is a priority. Handling this now and will confirm when complete."
return "Thank you for the information. I've noted this and will address it as soon as possible given the current situation."
# ---------------------------------------------------------------------------
# Comparison mode
# ---------------------------------------------------------------------------
def run_comparison(seed: int = 42, speed: float = 0.2):
"""Run both strategies side by side for the demo."""
header("CRISISINBOX: STRATEGY COMPARISON")
print(f"\n {C.DIM}Same episode, two different approaches.{C.RESET}")
print(f" {C.DIM}Which agent handles a disaster better?{C.RESET}\n")
pause(1)
print(f"{C.RED}{C.BOLD}{'=' * 72}")
print(f" ROUND 1: NAIVE AGENT (responds in arrival order)")
print(f"{'=' * 72}{C.RESET}")
pause(0.5)
naive_score = run_demo(strategy="naive", seed=seed, speed=speed)
pause(1)
print(f"\n{C.GREEN}{C.BOLD}{'=' * 72}")
print(f" ROUND 2: TRAINED AGENT (smart triage)")
print(f"{'=' * 72}{C.RESET}")
pause(0.5)
smart_score = run_demo(strategy="smart", seed=seed, speed=speed)
header("FINAL COMPARISON")
improvement = smart_score - naive_score
pct = (improvement / max(naive_score, 1)) * 100
print(f"\n Naive Agent: {C.RED}{naive_score:.1f} pts{C.RESET}")
print(f" Trained Agent: {C.GREEN}{smart_score:.1f} pts{C.RESET}")
print(f" Improvement: {C.BOLD}{C.CYAN}+{improvement:.1f} pts ({pct:.0f}%){C.RESET}\n")
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CrisisInbox Demo")
parser.add_argument("--strategy", choices=["smart", "naive", "compare"],
default="compare", help="Agent strategy (default: compare)")
parser.add_argument("--seed", type=int, default=42, help="Episode seed")
parser.add_argument("--speed", type=float, default=0.3,
help="Pause between actions in seconds (default: 0.3)")
args = parser.parse_args()
if args.strategy == "compare":
run_comparison(seed=args.seed, speed=args.speed)
else:
run_demo(strategy=args.strategy, seed=args.seed, speed=args.speed)