Spaces:
Running on Zero
Running on Zero
| """FLIGHTDECK agent — an LLM as a tool-using flight-search assistant. | |
| Scope is deliberately narrow: the agent can ONLY look up live flights to/from an | |
| airport, or on an origin->destination route. It chooses a tool, the tool runs the | |
| real FlightRadar24 API call, and the LLM writes the answer from the results. | |
| Every run is persisted as an agent trace under ./traces/ (one JSON per run plus a | |
| rolling JSONL log) so the reasoning + tool calls are auditable — useful for the | |
| HuggingFace hackathon submission. | |
| """ | |
| from __future__ import annotations | |
| import datetime as dt | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| import fr24 | |
| import liquid | |
| _LOCAL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "traces") | |
| try: | |
| # Attempt to use the local directory and prove it is writable | |
| os.makedirs(_LOCAL_DIR, exist_ok=True) | |
| _test_path = os.path.join(_LOCAL_DIR, ".write_test") | |
| with open(_test_path, "w") as f: | |
| f.write("ok") | |
| os.remove(_test_path) | |
| TRACES_DIR = _LOCAL_DIR | |
| except (OSError, PermissionError): | |
| # If Hugging Face locks the file system, fallback to /tmp | |
| TRACES_DIR = "/tmp/traces" | |
| os.makedirs(TRACES_DIR, exist_ok=True) | |
| JSONL_LOG = os.path.join(TRACES_DIR, "agent_log.jsonl") | |
| MODEL_NAME = os.environ.get("LLM_REPO", "openbmb/MiniCPM5-1B") | |
| # Best-effort city/keyword -> IATA so users can type "London to Dubai". | |
| CITY_TO_IATA = { | |
| "london": "LHR", "heathrow": "LHR", "gatwick": "LGW", "stansted": "STN", | |
| "new york": "JFK", "nyc": "JFK", "newark": "EWR", "jfk": "JFK", | |
| "dubai": "DXB", "paris": "CDG", "amsterdam": "AMS", "frankfurt": "FRA", | |
| "tokyo": "HND", "haneda": "HND", "narita": "NRT", "singapore": "SIN", | |
| "hong kong": "HKG", "los angeles": "LAX", "la": "LAX", "chicago": "ORD", | |
| "san francisco": "SFO", "sydney": "SYD", "melbourne": "MEL", "doha": "DOH", | |
| "istanbul": "IST", "madrid": "MAD", "barcelona": "BCN", "rome": "FCO", | |
| "munich": "MUC", "berlin": "BER", "dublin": "DUB", "boston": "BOS", | |
| "miami": "MIA", "atlanta": "ATL", "toronto": "YYZ", "delhi": "DEL", | |
| "mumbai": "BOM", "beijing": "PEK", "shanghai": "PVG", "seoul": "ICN", | |
| "birmingham": "BHX", "manchester": "MAN", "edinburgh": "EDI", | |
| } | |
| TOOLS_DOC = """\ | |
| TOOLS (you may call exactly one per turn): | |
| 1. search_by_route - live flights flying a specific origin->destination route. | |
| args: {"origin": "<IATA/ICAO>", "destination": "<IATA/ICAO>"} | |
| 2. search_by_airport - live flights to/from a single airport. | |
| args: {"airport": "<IATA/ICAO>", "direction": "inbound"|"outbound"|"both"} | |
| """ | |
| SYSTEM_PROMPT = f"""You are FLIGHTDECK-ONE, a focused flight-search assistant. | |
| You can ONLY help users find LIVE flights to/from airports or on a route. | |
| You cannot book, price, give weather, or answer anything off-topic. | |
| {TOOLS_DOC} | |
| Reply with ONE JSON object and nothing else. Three shapes: | |
| - Route (when the user gives BOTH an origin AND a destination): | |
| {{"tool": "search_by_route", "origin": "LHR", "destination": "JFK"}} | |
| - Airport (when the user gives ONE place, with a direction): | |
| {{"tool": "search_by_airport", "airport": "DXB", "direction": "inbound"}} | |
| - Refuse (when the request is NOT about finding live flights): | |
| {{"tool": "none", "answer": "<one sentence refusal>"}} | |
| Rules: | |
| - "X to Y", "from X to Y" => search_by_route (two places). | |
| - "arrivals/into/landing" => direction "inbound"; "departures/leaving" => "outbound". | |
| - Use IATA/ICAO codes; map city names to their main airport code. | |
| - If it is not a flight search (poem, math, chat, weather...), use tool "none". | |
| Output JSON only. | |
| Examples: | |
| User: flights from London to Dubai | |
| {{"tool": "search_by_route", "origin": "LHR", "destination": "DXB"}} | |
| User: arrivals into JFK | |
| {{"tool": "search_by_airport", "airport": "JFK", "direction": "inbound"}} | |
| User: departures from LAX | |
| {{"tool": "search_by_airport", "airport": "LAX", "direction": "outbound"}} | |
| User: write me a poem about clouds | |
| {{"tool": "none", "answer": "I can only search live flights to/from airports or on a route."}}""" | |
| # Words that signal the query is actually about flights / airports. | |
| FLIGHT_KEYWORDS = { | |
| "flight", "flights", "fly", "flying", "flown", "plane", "planes", "aircraft", | |
| "airline", "airlines", "airport", "airports", "arrival", "arrivals", "arrive", | |
| "arriving", "departure", "departures", "depart", "departing", "inbound", | |
| "outbound", "landing", "land", "takeoff", "route", "routes", "callsign", | |
| "aviation", "airborne", "jet", "jets", "airspace", "tail", "registration", | |
| } | |
| # --------------------------------------------------------------------------- # | |
| def _norm_code(value: str) -> str: | |
| if not value: | |
| return "" | |
| v = value.strip() | |
| low = v.lower() | |
| if low in CITY_TO_IATA: | |
| return CITY_TO_IATA[low] | |
| return re.sub(r"[^A-Za-z]", "", v).upper()[:4] | |
| def _in_scope(query: str) -> bool: | |
| """Deterministic guard: is this plausibly a flight-search request at all? | |
| Passes if the text has a flight keyword, a known city, or an airport-code- | |
| looking token. Guarantees off-topic prompts are refused even if the tiny | |
| model wants to answer them. | |
| """ | |
| q = query.lower() | |
| words = set(re.findall(r"[a-z]+", q)) | |
| if words & FLIGHT_KEYWORDS: | |
| return True | |
| if any(city in q for city in CITY_TO_IATA): | |
| return True | |
| # Bare airport-code token, e.g. "JFK", "EGLL", or "LHR to DXB". | |
| if re.search(r"\b[A-Za-z]{3,4}\b\s*(?:to|-|>|→)\s*\b[A-Za-z]{3,4}\b", query): | |
| return True | |
| if re.search(r"\b[A-Z]{3,4}\b", query): | |
| return True | |
| return False | |
| def _validate(action, query): | |
| """Sanity-check / repair the model's tool choice against the query. | |
| Returns (action, override_reason | None). The tiny model often under-uses | |
| the route tool and forgets args, so we correct obvious cases and record why. | |
| """ | |
| regex_action = _regex_plan(query) | |
| reason = None | |
| # 0. Model refused / gave no tool, but the query is in-scope and the rules | |
| # CAN plan it -> recover (the gate already proved it's a flight query). | |
| if (not action or action.get("tool") in (None, "none", "")): | |
| if regex_action and regex_action.get("tool") in TOOL_IMPLS: | |
| return regex_action, "override: model refused an in-scope query" | |
| return action, None | |
| # 1. Strong route signal in the text but model didn't pick route -> override. | |
| if (regex_action and regex_action.get("tool") == "search_by_route" | |
| and action.get("tool") != "search_by_route"): | |
| action = regex_action | |
| reason = "override: query has explicit origin->destination" | |
| # 2. Route chosen but missing an endpoint -> fill from regex or downgrade. | |
| if action.get("tool") == "search_by_route": | |
| if not action.get("origin") or not action.get("destination"): | |
| if regex_action and regex_action.get("tool") == "search_by_route": | |
| action, reason = regex_action, "repair: filled missing route args" | |
| else: | |
| action = {"tool": "none", | |
| "answer": "Tell me both an origin and a destination, " | |
| "e.g. 'flights from London to Dubai'."} | |
| reason = "repair: route missing args, no fallback" | |
| # 3. Airport chosen but missing the airport code -> fill from regex. | |
| if action.get("tool") == "search_by_airport" and not action.get("airport"): | |
| if regex_action and regex_action.get("airport"): | |
| action["airport"] = regex_action["airport"] | |
| action.setdefault("direction", regex_action.get("direction", "both")) | |
| reason = "repair: filled missing airport from query" | |
| else: | |
| action = {"tool": "none", | |
| "answer": "Which airport? e.g. 'arrivals into JFK'."} | |
| reason = "repair: airport missing, no fallback" | |
| return action, reason | |
| def _extract_json(text: str): | |
| """Pull the first balanced {...} object out of a model response.""" | |
| if not text: | |
| return None | |
| start = text.find("{") | |
| if start == -1: | |
| return None | |
| depth = 0 | |
| for i in range(start, len(text)): | |
| if text[i] == "{": | |
| depth += 1 | |
| elif text[i] == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| blob = text[start:i + 1] | |
| try: | |
| return json.loads(blob) | |
| except Exception: | |
| try: | |
| return json.loads(blob.replace("'", '"')) | |
| except Exception: | |
| return None | |
| return None | |
| # ---- tool implementations -------------------------------------------------- # | |
| def _tool_search_by_route(args): | |
| o = _norm_code(args.get("origin", "")) | |
| d = _norm_code(args.get("destination", "")) | |
| if not o or not d: | |
| return [], {"error": "missing origin/destination"}, f"{o or '?'}-{d or '?'}" | |
| data, url = fr24.search_route(o, d) | |
| return data, {"route": f"{o}-{d}", "request_url": url}, f"{o}->{d}" | |
| def _tool_search_by_airport(args): | |
| ap = _norm_code(args.get("airport", "")) | |
| direction = (args.get("direction") or "both").lower() | |
| if direction not in {"inbound", "outbound", "both"}: | |
| direction = "both" | |
| if not ap: | |
| return [], {"error": "missing airport"}, "?" | |
| data, url = fr24.search_airport(ap, direction) | |
| return data, {"airport": ap, "direction": direction, "request_url": url}, f"{direction}:{ap}" | |
| TOOL_IMPLS = { | |
| "search_by_route": _tool_search_by_route, | |
| "search_by_airport": _tool_search_by_airport, | |
| } | |
| def _summarize_flights(flights, limit=25): | |
| if not flights: | |
| return "No live flights matched." | |
| lines = [f"{len(flights)} live flight(s) found. Sample:"] | |
| for f in flights[:limit]: | |
| lines.append( | |
| f"- {f.get('callsign') or f.get('flight') or '??'} " | |
| f"({f.get('type') or '?'}) " | |
| f"{f.get('orig_iata') or f.get('orig_icao') or '?'}->" | |
| f"{f.get('dest_iata') or f.get('dest_icao') or '?'} " | |
| f"alt={f.get('alt')}ft gs={f.get('gspeed')}kt eta={f.get('eta')}" | |
| ) | |
| if len(flights) > limit: | |
| lines.append(f"...(+{len(flights) - limit} more)") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- # | |
| def _new_trace(query): | |
| return { | |
| "trace_id": dt.datetime.now().strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:6], | |
| "started_at": dt.datetime.now(dt.timezone.utc).isoformat(), | |
| "model": MODEL_NAME, | |
| "query": query, | |
| "agent_mode": None, | |
| "steps": [], | |
| "tool_calls": [], | |
| "flights_returned": 0, | |
| "answer": None, | |
| } | |
| def _save_trace(trace): | |
| trace["ended_at"] = dt.datetime.now(dt.timezone.utc).isoformat() | |
| path = os.path.join(TRACES_DIR, f"trace_{trace['trace_id']}.json") | |
| with open(path, "w", encoding="utf-8") as fh: | |
| json.dump(trace, fh, indent=2, ensure_ascii=False) | |
| with open(JSONL_LOG, "a", encoding="utf-8") as fh: | |
| fh.write(json.dumps({ | |
| "trace_id": trace["trace_id"], | |
| "ts": trace["ended_at"], | |
| "query": trace["query"], | |
| "mode": trace["agent_mode"], | |
| "tool_calls": [t["tool"] for t in trace["tool_calls"]], | |
| "flights_returned": trace["flights_returned"], | |
| }, ensure_ascii=False) + "\n") | |
| return path | |
| _FILLER = { | |
| "flights", "flight", "fly", "flying", "show", "me", "the", "all", "any", | |
| "find", "list", "get", "please", "live", "to", "from", "into", "for", "of", | |
| "a", "an", "are", "is", "there", "what", "whats", "which", "near", "around", | |
| "at", "right", "now", "currently", "today", "going", "headed", "bound", | |
| "arrivals", "arriving", "arrive", "inbound", "landing", "departures", | |
| "departing", "depart", "leaving", "outbound", "between", "and", "in", "on", | |
| } | |
| def _clean_place(text: str) -> str: | |
| """Extract a place phrase from a fragment, dropping filler words. | |
| Returns a CITY_TO_IATA key when one is recognized (incl. multi-word cities | |
| like 'new york'), else the most specific leftover token. | |
| """ | |
| toks = [t for t in re.findall(r"[a-z]+", text.lower()) if t not in _FILLER] | |
| if not toks: | |
| return "" | |
| phrase = " ".join(toks) | |
| if phrase in CITY_TO_IATA: | |
| return phrase | |
| for n in (3, 2): # multi-word city names | |
| for i in range(len(toks) - n + 1): | |
| cand = " ".join(toks[i:i + n]) | |
| if cand in CITY_TO_IATA: | |
| return cand | |
| for t in toks: | |
| if t in CITY_TO_IATA: | |
| return t | |
| return toks[-1] | |
| def _regex_plan(query): | |
| """Rule-based planner. Used as LLM-free fallback AND as a validator prior.""" | |
| q = query.lower().strip() | |
| # Route: "A to B" (two distinct places around 'to'). | |
| if " to " in q: | |
| left, right = q.split(" to ", 1) | |
| o, d = _clean_place(left), _clean_place(right) | |
| if o and d and o != d: | |
| return {"tool": "search_by_route", "origin": o, "destination": d, | |
| "thought": "regex: route"} | |
| m = re.search(r"\b(?:arrivals?|arriving|inbound|landing|into)\b(.*)", q) | |
| if m and _clean_place(m.group(1)): | |
| return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)), | |
| "direction": "inbound", "thought": "regex: inbound"} | |
| m = re.search(r"\b(?:departures?|departing|leaving|outbound)\b(.*)", q) | |
| if m and _clean_place(m.group(1)): | |
| return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)), | |
| "direction": "outbound", "thought": "regex: outbound"} | |
| m = re.search(r"\bfrom\s+(.*)", q) | |
| if m and _clean_place(m.group(1)): | |
| return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)), | |
| "direction": "outbound", "thought": "regex: from"} | |
| m = re.search(r"\b(?:at|near|around|over)\b(.*)", q) | |
| if m and _clean_place(m.group(1)): | |
| return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)), | |
| "direction": "both", "thought": "regex: airport"} | |
| # Bare city / airport code anywhere. | |
| p = _clean_place(q) | |
| if p and (p in CITY_TO_IATA or re.search(r"\b[A-Za-z]{3,4}\b", query)): | |
| return {"tool": "search_by_airport", "airport": p, | |
| "direction": "both", "thought": "regex: bare"} | |
| return None | |
| def run(query: str, max_tokens=400): | |
| """Run the agent for one user query. | |
| Returns dict: {answer, flights (raw FR24 records), trace_path, trace_id, | |
| tool_calls, mode}. | |
| """ | |
| trace = _new_trace(query) | |
| use_llm = liquid.available() | |
| trace["agent_mode"] = "llm" if use_llm else "fallback-regex" | |
| # ---- 0. SCOPE GATE: hard refuse anything that isn't a flight search ---- | |
| if not _in_scope(query): | |
| trace["steps"].append({"step": 0, "phase": "scope-gate", "in_scope": False}) | |
| answer = ("I only search live flights — try 'flights from London to " | |
| "Dubai', 'arrivals into JFK', or 'departures from LAX'.") | |
| trace["answer"] = answer | |
| trace["agent_mode"] += "+scope-refused" | |
| path = _save_trace(trace) | |
| return {"answer": answer, "flights": [], "trace_path": path, | |
| "trace_id": trace["trace_id"], "tool_calls": [], | |
| "mode": trace["agent_mode"]} | |
| # ---- 1. PLAN: decide which tool to call (or refuse) ---- | |
| action = None | |
| if use_llm: | |
| messages = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": query}, | |
| ] | |
| try: | |
| raw, latency = liquid.complete(messages, max_tokens=200, temperature=0.0) | |
| except Exception as e: # noqa: BLE001 | |
| raw, latency = f"(model error: {e})", 0 | |
| use_llm = False | |
| trace["agent_mode"] = "fallback-regex" | |
| action = _extract_json(raw) | |
| trace["steps"].append({ | |
| "step": 1, "phase": "plan", "model_raw": raw, | |
| "parsed_action": action, "latency_ms": latency, | |
| }) | |
| if action is None: | |
| action = _regex_plan(query) | |
| trace["steps"].append({ | |
| "step": 1, "phase": "plan-fallback", "parsed_action": action, | |
| }) | |
| # ---- 1b. VALIDATE / REPAIR the plan (tiny-model guardrail) ---- | |
| action, override_reason = _validate(action, query) | |
| if override_reason: | |
| trace["steps"].append({ | |
| "step": 1, "phase": "validate", | |
| "final_action": action, "override_reason": override_reason, | |
| }) | |
| # Refusal / no actionable tool. | |
| if not action or action.get("tool") in (None, "none", ""): | |
| answer = (action or {}).get( | |
| "answer", | |
| "I can only search live flights to/from an airport or on a route. " | |
| "Try: 'flights from London to Dubai' or 'arrivals into JFK'.") | |
| trace["answer"] = answer | |
| path = _save_trace(trace) | |
| return {"answer": answer, "flights": [], "trace_path": path, | |
| "trace_id": trace["trace_id"], "tool_calls": [], | |
| "mode": trace["agent_mode"]} | |
| # ---- 2. ACT: run the chosen tool (real FR24 call) ---- | |
| tool = action.get("tool") | |
| impl = TOOL_IMPLS.get(tool) | |
| if impl is None: | |
| answer = f"Unknown tool '{tool}'. I only do flight to/from search." | |
| trace["answer"] = answer | |
| path = _save_trace(trace) | |
| return {"answer": answer, "flights": [], "trace_path": path, | |
| "trace_id": trace["trace_id"], "tool_calls": [], "mode": trace["agent_mode"]} | |
| t0 = time.time() | |
| try: | |
| flights, meta, label = impl(action) | |
| error = None | |
| except fr24.FR24Error as e: | |
| flights, meta, label, error = [], {"error": str(e)}, tool, str(e) | |
| except Exception as e: # noqa: BLE001 | |
| flights, meta, label, error = [], {"error": repr(e)}, tool, repr(e) | |
| tool_latency = int((time.time() - t0) * 1000) | |
| call_record = { | |
| "tool": tool, "args": {k: v for k, v in action.items() | |
| if k not in ("thought", "tool")}, | |
| "meta": meta, "result_count": len(flights), | |
| "latency_ms": tool_latency, "error": error, | |
| } | |
| trace["tool_calls"].append(call_record) | |
| trace["steps"].append({"step": 2, "phase": "act", **call_record}) | |
| trace["flights_returned"] = len(flights) | |
| # ---- 3. OBSERVE + ANSWER ---- | |
| summary = _summarize_flights(flights) | |
| if error: | |
| answer = f"Search failed: {error}" | |
| elif use_llm: | |
| try: | |
| ans_msgs = [ | |
| {"role": "system", "content": | |
| "You are FLIGHTDECK-ONE. Summarize the flight search results for " | |
| "the user in 1-3 sentences. Use callsigns and routes. Be concise. " | |
| "Do not invent flights."}, | |
| {"role": "user", "content": | |
| f"User asked: {query}\nTool {tool} ({label}) returned:\n{summary}"}, | |
| ] | |
| answer, latency = liquid.complete(ans_msgs, max_tokens=max_tokens, | |
| temperature=0.3) | |
| trace["steps"].append({"step": 3, "phase": "answer", | |
| "model_raw": answer, "latency_ms": latency}) | |
| except Exception as e: # noqa: BLE001 | |
| answer = summary + f"\n\n(answer-gen error: {e})" | |
| else: | |
| answer = summary | |
| trace["answer"] = answer | |
| path = _save_trace(trace) | |
| return {"answer": answer, "flights": flights, "trace_path": path, | |
| "trace_id": trace["trace_id"], | |
| "tool_calls": [c["tool"] for c in trace["tool_calls"]], | |
| "mode": trace["agent_mode"]} | |