quaz93's picture
Switch default LLM to openbmb/MiniCPM5-1B
88d4864
Raw
History Blame Contribute Delete
20.1 kB
"""FLIGHTDECK agent — an LLM as a tool-using flight-search assistant.
Scope is deliberately narrow: the agent can ONLY look up live flights to/from an
airport, or on an origin->destination route. It chooses a tool, the tool runs the
real FlightRadar24 API call, and the LLM writes the answer from the results.
Every run is persisted as an agent trace under ./traces/ (one JSON per run plus a
rolling JSONL log) so the reasoning + tool calls are auditable — useful for the
HuggingFace hackathon submission.
"""
from __future__ import annotations
import datetime as dt
import json
import os
import re
import time
import uuid
import fr24
import liquid
_LOCAL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "traces")
try:
# Attempt to use the local directory and prove it is writable
os.makedirs(_LOCAL_DIR, exist_ok=True)
_test_path = os.path.join(_LOCAL_DIR, ".write_test")
with open(_test_path, "w") as f:
f.write("ok")
os.remove(_test_path)
TRACES_DIR = _LOCAL_DIR
except (OSError, PermissionError):
# If Hugging Face locks the file system, fallback to /tmp
TRACES_DIR = "/tmp/traces"
os.makedirs(TRACES_DIR, exist_ok=True)
JSONL_LOG = os.path.join(TRACES_DIR, "agent_log.jsonl")
MODEL_NAME = os.environ.get("LLM_REPO", "openbmb/MiniCPM5-1B")
# Best-effort city/keyword -> IATA so users can type "London to Dubai".
CITY_TO_IATA = {
"london": "LHR", "heathrow": "LHR", "gatwick": "LGW", "stansted": "STN",
"new york": "JFK", "nyc": "JFK", "newark": "EWR", "jfk": "JFK",
"dubai": "DXB", "paris": "CDG", "amsterdam": "AMS", "frankfurt": "FRA",
"tokyo": "HND", "haneda": "HND", "narita": "NRT", "singapore": "SIN",
"hong kong": "HKG", "los angeles": "LAX", "la": "LAX", "chicago": "ORD",
"san francisco": "SFO", "sydney": "SYD", "melbourne": "MEL", "doha": "DOH",
"istanbul": "IST", "madrid": "MAD", "barcelona": "BCN", "rome": "FCO",
"munich": "MUC", "berlin": "BER", "dublin": "DUB", "boston": "BOS",
"miami": "MIA", "atlanta": "ATL", "toronto": "YYZ", "delhi": "DEL",
"mumbai": "BOM", "beijing": "PEK", "shanghai": "PVG", "seoul": "ICN",
"birmingham": "BHX", "manchester": "MAN", "edinburgh": "EDI",
}
TOOLS_DOC = """\
TOOLS (you may call exactly one per turn):
1. search_by_route - live flights flying a specific origin->destination route.
args: {"origin": "<IATA/ICAO>", "destination": "<IATA/ICAO>"}
2. search_by_airport - live flights to/from a single airport.
args: {"airport": "<IATA/ICAO>", "direction": "inbound"|"outbound"|"both"}
"""
SYSTEM_PROMPT = f"""You are FLIGHTDECK-ONE, a focused flight-search assistant.
You can ONLY help users find LIVE flights to/from airports or on a route.
You cannot book, price, give weather, or answer anything off-topic.
{TOOLS_DOC}
Reply with ONE JSON object and nothing else. Three shapes:
- Route (when the user gives BOTH an origin AND a destination):
{{"tool": "search_by_route", "origin": "LHR", "destination": "JFK"}}
- Airport (when the user gives ONE place, with a direction):
{{"tool": "search_by_airport", "airport": "DXB", "direction": "inbound"}}
- Refuse (when the request is NOT about finding live flights):
{{"tool": "none", "answer": "<one sentence refusal>"}}
Rules:
- "X to Y", "from X to Y" => search_by_route (two places).
- "arrivals/into/landing" => direction "inbound"; "departures/leaving" => "outbound".
- Use IATA/ICAO codes; map city names to their main airport code.
- If it is not a flight search (poem, math, chat, weather...), use tool "none".
Output JSON only.
Examples:
User: flights from London to Dubai
{{"tool": "search_by_route", "origin": "LHR", "destination": "DXB"}}
User: arrivals into JFK
{{"tool": "search_by_airport", "airport": "JFK", "direction": "inbound"}}
User: departures from LAX
{{"tool": "search_by_airport", "airport": "LAX", "direction": "outbound"}}
User: write me a poem about clouds
{{"tool": "none", "answer": "I can only search live flights to/from airports or on a route."}}"""
# Words that signal the query is actually about flights / airports.
FLIGHT_KEYWORDS = {
"flight", "flights", "fly", "flying", "flown", "plane", "planes", "aircraft",
"airline", "airlines", "airport", "airports", "arrival", "arrivals", "arrive",
"arriving", "departure", "departures", "depart", "departing", "inbound",
"outbound", "landing", "land", "takeoff", "route", "routes", "callsign",
"aviation", "airborne", "jet", "jets", "airspace", "tail", "registration",
}
# --------------------------------------------------------------------------- #
def _norm_code(value: str) -> str:
if not value:
return ""
v = value.strip()
low = v.lower()
if low in CITY_TO_IATA:
return CITY_TO_IATA[low]
return re.sub(r"[^A-Za-z]", "", v).upper()[:4]
def _in_scope(query: str) -> bool:
"""Deterministic guard: is this plausibly a flight-search request at all?
Passes if the text has a flight keyword, a known city, or an airport-code-
looking token. Guarantees off-topic prompts are refused even if the tiny
model wants to answer them.
"""
q = query.lower()
words = set(re.findall(r"[a-z]+", q))
if words & FLIGHT_KEYWORDS:
return True
if any(city in q for city in CITY_TO_IATA):
return True
# Bare airport-code token, e.g. "JFK", "EGLL", or "LHR to DXB".
if re.search(r"\b[A-Za-z]{3,4}\b\s*(?:to|-|>|→)\s*\b[A-Za-z]{3,4}\b", query):
return True
if re.search(r"\b[A-Z]{3,4}\b", query):
return True
return False
def _validate(action, query):
"""Sanity-check / repair the model's tool choice against the query.
Returns (action, override_reason | None). The tiny model often under-uses
the route tool and forgets args, so we correct obvious cases and record why.
"""
regex_action = _regex_plan(query)
reason = None
# 0. Model refused / gave no tool, but the query is in-scope and the rules
# CAN plan it -> recover (the gate already proved it's a flight query).
if (not action or action.get("tool") in (None, "none", "")):
if regex_action and regex_action.get("tool") in TOOL_IMPLS:
return regex_action, "override: model refused an in-scope query"
return action, None
# 1. Strong route signal in the text but model didn't pick route -> override.
if (regex_action and regex_action.get("tool") == "search_by_route"
and action.get("tool") != "search_by_route"):
action = regex_action
reason = "override: query has explicit origin->destination"
# 2. Route chosen but missing an endpoint -> fill from regex or downgrade.
if action.get("tool") == "search_by_route":
if not action.get("origin") or not action.get("destination"):
if regex_action and regex_action.get("tool") == "search_by_route":
action, reason = regex_action, "repair: filled missing route args"
else:
action = {"tool": "none",
"answer": "Tell me both an origin and a destination, "
"e.g. 'flights from London to Dubai'."}
reason = "repair: route missing args, no fallback"
# 3. Airport chosen but missing the airport code -> fill from regex.
if action.get("tool") == "search_by_airport" and not action.get("airport"):
if regex_action and regex_action.get("airport"):
action["airport"] = regex_action["airport"]
action.setdefault("direction", regex_action.get("direction", "both"))
reason = "repair: filled missing airport from query"
else:
action = {"tool": "none",
"answer": "Which airport? e.g. 'arrivals into JFK'."}
reason = "repair: airport missing, no fallback"
return action, reason
def _extract_json(text: str):
"""Pull the first balanced {...} object out of a model response."""
if not text:
return None
start = text.find("{")
if start == -1:
return None
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
blob = text[start:i + 1]
try:
return json.loads(blob)
except Exception:
try:
return json.loads(blob.replace("'", '"'))
except Exception:
return None
return None
# ---- tool implementations -------------------------------------------------- #
def _tool_search_by_route(args):
o = _norm_code(args.get("origin", ""))
d = _norm_code(args.get("destination", ""))
if not o or not d:
return [], {"error": "missing origin/destination"}, f"{o or '?'}-{d or '?'}"
data, url = fr24.search_route(o, d)
return data, {"route": f"{o}-{d}", "request_url": url}, f"{o}->{d}"
def _tool_search_by_airport(args):
ap = _norm_code(args.get("airport", ""))
direction = (args.get("direction") or "both").lower()
if direction not in {"inbound", "outbound", "both"}:
direction = "both"
if not ap:
return [], {"error": "missing airport"}, "?"
data, url = fr24.search_airport(ap, direction)
return data, {"airport": ap, "direction": direction, "request_url": url}, f"{direction}:{ap}"
TOOL_IMPLS = {
"search_by_route": _tool_search_by_route,
"search_by_airport": _tool_search_by_airport,
}
def _summarize_flights(flights, limit=25):
if not flights:
return "No live flights matched."
lines = [f"{len(flights)} live flight(s) found. Sample:"]
for f in flights[:limit]:
lines.append(
f"- {f.get('callsign') or f.get('flight') or '??'} "
f"({f.get('type') or '?'}) "
f"{f.get('orig_iata') or f.get('orig_icao') or '?'}->"
f"{f.get('dest_iata') or f.get('dest_icao') or '?'} "
f"alt={f.get('alt')}ft gs={f.get('gspeed')}kt eta={f.get('eta')}"
)
if len(flights) > limit:
lines.append(f"...(+{len(flights) - limit} more)")
return "\n".join(lines)
# --------------------------------------------------------------------------- #
def _new_trace(query):
return {
"trace_id": dt.datetime.now().strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:6],
"started_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"model": MODEL_NAME,
"query": query,
"agent_mode": None,
"steps": [],
"tool_calls": [],
"flights_returned": 0,
"answer": None,
}
def _save_trace(trace):
trace["ended_at"] = dt.datetime.now(dt.timezone.utc).isoformat()
path = os.path.join(TRACES_DIR, f"trace_{trace['trace_id']}.json")
with open(path, "w", encoding="utf-8") as fh:
json.dump(trace, fh, indent=2, ensure_ascii=False)
with open(JSONL_LOG, "a", encoding="utf-8") as fh:
fh.write(json.dumps({
"trace_id": trace["trace_id"],
"ts": trace["ended_at"],
"query": trace["query"],
"mode": trace["agent_mode"],
"tool_calls": [t["tool"] for t in trace["tool_calls"]],
"flights_returned": trace["flights_returned"],
}, ensure_ascii=False) + "\n")
return path
_FILLER = {
"flights", "flight", "fly", "flying", "show", "me", "the", "all", "any",
"find", "list", "get", "please", "live", "to", "from", "into", "for", "of",
"a", "an", "are", "is", "there", "what", "whats", "which", "near", "around",
"at", "right", "now", "currently", "today", "going", "headed", "bound",
"arrivals", "arriving", "arrive", "inbound", "landing", "departures",
"departing", "depart", "leaving", "outbound", "between", "and", "in", "on",
}
def _clean_place(text: str) -> str:
"""Extract a place phrase from a fragment, dropping filler words.
Returns a CITY_TO_IATA key when one is recognized (incl. multi-word cities
like 'new york'), else the most specific leftover token.
"""
toks = [t for t in re.findall(r"[a-z]+", text.lower()) if t not in _FILLER]
if not toks:
return ""
phrase = " ".join(toks)
if phrase in CITY_TO_IATA:
return phrase
for n in (3, 2): # multi-word city names
for i in range(len(toks) - n + 1):
cand = " ".join(toks[i:i + n])
if cand in CITY_TO_IATA:
return cand
for t in toks:
if t in CITY_TO_IATA:
return t
return toks[-1]
def _regex_plan(query):
"""Rule-based planner. Used as LLM-free fallback AND as a validator prior."""
q = query.lower().strip()
# Route: "A to B" (two distinct places around 'to').
if " to " in q:
left, right = q.split(" to ", 1)
o, d = _clean_place(left), _clean_place(right)
if o and d and o != d:
return {"tool": "search_by_route", "origin": o, "destination": d,
"thought": "regex: route"}
m = re.search(r"\b(?:arrivals?|arriving|inbound|landing|into)\b(.*)", q)
if m and _clean_place(m.group(1)):
return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)),
"direction": "inbound", "thought": "regex: inbound"}
m = re.search(r"\b(?:departures?|departing|leaving|outbound)\b(.*)", q)
if m and _clean_place(m.group(1)):
return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)),
"direction": "outbound", "thought": "regex: outbound"}
m = re.search(r"\bfrom\s+(.*)", q)
if m and _clean_place(m.group(1)):
return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)),
"direction": "outbound", "thought": "regex: from"}
m = re.search(r"\b(?:at|near|around|over)\b(.*)", q)
if m and _clean_place(m.group(1)):
return {"tool": "search_by_airport", "airport": _clean_place(m.group(1)),
"direction": "both", "thought": "regex: airport"}
# Bare city / airport code anywhere.
p = _clean_place(q)
if p and (p in CITY_TO_IATA or re.search(r"\b[A-Za-z]{3,4}\b", query)):
return {"tool": "search_by_airport", "airport": p,
"direction": "both", "thought": "regex: bare"}
return None
def run(query: str, max_tokens=400):
"""Run the agent for one user query.
Returns dict: {answer, flights (raw FR24 records), trace_path, trace_id,
tool_calls, mode}.
"""
trace = _new_trace(query)
use_llm = liquid.available()
trace["agent_mode"] = "llm" if use_llm else "fallback-regex"
# ---- 0. SCOPE GATE: hard refuse anything that isn't a flight search ----
if not _in_scope(query):
trace["steps"].append({"step": 0, "phase": "scope-gate", "in_scope": False})
answer = ("I only search live flights — try 'flights from London to "
"Dubai', 'arrivals into JFK', or 'departures from LAX'.")
trace["answer"] = answer
trace["agent_mode"] += "+scope-refused"
path = _save_trace(trace)
return {"answer": answer, "flights": [], "trace_path": path,
"trace_id": trace["trace_id"], "tool_calls": [],
"mode": trace["agent_mode"]}
# ---- 1. PLAN: decide which tool to call (or refuse) ----
action = None
if use_llm:
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": query},
]
try:
raw, latency = liquid.complete(messages, max_tokens=200, temperature=0.0)
except Exception as e: # noqa: BLE001
raw, latency = f"(model error: {e})", 0
use_llm = False
trace["agent_mode"] = "fallback-regex"
action = _extract_json(raw)
trace["steps"].append({
"step": 1, "phase": "plan", "model_raw": raw,
"parsed_action": action, "latency_ms": latency,
})
if action is None:
action = _regex_plan(query)
trace["steps"].append({
"step": 1, "phase": "plan-fallback", "parsed_action": action,
})
# ---- 1b. VALIDATE / REPAIR the plan (tiny-model guardrail) ----
action, override_reason = _validate(action, query)
if override_reason:
trace["steps"].append({
"step": 1, "phase": "validate",
"final_action": action, "override_reason": override_reason,
})
# Refusal / no actionable tool.
if not action or action.get("tool") in (None, "none", ""):
answer = (action or {}).get(
"answer",
"I can only search live flights to/from an airport or on a route. "
"Try: 'flights from London to Dubai' or 'arrivals into JFK'.")
trace["answer"] = answer
path = _save_trace(trace)
return {"answer": answer, "flights": [], "trace_path": path,
"trace_id": trace["trace_id"], "tool_calls": [],
"mode": trace["agent_mode"]}
# ---- 2. ACT: run the chosen tool (real FR24 call) ----
tool = action.get("tool")
impl = TOOL_IMPLS.get(tool)
if impl is None:
answer = f"Unknown tool '{tool}'. I only do flight to/from search."
trace["answer"] = answer
path = _save_trace(trace)
return {"answer": answer, "flights": [], "trace_path": path,
"trace_id": trace["trace_id"], "tool_calls": [], "mode": trace["agent_mode"]}
t0 = time.time()
try:
flights, meta, label = impl(action)
error = None
except fr24.FR24Error as e:
flights, meta, label, error = [], {"error": str(e)}, tool, str(e)
except Exception as e: # noqa: BLE001
flights, meta, label, error = [], {"error": repr(e)}, tool, repr(e)
tool_latency = int((time.time() - t0) * 1000)
call_record = {
"tool": tool, "args": {k: v for k, v in action.items()
if k not in ("thought", "tool")},
"meta": meta, "result_count": len(flights),
"latency_ms": tool_latency, "error": error,
}
trace["tool_calls"].append(call_record)
trace["steps"].append({"step": 2, "phase": "act", **call_record})
trace["flights_returned"] = len(flights)
# ---- 3. OBSERVE + ANSWER ----
summary = _summarize_flights(flights)
if error:
answer = f"Search failed: {error}"
elif use_llm:
try:
ans_msgs = [
{"role": "system", "content":
"You are FLIGHTDECK-ONE. Summarize the flight search results for "
"the user in 1-3 sentences. Use callsigns and routes. Be concise. "
"Do not invent flights."},
{"role": "user", "content":
f"User asked: {query}\nTool {tool} ({label}) returned:\n{summary}"},
]
answer, latency = liquid.complete(ans_msgs, max_tokens=max_tokens,
temperature=0.3)
trace["steps"].append({"step": 3, "phase": "answer",
"model_raw": answer, "latency_ms": latency})
except Exception as e: # noqa: BLE001
answer = summary + f"\n\n(answer-gen error: {e})"
else:
answer = summary
trace["answer"] = answer
path = _save_trace(trace)
return {"answer": answer, "flights": flights, "trace_path": path,
"trace_id": trace["trace_id"],
"tool_calls": [c["tool"] for c in trace["tool_calls"]],
"mode": trace["agent_mode"]}