CKS_Agentic_AI_V1 / app /service.py
CKSINGH's picture
Fix destination extraction and remove BKK default
1d9df4d
from __future__ import annotations
import logging
import os
import random
import re
import time
import uuid
from typing import Any, Dict, List, Literal, Optional, Tuple
from pydantic import BaseModel, Field
LOGGER = logging.getLogger("flight-agent-api")
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper())
class TripRequest(BaseModel):
origin: Optional[str] = None
destination: Optional[str] = None
depart_date: Optional[str] = None
return_date: Optional[str] = None
adults: int = 1
cabin: Literal["ECONOMY", "PREMIUM_ECONOMY", "BUSINESS", "FIRST"] = "ECONOMY"
max_price_usd: Optional[int] = None
max_stops: Optional[int] = 1
baggage: Literal["NONE", "CABIN", "CHECKED"] = "CABIN"
class FlightLeg(BaseModel):
from_airport: str
to_airport: str
depart_time: str
arrive_time: str
carrier: str
flight_no: str
class FlightOption(BaseModel):
option_id: str
legs: List[FlightLeg]
total_duration_min: int
stops: int
price_usd: int
refundable: bool
fare_brand: str
class FareRules(BaseModel):
option_id: str
change_fee_usd: int
cancellation_fee_usd: int
baggage_included: bool
notes: str
class BookingHold(BaseModel):
hold_id: str
option_id: str
status: Literal["HELD", "EXPIRED", "CONFIRMED"] = "HELD"
expires_in_min: int = 15
class NLUResult(BaseModel):
parent_intent: Literal["TRAVEL_BOOKING", "TRAVEL_SUPPORT", "SMALLTALK", "OUT_OF_SCOPE"]
child_intent: str
confidence: float
extracted: TripRequest
class AgentCoreState(BaseModel):
request: TripRequest = Field(default_factory=TripRequest)
nlu: Optional[NLUResult] = None
candidates: List[FlightOption] = Field(default_factory=list)
rules: Dict[str, FareRules] = Field(default_factory=dict)
selected_option_id: Optional[str] = None
booking_hold: Optional[BookingHold] = None
user_contact: Dict[str, str] = Field(default_factory=dict)
audit: List[Dict[str, Any]] = Field(default_factory=list)
def log(self, kind: str, payload: Dict[str, Any]) -> None:
self.audit.append({"ts": time.time(), "kind": kind, "payload": payload})
PII_PATTERNS: List[Tuple[re.Pattern, str]] = [
(re.compile(r"\b\d{12,19}\b"), "[REDACTED_CARD]"),
(re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.I), "[REDACTED_EMAIL]"),
]
INJECTION_PATTERNS = [
r"ignore previous instructions",
r"reveal system prompt",
r"print hidden rules",
r"system prompt",
r"developer message",
]
EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.I)
BOOK_RE = re.compile(r"\bbook\s+([123])\b", re.I)
DATE_RE = re.compile(r"\b(20\d{2}-\d{2}-\d{2})\b")
PRICE_RE = re.compile(r"(?:under|max|budget|below)\s*\$?\s*(\d{2,5})", re.I)
STOPS_RE = re.compile(r"\b(nonstop|direct|0\s*stop|1\s*stop|2\s*stop)\b", re.I)
ADULTS_RE = re.compile(r"\b(\d+)\s*(adult|adults|passenger|passengers|traveler|travelers)\b", re.I)
SUPPORT_WORDS = {"refund", "cancellation", "cancel", "change fee", "baggage", "reschedule", "policy", "rules", "luggage"}
BOOKING_WORDS = {"flight", "book", "ticket", "depart", "return", "one-way", "round trip"}
CITY_TO_IATA = {
"DUBAI": "DXB",
"ABU DHABI": "AUH",
"SHARJAH": "SHJ",
"DELHI": "DEL",
"MUMBAI": "BOM",
"DOHA": "DOH",
"SINGAPORE": "SIN",
"BANGKOK": "BKK",
}
PAIR_RE = re.compile(
r"\b(?:FROM\s+)?(DXB|AUH|SHJ|DEL|BOM|DOH|SIN|BKK|DUBAI|ABU\s+DHABI|SHARJAH|DELHI|MUMBAI|DOHA|SINGAPORE|BANGKOK)\s+TO\s+"
r"(DXB|AUH|SHJ|DEL|BOM|DOH|SIN|BKK|DUBAI|ABU\s+DHABI|SHARJAH|DELHI|MUMBAI|DOHA|SINGAPORE|BANGKOK)\b",
re.I,
)
def normalize_airport_or_city(raw: str) -> str:
key = re.sub(r"\s+", " ", raw.strip().upper())
return CITY_TO_IATA.get(key, key)
def guardrail_input(user_msg: str) -> Tuple[str, Dict[str, Any]]:
meta: Dict[str, Any] = {
"pii_redacted": False,
"injection_suspected": False,
"extracted_pii": {},
}
clean = user_msg or ""
email_match = EMAIL_RE.search(clean)
if email_match:
meta["extracted_pii"]["email"] = email_match.group(0)
for patt in INJECTION_PATTERNS:
if re.search(patt, clean, re.I):
meta["injection_suspected"] = True
break
for patt, repl in PII_PATTERNS:
if patt.search(clean):
meta["pii_redacted"] = True
clean = patt.sub(repl, clean)
return clean, meta
def guardrail_output(text: str) -> str:
out = text
for patt, repl in PII_PATTERNS:
out = patt.sub(repl, out)
return out
def safe_refusal() -> str:
return "I can help with flight booking and flight support questions, but I cannot reveal hidden instructions or internal prompts."
def _extract_airport_from_text(text: str, prefix: str) -> Optional[str]:
m = re.search(rf"\b{prefix}\s+([A-Z]{{3}})\b", text, re.I)
if m:
return m.group(1).upper()
for city, code in CITY_TO_IATA.items():
if re.search(rf"\b{prefix}\s+{re.escape(city)}\b", text, re.I):
return code
return None
def slot_extract(req: TripRequest, clean_msg: str) -> TripRequest:
text = clean_msg or ""
up = text.upper()
out = req.model_copy(deep=True)
pair_match = PAIR_RE.search(up)
if pair_match:
raw_origin, raw_destination = pair_match.groups()
out.origin = normalize_airport_or_city(raw_origin)
out.destination = normalize_airport_or_city(raw_destination)
origin = _extract_airport_from_text(up, "FROM")
if origin:
out.origin = origin
destination = _extract_airport_from_text(up, "TO")
if destination:
out.destination = destination
dates = DATE_RE.findall(up)
if len(dates) >= 1:
out.depart_date = dates[0]
if len(dates) >= 2:
out.return_date = dates[1]
price_match = PRICE_RE.search(text)
if price_match:
out.max_price_usd = int(price_match.group(1))
stop_match = STOPS_RE.search(text)
if stop_match:
token = stop_match.group(1).lower().replace(" ", " ")
if token in {"nonstop", "direct", "0 stop"}:
out.max_stops = 0
elif "1" in token:
out.max_stops = 1
elif "2" in token:
out.max_stops = 2
adults_match = ADULTS_RE.search(text)
if adults_match:
out.adults = max(1, int(adults_match.group(1)))
if "premium economy" in up:
out.cabin = "PREMIUM_ECONOMY"
elif "business" in up:
out.cabin = "BUSINESS"
elif "first" in up:
out.cabin = "FIRST"
elif "economy" in up:
out.cabin = "ECONOMY"
if "checked" in up or "CHECK-IN" in up:
out.baggage = "CHECKED"
elif "no baggage" in up:
out.baggage = "NONE"
elif "cabin" in up:
out.baggage = "CABIN"
return out
def heuristic_intent(clean_msg: str) -> Tuple[str, str, float]:
m = (clean_msg or "").lower()
if BOOK_RE.search(m):
return "TRAVEL_BOOKING", "FLIGHT_BOOK_HOLD", 0.96
if any(k in m for k in SUPPORT_WORDS):
return "TRAVEL_SUPPORT", "POLICY_EXPLAIN", 0.88
if (
any(k in m for k in BOOKING_WORDS)
or DATE_RE.search(m)
or _extract_airport_from_text(m.upper(), "FROM")
or _extract_airport_from_text(m.upper(), "TO")
or PAIR_RE.search(m.upper())
):
return "TRAVEL_BOOKING", "FLIGHT_SEARCH", 0.84
if any(k in m for k in ["hi", "hello", "hey"]):
return "SMALLTALK", "GREETING", 0.75
return "OUT_OF_SCOPE", "UNKNOWN", 0.40
def missing_slots(req: TripRequest) -> List[str]:
miss: List[str] = []
if not req.origin:
miss.append("origin (for example DXB)")
if not req.destination:
miss.append("destination (for example BKK or Bangkok)")
if not req.depart_date:
miss.append("depart date in YYYY-MM-DD")
return miss
CARRIERS = ["EK", "QR", "EY", "TG", "SQ", "AI"]
HUBS = ["DOH", "SIN", "KUL", "BOM"]
def make_mock_flight_options(req: TripRequest, k: int = 10) -> List[FlightOption]:
if not req.origin or not req.destination or not req.depart_date:
return []
rng = random.Random(
f"{req.origin}-{req.destination}-{req.depart_date}-{req.max_price_usd}-{req.max_stops}-{req.cabin}"
)
options: List[FlightOption] = []
for _ in range(k):
stops = rng.choice([0, 1, 1, 2])
carrier = rng.choice(CARRIERS)
price = rng.randint(380, 900)
duration = rng.choice([360, 410, 470, 540, 600])
refundable = rng.choice([False, False, True])
fare_brand = rng.choice(["LITE", "CLASSIC", "FLEX"])
legs: List[FlightLeg] = []
if stops == 0:
legs.append(
FlightLeg(
from_airport=req.origin,
to_airport=req.destination,
depart_time=f"{req.depart_date}T09:{rng.randint(10, 59):02d}",
arrive_time=f"{req.depart_date}T{rng.randint(14, 20):02d}:{rng.randint(10, 59):02d}",
carrier=carrier,
flight_no=f"{carrier}{rng.randint(100, 999)}",
)
)
else:
hub = rng.choice([h for h in HUBS if h not in {req.origin, req.destination}] or HUBS)
legs.append(
FlightLeg(
from_airport=req.origin,
to_airport=hub,
depart_time=f"{req.depart_date}T07:{rng.randint(10, 59):02d}",
arrive_time=f"{req.depart_date}T{rng.randint(9, 12):02d}:{rng.randint(10, 59):02d}",
carrier=carrier,
flight_no=f"{carrier}{rng.randint(100, 999)}",
)
)
legs.append(
FlightLeg(
from_airport=hub,
to_airport=req.destination,
depart_time=f"{req.depart_date}T{rng.randint(12, 16):02d}:{rng.randint(10, 59):02d}",
arrive_time=f"{req.depart_date}T{rng.randint(16, 23):02d}:{rng.randint(10, 59):02d}",
carrier=carrier,
flight_no=f"{carrier}{rng.randint(100, 999)}",
)
)
options.append(
FlightOption(
option_id=f"opt_{uuid.uuid4().hex[:8]}",
legs=legs,
total_duration_min=duration,
stops=stops,
price_usd=price,
refundable=refundable,
fare_brand=fare_brand,
)
)
if req.max_stops is not None:
options = [o for o in options if o.stops <= req.max_stops]
if req.max_price_usd is not None:
options = [o for o in options if o.price_usd <= req.max_price_usd]
return sorted(options, key=lambda x: x.price_usd)
def get_fare_rules(req: TripRequest, opt: FlightOption) -> FareRules:
change_fee = 0 if opt.fare_brand == "FLEX" else (75 if opt.fare_brand == "CLASSIC" else 150)
cancel_fee = 0 if opt.refundable else (100 if opt.fare_brand == "FLEX" else 200)
baggage_included = (req.baggage != "CHECKED") or (opt.fare_brand in ["CLASSIC", "FLEX"])
return FareRules(
option_id=opt.option_id,
change_fee_usd=change_fee,
cancellation_fee_usd=cancel_fee,
baggage_included=baggage_included,
notes="Mock fare rules for demo; replace with airline or GDS rules API in production.",
)
def create_booking_hold(option_id: str) -> BookingHold:
return BookingHold(hold_id=f"HLD-{uuid.uuid4().hex[:8].upper()}", option_id=option_id, status="HELD", expires_in_min=15)
def confirm_booking(hold: BookingHold, email: str) -> Dict[str, Any]:
return {
"status": "CONFIRMED",
"hold_id": hold.hold_id,
"email": email,
"message": "Mock confirmation completed.",
}
def send_email(email: str, subject: str, body: str) -> Dict[str, Any]:
return {
"status": "queued",
"provider": "mock",
"email": email,
"subject": subject,
"body_preview": body[:200],
}
POLICY_DOCS = [
{
"source_id": "policy_refund_1",
"text": "Refund eligibility depends on fare brand. FLEX fares generally have lower change and cancellation penalties than LITE fares.",
},
{
"source_id": "policy_baggage_1",
"text": "Checked baggage inclusion depends on fare brand and route. Cabin baggage is usually included unless the fare is very restrictive.",
},
{
"source_id": "policy_change_1",
"text": "Changes may be permitted before departure subject to fare rules, price difference, and applicable service fees.",
},
]
def retrieve_policy_notes(question: str, topk: int = 2) -> List[Dict[str, str]]:
q = set(re.findall(r"\w+", (question or "").lower()))
scored = []
for doc in POLICY_DOCS:
d = set(re.findall(r"\w+", doc["text"].lower()))
score = len(q & d)
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [d for score, d in scored[:topk] if score > 0] or POLICY_DOCS[:1]
def score_option(opt: FlightOption, rules: Optional[FareRules]) -> float:
flex_bonus = 30 if (opt.refundable or (rules and rules.change_fee_usd == 0)) else 0
return float(1000 - opt.price_usd * 1.2 - opt.stops * 120 - opt.total_duration_min * 0.35 + flex_bonus)
def rank_options(candidates: List[FlightOption], rules_by_id: Dict[str, FareRules], topn: int = 5) -> List[FlightOption]:
scored = []
for opt in candidates[:topn]:
scored.append((opt, score_option(opt, rules_by_id.get(opt.option_id))))
scored.sort(key=lambda x: x[1], reverse=True)
return [item[0] for item in scored]
def present_topk(state: AgentCoreState, options: List[FlightOption], k: int = 3) -> str:
lines = [f"Here are the best options (top {min(k, len(options))}):"]
for i, opt in enumerate(options[:k], start=1):
rr = state.rules.get(opt.option_id)
legs = " | ".join([f"{l.from_airport}->{l.to_airport} {l.carrier}{l.flight_no[-3:]}" for l in opt.legs])
line = (
f"{i}) ${opt.price_usd}, stops={opt.stops}, duration={opt.total_duration_min} min, "
f"brand={opt.fare_brand}, refundable={opt.refundable}, route={legs}"
)
if rr:
line += (
f", change_fee=${rr.change_fee_usd}, cancel_fee=${rr.cancellation_fee_usd}, "
f"baggage_included={rr.baggage_included}"
)
lines.append(line)
lines.append("To place a hold, say: book 1 or book 2 or book 3")
return "\n".join(lines)
class FlightBookingAgentService:
def __init__(self) -> None:
self.sessions: Dict[str, AgentCoreState] = {}
def _get_state(self, thread_id: str) -> AgentCoreState:
if thread_id not in self.sessions:
self.sessions[thread_id] = AgentCoreState()
return self.sessions[thread_id]
def reset_state(self, thread_id: str) -> None:
self.sessions.pop(thread_id, None)
def get_state(self, thread_id: str) -> Dict[str, Any]:
return self._get_state(thread_id).model_dump()
def chat(self, thread_id: str, user_msg: str) -> Dict[str, Any]:
core = self._get_state(thread_id)
clean_msg, meta = guardrail_input(user_msg)
extracted = meta.get("extracted_pii") or {}
if extracted.get("email"):
core.user_contact["email"] = extracted["email"]
core.log("user", {"text": user_msg, "clean": clean_msg, "guardrail_meta": meta})
if meta.get("injection_suspected"):
response = safe_refusal()
core.log("refuse", {"reason": "prompt_injection"})
return self._response(thread_id, response, core)
parent, child, conf = heuristic_intent(clean_msg)
if parent == "TRAVEL_BOOKING":
core.request = slot_extract(core.request, clean_msg)
core.nlu = NLUResult(parent_intent=parent, child_intent=child, confidence=conf, extracted=core.request)
core.log("nlu", core.nlu.model_dump())
if core.booking_hold and core.booking_hold.status == "HELD" and core.user_contact.get("email"):
response = self._confirm_booking(core)
return self._response(thread_id, response, core)
if parent == "TRAVEL_SUPPORT":
response = self._support(clean_msg, core)
return self._response(thread_id, response, core)
if parent == "TRAVEL_BOOKING" and child == "FLIGHT_BOOK_HOLD":
response = self._create_hold(clean_msg, core)
return self._response(thread_id, response, core)
if parent == "TRAVEL_BOOKING":
response = self._search_flights(core)
return self._response(thread_id, response, core)
if parent == "SMALLTALK":
response = "I can help book flights. Try: From DXB to BKK on 2026-07-10 under 650"
return self._response(thread_id, response, core)
response = "I can only help with flight booking or flight support in this demo."
return self._response(thread_id, response, core)
def search(self, thread_id: str, req: TripRequest) -> Dict[str, Any]:
core = self._get_state(thread_id)
core.request = req
response = self._search_flights(core)
return self._response(thread_id, response, core)
def hold(self, thread_id: str, choice_index: int) -> Dict[str, Any]:
core = self._get_state(thread_id)
clean_msg = f"book {choice_index}"
response = self._create_hold(clean_msg, core)
return self._response(thread_id, response, core)
def confirm(self, thread_id: str, email: str) -> Dict[str, Any]:
core = self._get_state(thread_id)
core.user_contact["email"] = email
response = self._confirm_booking(core)
return self._response(thread_id, response, core)
def _support(self, clean_msg: str, core: AgentCoreState) -> str:
docs = retrieve_policy_notes(clean_msg, topk=2)
bullets = [f"- {d['text']} [{d['source_id']}]" for d in docs]
answer = "Based on the available policy snippets:\n" + "\n".join(bullets)
core.log("support.answer", {"q": clean_msg, "sources": [d["source_id"] for d in docs]})
return guardrail_output(answer)
def _search_flights(self, core: AgentCoreState) -> str:
miss = missing_slots(core.request)
if miss:
msg = (
f"I can help book your flight. Please provide: {', '.join(miss)}.\n"
f"Example: From DXB to BKK on 2026-07-10 under 650"
)
core.log("search.ask_clarify", {"missing": miss})
return msg
results = make_mock_flight_options(core.request, k=12)
core.candidates = results
core.rules = {}
core.log("tool.search_flights", {"num_results": len(core.candidates), "request": core.request.model_dump()})
if not core.candidates:
return "No flights found with those constraints. Try relaxing the budget or stop limit."
topn = min(5, len(core.candidates))
for opt in core.candidates[:topn]:
core.rules[opt.option_id] = get_fare_rules(core.request, opt)
ranked = rank_options(core.candidates, core.rules, topn=max(5, topn))
msg = present_topk(core, ranked, 3)
docs = retrieve_policy_notes("baggage refund change")
if docs:
msg += "\n\nPolicy notes:\n" + "\n".join(f"- {d['text']} [{d['source_id']}]" for d in docs)
core.log("search.present", {"shown": min(3, len(ranked))})
return guardrail_output(msg)
def _create_hold(self, clean_msg: str, core: AgentCoreState) -> str:
if not core.candidates:
core.log("hold.missing_candidates", {})
return "I do not have options yet. Search flights first, then say book 1 or book 2 or book 3."
m = BOOK_RE.search(clean_msg)
if not m:
core.log("hold.gate_failed", {"clean_msg": clean_msg})
return "To place a hold, say: book 1 or book 2 or book 3."
idx = int(m.group(1)) - 1
ranked = rank_options(core.candidates, core.rules, topn=max(5, len(core.candidates)))
if idx < 0 or idx >= len(ranked):
return "Invalid selection. Choose book 1, book 2, or book 3."
chosen = ranked[idx]
core.selected_option_id = chosen.option_id
core.booking_hold = create_booking_hold(chosen.option_id)
core.log("tool.create_booking_hold", core.booking_hold.model_dump())
return (
f"Hold created: {core.booking_hold.hold_id} for option {chosen.option_id}. "
f"It expires in {core.booking_hold.expires_in_min} minutes. "
f"Share your email to confirm."
)
def _confirm_booking(self, core: AgentCoreState) -> str:
email = core.user_contact.get("email")
if not core.booking_hold or core.booking_hold.status != "HELD":
core.log("confirm.no_hold", {})
return "No active booking hold found. Search flights and create a hold first."
if not email:
core.log("confirm.no_email", {})
return "I need your email address to confirm the booking."
conf = confirm_booking(core.booking_hold, email)
core.booking_hold.status = "CONFIRMED"
core.log("tool.confirm_booking", conf)
opt = next((o for o in core.candidates if o.option_id == core.booking_hold.option_id), None)
price_str = f"${opt.price_usd}" if opt else "N/A"
subject = f"Booking Confirmation: {core.booking_hold.hold_id}"
body = (
f"Your booking is confirmed.\n\n"
f"Reference: {core.booking_hold.hold_id}\n"
f"Option: {core.booking_hold.option_id}\n"
f"Price: {price_str}\n"
f"Route: {core.request.origin} -> {core.request.destination}\n"
f"Depart: {core.request.depart_date}\n"
)
email_res = send_email(email, subject, body)
core.log("tool.send_email", email_res)
return f"Booking confirmed for {email}. Confirmation reference: {core.booking_hold.hold_id}."
def _response(self, thread_id: str, response: str, core: AgentCoreState) -> Dict[str, Any]:
return {
"thread_id": thread_id,
"response": response,
"state": core.model_dump(),
}