""" SkyGuardian AI v2.0 – Multi-Model Agentic Flight Intelligence System India ⇄ Singapore Corridor | Primary: TRZ ⇄ SIN | Secondary: MAA ⇄ SIN Architecture: ReAct Agent + 4-Model Ensemble (Heuristic + sklearn RF/GB + HF Sentiment + Chronos Forecast) HuggingFace Models: ProsusAI/finbert (sentiment) · amazon/chronos-t5-tiny (time-series forecast) """ import os import sqlite3 import json import time import pickle import requests import math from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import List, Optional, Tuple, Dict, Any, Callable from enum import Enum from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # ============================================================================ # LAYER 1 - DATA MODELS # ============================================================================ @dataclass class FlightPrice: route: str price: float currency: str airline: str departure_time: str arrival_time: str duration_min: int stops: int provider: str deep_link: str flights_found: int value_score: float = field(init=False) def __post_init__(self): self.value_score = self.price + (self.stops * 1500) + max(0, self.duration_min - 300) * 2 class SurgeLevel(Enum): BELOW_AVERAGE = "BELOW_AVERAGE 💚" NORMAL = "NORMAL 🟢" ELEVATED = "ELEVATED 🟡" EXTREME = "EXTREME 🔴" @dataclass class SurgeResult: level: SurgeLevel z_score: float current_price: float avg_price: float std_dev: float pct_vs_avg: float @dataclass class ModelPrediction: model_name: str drop_probability: float drop_pct: float confidence: float reasoning: str @dataclass class EnsemblePrediction: models: List["ModelPrediction"] final_drop_probability: float final_drop_pct: float final_confidence: float dominant_model: str explanation: str urgency: str @dataclass class AgentStep: step_num: int thought: str action: str observation: str timestamp: str = field(default_factory=lambda: datetime.now().strftime("%H:%M:%S")) @dataclass class AgentTrace: steps: List["AgentStep"] final_reasoning: str models_used: int duration_ms: int @dataclass class ArbitrageResult: best_airport: str best_price: float domestic_cost: float total_cost: float savings: float worth_it: bool class ActionType(Enum): BOOK_NOW = "BOOK_NOW" WAIT_24H = "WAIT_24H" WAIT_WEEK = "WAIT_WEEK" TRY_NEARBY = "TRY_NEARBY" EMERGENCY_BOOK = "EMERGENCY_BOOK" class Urgency(Enum): LOW = "LOW" MEDIUM = "MEDIUM" HIGH = "HIGH" @dataclass class Decision: action: ActionType urgency: Urgency explanation: str confidence_score: float = 0.0 arbitrage: Optional[ArbitrageResult] = None auto_book_triggered: bool = False telegram_sent: bool = False @dataclass class BookingResult: success: bool screenshot_path: Optional[str] error: Optional[str] manual_instructions: str @dataclass class PassengerProfile: full_name: str email: str phone: str @dataclass class IntelligenceReport: flight: FlightPrice history_days: int rolling_avg: float std_dev: float trend_slope: float volatility: float surge: SurgeResult ensemble: EnsemblePrediction agent_trace: AgentTrace arbitrage: Optional[ArbitrageResult] decision: Decision booking: Optional[BookingResult] forecast_prices: List[float] = field(default_factory=list) sentiment_score: float = 0.5 # ============================================================================ # LAYER 2 - PRICE PROVIDERS # ============================================================================ class KiwiProvider: BASE_URL = "https://api.tequila.kiwi.com" def __init__(self, api_key: str): self.api_key = api_key self.headers = {"apikey": api_key} def search(self, origin: str, destination: str, departure_date: str, return_date: Optional[str], adults: int) -> List[FlightPrice]: if not self.api_key: return [] try: params = { "fly_from": origin, "fly_to": destination, "date_from": departure_date, "date_to": departure_date, "adults": adults, "curr": "INR", "limit": 50, "sort": "price" } if return_date: params["return_from"] = return_date params["return_to"] = return_date response = requests.get( f"{self.BASE_URL}/v2/search", headers=self.headers, params=params, timeout=15 ) if response.status_code != 200: return [] data = response.json() flights = [] for item in data.get("data", []): route_str = f"{item.get('flyFrom', origin)} → {item.get('flyTo', destination)}" if return_date: route_str += f" → {item.get('flyFrom', origin)}" flights.append(FlightPrice( route=route_str, price=float(item.get("price", 0)), currency="INR", airline=item.get("airlines", ["Unknown"])[0] if item.get("airlines") else "Unknown", departure_time=item.get("local_departure", ""), arrival_time=item.get("local_arrival", ""), duration_min=int(item.get("duration", {}).get("total", 0) / 60) if item.get("duration") else 0, stops=len(item.get("route", [])) - 1, provider="Kiwi", deep_link=item.get("deep_link", ""), flights_found=len(data.get("data", [])) )) return flights except Exception as e: print(f"Kiwi API error: {e}") return [] class AmadeusProvider: BASE_URL = "https://test.api.amadeus.com" def __init__(self, api_key: str, api_secret: str): self.api_key = api_key self.api_secret = api_secret self.token = None self.token_expiry = None def _get_token(self) -> bool: if not self.api_key or not self.api_secret: return False if self.token and self.token_expiry and datetime.now() < self.token_expiry: return True try: response = requests.post( f"{self.BASE_URL}/v1/security/oauth2/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": self.api_key, "client_secret": self.api_secret }, timeout=10 ) if response.status_code == 200: data = response.json() self.token = data.get("access_token") self.token_expiry = datetime.now() + timedelta(seconds=data.get("expires_in", 1800) - 60) return True return False except Exception as e: print(f"Amadeus auth error: {e}") return False def search(self, origin: str, destination: str, departure_date: str, return_date: Optional[str], adults: int) -> List[FlightPrice]: if not self._get_token(): return [] try: params = { "originLocationCode": origin, "destinationLocationCode": destination, "departureDate": departure_date, "adults": adults, "currencyCode": "INR", "max": 50 } if return_date: params["returnDate"] = return_date response = requests.get( f"{self.BASE_URL}/v2/shopping/flight-offers", headers={"Authorization": f"Bearer {self.token}"}, params=params, timeout=15 ) if response.status_code != 200: return [] data = response.json() flights = [] for offer in data.get("data", []): price = float(offer.get("price", {}).get("total", 0)) itineraries = offer.get("itineraries", []) if not itineraries: continue first_segment = itineraries[0].get("segments", [{}])[0] last_segment = itineraries[-1].get("segments", [{}])[-1] total_duration = sum( self._parse_duration(itin.get("duration", "PT0M")) for itin in itineraries ) total_stops = sum( len(itin.get("segments", [])) - 1 for itin in itineraries ) route_str = f"{origin} → {destination}" if return_date: route_str += f" → {origin}" flights.append(FlightPrice( route=route_str, price=price, currency="INR", airline=first_segment.get("carrierCode", "Unknown"), departure_time=first_segment.get("departure", {}).get("at", ""), arrival_time=last_segment.get("arrival", {}).get("at", ""), duration_min=total_duration, stops=total_stops, provider="Amadeus", deep_link="", flights_found=len(data.get("data", [])) )) return flights except Exception as e: print(f"Amadeus API error: {e}") return [] def _parse_duration(self, duration_str: str) -> int: try: duration_str = duration_str.replace("PT", "") hours = 0 minutes = 0 if "H" in duration_str: parts = duration_str.split("H") hours = int(parts[0]) duration_str = parts[1] if len(parts) > 1 else "" if "M" in duration_str: minutes = int(duration_str.replace("M", "")) return hours * 60 + minutes except: return 0 class PriceAggregator: def __init__(self, kiwi_key: str, amadeus_key: str, amadeus_secret: str): self.kiwi = KiwiProvider(kiwi_key) self.amadeus = AmadeusProvider(amadeus_key, amadeus_secret) def get_best_price(self, origin: str, destination: str, departure_date: str, return_date: Optional[str], adults: int) -> Optional[FlightPrice]: all_flights = [] kiwi_flights = self.kiwi.search(origin, destination, departure_date, return_date, adults) all_flights.extend(kiwi_flights) amadeus_flights = self.amadeus.search(origin, destination, departure_date, return_date, adults) all_flights.extend(amadeus_flights) if not all_flights: return None deduplicated = {} for flight in all_flights: key = (flight.airline, round(flight.price, -2), flight.stops) if key not in deduplicated or flight.value_score < deduplicated[key].value_score: deduplicated[key] = flight unique_flights = list(deduplicated.values()) unique_flights.sort(key=lambda f: f.value_score) return unique_flights[0] if unique_flights else None # ============================================================================ # LAYER 3 - HISTORICAL STORAGE # ============================================================================ class PriceLogger: def __init__(self, db_path: str): self.db_path = db_path self._init_db() def _init_db(self): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS price_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, route TEXT NOT NULL, price REAL NOT NULL, currency TEXT NOT NULL, airline TEXT, duration_min INTEGER, stops INTEGER, provider TEXT, departure_date TEXT ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS bookings ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, route TEXT NOT NULL, price REAL NOT NULL, action TEXT NOT NULL, success INTEGER NOT NULL, details TEXT ) """) conn.commit() conn.close() def log_price(self, flight: FlightPrice, departure_date: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT INTO price_logs (timestamp, route, price, currency, airline, duration_min, stops, provider, departure_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( datetime.now().isoformat(), flight.route, flight.price, flight.currency, flight.airline, flight.duration_min, flight.stops, flight.provider, departure_date )) conn.commit() conn.close() def log_booking(self, route: str, price: float, action: str, success: bool, details: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT INTO bookings (timestamp, route, price, action, success, details) VALUES (?, ?, ?, ?, ?, ?) """, ( datetime.now().isoformat(), route, price, action, 1 if success else 0, details )) conn.commit() conn.close() def fetch_history(self, route: str, days: int) -> List[Tuple[str, float]]: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cutoff = (datetime.now() - timedelta(days=days)).isoformat() cursor.execute(""" SELECT timestamp, price FROM price_logs WHERE route = ? AND timestamp >= ? ORDER BY timestamp DESC """, (route, cutoff)) results = cursor.fetchall() conn.close() return results def rolling_avg(self, route: str, days: int) -> float: history = self.fetch_history(route, days) if not history: return 0.0 prices = [price for _, price in history] return sum(prices) / len(prices) def std_dev(self, route: str, days: int) -> float: history = self.fetch_history(route, days) if len(history) < 2: return 0.0 prices = [price for _, price in history] avg = sum(prices) / len(prices) variance = sum((p - avg) ** 2 for p in prices) / len(prices) return math.sqrt(variance) def trend_slope(self, route: str, days: int) -> float: history = self.fetch_history(route, days) if len(history) < 2: return 0.0 prices = [price for _, price in history] n = len(prices) x_values = list(range(n)) x_mean = sum(x_values) / n y_mean = sum(prices) / n numerator = sum((x_values[i] - x_mean) * (prices[i] - y_mean) for i in range(n)) denominator = sum((x - x_mean) ** 2 for x in x_values) if denominator == 0: return 0.0 return numerator / denominator def recent_bookings(self, route: str, days: int) -> List[Dict]: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cutoff = (datetime.now() - timedelta(days=days)).isoformat() cursor.execute(""" SELECT timestamp, price, action, success, details FROM bookings WHERE route = ? AND timestamp >= ? ORDER BY timestamp DESC """, (route, cutoff)) results = [] for row in cursor.fetchall(): results.append({ "timestamp": row[0], "price": row[1], "action": row[2], "success": bool(row[3]), "details": row[4] }) conn.close() return results # ============================================================================ # LAYER 4 - INTELLIGENCE # ============================================================================ class SurgeAnalyzer: @staticmethod def analyze(current_price: float, avg_price: float, std_dev: float) -> SurgeResult: if avg_price == 0 or std_dev == 0: return SurgeResult( level=SurgeLevel.NORMAL, z_score=0.0, current_price=current_price, avg_price=avg_price, std_dev=std_dev, pct_vs_avg=0.0 ) z_score = (current_price - avg_price) / std_dev pct_vs_avg = ((current_price - avg_price) / avg_price) * 100 if z_score < -0.5: level = SurgeLevel.BELOW_AVERAGE elif z_score < 0.5: level = SurgeLevel.NORMAL elif z_score < 1.5: level = SurgeLevel.ELEVATED else: level = SurgeLevel.EXTREME return SurgeResult( level=level, z_score=z_score, current_price=current_price, avg_price=avg_price, std_dev=std_dev, pct_vs_avg=pct_vs_avg ) # ──────────────────────────────────────────────────────────────────────────── # MODEL 0 – Volatility helper (shared) # ──────────────────────────────────────────────────────────────────────────── def _volatility(std_dev: float, avg_price: float) -> float: if avg_price == 0: return 0.0 return (std_dev / avg_price) * 100 # ──────────────────────────────────────────────────────────────────────────── # MODEL UTILITY – HuggingFace Inference API client (no torch required) # ──────────────────────────────────────────────────────────────────────────── class HFInferenceClient: BASE_URL = "https://api-inference.huggingface.co/models" def __init__(self, hf_token: str = ""): self.hf_token = hf_token or os.getenv("HF_TOKEN", "") self.headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else {} def query(self, model_id: str, payload: Dict, timeout: int = 20) -> Optional[Any]: try: resp = requests.post( f"{self.BASE_URL}/{model_id}", headers=self.headers, json=payload, timeout=timeout ) if resp.status_code == 200: return resp.json() return None except Exception: return None # ──────────────────────────────────────────────────────────────────────────── # MODEL 1 – Heuristic rule-based predictor (always available, baseline) # ──────────────────────────────────────────────────────────────────────────── class HeuristicPredictor: def predict(self, days_to_departure: int, surge_z: float, volatility: float, trend_slope: float) -> ModelPrediction: base = 50.0 if days_to_departure <= 3: base -= 30 elif days_to_departure <= 7: base -= 15 elif days_to_departure <= 14: base += 5 elif days_to_departure <= 30: base += 15 else: base += 10 if surge_z > 1.5: base += 25 elif surge_z > 0.5: base += 10 elif surge_z < -0.5: base -= 20 if volatility > 20: base += 15 elif volatility > 10: base += 5 if trend_slope > 50: base += 20 elif trend_slope > 0: base += 10 elif trend_slope < -50: base -= 15 prob = max(0.0, min(100.0, base)) drop_pct = 12.0 if prob >= 70 else 8.0 if prob >= 50 else 4.0 conf = 0.55 return ModelPrediction( model_name="Heuristic-V2", drop_probability=prob, drop_pct=drop_pct, confidence=conf, reasoning=( f"days={days_to_departure}, Z={surge_z:.2f}, " f"vol={volatility:.1f}%, slope={trend_slope:+.1f}" ) ) # ──────────────────────────────────────────────────────────────────────────── # MODEL 2 – sklearn RandomForest + GradientBoosting ensemble # ──────────────────────────────────────────────────────────────────────────── class SklearnEnsemblePredictor: def __init__(self, model_path: Optional[str] = None): self.rf = None self.gb = None self._init(model_path) def _init(self, model_path: Optional[str]): if model_path and os.path.exists(model_path): try: with open(model_path, "rb") as f: saved = pickle.load(f) self.rf = saved.get("rf") self.gb = saved.get("gb") return except Exception: pass self._train_synthetic() def _train_synthetic(self): try: from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier import numpy as np rng = np.random.default_rng(42) n = 600 days = rng.integers(1, 91, n).astype(float) sz = rng.uniform(-2.5, 3.0, n) vol = rng.uniform(0, 45, n) slope = rng.uniform(-120, 200, n) labels = ( ((days > 7) & (sz > 0.5) & (slope > 0)) | (sz > 1.5) | ((vol > 20) & (slope > 30)) ).astype(int) X = np.column_stack([days, sz, vol, slope]) self.rf = RandomForestClassifier(n_estimators=80, max_depth=6, random_state=42) self.rf.fit(X, labels) self.gb = GradientBoostingClassifier(n_estimators=80, max_depth=4, random_state=42) self.gb.fit(X, labels) except Exception as e: self.rf = None self.gb = None def predict(self, days_to_departure: int, surge_z: float, volatility: float, trend_slope: float) -> ModelPrediction: if self.rf is None: return ModelPrediction( model_name="sklearn-RF+GB", drop_probability=50.0, drop_pct=6.0, confidence=0.30, reasoning="sklearn unavailable – using fallback" ) try: import numpy as np X = np.array([[float(days_to_departure), surge_z, volatility, trend_slope]]) rf_p = float(self.rf.predict_proba(X)[0][1]) * 100 gb_p = float(self.gb.predict_proba(X)[0][1]) * 100 avg_p = (rf_p + gb_p) / 2.0 return ModelPrediction( model_name="sklearn-RF+GB", drop_probability=avg_p, drop_pct=10.0 if avg_p >= 60 else 6.0, confidence=0.72, reasoning=f"RF={rf_p:.1f}%, GB={gb_p:.1f}%, ensemble={avg_p:.1f}%" ) except Exception as e: return ModelPrediction( model_name="sklearn-RF+GB", drop_probability=50.0, drop_pct=6.0, confidence=0.30, reasoning=f"predict error: {e}" ) # ──────────────────────────────────────────────────────────────────────────── # MODEL 3 – HuggingFace Sentiment (ProsusAI/finbert → financial NLP) # Converts numeric signals → text → sentiment → buy/wait probability # ──────────────────────────────────────────────────────────────────────────── class SentimentAnalyzer: PRIMARY_MODEL = "ProsusAI/finbert" FALLBACK_MODEL = "distilbert-base-uncased-finetuned-sst-2-english" def __init__(self, hf_client: HFInferenceClient): self.hf = hf_client def analyze(self, flight: FlightPrice, surge: SurgeResult, trend_slope: float, days_to_departure: int) -> ModelPrediction: text = self._build_text(flight, surge, trend_slope, days_to_departure) raw = self.hf.query(self.PRIMARY_MODEL, {"inputs": text}) if not raw: raw = self.hf.query(self.FALLBACK_MODEL, {"inputs": text}) buy_signal = self._extract_buy_signal(raw) if raw else self._heuristic_buy(surge, trend_slope) drop_prob = 100.0 - (buy_signal * 100.0) return ModelPrediction( model_name="HF-Sentiment(finbert)", drop_probability=drop_prob, drop_pct=7.0 if drop_prob > 60 else 3.5, confidence=0.58 if raw else 0.40, reasoning=( f"buy_signal={buy_signal:.2f}, text='{text[:60]}...'" if raw else f"heuristic_buy={buy_signal:.2f} (HF API unavailable)" ) ) def _build_text(self, flight: FlightPrice, surge: SurgeResult, trend_slope: float, days_to_departure: int) -> str: trend_dir = "rising" if trend_slope > 0 else "declining" level = surge.level.name.replace("_", " ").lower() return ( f"Airline ticket price {flight.price:.0f} INR on {flight.route}. " f"Price is {level} at {surge.pct_vs_avg:+.1f}% deviation from average. " f"14-day trend is {trend_dir} by {abs(trend_slope):.1f} INR per day. " f"Departure in {days_to_departure} days. " f"{'Last minute urgent booking.' if days_to_departure <= 5 else ''}" ) @staticmethod def _extract_buy_signal(result: Any) -> float: try: items = result[0] if isinstance(result[0], list) else result for item in items: label = item.get("label", "").lower() score = float(item.get("score", 0.5)) if "positive" in label or label == "pos": return score if "negative" in label or label == "neg": return 1.0 - score return 0.5 except Exception: return 0.5 @staticmethod def _heuristic_buy(surge: SurgeResult, trend_slope: float) -> float: s = 0.5 if surge.level == SurgeLevel.BELOW_AVERAGE: s += 0.3 elif surge.level == SurgeLevel.EXTREME: s -= 0.3 if trend_slope < 0: s += 0.1 elif trend_slope > 0: s -= 0.1 return max(0.0, min(1.0, s)) # ──────────────────────────────────────────────────────────────────────────── # MODEL 4 – Chronos / Linear time-series forecaster # Uses amazon/chronos-t5-tiny via HF Inference API; falls back to OLS # ──────────────────────────────────────────────────────────────────────────── class ChronosForecaster: MODEL_ID = "amazon/chronos-t5-tiny" HORIZON = 7 def __init__(self, hf_client: HFInferenceClient): self.hf = hf_client def forecast(self, history: List[float]) -> List[float]: if len(history) < 4: return [] raw = self.hf.query( self.MODEL_ID, {"inputs": history[-20:], "parameters": {"prediction_length": self.HORIZON}}, timeout=25 ) if raw and isinstance(raw, list) and len(raw) >= self.HORIZON: try: return [float(v) for v in raw[:self.HORIZON]] except Exception: pass return self._ols_forecast(history, self.HORIZON) def predict_model(self, history: List[float], current_price: float) -> ModelPrediction: fc = self.forecast(history) if not fc: return ModelPrediction( model_name="Chronos-Forecast", drop_probability=50.0, drop_pct=5.0, confidence=0.35, reasoning="Insufficient history for forecast" ) fc_avg = sum(fc) / len(fc) fc_min = min(fc) proj_drop_pct = ((current_price - fc_avg) / current_price * 100) if current_price > 0 else 0.0 drop_prob = max(0.0, min(100.0, 50.0 + proj_drop_pct * 2.0)) return ModelPrediction( model_name="Chronos-Forecast", drop_probability=drop_prob, drop_pct=abs(proj_drop_pct) if proj_drop_pct > 0 else 3.0, confidence=0.65, reasoning=( f"7-day avg=₹{fc_avg:.0f}, min=₹{fc_min:.0f}, " f"now=₹{current_price:.0f}, proj_drop={proj_drop_pct:.1f}%" ) ) @staticmethod def _ols_forecast(prices: List[float], horizon: int) -> List[float]: n = len(prices) x_mean = (n - 1) / 2.0 y_mean = sum(prices) / n num = sum((i - x_mean) * (prices[i] - y_mean) for i in range(n)) den = sum((i - x_mean) ** 2 for i in range(n)) slope = num / den if den != 0 else 0.0 intercept = y_mean - slope * x_mean return [intercept + slope * (n + i) for i in range(horizon)] # ──────────────────────────────────────────────────────────────────────────── # ENSEMBLE – Weighted combination of all 4 models # ──────────────────────────────────────────────────────────────────────────── class MultiModelEnsemble: WEIGHTS: Dict[str, float] = { "Heuristic-V2": 0.15, "sklearn-RF+GB": 0.35, "HF-Sentiment(finbert)": 0.20, "Chronos-Forecast": 0.30, } def __init__(self, hf_token: str = "", model_path: Optional[str] = None): self.hf_client = HFInferenceClient(hf_token) self.heuristic = HeuristicPredictor() self.sklearn_mdl = SklearnEnsemblePredictor(model_path) self.sentiment = SentimentAnalyzer(self.hf_client) self.chronos = ChronosForecaster(self.hf_client) def predict(self, flight: FlightPrice, surge: SurgeResult, days_to_departure: int, trend_slope: float, volatility: float, history_prices: List[float]) -> EnsemblePrediction: models: List[ModelPrediction] = [] m1 = self.heuristic.predict(days_to_departure, surge.z_score, volatility, trend_slope) models.append(m1) m2 = self.sklearn_mdl.predict(days_to_departure, surge.z_score, volatility, trend_slope) models.append(m2) m3 = self.sentiment.analyze(flight, surge, trend_slope, days_to_departure) models.append(m3) m4 = self.chronos.predict_model(history_prices, flight.price) models.append(m4) total_w = sum(self.WEIGHTS.get(m.model_name, 0.25) for m in models) w_prob = sum(m.drop_probability * self.WEIGHTS.get(m.model_name, 0.25) for m in models) / total_w w_drop = sum(m.drop_pct * self.WEIGHTS.get(m.model_name, 0.25) for m in models) / total_w w_conf = sum(m.confidence * self.WEIGHTS.get(m.model_name, 0.25) for m in models) / total_w dominant = max(models, key=lambda m: m.confidence) urgency = "HIGH" if w_prob >= 70 else "MEDIUM" if w_prob >= 50 else "LOW" return EnsemblePrediction( models=models, final_drop_probability=round(w_prob, 1), final_drop_pct=round(w_drop, 1), final_confidence=round(w_conf, 3), dominant_model=dominant.model_name, explanation=( f"4-model ensemble → H={m1.drop_probability:.0f}%, " f"SK={m2.drop_probability:.0f}%, " f"Sent={m3.drop_probability:.0f}%, " f"Chron={m4.drop_probability:.0f}%" ), urgency=urgency ) # ============================================================================ # LAYER 5 - AGENTIC CORE (ReAct: Reason + Act loop) # ============================================================================ class AgentMemory: def __init__(self): self.facts: Dict[str, Any] = {} self.observations: List[str] = [] def remember(self, key: str, value: Any): self.facts[key] = value def recall(self, key: str) -> Optional[Any]: return self.facts.get(key) def observe(self, text: str): self.observations.append(text) def context(self) -> str: return " | ".join(self.observations[-5:]) class ReActAgent: """ Deterministic ReAct (Reasoning + Acting) agent. Iterates through structured tool-call steps, records Thought/Action/Observation triples, and synthesises a final natural-language recommendation. No LLM API required — fully deterministic and explainable. """ MAX_STEPS = 8 def __init__(self): self.memory: AgentMemory = AgentMemory() self.steps: List[AgentStep] = [] self._n = 0 def _step(self, thought: str, action: str, observation: str) -> AgentStep: self._n += 1 s = AgentStep(step_num=self._n, thought=thought, action=action, observation=observation) self.steps.append(s) self.memory.observe(observation) return s def analyze(self, flight: FlightPrice, surge: SurgeResult, ensemble: EnsemblePrediction, arbitrage: Optional[ArbitrageResult], days_to_departure: int, history_prices: List[float], trend_slope: float, volatility: float, sentiment_score: float, forecast: List[float]) -> AgentTrace: t0 = time.time() self.steps = [] self.memory = AgentMemory() self._n = 0 # ── Step 1: Price vs History ────────────────────────────────────────── self._step( thought=f"I need to evaluate ₹{flight.price:,.0f} for {flight.route} against historical baseline.", action="assess_price_vs_history", observation=( f"Price is {surge.pct_vs_avg:+.1f}% vs 14-day avg ₹{surge.avg_price:,.0f}. " f"Surge level: {surge.level.name} (Z={surge.z_score:.2f}). " f"Std-dev: ₹{surge.std_dev:,.0f}." ) ) self.memory.remember("surge_level", surge.level) # ── Step 2: Trend analysis ──────────────────────────────────────────── trend_label = "RISING ↑" if trend_slope > 20 else "FALLING ↓" if trend_slope < -20 else "STABLE →" self._step( thought="Trend direction tells me whether waiting could yield lower prices.", action="analyze_14day_trend", observation=( f"Slope: {trend_slope:+.1f} INR/day → {trend_label}. " f"Volatility (CoV): {volatility:.1f}%. " f"{'High volatility — market is unpredictable.' if volatility > 15 else 'Low volatility — stable pricing.'}" ) ) self.memory.remember("trend", trend_label) # ── Step 3: Ensemble model results ─────────────────────────────────── dom_reasoning = next( (m.reasoning for m in ensemble.models if m.model_name == ensemble.dominant_model), "" ) self._step( thought=f"Running {len(ensemble.models)} ML models to estimate price-drop probability.", action="run_4model_ensemble", observation=( f"Ensemble: {ensemble.final_drop_probability:.0f}% drop probability, " f"est. drop {ensemble.final_drop_pct:.1f}%, confidence {ensemble.final_confidence:.0%}. " f"Dominant: {ensemble.dominant_model} → {dom_reasoning}." ) ) self.memory.remember("drop_prob", ensemble.final_drop_probability) # ── Step 4: Sentiment signal ───────────────────────────────────────── sent_label = ( "BULLISH 🟢 (good time to buy)" if sentiment_score > 0.6 else "BEARISH 🔴 (prices may drop)" if sentiment_score < 0.4 else "NEUTRAL 🟡" ) self._step( thought="HuggingFace finbert sentiment on numeric-to-text conversion provides market feel.", action="query_hf_sentiment_model", observation=f"Sentiment score: {sentiment_score:.2f} → {sent_label}." ) self.memory.remember("sentiment", sent_label) # ── Step 5: 7-day price forecast ───────────────────────────────────── if forecast: fc_min = min(forecast) fc_trend = "declining 📉" if forecast[-1] < forecast[0] else "rising 📈" self._step( thought="Chronos OLS 7-day forecast reveals expected price direction.", action="fetch_chronos_forecast", observation=( f"7-day forecast {fc_trend}. Lowest projected: ₹{fc_min:,.0f}. " f"Current: ₹{flight.price:,.0f}. " f"{'Waiting may save money.' if fc_min < flight.price * 0.95 else 'Minimal savings expected from waiting.'}" ) ) # ── Step 6: Arbitrage check ─────────────────────────────────────────── if arbitrage and arbitrage.worth_it: self._step( thought=f"Arbitrage option found via {arbitrage.best_airport}. Evaluating true net saving.", action="evaluate_arbitrage_opportunity", observation=( f"Flight from {arbitrage.best_airport}: ₹{arbitrage.best_price:,.0f} + " f"domestic ₹{arbitrage.domestic_cost:,.0f} = ₹{arbitrage.total_cost:,.0f}. " f"Net saving: ₹{arbitrage.savings:,.0f} ({arbitrage.savings/flight.price*100:.1f}%). " f"{'Strong arbitrage — recommend alternate route.' if arbitrage.savings > 5000 else 'Moderate arbitrage — consider if flexible.'}" ) ) # ── Step 7: Time-pressure assessment ───────────────────────────────── urgency_label = ( "🚨 CRITICAL (<7 days) — prices spike sharply near departure" if days_to_departure <= 7 else "⚠️ MODERATE (1-4 weeks) — decide soon" if days_to_departure <= 28 else "✅ RELAXED (>4 weeks) — monitor for better price" ) self._step( thought=f"Time window ({days_to_departure} days) is a key factor in urgency.", action="assess_departure_window", observation=f"{days_to_departure} days to departure → {urgency_label}." ) # ── Step 8: Synthesize ──────────────────────────────────────────────── signals = [] if surge.level in (SurgeLevel.BELOW_AVERAGE, SurgeLevel.NORMAL): signals.append(f"✅ Price at/below average ({surge.level.name})") else: signals.append(f"⚠️ Price elevated ({surge.level.name})") if ensemble.final_drop_probability > 60: signals.append(f"⚠️ {ensemble.final_drop_probability:.0f}% drop probability — consider waiting") else: signals.append(f"✅ Low drop probability ({ensemble.final_drop_probability:.0f}%)") if trend_slope < -20: signals.append("⚠️ Price trend declining — waiting may save money") elif trend_slope > 20: signals.append("✅ Prices rising — book now beats higher future cost") if days_to_departure <= 7: signals.append("🚨 Departure imminent — book immediately") final_reasoning = ( f"After consulting {len(ensemble.models)} AI models (sklearn RF/GB, HF finbert, " f"Chronos forecast, heuristic), analysing {days_to_departure}-day horizon, " f"trend '{trend_label}', and market sentiment '{sent_label}': " + " | ".join(signals) ) self._step( thought="All signals gathered. Synthesising final recommendation.", action="synthesize_recommendation", observation=final_reasoning ) return AgentTrace( steps=self.steps, final_reasoning=final_reasoning, models_used=len(ensemble.models), duration_ms=int((time.time() - t0) * 1000) ) # ============================================================================ # LAYER 6 - ARBITRAGE AGENT # ============================================================================ class ArbitrageAgent: AIRPORTS = { "TRZ": {"name": "Trichy", "domestic_cost": 0}, "MAA": {"name": "Chennai", "domestic_cost": 1200}, "CJB": {"name": "Coimbatore", "domestic_cost": 800}, "BLR": {"name": "Bangalore", "domestic_cost": 2800}, "HYD": {"name": "Hyderabad", "domestic_cost": 3500} } MIN_SAVINGS = 2000 def __init__(self, aggregator: PriceAggregator): self.aggregator = aggregator def find_best_route(self, origin: str, destination: str, departure_date: str, return_date: Optional[str], adults: int, current_price: float) -> Optional[ArbitrageResult]: best_result = None for airport_code, airport_info in self.AIRPORTS.items(): if airport_code == origin: continue alt_flight = self.aggregator.get_best_price( airport_code, destination, departure_date, return_date, adults ) if not alt_flight: continue domestic_cost = airport_info["domestic_cost"] total_cost = alt_flight.price + domestic_cost savings = current_price - total_cost if savings >= self.MIN_SAVINGS: if not best_result or savings > best_result.savings: best_result = ArbitrageResult( best_airport=airport_code, best_price=alt_flight.price, domestic_cost=domestic_cost, total_cost=total_cost, savings=savings, worth_it=True ) return best_result # ============================================================================ # LAYER 7 - DECISION ENGINE # ============================================================================ class DecisionEngine: @staticmethod def decide(flight: FlightPrice, surge: SurgeResult, ensemble: EnsemblePrediction, arbitrage: Optional[ArbitrageResult], days_to_departure: int, emergency_mode: bool, rolling_avg: float) -> Decision: drop_prob = ensemble.final_drop_probability conf = ensemble.final_confidence if emergency_mode: return Decision( action=ActionType.EMERGENCY_BOOK, urgency=Urgency.HIGH, confidence_score=1.0, explanation="Emergency mode activated — booking immediately regardless of price.", arbitrage=arbitrage ) if days_to_departure <= 5: return Decision( action=ActionType.BOOK_NOW, urgency=Urgency.HIGH, confidence_score=0.95, explanation=( f"Only {days_to_departure} days to departure — last-minute prices typically spike. " f"Ensemble confidence: {conf:.0%}." ), arbitrage=arbitrage ) if arbitrage and arbitrage.worth_it and arbitrage.savings / flight.price > 0.10: return Decision( action=ActionType.TRY_NEARBY, urgency=Urgency.HIGH, confidence_score=0.88, explanation=( f"Arbitrage: save ₹{arbitrage.savings:,.0f} ({arbitrage.savings/flight.price*100:.1f}%) " f"flying from {arbitrage.best_airport} instead." ), arbitrage=arbitrage ) if rolling_avg > 0 and flight.price < rolling_avg * 0.95: return Decision( action=ActionType.BOOK_NOW, urgency=Urgency.HIGH, confidence_score=round(0.80 + conf * 0.15, 3), explanation=( f"Price is {abs(surge.pct_vs_avg):.1f}% below 14-day avg — " f"excellent deal confirmed by {ensemble.dominant_model} (drop_prob={drop_prob:.0f}%)." ), arbitrage=arbitrage ) if surge.level == SurgeLevel.EXTREME and drop_prob >= 60: return Decision( action=ActionType.WAIT_WEEK, urgency=Urgency.MEDIUM, confidence_score=round(conf * drop_prob / 100, 3), explanation=( f"Extreme surge (Z={surge.z_score:.2f}) + {drop_prob:.0f}% ensemble drop probability " f"→ wait for correction. Dominant signal: {ensemble.dominant_model}." ), arbitrage=arbitrage ) if surge.level == SurgeLevel.ELEVATED and drop_prob >= 45: return Decision( action=ActionType.WAIT_24H, urgency=Urgency.LOW, confidence_score=round(conf * 0.7, 3), explanation=( f"Elevated price + {drop_prob:.0f}% drop probability " f"→ monitor for 24 hours. {ensemble.explanation}." ), arbitrage=arbitrage ) if surge.level == SurgeLevel.BELOW_AVERAGE: return Decision( action=ActionType.BOOK_NOW, urgency=Urgency.HIGH, confidence_score=round(0.75 + conf * 0.20, 3), explanation=( f"Below-average price ({surge.pct_vs_avg:+.1f}%) — strong buy signal. " f"All models agree: drop_prob only {drop_prob:.0f}%." ), arbitrage=arbitrage ) return Decision( action=ActionType.BOOK_NOW, urgency=Urgency.MEDIUM, confidence_score=round(conf * 0.6, 3), explanation=( f"Normal market conditions. Ensemble: {ensemble.explanation}. " f"Book when ready." ), arbitrage=arbitrage ) # ============================================================================ # LAYER 8 - AUTONOMOUS BOOKING # ============================================================================ class BookingAgent: def __init__(self): self.playwright_available = self._check_playwright() def _check_playwright(self) -> bool: try: from playwright.sync_api import sync_playwright return True except ImportError: return False def book(self, flight: FlightPrice, passenger: PassengerProfile) -> BookingResult: if not self.playwright_available: return BookingResult( success=False, screenshot_path=None, error="Playwright not installed", manual_instructions=self.instructions(flight, passenger) ) try: from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() page.goto(flight.deep_link if flight.deep_link else "https://www.google.com/flights") page.wait_for_timeout(3000) selectors_first_name = [ "input[name='firstName']", "input[placeholder*='First']", "input[id*='first']", "input.first-name" ] selectors_last_name = [ "input[name='lastName']", "input[placeholder*='Last']", "input[id*='last']", "input.last-name" ] selectors_email = [ "input[type='email']", "input[name='email']", "input[placeholder*='email']" ] selectors_phone = [ "input[type='tel']", "input[name='phone']", "input[placeholder*='phone']" ] name_parts = passenger.full_name.split(maxsplit=1) first_name = name_parts[0] if name_parts else passenger.full_name last_name = name_parts[1] if len(name_parts) > 1 else "" for selector in selectors_first_name: try: page.fill(selector, first_name, timeout=2000) break except: continue for selector in selectors_last_name: try: page.fill(selector, last_name, timeout=2000) break except: continue for selector in selectors_email: try: page.fill(selector, passenger.email, timeout=2000) break except: continue for selector in selectors_phone: try: page.fill(selector, passenger.phone, timeout=2000) break except: continue screenshot_path = f"booking_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" page.screenshot(path=screenshot_path) browser.close() return BookingResult( success=True, screenshot_path=screenshot_path, error=None, manual_instructions=self.instructions(flight, passenger) ) except Exception as e: return BookingResult( success=False, screenshot_path=None, error=str(e), manual_instructions=self.instructions(flight, passenger) ) def instructions(self, flight: FlightPrice, passenger: PassengerProfile) -> str: instructions = f""" MANUAL BOOKING INSTRUCTIONS {'=' * 50} 1. Open booking link: {flight.deep_link if flight.deep_link else 'Search manually on flight booking site'} 2. Fill passenger details: Name: {passenger.full_name} Email: {passenger.email} Phone: {passenger.phone} 3. Review flight details: Route: {flight.route} Price: ₹{flight.price:,.0f} Airline: {flight.airline} Departure: {flight.departure_time} 4. Proceed to payment (MANUAL STEP - system stops here) 5. Complete payment using your preferred method {'=' * 50} """ return instructions # ============================================================================ # LAYER 9 - TELEGRAM NOTIFIER # ============================================================================ class TelegramNotifier: def __init__(self, token: str, chat_id: str): self.token = token self.chat_id = chat_id self.enabled = bool(token and chat_id) def should_alert(self, decision: Decision, surge: SurgeResult, arbitrage: Optional[ArbitrageResult], monitoring: bool) -> bool: if not self.enabled: return False if monitoring: return True if decision.action in [ActionType.BOOK_NOW, ActionType.EMERGENCY_BOOK, ActionType.TRY_NEARBY]: return True if surge.level == SurgeLevel.EXTREME: return True if arbitrage and arbitrage.savings >= 5000: return True if surge.level == SurgeLevel.BELOW_AVERAGE: return True return False def send(self, report: IntelligenceReport) -> bool: if not self.enabled: return False try: message = self._format_message(report) url = f"https://api.telegram.org/bot{self.token}/sendMessage" payload = { "chat_id": self.chat_id, "text": message, "parse_mode": "Markdown" } response = requests.post(url, json=payload, timeout=10) return response.status_code == 200 except Exception as e: print(f"Telegram error: {e}") return False def _format_message(self, report: IntelligenceReport) -> str: f = report.flight s = report.surge e = report.ensemble d = report.decision model_lines = "\n".join( f" • {m.model_name}: {m.drop_probability:.0f}% drop | conf {m.confidence:.0%}" for m in e.models ) message = ( f"\U0001f6eb *SkyGuardian AI v2.0 Alert*\n\n" f"*Flight Found*\n" f"{f.route}\n" f"\U0001f4b0 \u20b9{f.price:,.0f} | {f.airline}\n" f"\u23f1 {f.duration_min // 60}h {f.duration_min % 60}m | {f.stops} stop(s)\n\n" f"*Surge Analysis*\n" f"{s.level.value} (Z={s.z_score:.2f}, {s.pct_vs_avg:+.1f}% vs avg)\n\n" f"*4-Model Ensemble*\n" f"{model_lines}\n" f"\U0001f9e0 Final: {e.final_drop_probability:.0f}% drop probability " f"(conf {e.final_confidence:.0%})\n\n" f"*Decision*\n" f"\U0001f3af {d.action.value} ({d.urgency.value}) | confidence {d.confidence_score:.0%}\n" f"{d.explanation}\n" ) if d.arbitrage and d.arbitrage.worth_it: message += f"\n\U0001f4a1 Arbitrage: Save \u20b9{d.arbitrage.savings:,.0f} via {d.arbitrage.best_airport}" if d.auto_book_triggered: message += "\n\n\u2705 Auto-booking triggered" return message # ============================================================================ # LAYER 10 - MASTER ORCHESTRATOR # ============================================================================ class Orchestrator: def __init__(self, kiwi_key: str, amadeus_key: str, amadeus_secret: str, telegram_token: str, telegram_chat_id: str, db_path: str, hf_token: str = "", model_path: Optional[str] = None): self.aggregator = PriceAggregator(kiwi_key, amadeus_key, amadeus_secret) self.logger = PriceLogger(db_path) self.arbitrage_agent = ArbitrageAgent(self.aggregator) self.ensemble = MultiModelEnsemble(hf_token, model_path) self.react_agent = ReActAgent() self.booking_agent = BookingAgent() self.notifier = TelegramNotifier(telegram_token, telegram_chat_id) def run(self, origin: str, destination: str, departure_date: str, return_date: Optional[str], adults: int, emergency_mode: bool, monitoring_mode: bool, passenger: Optional[PassengerProfile], auto_book_threshold: float) -> IntelligenceReport: # ── Step 1: Validate ────────────────────────────────────────────────── if not origin or not destination or not departure_date: raise ValueError("Origin, destination, and departure date are required") # ── Step 2: Fetch live prices ───────────────────────────────────────── flight = self.aggregator.get_best_price( origin, destination, departure_date, return_date, adults ) if not flight: raise ValueError("No flights found — check API keys and route availability") # ── Step 3: Log to SQLite ───────────────────────────────────────────── self.logger.log_price(flight, departure_date) # ── Step 4: Compute 14-day history stats ────────────────────────────── history = self.logger.fetch_history(flight.route, 14) rolling_avg = self.logger.rolling_avg(flight.route, 14) std_dev_val = self.logger.std_dev(flight.route, 14) trend_slope = self.logger.trend_slope(flight.route, 14) vol = _volatility(std_dev_val, rolling_avg) history_prices = [p for _, p in history] # ── Step 5: Surge analysis ──────────────────────────────────────────── surge = SurgeAnalyzer.analyze(flight.price, rolling_avg, std_dev_val) # ── Step 6: Days to departure ───────────────────────────────────────── try: days_to_departure = (datetime.strptime(departure_date, "%Y-%m-%d") - datetime.now()).days except Exception: days_to_departure = 30 # ── Step 7: Multi-model ensemble prediction ─────────────────────────── ensemble_result = self.ensemble.predict( flight=flight, surge=surge, days_to_departure=days_to_departure, trend_slope=trend_slope, volatility=vol, history_prices=history_prices ) # ── Step 7b: Get forecast + sentiment score for agent ───────────────── forecast = self.ensemble.chronos.forecast(history_prices) sent_model = next( (m for m in ensemble_result.models if "Sentiment" in m.model_name), None ) sentiment_score = (100.0 - (sent_model.drop_probability if sent_model else 50.0)) / 100.0 # ── Step 8: Arbitrage check ─────────────────────────────────────────── arbitrage = self.arbitrage_agent.find_best_route( origin, destination, departure_date, return_date, adults, flight.price ) # ── Step 9: ReAct agent reasoning trace ────────────────────────────── agent_trace = self.react_agent.analyze( flight=flight, surge=surge, ensemble=ensemble_result, arbitrage=arbitrage, days_to_departure=days_to_departure, history_prices=history_prices, trend_slope=trend_slope, volatility=vol, sentiment_score=sentiment_score, forecast=forecast ) # ── Step 10: Decision engine ────────────────────────────────────────── decision = DecisionEngine.decide( flight, surge, ensemble_result, arbitrage, days_to_departure, emergency_mode, rolling_avg ) # ── Step 11: Auto-booking ───────────────────────────────────────────── booking_result = None if (auto_book_threshold > 0 and flight.price <= auto_book_threshold and decision.action in [ActionType.BOOK_NOW, ActionType.EMERGENCY_BOOK, ActionType.TRY_NEARBY] and passenger is not None): booking_result = self.booking_agent.book(flight, passenger) decision.auto_book_triggered = True self.logger.log_booking( route=flight.route, price=flight.price, action=decision.action.value, success=booking_result.success, details=booking_result.error or "Auto-booking attempted" ) # ── Step 12: Build report ───────────────────────────────────────────── report = IntelligenceReport( flight=flight, history_days=len(history), rolling_avg=rolling_avg, std_dev=std_dev_val, trend_slope=trend_slope, volatility=vol, surge=surge, ensemble=ensemble_result, agent_trace=agent_trace, arbitrage=arbitrage, decision=decision, booking=booking_result, forecast_prices=forecast, sentiment_score=sentiment_score ) # ── Step 13: Telegram alert ─────────────────────────────────────────── if self.notifier.should_alert(decision, surge, arbitrage, monitoring_mode): decision.telegram_sent = self.notifier.send(report) return report # ============================================================================ # OUTPUT FORMATTERS # ============================================================================ def _fmt_status(report: IntelligenceReport) -> str: e = report.ensemble d = report.decision return ( f"✈️ ₹{report.flight.price:,.0f} | {report.surge.level.value} | " f"Ensemble drop: {e.final_drop_probability:.0f}% (conf {e.final_confidence:.0%}) | " f"Decision: {d.action.value} [{d.urgency.value}] | " f"Confidence: {d.confidence_score:.0%} | " f"Alert: {'✅' if d.telegram_sent else '❌'} | " f"Auto-book: {'✅' if d.auto_book_triggered else '❌'} | " f"Agent steps: {len(report.agent_trace.steps)} " f"({report.agent_trace.duration_ms}ms)" ) def _fmt_price(flight: FlightPrice) -> str: return ( f"## ✈️ Best Flight Found\n\n" f"| Field | Value |\n|-------|-------|\n" f"| **Route** | {flight.route} |\n" f"| **Price** | ₹{flight.price:,.0f} {flight.currency} |\n" f"| **Airline** | {flight.airline} |\n" f"| **Duration** | {flight.duration_min // 60}h {flight.duration_min % 60}m |\n" f"| **Stops** | {flight.stops} |\n" f"| **Provider** | {flight.provider} |\n" f"| **Flights Found** | {flight.flights_found} |\n" f"| **Departure** | {flight.departure_time} |\n" f"| **Value Score** | {flight.value_score:,.0f} |\n" ) def _fmt_intel(report: IntelligenceReport) -> str: trend_emoji = "📈" if report.trend_slope > 0 else "📉" if report.trend_slope < 0 else "➡️" e = report.ensemble fc_line = "" if report.forecast_prices: fc_vals = ", ".join(f"₹{p:,.0f}" for p in report.forecast_prices[:5]) fc_line = f"| **7-Day Forecast** | {fc_vals}... |\n" return ( f"## 📊 Market Intelligence\n\n" f"| Metric | Value |\n|--------|-------|\n" f"| **14-Day Average** | ₹{report.rolling_avg:,.0f} |\n" f"| **Std Deviation** | ₹{report.std_dev:,.0f} |\n" f"| **Price vs Avg** | {report.surge.pct_vs_avg:+.1f}% |\n" f"| **Trend** | {trend_emoji} {report.trend_slope:+.1f} INR/day |\n" f"| **Volatility (CoV)** | {report.volatility:.1f}% |\n" f"| **History Points** | {report.history_days} |\n" f"| **Surge Level** | {report.surge.level.value} (Z={report.surge.z_score:.2f}) |\n" f"| **Sentiment Score** | {report.sentiment_score:.2f} " f"({'BULLISH 🟢' if report.sentiment_score > 0.6 else 'BEARISH 🔴' if report.sentiment_score < 0.4 else 'NEUTRAL 🟡'}) |\n" f"| **Ensemble Drop Prob** | {e.final_drop_probability:.0f}% " f"(conf {e.final_confidence:.0%}, {e.urgency}) |\n" f"| **Estimated Drop** | {e.final_drop_pct:.1f}% |\n" f"| **Dominant Model** | {e.dominant_model} |\n" f"{fc_line}" ) def _fmt_models(report: IntelligenceReport) -> str: e = report.ensemble rows = "\n".join( f"| **{m.model_name}** | {m.drop_probability:.1f}% | {m.drop_pct:.1f}% | " f"{m.confidence:.0%} | {m.reasoning[:80]}{'…' if len(m.reasoning) > 80 else ''} |" for m in e.models ) weight_map = { "Heuristic-V2": "15%", "sklearn-RF+GB": "35%", "HF-Sentiment(finbert)": "20%", "Chronos-Forecast": "30%" } wrows = "\n".join( f"| {m.model_name} | {weight_map.get(m.model_name, '—')} | {m.drop_probability:.1f}% |" for m in e.models ) return ( f"## 🧠 4-Model Ensemble Intelligence\n\n" f"| Model | Drop Prob | Drop Est | Confidence | Reasoning |\n" f"|-------|-----------|----------|------------|----------|\n" f"{rows}\n\n" f"### Weighted Ensemble Result\n\n" f"| Model | Weight | Individual Prob |\n" f"|-------|--------|-----------------|\n" f"{wrows}\n\n" f"**Final: {e.final_drop_probability:.1f}% drop probability** | " f"Drop est: {e.final_drop_pct:.1f}% | " f"Confidence: {e.final_confidence:.0%} | " f"Dominant: **{e.dominant_model}**\n\n" f"> {e.explanation}" ) def _fmt_agent_trace(report: IntelligenceReport) -> str: trace = report.agent_trace lines = [ f"## 🤖 ReAct Agent Trace\n", f"> **{len(trace.steps)} reasoning steps** | " f"**{trace.models_used} models consulted** | " f"**{trace.duration_ms}ms**\n", ] for step in trace.steps: lines.append( f"---\n" f"**Step {step.step_num}** `[{step.timestamp}]`\n\n" f"🧠 **Thought:** {step.thought}\n\n" f"⚙️ **Action:** `{step.action}`\n\n" f"👁️ **Observation:** {step.observation}\n" ) lines.append(f"\n---\n### 🎯 Final Agent Reasoning\n\n> {trace.final_reasoning}") return "\n".join(lines) def _fmt_decision(report: IntelligenceReport) -> str: d = report.decision output = ( f"## 🎯 Decision: **{d.action.value}** ({d.urgency.value})\n\n" f"**Confidence:** {d.confidence_score:.0%}\n\n" f"{d.explanation}\n" ) if d.arbitrage and d.arbitrage.worth_it: output += ( f"\n### 💡 Arbitrage Opportunity\n\n" f"Fly from **{d.arbitrage.best_airport}** instead:\n" f"- Flight price: ₹{d.arbitrage.best_price:,.0f}\n" f"- Domestic cost: ₹{d.arbitrage.domestic_cost:,.0f}\n" f"- Total cost: ₹{d.arbitrage.total_cost:,.0f}\n" f"- **Savings: ₹{d.arbitrage.savings:,.0f}**\n" ) if d.auto_book_triggered: output += "\n### ✅ Auto-Booking Triggered\nCheck booking instructions below.\n" if d.telegram_sent: output += "\n### 📱 Telegram Alert Sent\n" return output # ============================================================================ # GRADIO UI (6 tabs) # ============================================================================ def create_ui(): import gradio as gr css = """ @import url('https://fonts.googleapis.com/css2?family=Familjen+Grotesk:wght@400;600;700&family=JetBrains+Mono:wght@400;500&display=swap'); body, .gradio-container { background:#060810!important; font-family:'Familjen Grotesk',sans-serif!important; color:#e0e0e0!important; } .hero-title { background:linear-gradient(135deg,#f0a500 0%,#00c2ff 100%); -webkit-background-clip:text; -webkit-text-fill-color:transparent; font-size:2.8rem; font-weight:700; margin-bottom:.4rem; } .hero-sub { color:#00e87a; font-size:1.1rem; margin-bottom:.8rem; } .pill { display:inline-block; background:rgba(240,165,0,.18); border:1px solid #f0a500; color:#f0a500; padding:.25rem .75rem; border-radius:20px; font-size:.8rem; margin:.15rem; font-weight:600; } .pill2 { border-color:#00c2ff; color:#00c2ff; background:rgba(0,194,255,.1); } .pill3 { border-color:#00e87a; color:#00e87a; background:rgba(0,232,122,.1); } h1,h2,h3 { font-weight:700!important; } code,pre,.mono { font-family:'JetBrains Mono',monospace!important; } """ hero = """