#APIs import os import json import requests from datetime import datetime, timezone, timedelta from helpers.hf_llm import generate_text with open("data/static_lookup/stations_lookup.json", "r", encoding="utf-8") as f: STATION_LOOKUP = json.load(f) with open("data/static_lookup/trains_lookup.json", "r", encoding="utf-8") as f: TRAIN_LOOKUP = json.load(f) RAPIDAPI_KEY = os.getenv("RAPIDAPI_KEY") if not RAPIDAPI_KEY: raise RuntimeError("RAPIDAPI_KEY is not set") RAPIDAPI_HOST = "irctc1.p.rapidapi.com" BASE_URL = "https://irctc1.p.rapidapi.com" HEADERS = { "x-rapidapi-key": RAPIDAPI_KEY, "x-rapidapi-host": RAPIDAPI_HOST } ENTITY_PROMPT = """ You extract structured railway-related information from user queries. Return ONLY valid JSON. Do NOT infer or assume missing information. Do NOT guess defaults. Extract a field ONLY if it is explicitly mentioned by the user. If a value is not present, use null or an empty list as specified. Supported intents (choose ONE): - train_live_status (live running status of a train) - train_schedule (timetable / schedule of a train) - trains_between_stations (trains running between two stations) - seat_availability (seat/berth availability enquiry) - fare_enquiry (ticket fare enquiry) - pnr_status (PNR booking status) - live_station (live arrivals/departures at a station) - trains_by_station (list of trains passing a station) - search_train (search trains by name or number) - search_station (search stations by name) - unknown JSON schema: { "intent": string, "train_numbers": [string], // 5-digit train numbers only "pnr_numbers": [string], // 10-digit PNR numbers only "stations": [string], // station names or codes exactly as mentioned "journey": { "from": string | null, // source station name/code if explicitly mentioned "to": string | null // destination station name/code if explicitly mentioned }, "date": string | null, // travel date if mentioned (keep original format) "class_type": string | null, // class if mentioned (e.g., 2A, 3A, SL, CC) "quota": string | null, // quota if mentioned (e.g., GN, Tatkal) "hours": integer | null // time window in hours if explicitly mentioned } Important rules: - Do NOT convert station names to codes. - Do NOT normalize dates or class names. - Do NOT infer intent from missing data. - Do NOT fill defaults. - If multiple values exist, extract all where applicable. - Do NOT include explanations. - Do NOT include markdown. - Output JSON only. The response must start with { and end with }. """ def extract_with_llm(query): prompt = f""" {ENTITY_PROMPT} User query: {query} """ raw = generate_text( prompt=prompt, max_new_tokens=300, temperature=0.0, top_p=1.0 ) return extract_json(raw) def call_api(path: str, params): print("\n[API CALL]") print("URL:", BASE_URL + path) print("PARAMS:", params) try: r = requests.get( BASE_URL + path, headers=HEADERS, params=params, timeout=10 ) print("STATUS:", r.status_code) print("RESPONSE:", r.text[:300]) if r.status_code != 200: return None, r.headers return r.json().get("data"), r.headers except Exception as e: print("❌ API EXCEPTION:", e) return None, None def resolve_train_number(value: str): if not value: return None clean = value.lower().strip() if clean.isdigit() and len(clean) == 5: return clean return TRAIN_LOOKUP.get(clean) def resolve_station_code_local(name: str): if not name: return None clean = name.lower().strip() if clean in STATION_LOOKUP: return STATION_LOOKUP[clean] return resolve_station_code(name) def resolve_station_code(name: str): if not name: return None clean = name.lower().replace(" station", "").strip() data, _ = call_api("/api/v1/searchStation", {"query": clean}) if not data: return None for s in data: station_name = s.get("name", "").lower() if station_name == clean: return s.get("code") for s in data: station_name = s.get("name", "").lower() if station_name.startswith(clean): return s.get("code") return data[0].get("code") # Determine freshness (for additional links) def determine_freshness(headers): if not headers: return "unknown" date_header = headers.get("Date") if not date_header: return "unknown" try: response_time = datetime.strptime( date_header, "%a, %d %b %Y %H:%M:%S GMT" ).replace(tzinfo=timezone.utc) except Exception: return "unknown" if datetime.now(timezone.utc) - response_time > timedelta(minutes=60): return "stale" return "fresh" # Date format correction def normalize_date_for_api(date_str): for fmt in ("%d-%m-%Y", "%Y-%m-%d", "%d/%m/%Y"): try: return datetime.strptime(date_str,fmt).strftime("%d-%m-%Y") except ValueError: continue return None #APIs def get_train_live_status(e): return call_api("/api/v1/liveTrainStatus", {"trainNo": e["train_number"]}) def get_train_schedule(e): return call_api("/api/v1/getTrainSchedule", {"trainNo": e["train_number"]}) def get_trains_between_stations(e): return call_api("/api/v3/trainBetweenStations", { "fromStationCode": e["from_station"], "toStationCode": e["to_station"], "dateOfJourney": e["date"] }) def get_pnr_status(e): return call_api("/api/v3/getPNRStatus", {"pnrNumber": e["pnr"]}) def get_live_station(e): return call_api("/api/v3/getLiveStation", { "fromStationCode": e["station_code"], "hours": e.get("hours", 2) }) def get_trains_by_station(e): return call_api("/api/v3/getTrainsByStation", {"stationCode": e["station_code"]}) def search_train(e): return call_api("/api/v1/searchTrain", {"query": e["query"]}) def search_station(e): return call_api("/api/v1/searchStation", {"query": e["query"]}) def get_seat_availability(e): return call_api("/api/v1/checkSeatAvailability", { "trainNo": e["train_number"], "fromStationCode": e["from_station"], "toStationCode": e["to_station"], "date": e["date"], "classType": e["class_type"], "quota": e.get("quota", "GN") }) def get_seat_availability_v2(e): return call_api("/api/v2/checkSeatAvailability", { "trainNo": e["train_number"], "fromStationCode": e["from_station"], "toStationCode": e["to_station"], "date": e["date"], "classType": e["class_type"], "quota": e.get("quota", "GN") }) def get_fare(e): return call_api("/api/v1/getFare", { "trainNo": e["train_number"], "fromStationCode": e["from_station"], "toStationCode": e["to_station"], "date": e["date"], "classType": e["class_type"], "quota": e.get("quota", "GN") }) # Calling crct API # Mapping intent to API INTENT_TO_API = { "train_live_status": { "required": ["train_number"], "handler": get_train_live_status, "fallback": "train_schedule" }, "train_schedule": { "required": ["train_number"], "handler": get_train_schedule, "fallback": None }, "trains_between_stations": { "required": ["from_station", "to_station", "date"], "handler": get_trains_between_stations, "fallback": None }, "seat_availability": { "required": ["train_number", "from_station", "to_station", "date", "class_type"], "handler": get_seat_availability, "fallback": "seat_availability_v2" }, "seat_availability_v2": { "required": ["train_number", "from_station", "to_station", "date", "class_type"], "handler": get_seat_availability_v2, "fallback": None }, "fare_enquiry": { "required": ["train_number", "from_station", "to_station", "date", "class_type"], "handler": get_fare, "fallback": None }, "pnr_status": { "required": ["pnr"], "handler": get_pnr_status, "fallback": None }, "live_station": { "required": ["station_code"], "handler": get_live_station, "fallback": "trains_by_station" }, "trains_by_station": { "required": ["station_code"], "handler": get_trains_by_station, "fallback": None }, "search_train": { "required": ["query"], "handler": search_train, "fallback": None }, "search_station": { "required": ["query"], "handler": search_station, "fallback": None } } # Main def answer_with_live_data(query): parsed = extract_with_llm(query) intent = parsed.get("intent", "unknown") if intent not in INTENT_TO_API: return {"answer": None, "has_answer": False, "meta": {"status":"nothing"}} entity = {"query": query} if parsed.get("train_numbers"): tn = resolve_train_number(parsed["train_numbers"][0]) if tn: entity["train_number"] = tn if parsed.get("pnr_numbers"): entity["pnr"] = parsed["pnr_numbers"][0] resolved = [] for s in parsed.get("stations", []): code = resolve_station_code_local(s) if code: resolved.append(code) if len(resolved) == 1: entity["station_code"] = resolved[0] journey = parsed.get("journey") if journey and journey.get("from") and journey.get("to"): f = resolve_station_code_local(journey["from"]) t = resolve_station_code_local(journey["to"]) if f and t: entity["from_station"] = f entity["to_station"] = t if parsed.get("date"): normalized = normalize_date_for_api(parsed["date"]) if normalized: entity["date"] = normalized elif intent == "trains_between_stations": entity["date"] = datetime.now(timezone.utc).strftime("%d-%m-%Y") for k in ["class_type", "quota", "hours"]: if parsed.get(k) is not None: entity[k] = parsed[k] api = INTENT_TO_API[intent] # required, missing required = api["required"] missing = [k for k in required if k not in entity] if missing: return { "answer": None, "has_answer": False, "meta": { "status": "need_input", "reason": "missing_required_fields", "intent": intent, "missing_fields": missing, "partial_entity": entity } } # Primary API call data, headers= api["handler"](entity) fallback_used = False # Fallback handling if not data and api.get("fallback"): fb = INTENT_TO_API[api["fallback"]] if all(k in entity for k in fb["required"]): data, headers= fb["handler"](entity) if data: fallback_used = True # Freshness detection (safe, signal only) freshness = determine_freshness(headers) if not data: return {"answer": None, "has_answer": False, "meta": {"status": "api_failed","intent": intent}} return { "answer":data, "has_answer": True, "meta": { "status": "ok", "intent": intent, "fallback_used": fallback_used, "freshness": freshness, "resolved": { "train_number": entity.get("train_number"), "station_code": entity.get("station_code"), "from_station": entity.get("from_station"), "to_station": entity.get("to_station"), "date": entity.get("date") } } }