crisisops / server /rules.py
mrinaalarora's picture
Upload folder using huggingface_hub
99d636c verified
"""Rule-derived Crisisops domain logic."""
from collections.abc import Mapping, Sequence
from typing import Any
try:
from ..models import CrisisopsAction, Report, SitrepPayload, Unit, Zone
except ImportError:
from models import CrisisopsAction, Report, SitrepPayload, Unit, Zone
INCIDENT_UNIT_TYPES: dict[str, set[str]] = {
"flood": {"rescue_team", "evac_bus"},
"collapse": {"rescue_team", "medical_unit"},
"medical_surge": {"medical_unit"},
"fire": {"rescue_team"},
"contamination": {"supply_truck", "medical_unit"},
"power_outage": {"supply_truck"},
}
TASK_BY_UNIT_TYPE = {
"rescue_team": "rescue",
"medical_unit": "medical",
"supply_truck": "supply_delivery",
"evac_bus": "evacuation",
"recon_drone": "recon",
}
def unit_types_for_incident(incident_type: str) -> set[str]:
"""Return plan-required unit types for an incident category."""
return set(INCIDENT_UNIT_TYPES.get(incident_type, {"recon_drone"}))
def unit_type_for_incident(incident_type: str) -> str:
"""Return one preferred unit type for legacy callers."""
return sorted(unit_types_for_incident(incident_type))[0]
def priority_key(zone: Zone) -> tuple[int, int, str]:
"""Plan section 3.3.2 priority key: deadline, severity-population, id."""
return (
zone.deadline_steps,
-(zone.severity * zone.population_at_risk),
zone.zone_id,
)
def is_unsafe_action(
action: CrisisopsAction | Mapping[str, Any],
hidden_state: Mapping[str, Any] | None = None,
) -> bool:
"""Return whether an action violates the plan's safety preconditions."""
payload = _action_payload(action)
action_type = payload.get("type")
hidden_state = hidden_state or {}
if action_type in {"verify_report", "request_recon", "noop", "publish_sitrep"}:
return False
if action_type == "flag_false_alarm":
report_id = payload.get("report_id")
return bool(report_id and _report_truth(report_id, hidden_state) is True)
if _targets_blocked_zone(payload, hidden_state):
return True
if action_type == "reroute_unit":
route = payload.get("route", {})
return route.get("status") in {"blocked", "unsafe"}
if action_type in {"allocate_unit", "issue_evacuation"}:
if _acts_on_unverified_non_sensor_report(payload, hidden_state):
return True
zone_id = payload.get("zone_id")
incident_zones = set(hidden_state.get("incident_zones", []))
if zone_id and incident_zones and zone_id not in incident_zones:
return True
if action_type == "dispatch_supplies":
supplies = payload.get("supplies", {})
return not supplies or any(amount <= 0 for amount in supplies.values())
if action_type == "issue_evacuation":
return _overfills_shelter(payload, hidden_state)
if action_type == "open_shelter":
shelter = payload.get("shelter", {})
return shelter.get("capacity_available", 0) <= 0
return False
def compute_optimal_plan(scenario: Mapping[str, Any]) -> list[CrisisopsAction]:
"""Compute the deterministic reference plan for a generated scenario."""
zones: Sequence[Zone] = scenario["zones"]
units: list[Unit] = list(scenario["units"])
hidden_truth: Mapping[str, Any] = scenario["hidden_truth"]
report_truth: Mapping[str, bool] = hidden_truth["report_truth"]
reports = [
*scenario["reports"],
*_event_reports_from_hidden(hidden_truth),
]
actions: list[CrisisopsAction] = []
sorted_zones = sorted(zones, key=priority_key)
zone_rank = {zone.zone_id: index for index, zone in enumerate(sorted_zones)}
sorted_reports = sorted(
reports,
key=lambda report: (
zone_rank.get(report.zone_id, len(zone_rank)),
report.reveal_at_step,
report.report_id,
),
)
reports_by_zone = {
zone.zone_id: [report for report in sorted_reports if report.zone_id == zone.zone_id]
for zone in sorted_zones
}
true_reports_by_zone = {
zone.zone_id: [
report
for report in reports_by_zone[zone.zone_id]
if report_truth[report.report_id]
]
for zone in sorted_zones
}
verified_report_ids = _sensor_confirmed_true_ids(sorted_reports, report_truth)
flagged_false_ids: set[str] = set()
assigned_units: set[str] = set()
allocated_unit_types_by_zone: dict[str, set[str]] = {}
resolved_zone_ids: set[str] = set()
# Timeliness is measured by action index, so critical zones get a safe first
# allocation before the slower full verification sweep.
for zone in sorted_zones:
if not _is_critical(zone) or not true_reports_by_zone[zone.zone_id]:
continue
unit = _next_unit_for_zone(units, assigned_units, zone)
if unit is None:
continue
support_report = true_reports_by_zone[zone.zone_id][0]
_append_verification_actions(
actions,
support_report,
report_truth,
verified_report_ids,
flagged_false_ids,
)
_append_allocation_action(
actions,
unit,
zone,
[support_report.report_id],
assigned_units,
allocated_unit_types_by_zone,
resolved_zone_ids,
)
for report in sorted_reports:
_append_noops_until(actions, report.reveal_at_step)
_append_verification_actions(
actions,
report,
report_truth,
verified_report_ids,
flagged_false_ids,
)
for zone in sorted_zones:
zone_reports = true_reports_by_zone[zone.zone_id]
if not zone_reports:
continue
if _is_critical(zone):
actions.append(
_action(
{
"type": "issue_evacuation",
"zone_id": zone.zone_id,
"urgency": "critical",
"message": f"Evacuate {zone.name or zone.zone_id} before deadline.",
"route_id": zone.routes[0].route_id if zone.routes else None,
"destination_shelter_id": (
zone.shelter.shelter_id if zone.shelter else None
),
}
)
)
if zone.shelter is not None:
shelter = zone.shelter.model_copy(update={"status": "open"})
actions.append(
_action(
{
"type": "open_shelter",
"shelter": shelter.model_dump(mode="json"),
"reason": "Open shelter capacity for critical incident evacuation.",
}
)
)
allocated_types = allocated_unit_types_by_zone.setdefault(zone.zone_id, set())
for unit_type in sorted(zone.required_unit_types - allocated_types):
unit = _next_unit(units, assigned_units, unit_type)
if unit is None:
continue
_append_allocation_action(
actions,
unit,
zone,
[report.report_id for report in zone_reports],
assigned_units,
allocated_unit_types_by_zone,
resolved_zone_ids,
)
if _is_critical(zone):
supply_unit = _next_unit(units, assigned_units, "supply_truck")
actions.append(
_action(
{
"type": "dispatch_supplies",
"supplies": {"water": 100, "medical_kits": 12},
"destination_zone_id": zone.zone_id,
"priority": "critical",
"unit_id": supply_unit.unit_id if supply_unit else None,
"destination_shelter_id": (
zone.shelter.shelter_id if zone.shelter else None
),
}
)
)
if supply_unit is not None:
assigned_units.add(supply_unit.unit_id)
true_reports = [report for report in sorted_reports if report_truth[report.report_id]]
unresolved_zone_ids = sorted(set(hidden_truth.get("incident_zones", [])) - resolved_zone_ids)
actions.append(
_action(
{
"type": "publish_sitrep",
"payload": SitrepPayload(
incidents_confirmed=[report.report_id for report in true_reports],
incidents_resolved=sorted(resolved_zone_ids),
unresolved_risks=unresolved_zone_ids,
false_alarms_detected=list(hidden_truth.get("false_report_ids", [])),
summary_text=(
"Verified incidents prioritized by deadline and population "
"at risk."
),
).model_dump(mode="json"),
}
)
)
return actions
def _event_reports_from_hidden(hidden_truth: Mapping[str, Any]) -> list[Report]:
return [
Report.model_validate(report)
for report in hidden_truth.get("event_reports_by_id", {}).values()
]
def _sensor_confirmed_true_ids(
reports: Sequence[Report], report_truth: Mapping[str, bool]
) -> set[str]:
return {
report.report_id
for report in reports
if report.confidence == "sensor_confirmed" and report_truth[report.report_id]
}
def _append_verification_actions(
actions: list[CrisisopsAction],
report: Report,
report_truth: Mapping[str, bool],
verified_report_ids: set[str],
flagged_false_ids: set[str],
) -> None:
if report.confidence != "sensor_confirmed" and report.report_id not in verified_report_ids:
actions.append(
_action(
{
"type": "verify_report",
"report_id": report.report_id,
"verification_method": "cross_check",
"rationale": "Confirm non-sensor report before allocating scarce units.",
}
)
)
verified_report_ids.add(report.report_id)
if report_truth[report.report_id] is False and report.report_id not in flagged_false_ids:
actions.append(
_action(
{
"type": "flag_false_alarm",
"report_id": report.report_id,
"rationale": "Ground truth checks contradict the report.",
"evidence": ["seeded_truth_table"],
}
)
)
flagged_false_ids.add(report.report_id)
def _append_noops_until(actions: list[CrisisopsAction], reveal_at_step: int) -> None:
while len(actions) + 1 < reveal_at_step:
actions.append(
_action(
{
"type": "noop",
"reason": "Wait for the next scheduled report reveal.",
}
)
)
def _append_allocation_action(
actions: list[CrisisopsAction],
unit: Unit,
zone: Zone,
report_ids: list[str],
assigned_units: set[str],
allocated_unit_types_by_zone: dict[str, set[str]],
resolved_zone_ids: set[str],
) -> None:
assigned_units.add(unit.unit_id)
allocated_unit_types_by_zone.setdefault(zone.zone_id, set()).add(unit.unit_type)
resolved_zone_ids.add(zone.zone_id)
actions.append(
_action(
{
"type": "allocate_unit",
"unit_id": unit.unit_id,
"zone_id": zone.zone_id,
"task": TASK_BY_UNIT_TYPE.get(unit.unit_type, "recon"),
"priority": _priority_for_zone(zone),
"report_ids": report_ids,
}
)
)
def _next_unit_for_zone(
units: Sequence[Unit], assigned_units: set[str], zone: Zone
) -> Unit | None:
for unit_type in sorted(zone.required_unit_types):
unit = _next_unit(units, assigned_units, unit_type)
if unit is not None:
return unit
return None
def _targets_blocked_zone(
payload: Mapping[str, Any], hidden_state: Mapping[str, Any]
) -> bool:
if payload.get("type") not in {
"allocate_unit",
"issue_evacuation",
"dispatch_supplies",
}:
return False
zone_id = payload.get("zone_id") or payload.get("destination_zone_id")
zones_by_id = hidden_state.get("zones_by_id", {})
zone = zones_by_id.get(zone_id, {})
return zone.get("access_status") == "blocked"
def _report_truth(report_id: str, hidden_state: Mapping[str, Any]) -> bool | None:
if report_id in hidden_state.get("report_truth", {}):
return hidden_state["report_truth"][report_id]
return hidden_state.get("event_report_truth", {}).get(report_id)
def _acts_on_unverified_non_sensor_report(
payload: Mapping[str, Any], hidden_state: Mapping[str, Any]
) -> bool:
verified = set(hidden_state.get("verified_report_ids", []))
reports_by_id = hidden_state.get("reports_by_id", {})
report_ids = set(payload.get("report_ids", []))
if not report_ids and payload.get("zone_id"):
report_ids = {
report_id
for report_id, report in reports_by_id.items()
if report.get("zone_id") == payload.get("zone_id")
}
for report_id in report_ids:
report = reports_by_id.get(report_id)
if not report:
continue
if report.get("confidence") != "sensor_confirmed" and report_id not in verified:
return True
return False
def _overfills_shelter(
payload: Mapping[str, Any], hidden_state: Mapping[str, Any]
) -> bool:
zone_id = payload.get("zone_id")
shelter_id = payload.get("destination_shelter_id")
if not zone_id or not shelter_id:
return False
zones_by_id = hidden_state.get("zones_by_id", {})
zone = zones_by_id.get(zone_id, {})
shelter = zone.get("shelter") or {}
if shelter.get("shelter_id") != shelter_id:
shelters_by_id = hidden_state.get("shelters_by_id", {})
shelter = shelters_by_id.get(shelter_id, {})
return bool(
shelter
and zone.get("population_at_risk", 0)
> shelter.get("capacity_available", 0)
)
def _next_unit(
units: Sequence[Unit], assigned_units: set[str], unit_type: str
) -> Unit | None:
return next(
(
unit
for unit in units
if unit.unit_id not in assigned_units
and unit.unit_type == unit_type
and unit.status == "available"
),
None,
)
def _is_critical(zone: Zone) -> bool:
return zone.severity >= 4
def _priority_for_zone(zone: Zone) -> str:
if zone.severity >= 5:
return "critical"
if zone.severity >= 4:
return "high"
if zone.severity <= 2:
return "low"
return "normal"
def _action(data: Mapping[str, Any]) -> CrisisopsAction:
return CrisisopsAction.model_validate(dict(data))
def _action_payload(action: CrisisopsAction | Mapping[str, Any]) -> dict[str, Any]:
if isinstance(action, CrisisopsAction):
return action.root.model_dump(mode="json")
if "root" in action and isinstance(action["root"], Mapping):
return dict(action["root"])
return dict(action)