Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from typing import Any, Dict, List, Optional | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| from environment.avigilance_env import AvigilanceEnv | |
| from environment.models import ( | |
| AvigilanceAction, | |
| FTOGradeAction, | |
| IncidentPriorityAction, | |
| ResourceAllocationAction, | |
| ) | |
| from environment.scoring import format_open_score, format_open_score_compact, normalize_open_score | |
| load_dotenv() | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") | |
| API_KEY = os.getenv("OPENAI_API_KEY") or os.getenv("HF_TOKEN") | |
| BENCHMARK = "avigilance-env" | |
| def build_client() -> OpenAI: | |
| return OpenAI(base_url=API_BASE_URL, api_key=API_KEY or "missing-token") | |
| CLIENT = build_client() | |
| def compact_json(value: Any) -> str: | |
| return json.dumps(value, separators=(",", ":"), ensure_ascii=True) | |
| def log_start(task: str) -> None: | |
| print(f"[START] task={task} env={BENCHMARK} model={MODEL_NAME}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_text = error if error else "null" | |
| print( | |
| f"[STEP] step={step} action={action} reward={format_open_score(reward, decimals=2)} done={str(done).lower()} error={error_text}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_text = ",".join(format_open_score_compact(reward) for reward in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={format_open_score_compact(score)} rewards={rewards_text}", | |
| flush=True, | |
| ) | |
| def maybe_generate_rationale(prompt: str) -> Optional[str]: | |
| if not API_KEY or API_KEY == "your_api_key_here": | |
| return None | |
| try: | |
| completion = CLIENT.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": "Respond with one concise operational sentence."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=0.0, | |
| max_tokens=120, | |
| ) | |
| content = (completion.choices[0].message.content or "").strip() | |
| return content or None | |
| except Exception: | |
| return None | |
| def build_task1_action(obs) -> AvigilanceAction: | |
| fto = obs.fto_profile | |
| total = round( | |
| fto.performance_score | |
| + fto.operational_score | |
| + fto.safety_score | |
| + fto.compliance_score | |
| + fto.student_support_score, | |
| 2, | |
| ) | |
| if total >= 90 and fto.recent_incidents == 0 and fto.pass_rate >= 0.85: | |
| grade = "A+" | |
| recommended_action = "clear" | |
| elif total >= 75 and fto.recent_incidents <= 1 and fto.pass_rate >= 0.75: | |
| grade = "A" | |
| recommended_action = "clear" | |
| elif total >= 50 and fto.recent_incidents <= 3 and fto.pass_rate >= 0.60: | |
| grade = "B" | |
| recommended_action = "self_assessment_required" | |
| else: | |
| grade = "C" | |
| recommended_action = "immediate_audit" if fto.recent_incidents >= 3 else "dgca_notice_issued" | |
| risk_flags: List[str] = [] | |
| if fto.recent_incidents >= 3: | |
| risk_flags.append("high_incident_rate") | |
| if fto.solo_hours_per_student < 15: | |
| risk_flags.append("insufficient_solo_hours") | |
| if fto.pass_rate < 0.55: | |
| risk_flags.append("low_pass_rate") | |
| if fto.grievances_last_6_months >= 5: | |
| risk_flags.append("excessive_student_grievances") | |
| if fto.safety_score < 10: | |
| risk_flags.append("safety_critical") | |
| rationale = maybe_generate_rationale( | |
| f"Explain a DGCA action for grade {grade}, total score {total}, incidents {fto.recent_incidents}, and pass rate {fto.pass_rate}." | |
| ) or f"Assigned grade {grade} from the DGCA rubric using a total score of {total} with risk flags derived from incidents, safety, pass rate, and grievances." | |
| return AvigilanceAction( | |
| task_id="task1", | |
| fto_grade_action=FTOGradeAction( | |
| grade=grade, | |
| total_score=total, | |
| risk_flags=risk_flags, | |
| recommended_action=recommended_action, | |
| justification=rationale, | |
| ), | |
| ) | |
| def compute_incident_priority(incident) -> float: | |
| type_base = { | |
| "runway_incursion": 0.95, | |
| "atc_deviation": 0.80, | |
| "fdtl_violation": 0.70, | |
| "technical_snag": 0.60, | |
| "maintenance_lapse": 0.65, | |
| "bird_strike": 0.50, | |
| "fuel_irregularity": 0.55, | |
| "unauthorized_access": 0.45, | |
| } | |
| severity_multiplier = {"low": 1.0, "medium": 1.15, "high": 1.30, "critical": 1.50} | |
| base = type_base.get(incident.incident_type, 0.5) | |
| recurrence_boost = min(incident.recurrence_count * 0.08, 0.25) | |
| traffic_boost = min(incident.flights_per_day_at_airport / 500 * 0.10, 0.10) | |
| inspection_penalty = min(incident.days_since_last_inspection / 180 * 0.10, 0.10) | |
| raw = (base + recurrence_boost + traffic_boost + inspection_penalty) * severity_multiplier[incident.severity.value] | |
| return round(min(raw, 1.0), 4) | |
| def build_task2_action(obs) -> AvigilanceAction: | |
| incidents = list(obs.incident_batch) | |
| ranked_incidents = sorted(incidents, key=compute_incident_priority, reverse=True) | |
| ranking = [incident.incident_id for incident in ranked_incidents] | |
| escalate = [incident.incident_id for incident in ranked_incidents if compute_incident_priority(incident) >= 0.85] | |
| defer = [incident.incident_id for incident in ranked_incidents if compute_incident_priority(incident) < 0.60] | |
| pattern_counts: Dict[str, int] = {} | |
| for incident in incidents: | |
| key = f"{incident.incident_type}:{incident.airline}" | |
| pattern_counts[key] = pattern_counts.get(key, 0) + 1 | |
| repeated = [key for key, count in pattern_counts.items() if count >= 2] | |
| rationale = maybe_generate_rationale( | |
| f"Summarize why these incident ids are highest priority: {ranking[:3]}." | |
| ) or "Top incidents rank highest because their severity, recurrence, and delayed inspection windows imply the greatest operational urgency." | |
| return AvigilanceAction( | |
| task_id="task2", | |
| incident_priority_action=IncidentPriorityAction( | |
| priority_ranking=ranking, | |
| top_3_rationale=rationale, | |
| defer_list=defer, | |
| escalate_immediately=escalate, | |
| pattern_detected=bool(repeated), | |
| pattern_description=("Repeated operator-pattern pairs detected: " + ", ".join(repeated)) if repeated else None, | |
| ), | |
| ) | |
| def task_hours_for_fto(fto) -> int: | |
| total = ( | |
| fto.performance_score | |
| + fto.operational_score | |
| + fto.safety_score | |
| + fto.compliance_score | |
| + fto.student_support_score | |
| ) | |
| if total < 50: | |
| return 16 | |
| if total < 70: | |
| return 8 | |
| return 4 | |
| def task_hours_for_incident(incident) -> int: | |
| return {"critical": 8, "high": 6, "medium": 4, "low": 2}[incident.severity.value] | |
| def build_task3_action(obs) -> AvigilanceAction: | |
| ftos = list(obs.fto_audit_queue or []) | |
| incidents = list(obs.incident_queue or []) | |
| inspectors = [f"inspector_{index + 1}" for index in range(obs.inspector_capacity or 2)] | |
| remaining_budget = obs.week_budget_hours or 40 | |
| assignments: Dict[str, List[str]] = {inspector: [] for inspector in inspectors} | |
| prioritized: List[Dict[str, Any]] = [] | |
| for incident in sorted(incidents, key=compute_incident_priority, reverse=True): | |
| prioritized.append({"id": incident.incident_id, "hours": task_hours_for_incident(incident)}) | |
| for fto in sorted(ftos, key=task_hours_for_fto): | |
| prioritized.append({"id": fto.fto_id, "hours": task_hours_for_fto(fto)}) | |
| deferred: List[str] = [] | |
| inspector_index = 0 | |
| for item in prioritized: | |
| assigned = False | |
| for _ in inspectors: | |
| inspector = inspectors[inspector_index % len(inspectors)] | |
| inspector_index += 1 | |
| if len(assignments[inspector]) >= 3: | |
| continue | |
| if item["hours"] <= remaining_budget: | |
| assignments[inspector].append(item["id"]) | |
| remaining_budget -= item["hours"] | |
| assigned = True | |
| break | |
| if not assigned: | |
| deferred.append(item["id"]) | |
| rationale = maybe_generate_rationale( | |
| f"Summarize an allocation strategy for {len(ftos)} FTOs and {len(incidents)} incidents under a budget of {obs.week_budget_hours} hours." | |
| ) or "Allocated inspectors to the highest-risk incidents first, then used remaining hours for audit coverage without breaching per-inspector task caps." | |
| covered = sum(len(tasks) for tasks in assignments.values()) | |
| total_items = len(prioritized) if prioritized else 1 | |
| predicted_reduction = normalize_open_score(covered / total_items) | |
| return AvigilanceAction( | |
| task_id="task3", | |
| resource_allocation_action=ResourceAllocationAction( | |
| inspector_assignments=assignments, | |
| deferred_items=deferred, | |
| priority_rationale=rationale, | |
| predicted_risk_reduction=predicted_reduction, | |
| abstain=False, | |
| abstain_reason=None, | |
| ), | |
| ) | |
| def run_episode(task_id: str, seed: int = 42) -> float: | |
| env = AvigilanceEnv(task_id=task_id, seed=seed) | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| success = False | |
| score = normalize_open_score(0) | |
| log_start(task_id) | |
| try: | |
| obs = env.reset() | |
| done = False | |
| while not done: | |
| if task_id == "task1": | |
| action = build_task1_action(obs) | |
| elif task_id == "task2": | |
| action = build_task2_action(obs) | |
| else: | |
| action = build_task3_action(obs) | |
| action_text = compact_json(action.model_dump(exclude_none=True)) | |
| error = None | |
| try: | |
| obs, reward, done, _info = env.step(action) | |
| rewards.append(reward.score) | |
| steps_taken += 1 | |
| log_step(steps_taken, action_text, reward.score, done, error) | |
| except Exception as exc: | |
| done = True | |
| error = str(exc) | |
| rewards.append(normalize_open_score(0.0)) | |
| steps_taken += 1 | |
| log_step(steps_taken, action_text, normalize_open_score(0.0), done, error) | |
| if rewards: | |
| score = normalize_open_score(sum(rewards) / len(rewards)) | |
| success = score >= 0.1 | |
| finally: | |
| log_end(success, steps_taken, score, rewards) | |
| return score | |
| def main() -> None: | |
| for task_id in ("task1", "task2", "task3"): | |
| run_episode(task_id, seed=42) | |
| if __name__ == "__main__": | |
| main() | |