"""Rule-derived Crisisops domain logic.""" from collections.abc import Mapping, Sequence from typing import Any try: from ..models import CrisisopsAction, Report, SitrepPayload, Unit, Zone except ImportError: from models import CrisisopsAction, Report, SitrepPayload, Unit, Zone INCIDENT_UNIT_TYPES: dict[str, set[str]] = { "flood": {"rescue_team", "evac_bus"}, "collapse": {"rescue_team", "medical_unit"}, "medical_surge": {"medical_unit"}, "fire": {"rescue_team"}, "contamination": {"supply_truck", "medical_unit"}, "power_outage": {"supply_truck"}, } TASK_BY_UNIT_TYPE = { "rescue_team": "rescue", "medical_unit": "medical", "supply_truck": "supply_delivery", "evac_bus": "evacuation", "recon_drone": "recon", } def unit_types_for_incident(incident_type: str) -> set[str]: """Return plan-required unit types for an incident category.""" return set(INCIDENT_UNIT_TYPES.get(incident_type, {"recon_drone"})) def unit_type_for_incident(incident_type: str) -> str: """Return one preferred unit type for legacy callers.""" return sorted(unit_types_for_incident(incident_type))[0] def priority_key(zone: Zone) -> tuple[int, int, str]: """Plan section 3.3.2 priority key: deadline, severity-population, id.""" return ( zone.deadline_steps, -(zone.severity * zone.population_at_risk), zone.zone_id, ) def is_unsafe_action( action: CrisisopsAction | Mapping[str, Any], hidden_state: Mapping[str, Any] | None = None, ) -> bool: """Return whether an action violates the plan's safety preconditions.""" payload = _action_payload(action) action_type = payload.get("type") hidden_state = hidden_state or {} if action_type in {"verify_report", "request_recon", "noop", "publish_sitrep"}: return False if action_type == "flag_false_alarm": report_id = payload.get("report_id") return bool(report_id and _report_truth(report_id, hidden_state) is True) if _targets_blocked_zone(payload, hidden_state): return True if action_type == "reroute_unit": route = payload.get("route", {}) return route.get("status") in {"blocked", "unsafe"} if action_type in {"allocate_unit", "issue_evacuation"}: if _acts_on_unverified_non_sensor_report(payload, hidden_state): return True zone_id = payload.get("zone_id") incident_zones = set(hidden_state.get("incident_zones", [])) if zone_id and incident_zones and zone_id not in incident_zones: return True if action_type == "dispatch_supplies": supplies = payload.get("supplies", {}) return not supplies or any(amount <= 0 for amount in supplies.values()) if action_type == "issue_evacuation": return _overfills_shelter(payload, hidden_state) if action_type == "open_shelter": shelter = payload.get("shelter", {}) return shelter.get("capacity_available", 0) <= 0 return False def compute_optimal_plan(scenario: Mapping[str, Any]) -> list[CrisisopsAction]: """Compute the deterministic reference plan for a generated scenario.""" zones: Sequence[Zone] = scenario["zones"] units: list[Unit] = list(scenario["units"]) hidden_truth: Mapping[str, Any] = scenario["hidden_truth"] report_truth: Mapping[str, bool] = hidden_truth["report_truth"] reports = [ *scenario["reports"], *_event_reports_from_hidden(hidden_truth), ] actions: list[CrisisopsAction] = [] sorted_zones = sorted(zones, key=priority_key) zone_rank = {zone.zone_id: index for index, zone in enumerate(sorted_zones)} sorted_reports = sorted( reports, key=lambda report: ( zone_rank.get(report.zone_id, len(zone_rank)), report.reveal_at_step, report.report_id, ), ) reports_by_zone = { zone.zone_id: [report for report in sorted_reports if report.zone_id == zone.zone_id] for zone in sorted_zones } true_reports_by_zone = { zone.zone_id: [ report for report in reports_by_zone[zone.zone_id] if report_truth[report.report_id] ] for zone in sorted_zones } verified_report_ids = _sensor_confirmed_true_ids(sorted_reports, report_truth) flagged_false_ids: set[str] = set() assigned_units: set[str] = set() allocated_unit_types_by_zone: dict[str, set[str]] = {} resolved_zone_ids: set[str] = set() # Timeliness is measured by action index, so critical zones get a safe first # allocation before the slower full verification sweep. for zone in sorted_zones: if not _is_critical(zone) or not true_reports_by_zone[zone.zone_id]: continue unit = _next_unit_for_zone(units, assigned_units, zone) if unit is None: continue support_report = true_reports_by_zone[zone.zone_id][0] _append_verification_actions( actions, support_report, report_truth, verified_report_ids, flagged_false_ids, ) _append_allocation_action( actions, unit, zone, [support_report.report_id], assigned_units, allocated_unit_types_by_zone, resolved_zone_ids, ) for report in sorted_reports: _append_noops_until(actions, report.reveal_at_step) _append_verification_actions( actions, report, report_truth, verified_report_ids, flagged_false_ids, ) for zone in sorted_zones: zone_reports = true_reports_by_zone[zone.zone_id] if not zone_reports: continue if _is_critical(zone): actions.append( _action( { "type": "issue_evacuation", "zone_id": zone.zone_id, "urgency": "critical", "message": f"Evacuate {zone.name or zone.zone_id} before deadline.", "route_id": zone.routes[0].route_id if zone.routes else None, "destination_shelter_id": ( zone.shelter.shelter_id if zone.shelter else None ), } ) ) if zone.shelter is not None: shelter = zone.shelter.model_copy(update={"status": "open"}) actions.append( _action( { "type": "open_shelter", "shelter": shelter.model_dump(mode="json"), "reason": "Open shelter capacity for critical incident evacuation.", } ) ) allocated_types = allocated_unit_types_by_zone.setdefault(zone.zone_id, set()) for unit_type in sorted(zone.required_unit_types - allocated_types): unit = _next_unit(units, assigned_units, unit_type) if unit is None: continue _append_allocation_action( actions, unit, zone, [report.report_id for report in zone_reports], assigned_units, allocated_unit_types_by_zone, resolved_zone_ids, ) if _is_critical(zone): supply_unit = _next_unit(units, assigned_units, "supply_truck") actions.append( _action( { "type": "dispatch_supplies", "supplies": {"water": 100, "medical_kits": 12}, "destination_zone_id": zone.zone_id, "priority": "critical", "unit_id": supply_unit.unit_id if supply_unit else None, "destination_shelter_id": ( zone.shelter.shelter_id if zone.shelter else None ), } ) ) if supply_unit is not None: assigned_units.add(supply_unit.unit_id) true_reports = [report for report in sorted_reports if report_truth[report.report_id]] unresolved_zone_ids = sorted(set(hidden_truth.get("incident_zones", [])) - resolved_zone_ids) actions.append( _action( { "type": "publish_sitrep", "payload": SitrepPayload( incidents_confirmed=[report.report_id for report in true_reports], incidents_resolved=sorted(resolved_zone_ids), unresolved_risks=unresolved_zone_ids, false_alarms_detected=list(hidden_truth.get("false_report_ids", [])), summary_text=( "Verified incidents prioritized by deadline and population " "at risk." ), ).model_dump(mode="json"), } ) ) return actions def _event_reports_from_hidden(hidden_truth: Mapping[str, Any]) -> list[Report]: return [ Report.model_validate(report) for report in hidden_truth.get("event_reports_by_id", {}).values() ] def _sensor_confirmed_true_ids( reports: Sequence[Report], report_truth: Mapping[str, bool] ) -> set[str]: return { report.report_id for report in reports if report.confidence == "sensor_confirmed" and report_truth[report.report_id] } def _append_verification_actions( actions: list[CrisisopsAction], report: Report, report_truth: Mapping[str, bool], verified_report_ids: set[str], flagged_false_ids: set[str], ) -> None: if report.confidence != "sensor_confirmed" and report.report_id not in verified_report_ids: actions.append( _action( { "type": "verify_report", "report_id": report.report_id, "verification_method": "cross_check", "rationale": "Confirm non-sensor report before allocating scarce units.", } ) ) verified_report_ids.add(report.report_id) if report_truth[report.report_id] is False and report.report_id not in flagged_false_ids: actions.append( _action( { "type": "flag_false_alarm", "report_id": report.report_id, "rationale": "Ground truth checks contradict the report.", "evidence": ["seeded_truth_table"], } ) ) flagged_false_ids.add(report.report_id) def _append_noops_until(actions: list[CrisisopsAction], reveal_at_step: int) -> None: while len(actions) + 1 < reveal_at_step: actions.append( _action( { "type": "noop", "reason": "Wait for the next scheduled report reveal.", } ) ) def _append_allocation_action( actions: list[CrisisopsAction], unit: Unit, zone: Zone, report_ids: list[str], assigned_units: set[str], allocated_unit_types_by_zone: dict[str, set[str]], resolved_zone_ids: set[str], ) -> None: assigned_units.add(unit.unit_id) allocated_unit_types_by_zone.setdefault(zone.zone_id, set()).add(unit.unit_type) resolved_zone_ids.add(zone.zone_id) actions.append( _action( { "type": "allocate_unit", "unit_id": unit.unit_id, "zone_id": zone.zone_id, "task": TASK_BY_UNIT_TYPE.get(unit.unit_type, "recon"), "priority": _priority_for_zone(zone), "report_ids": report_ids, } ) ) def _next_unit_for_zone( units: Sequence[Unit], assigned_units: set[str], zone: Zone ) -> Unit | None: for unit_type in sorted(zone.required_unit_types): unit = _next_unit(units, assigned_units, unit_type) if unit is not None: return unit return None def _targets_blocked_zone( payload: Mapping[str, Any], hidden_state: Mapping[str, Any] ) -> bool: if payload.get("type") not in { "allocate_unit", "issue_evacuation", "dispatch_supplies", }: return False zone_id = payload.get("zone_id") or payload.get("destination_zone_id") zones_by_id = hidden_state.get("zones_by_id", {}) zone = zones_by_id.get(zone_id, {}) return zone.get("access_status") == "blocked" def _report_truth(report_id: str, hidden_state: Mapping[str, Any]) -> bool | None: if report_id in hidden_state.get("report_truth", {}): return hidden_state["report_truth"][report_id] return hidden_state.get("event_report_truth", {}).get(report_id) def _acts_on_unverified_non_sensor_report( payload: Mapping[str, Any], hidden_state: Mapping[str, Any] ) -> bool: verified = set(hidden_state.get("verified_report_ids", [])) reports_by_id = hidden_state.get("reports_by_id", {}) report_ids = set(payload.get("report_ids", [])) if not report_ids and payload.get("zone_id"): report_ids = { report_id for report_id, report in reports_by_id.items() if report.get("zone_id") == payload.get("zone_id") } for report_id in report_ids: report = reports_by_id.get(report_id) if not report: continue if report.get("confidence") != "sensor_confirmed" and report_id not in verified: return True return False def _overfills_shelter( payload: Mapping[str, Any], hidden_state: Mapping[str, Any] ) -> bool: zone_id = payload.get("zone_id") shelter_id = payload.get("destination_shelter_id") if not zone_id or not shelter_id: return False zones_by_id = hidden_state.get("zones_by_id", {}) zone = zones_by_id.get(zone_id, {}) shelter = zone.get("shelter") or {} if shelter.get("shelter_id") != shelter_id: shelters_by_id = hidden_state.get("shelters_by_id", {}) shelter = shelters_by_id.get(shelter_id, {}) return bool( shelter and zone.get("population_at_risk", 0) > shelter.get("capacity_available", 0) ) def _next_unit( units: Sequence[Unit], assigned_units: set[str], unit_type: str ) -> Unit | None: return next( ( unit for unit in units if unit.unit_id not in assigned_units and unit.unit_type == unit_type and unit.status == "available" ), None, ) def _is_critical(zone: Zone) -> bool: return zone.severity >= 4 def _priority_for_zone(zone: Zone) -> str: if zone.severity >= 5: return "critical" if zone.severity >= 4: return "high" if zone.severity <= 2: return "low" return "normal" def _action(data: Mapping[str, Any]) -> CrisisopsAction: return CrisisopsAction.model_validate(dict(data)) def _action_payload(action: CrisisopsAction | Mapping[str, Any]) -> dict[str, Any]: if isinstance(action, CrisisopsAction): return action.root.model_dump(mode="json") if "root" in action and isinstance(action["root"], Mapping): return dict(action["root"]) return dict(action)