skyliner / app.py
ganeshkumar383's picture
Update app.py
4281180 verified
"""
SkyGuardian AI v2.0 – Multi-Model Agentic Flight Intelligence System
India ⇄ Singapore Corridor | Primary: TRZ ⇄ SIN | Secondary: MAA ⇄ SIN
Architecture: ReAct Agent + 4-Model Ensemble (Heuristic + sklearn RF/GB + HF Sentiment + Chronos Forecast)
HuggingFace Models: ProsusAI/finbert (sentiment) Β· amazon/chronos-t5-tiny (time-series forecast)
"""
import os
import sqlite3
import json
import time
import pickle
import requests
import math
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Dict, Any, Callable
from enum import Enum
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# ============================================================================
# LAYER 1 - DATA MODELS
# ============================================================================
@dataclass
class FlightPrice:
route: str
price: float
currency: str
airline: str
departure_time: str
arrival_time: str
duration_min: int
stops: int
provider: str
deep_link: str
flights_found: int
value_score: float = field(init=False)
def __post_init__(self):
self.value_score = self.price + (self.stops * 1500) + max(0, self.duration_min - 300) * 2
class SurgeLevel(Enum):
BELOW_AVERAGE = "BELOW_AVERAGE πŸ’š"
NORMAL = "NORMAL 🟒"
ELEVATED = "ELEVATED 🟑"
EXTREME = "EXTREME πŸ”΄"
@dataclass
class SurgeResult:
level: SurgeLevel
z_score: float
current_price: float
avg_price: float
std_dev: float
pct_vs_avg: float
@dataclass
class ModelPrediction:
model_name: str
drop_probability: float
drop_pct: float
confidence: float
reasoning: str
@dataclass
class EnsemblePrediction:
models: List["ModelPrediction"]
final_drop_probability: float
final_drop_pct: float
final_confidence: float
dominant_model: str
explanation: str
urgency: str
@dataclass
class AgentStep:
step_num: int
thought: str
action: str
observation: str
timestamp: str = field(default_factory=lambda: datetime.now().strftime("%H:%M:%S"))
@dataclass
class AgentTrace:
steps: List["AgentStep"]
final_reasoning: str
models_used: int
duration_ms: int
@dataclass
class ArbitrageResult:
best_airport: str
best_price: float
domestic_cost: float
total_cost: float
savings: float
worth_it: bool
class ActionType(Enum):
BOOK_NOW = "BOOK_NOW"
WAIT_24H = "WAIT_24H"
WAIT_WEEK = "WAIT_WEEK"
TRY_NEARBY = "TRY_NEARBY"
EMERGENCY_BOOK = "EMERGENCY_BOOK"
class Urgency(Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
@dataclass
class Decision:
action: ActionType
urgency: Urgency
explanation: str
confidence_score: float = 0.0
arbitrage: Optional[ArbitrageResult] = None
auto_book_triggered: bool = False
telegram_sent: bool = False
@dataclass
class BookingResult:
success: bool
screenshot_path: Optional[str]
error: Optional[str]
manual_instructions: str
@dataclass
class PassengerProfile:
full_name: str
email: str
phone: str
@dataclass
class IntelligenceReport:
flight: FlightPrice
history_days: int
rolling_avg: float
std_dev: float
trend_slope: float
volatility: float
surge: SurgeResult
ensemble: EnsemblePrediction
agent_trace: AgentTrace
arbitrage: Optional[ArbitrageResult]
decision: Decision
booking: Optional[BookingResult]
forecast_prices: List[float] = field(default_factory=list)
sentiment_score: float = 0.5
# ============================================================================
# LAYER 2 - PRICE PROVIDERS
# ============================================================================
class KiwiProvider:
BASE_URL = "https://api.tequila.kiwi.com"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"apikey": api_key}
def search(self, origin: str, destination: str, departure_date: str,
return_date: Optional[str], adults: int) -> List[FlightPrice]:
if not self.api_key:
return []
try:
params = {
"fly_from": origin,
"fly_to": destination,
"date_from": departure_date,
"date_to": departure_date,
"adults": adults,
"curr": "INR",
"limit": 50,
"sort": "price"
}
if return_date:
params["return_from"] = return_date
params["return_to"] = return_date
response = requests.get(
f"{self.BASE_URL}/v2/search",
headers=self.headers,
params=params,
timeout=15
)
if response.status_code != 200:
return []
data = response.json()
flights = []
for item in data.get("data", []):
route_str = f"{item.get('flyFrom', origin)} β†’ {item.get('flyTo', destination)}"
if return_date:
route_str += f" β†’ {item.get('flyFrom', origin)}"
flights.append(FlightPrice(
route=route_str,
price=float(item.get("price", 0)),
currency="INR",
airline=item.get("airlines", ["Unknown"])[0] if item.get("airlines") else "Unknown",
departure_time=item.get("local_departure", ""),
arrival_time=item.get("local_arrival", ""),
duration_min=int(item.get("duration", {}).get("total", 0) / 60) if item.get("duration") else 0,
stops=len(item.get("route", [])) - 1,
provider="Kiwi",
deep_link=item.get("deep_link", ""),
flights_found=len(data.get("data", []))
))
return flights
except Exception as e:
print(f"Kiwi API error: {e}")
return []
class AmadeusProvider:
BASE_URL = "https://test.api.amadeus.com"
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
self.token = None
self.token_expiry = None
def _get_token(self) -> bool:
if not self.api_key or not self.api_secret:
return False
if self.token and self.token_expiry and datetime.now() < self.token_expiry:
return True
try:
response = requests.post(
f"{self.BASE_URL}/v1/security/oauth2/token",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": self.api_key,
"client_secret": self.api_secret
},
timeout=10
)
if response.status_code == 200:
data = response.json()
self.token = data.get("access_token")
self.token_expiry = datetime.now() + timedelta(seconds=data.get("expires_in", 1800) - 60)
return True
return False
except Exception as e:
print(f"Amadeus auth error: {e}")
return False
def search(self, origin: str, destination: str, departure_date: str,
return_date: Optional[str], adults: int) -> List[FlightPrice]:
if not self._get_token():
return []
try:
params = {
"originLocationCode": origin,
"destinationLocationCode": destination,
"departureDate": departure_date,
"adults": adults,
"currencyCode": "INR",
"max": 50
}
if return_date:
params["returnDate"] = return_date
response = requests.get(
f"{self.BASE_URL}/v2/shopping/flight-offers",
headers={"Authorization": f"Bearer {self.token}"},
params=params,
timeout=15
)
if response.status_code != 200:
return []
data = response.json()
flights = []
for offer in data.get("data", []):
price = float(offer.get("price", {}).get("total", 0))
itineraries = offer.get("itineraries", [])
if not itineraries:
continue
first_segment = itineraries[0].get("segments", [{}])[0]
last_segment = itineraries[-1].get("segments", [{}])[-1]
total_duration = sum(
self._parse_duration(itin.get("duration", "PT0M"))
for itin in itineraries
)
total_stops = sum(
len(itin.get("segments", [])) - 1
for itin in itineraries
)
route_str = f"{origin} β†’ {destination}"
if return_date:
route_str += f" β†’ {origin}"
flights.append(FlightPrice(
route=route_str,
price=price,
currency="INR",
airline=first_segment.get("carrierCode", "Unknown"),
departure_time=first_segment.get("departure", {}).get("at", ""),
arrival_time=last_segment.get("arrival", {}).get("at", ""),
duration_min=total_duration,
stops=total_stops,
provider="Amadeus",
deep_link="",
flights_found=len(data.get("data", []))
))
return flights
except Exception as e:
print(f"Amadeus API error: {e}")
return []
def _parse_duration(self, duration_str: str) -> int:
try:
duration_str = duration_str.replace("PT", "")
hours = 0
minutes = 0
if "H" in duration_str:
parts = duration_str.split("H")
hours = int(parts[0])
duration_str = parts[1] if len(parts) > 1 else ""
if "M" in duration_str:
minutes = int(duration_str.replace("M", ""))
return hours * 60 + minutes
except:
return 0
class PriceAggregator:
def __init__(self, kiwi_key: str, amadeus_key: str, amadeus_secret: str):
self.kiwi = KiwiProvider(kiwi_key)
self.amadeus = AmadeusProvider(amadeus_key, amadeus_secret)
def get_best_price(self, origin: str, destination: str, departure_date: str,
return_date: Optional[str], adults: int) -> Optional[FlightPrice]:
all_flights = []
kiwi_flights = self.kiwi.search(origin, destination, departure_date, return_date, adults)
all_flights.extend(kiwi_flights)
amadeus_flights = self.amadeus.search(origin, destination, departure_date, return_date, adults)
all_flights.extend(amadeus_flights)
if not all_flights:
return None
deduplicated = {}
for flight in all_flights:
key = (flight.airline, round(flight.price, -2), flight.stops)
if key not in deduplicated or flight.value_score < deduplicated[key].value_score:
deduplicated[key] = flight
unique_flights = list(deduplicated.values())
unique_flights.sort(key=lambda f: f.value_score)
return unique_flights[0] if unique_flights else None
# ============================================================================
# LAYER 3 - HISTORICAL STORAGE
# ============================================================================
class PriceLogger:
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS price_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
route TEXT NOT NULL,
price REAL NOT NULL,
currency TEXT NOT NULL,
airline TEXT,
duration_min INTEGER,
stops INTEGER,
provider TEXT,
departure_date TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS bookings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
route TEXT NOT NULL,
price REAL NOT NULL,
action TEXT NOT NULL,
success INTEGER NOT NULL,
details TEXT
)
""")
conn.commit()
conn.close()
def log_price(self, flight: FlightPrice, departure_date: str):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO price_logs (timestamp, route, price, currency, airline,
duration_min, stops, provider, departure_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.now().isoformat(),
flight.route,
flight.price,
flight.currency,
flight.airline,
flight.duration_min,
flight.stops,
flight.provider,
departure_date
))
conn.commit()
conn.close()
def log_booking(self, route: str, price: float, action: str, success: bool, details: str):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO bookings (timestamp, route, price, action, success, details)
VALUES (?, ?, ?, ?, ?, ?)
""", (
datetime.now().isoformat(),
route,
price,
action,
1 if success else 0,
details
))
conn.commit()
conn.close()
def fetch_history(self, route: str, days: int) -> List[Tuple[str, float]]:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
cursor.execute("""
SELECT timestamp, price FROM price_logs
WHERE route = ? AND timestamp >= ?
ORDER BY timestamp DESC
""", (route, cutoff))
results = cursor.fetchall()
conn.close()
return results
def rolling_avg(self, route: str, days: int) -> float:
history = self.fetch_history(route, days)
if not history:
return 0.0
prices = [price for _, price in history]
return sum(prices) / len(prices)
def std_dev(self, route: str, days: int) -> float:
history = self.fetch_history(route, days)
if len(history) < 2:
return 0.0
prices = [price for _, price in history]
avg = sum(prices) / len(prices)
variance = sum((p - avg) ** 2 for p in prices) / len(prices)
return math.sqrt(variance)
def trend_slope(self, route: str, days: int) -> float:
history = self.fetch_history(route, days)
if len(history) < 2:
return 0.0
prices = [price for _, price in history]
n = len(prices)
x_values = list(range(n))
x_mean = sum(x_values) / n
y_mean = sum(prices) / n
numerator = sum((x_values[i] - x_mean) * (prices[i] - y_mean) for i in range(n))
denominator = sum((x - x_mean) ** 2 for x in x_values)
if denominator == 0:
return 0.0
return numerator / denominator
def recent_bookings(self, route: str, days: int) -> List[Dict]:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
cursor.execute("""
SELECT timestamp, price, action, success, details FROM bookings
WHERE route = ? AND timestamp >= ?
ORDER BY timestamp DESC
""", (route, cutoff))
results = []
for row in cursor.fetchall():
results.append({
"timestamp": row[0],
"price": row[1],
"action": row[2],
"success": bool(row[3]),
"details": row[4]
})
conn.close()
return results
# ============================================================================
# LAYER 4 - INTELLIGENCE
# ============================================================================
class SurgeAnalyzer:
@staticmethod
def analyze(current_price: float, avg_price: float, std_dev: float) -> SurgeResult:
if avg_price == 0 or std_dev == 0:
return SurgeResult(
level=SurgeLevel.NORMAL,
z_score=0.0,
current_price=current_price,
avg_price=avg_price,
std_dev=std_dev,
pct_vs_avg=0.0
)
z_score = (current_price - avg_price) / std_dev
pct_vs_avg = ((current_price - avg_price) / avg_price) * 100
if z_score < -0.5:
level = SurgeLevel.BELOW_AVERAGE
elif z_score < 0.5:
level = SurgeLevel.NORMAL
elif z_score < 1.5:
level = SurgeLevel.ELEVATED
else:
level = SurgeLevel.EXTREME
return SurgeResult(
level=level,
z_score=z_score,
current_price=current_price,
avg_price=avg_price,
std_dev=std_dev,
pct_vs_avg=pct_vs_avg
)
# ────────────────────────────────────────────────────────────────────────────
# MODEL 0 – Volatility helper (shared)
# ────────────────────────────────────────────────────────────────────────────
def _volatility(std_dev: float, avg_price: float) -> float:
if avg_price == 0:
return 0.0
return (std_dev / avg_price) * 100
# ────────────────────────────────────────────────────────────────────────────
# MODEL UTILITY – HuggingFace Inference API client (no torch required)
# ────────────────────────────────────────────────────────────────────────────
class HFInferenceClient:
BASE_URL = "https://api-inference.huggingface.co/models"
def __init__(self, hf_token: str = ""):
self.hf_token = hf_token or os.getenv("HF_TOKEN", "")
self.headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else {}
def query(self, model_id: str, payload: Dict, timeout: int = 20) -> Optional[Any]:
try:
resp = requests.post(
f"{self.BASE_URL}/{model_id}",
headers=self.headers,
json=payload,
timeout=timeout
)
if resp.status_code == 200:
return resp.json()
return None
except Exception:
return None
# ────────────────────────────────────────────────────────────────────────────
# MODEL 1 – Heuristic rule-based predictor (always available, baseline)
# ────────────────────────────────────────────────────────────────────────────
class HeuristicPredictor:
def predict(self, days_to_departure: int, surge_z: float,
volatility: float, trend_slope: float) -> ModelPrediction:
base = 50.0
if days_to_departure <= 3: base -= 30
elif days_to_departure <= 7: base -= 15
elif days_to_departure <= 14: base += 5
elif days_to_departure <= 30: base += 15
else: base += 10
if surge_z > 1.5: base += 25
elif surge_z > 0.5: base += 10
elif surge_z < -0.5: base -= 20
if volatility > 20: base += 15
elif volatility > 10: base += 5
if trend_slope > 50: base += 20
elif trend_slope > 0: base += 10
elif trend_slope < -50: base -= 15
prob = max(0.0, min(100.0, base))
drop_pct = 12.0 if prob >= 70 else 8.0 if prob >= 50 else 4.0
conf = 0.55
return ModelPrediction(
model_name="Heuristic-V2",
drop_probability=prob,
drop_pct=drop_pct,
confidence=conf,
reasoning=(
f"days={days_to_departure}, Z={surge_z:.2f}, "
f"vol={volatility:.1f}%, slope={trend_slope:+.1f}"
)
)
# ────────────────────────────────────────────────────────────────────────────
# MODEL 2 – sklearn RandomForest + GradientBoosting ensemble
# ────────────────────────────────────────────────────────────────────────────
class SklearnEnsemblePredictor:
def __init__(self, model_path: Optional[str] = None):
self.rf = None
self.gb = None
self._init(model_path)
def _init(self, model_path: Optional[str]):
if model_path and os.path.exists(model_path):
try:
with open(model_path, "rb") as f:
saved = pickle.load(f)
self.rf = saved.get("rf")
self.gb = saved.get("gb")
return
except Exception:
pass
self._train_synthetic()
def _train_synthetic(self):
try:
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
import numpy as np
rng = np.random.default_rng(42)
n = 600
days = rng.integers(1, 91, n).astype(float)
sz = rng.uniform(-2.5, 3.0, n)
vol = rng.uniform(0, 45, n)
slope = rng.uniform(-120, 200, n)
labels = (
((days > 7) & (sz > 0.5) & (slope > 0)) |
(sz > 1.5) |
((vol > 20) & (slope > 30))
).astype(int)
X = np.column_stack([days, sz, vol, slope])
self.rf = RandomForestClassifier(n_estimators=80, max_depth=6, random_state=42)
self.rf.fit(X, labels)
self.gb = GradientBoostingClassifier(n_estimators=80, max_depth=4, random_state=42)
self.gb.fit(X, labels)
except Exception as e:
self.rf = None
self.gb = None
def predict(self, days_to_departure: int, surge_z: float,
volatility: float, trend_slope: float) -> ModelPrediction:
if self.rf is None:
return ModelPrediction(
model_name="sklearn-RF+GB",
drop_probability=50.0, drop_pct=6.0,
confidence=0.30, reasoning="sklearn unavailable – using fallback"
)
try:
import numpy as np
X = np.array([[float(days_to_departure), surge_z, volatility, trend_slope]])
rf_p = float(self.rf.predict_proba(X)[0][1]) * 100
gb_p = float(self.gb.predict_proba(X)[0][1]) * 100
avg_p = (rf_p + gb_p) / 2.0
return ModelPrediction(
model_name="sklearn-RF+GB",
drop_probability=avg_p,
drop_pct=10.0 if avg_p >= 60 else 6.0,
confidence=0.72,
reasoning=f"RF={rf_p:.1f}%, GB={gb_p:.1f}%, ensemble={avg_p:.1f}%"
)
except Exception as e:
return ModelPrediction(
model_name="sklearn-RF+GB",
drop_probability=50.0, drop_pct=6.0,
confidence=0.30, reasoning=f"predict error: {e}"
)
# ────────────────────────────────────────────────────────────────────────────
# MODEL 3 – HuggingFace Sentiment (ProsusAI/finbert β†’ financial NLP)
# Converts numeric signals β†’ text β†’ sentiment β†’ buy/wait probability
# ────────────────────────────────────────────────────────────────────────────
class SentimentAnalyzer:
PRIMARY_MODEL = "ProsusAI/finbert"
FALLBACK_MODEL = "distilbert-base-uncased-finetuned-sst-2-english"
def __init__(self, hf_client: HFInferenceClient):
self.hf = hf_client
def analyze(self, flight: FlightPrice, surge: SurgeResult,
trend_slope: float, days_to_departure: int) -> ModelPrediction:
text = self._build_text(flight, surge, trend_slope, days_to_departure)
raw = self.hf.query(self.PRIMARY_MODEL, {"inputs": text})
if not raw:
raw = self.hf.query(self.FALLBACK_MODEL, {"inputs": text})
buy_signal = self._extract_buy_signal(raw) if raw else self._heuristic_buy(surge, trend_slope)
drop_prob = 100.0 - (buy_signal * 100.0)
return ModelPrediction(
model_name="HF-Sentiment(finbert)",
drop_probability=drop_prob,
drop_pct=7.0 if drop_prob > 60 else 3.5,
confidence=0.58 if raw else 0.40,
reasoning=(
f"buy_signal={buy_signal:.2f}, text='{text[:60]}...'"
if raw else f"heuristic_buy={buy_signal:.2f} (HF API unavailable)"
)
)
def _build_text(self, flight: FlightPrice, surge: SurgeResult,
trend_slope: float, days_to_departure: int) -> str:
trend_dir = "rising" if trend_slope > 0 else "declining"
level = surge.level.name.replace("_", " ").lower()
return (
f"Airline ticket price {flight.price:.0f} INR on {flight.route}. "
f"Price is {level} at {surge.pct_vs_avg:+.1f}% deviation from average. "
f"14-day trend is {trend_dir} by {abs(trend_slope):.1f} INR per day. "
f"Departure in {days_to_departure} days. "
f"{'Last minute urgent booking.' if days_to_departure <= 5 else ''}"
)
@staticmethod
def _extract_buy_signal(result: Any) -> float:
try:
items = result[0] if isinstance(result[0], list) else result
for item in items:
label = item.get("label", "").lower()
score = float(item.get("score", 0.5))
if "positive" in label or label == "pos":
return score
if "negative" in label or label == "neg":
return 1.0 - score
return 0.5
except Exception:
return 0.5
@staticmethod
def _heuristic_buy(surge: SurgeResult, trend_slope: float) -> float:
s = 0.5
if surge.level == SurgeLevel.BELOW_AVERAGE: s += 0.3
elif surge.level == SurgeLevel.EXTREME: s -= 0.3
if trend_slope < 0: s += 0.1
elif trend_slope > 0: s -= 0.1
return max(0.0, min(1.0, s))
# ────────────────────────────────────────────────────────────────────────────
# MODEL 4 – Chronos / Linear time-series forecaster
# Uses amazon/chronos-t5-tiny via HF Inference API; falls back to OLS
# ────────────────────────────────────────────────────────────────────────────
class ChronosForecaster:
MODEL_ID = "amazon/chronos-t5-tiny"
HORIZON = 7
def __init__(self, hf_client: HFInferenceClient):
self.hf = hf_client
def forecast(self, history: List[float]) -> List[float]:
if len(history) < 4:
return []
raw = self.hf.query(
self.MODEL_ID,
{"inputs": history[-20:], "parameters": {"prediction_length": self.HORIZON}},
timeout=25
)
if raw and isinstance(raw, list) and len(raw) >= self.HORIZON:
try:
return [float(v) for v in raw[:self.HORIZON]]
except Exception:
pass
return self._ols_forecast(history, self.HORIZON)
def predict_model(self, history: List[float], current_price: float) -> ModelPrediction:
fc = self.forecast(history)
if not fc:
return ModelPrediction(
model_name="Chronos-Forecast",
drop_probability=50.0, drop_pct=5.0,
confidence=0.35, reasoning="Insufficient history for forecast"
)
fc_avg = sum(fc) / len(fc)
fc_min = min(fc)
proj_drop_pct = ((current_price - fc_avg) / current_price * 100) if current_price > 0 else 0.0
drop_prob = max(0.0, min(100.0, 50.0 + proj_drop_pct * 2.0))
return ModelPrediction(
model_name="Chronos-Forecast",
drop_probability=drop_prob,
drop_pct=abs(proj_drop_pct) if proj_drop_pct > 0 else 3.0,
confidence=0.65,
reasoning=(
f"7-day avg=β‚Ή{fc_avg:.0f}, min=β‚Ή{fc_min:.0f}, "
f"now=β‚Ή{current_price:.0f}, proj_drop={proj_drop_pct:.1f}%"
)
)
@staticmethod
def _ols_forecast(prices: List[float], horizon: int) -> List[float]:
n = len(prices)
x_mean = (n - 1) / 2.0
y_mean = sum(prices) / n
num = sum((i - x_mean) * (prices[i] - y_mean) for i in range(n))
den = sum((i - x_mean) ** 2 for i in range(n))
slope = num / den if den != 0 else 0.0
intercept = y_mean - slope * x_mean
return [intercept + slope * (n + i) for i in range(horizon)]
# ────────────────────────────────────────────────────────────────────────────
# ENSEMBLE – Weighted combination of all 4 models
# ────────────────────────────────────────────────────────────────────────────
class MultiModelEnsemble:
WEIGHTS: Dict[str, float] = {
"Heuristic-V2": 0.15,
"sklearn-RF+GB": 0.35,
"HF-Sentiment(finbert)": 0.20,
"Chronos-Forecast": 0.30,
}
def __init__(self, hf_token: str = "", model_path: Optional[str] = None):
self.hf_client = HFInferenceClient(hf_token)
self.heuristic = HeuristicPredictor()
self.sklearn_mdl = SklearnEnsemblePredictor(model_path)
self.sentiment = SentimentAnalyzer(self.hf_client)
self.chronos = ChronosForecaster(self.hf_client)
def predict(self, flight: FlightPrice, surge: SurgeResult,
days_to_departure: int, trend_slope: float,
volatility: float, history_prices: List[float]) -> EnsemblePrediction:
models: List[ModelPrediction] = []
m1 = self.heuristic.predict(days_to_departure, surge.z_score, volatility, trend_slope)
models.append(m1)
m2 = self.sklearn_mdl.predict(days_to_departure, surge.z_score, volatility, trend_slope)
models.append(m2)
m3 = self.sentiment.analyze(flight, surge, trend_slope, days_to_departure)
models.append(m3)
m4 = self.chronos.predict_model(history_prices, flight.price)
models.append(m4)
total_w = sum(self.WEIGHTS.get(m.model_name, 0.25) for m in models)
w_prob = sum(m.drop_probability * self.WEIGHTS.get(m.model_name, 0.25) for m in models) / total_w
w_drop = sum(m.drop_pct * self.WEIGHTS.get(m.model_name, 0.25) for m in models) / total_w
w_conf = sum(m.confidence * self.WEIGHTS.get(m.model_name, 0.25) for m in models) / total_w
dominant = max(models, key=lambda m: m.confidence)
urgency = "HIGH" if w_prob >= 70 else "MEDIUM" if w_prob >= 50 else "LOW"
return EnsemblePrediction(
models=models,
final_drop_probability=round(w_prob, 1),
final_drop_pct=round(w_drop, 1),
final_confidence=round(w_conf, 3),
dominant_model=dominant.model_name,
explanation=(
f"4-model ensemble β†’ H={m1.drop_probability:.0f}%, "
f"SK={m2.drop_probability:.0f}%, "
f"Sent={m3.drop_probability:.0f}%, "
f"Chron={m4.drop_probability:.0f}%"
),
urgency=urgency
)
# ============================================================================
# LAYER 5 - AGENTIC CORE (ReAct: Reason + Act loop)
# ============================================================================
class AgentMemory:
def __init__(self):
self.facts: Dict[str, Any] = {}
self.observations: List[str] = []
def remember(self, key: str, value: Any):
self.facts[key] = value
def recall(self, key: str) -> Optional[Any]:
return self.facts.get(key)
def observe(self, text: str):
self.observations.append(text)
def context(self) -> str:
return " | ".join(self.observations[-5:])
class ReActAgent:
"""
Deterministic ReAct (Reasoning + Acting) agent.
Iterates through structured tool-call steps, records Thought/Action/Observation
triples, and synthesises a final natural-language recommendation.
No LLM API required β€” fully deterministic and explainable.
"""
MAX_STEPS = 8
def __init__(self):
self.memory: AgentMemory = AgentMemory()
self.steps: List[AgentStep] = []
self._n = 0
def _step(self, thought: str, action: str, observation: str) -> AgentStep:
self._n += 1
s = AgentStep(step_num=self._n, thought=thought, action=action, observation=observation)
self.steps.append(s)
self.memory.observe(observation)
return s
def analyze(self,
flight: FlightPrice,
surge: SurgeResult,
ensemble: EnsemblePrediction,
arbitrage: Optional[ArbitrageResult],
days_to_departure: int,
history_prices: List[float],
trend_slope: float,
volatility: float,
sentiment_score: float,
forecast: List[float]) -> AgentTrace:
t0 = time.time()
self.steps = []
self.memory = AgentMemory()
self._n = 0
# ── Step 1: Price vs History ──────────────────────────────────────────
self._step(
thought=f"I need to evaluate β‚Ή{flight.price:,.0f} for {flight.route} against historical baseline.",
action="assess_price_vs_history",
observation=(
f"Price is {surge.pct_vs_avg:+.1f}% vs 14-day avg β‚Ή{surge.avg_price:,.0f}. "
f"Surge level: {surge.level.name} (Z={surge.z_score:.2f}). "
f"Std-dev: β‚Ή{surge.std_dev:,.0f}."
)
)
self.memory.remember("surge_level", surge.level)
# ── Step 2: Trend analysis ────────────────────────────────────────────
trend_label = "RISING ↑" if trend_slope > 20 else "FALLING ↓" if trend_slope < -20 else "STABLE β†’"
self._step(
thought="Trend direction tells me whether waiting could yield lower prices.",
action="analyze_14day_trend",
observation=(
f"Slope: {trend_slope:+.1f} INR/day β†’ {trend_label}. "
f"Volatility (CoV): {volatility:.1f}%. "
f"{'High volatility β€” market is unpredictable.' if volatility > 15 else 'Low volatility β€” stable pricing.'}"
)
)
self.memory.remember("trend", trend_label)
# ── Step 3: Ensemble model results ───────────────────────────────────
dom_reasoning = next(
(m.reasoning for m in ensemble.models if m.model_name == ensemble.dominant_model), ""
)
self._step(
thought=f"Running {len(ensemble.models)} ML models to estimate price-drop probability.",
action="run_4model_ensemble",
observation=(
f"Ensemble: {ensemble.final_drop_probability:.0f}% drop probability, "
f"est. drop {ensemble.final_drop_pct:.1f}%, confidence {ensemble.final_confidence:.0%}. "
f"Dominant: {ensemble.dominant_model} β†’ {dom_reasoning}."
)
)
self.memory.remember("drop_prob", ensemble.final_drop_probability)
# ── Step 4: Sentiment signal ─────────────────────────────────────────
sent_label = (
"BULLISH 🟒 (good time to buy)" if sentiment_score > 0.6
else "BEARISH πŸ”΄ (prices may drop)" if sentiment_score < 0.4
else "NEUTRAL 🟑"
)
self._step(
thought="HuggingFace finbert sentiment on numeric-to-text conversion provides market feel.",
action="query_hf_sentiment_model",
observation=f"Sentiment score: {sentiment_score:.2f} β†’ {sent_label}."
)
self.memory.remember("sentiment", sent_label)
# ── Step 5: 7-day price forecast ─────────────────────────────────────
if forecast:
fc_min = min(forecast)
fc_trend = "declining πŸ“‰" if forecast[-1] < forecast[0] else "rising πŸ“ˆ"
self._step(
thought="Chronos OLS 7-day forecast reveals expected price direction.",
action="fetch_chronos_forecast",
observation=(
f"7-day forecast {fc_trend}. Lowest projected: β‚Ή{fc_min:,.0f}. "
f"Current: β‚Ή{flight.price:,.0f}. "
f"{'Waiting may save money.' if fc_min < flight.price * 0.95 else 'Minimal savings expected from waiting.'}"
)
)
# ── Step 6: Arbitrage check ───────────────────────────────────────────
if arbitrage and arbitrage.worth_it:
self._step(
thought=f"Arbitrage option found via {arbitrage.best_airport}. Evaluating true net saving.",
action="evaluate_arbitrage_opportunity",
observation=(
f"Flight from {arbitrage.best_airport}: β‚Ή{arbitrage.best_price:,.0f} + "
f"domestic β‚Ή{arbitrage.domestic_cost:,.0f} = β‚Ή{arbitrage.total_cost:,.0f}. "
f"Net saving: β‚Ή{arbitrage.savings:,.0f} ({arbitrage.savings/flight.price*100:.1f}%). "
f"{'Strong arbitrage β€” recommend alternate route.' if arbitrage.savings > 5000 else 'Moderate arbitrage β€” consider if flexible.'}"
)
)
# ── Step 7: Time-pressure assessment ─────────────────────────────────
urgency_label = (
"🚨 CRITICAL (<7 days) β€” prices spike sharply near departure" if days_to_departure <= 7
else "⚠️ MODERATE (1-4 weeks) β€” decide soon" if days_to_departure <= 28
else "βœ… RELAXED (>4 weeks) β€” monitor for better price"
)
self._step(
thought=f"Time window ({days_to_departure} days) is a key factor in urgency.",
action="assess_departure_window",
observation=f"{days_to_departure} days to departure β†’ {urgency_label}."
)
# ── Step 8: Synthesize ────────────────────────────────────────────────
signals = []
if surge.level in (SurgeLevel.BELOW_AVERAGE, SurgeLevel.NORMAL):
signals.append(f"βœ… Price at/below average ({surge.level.name})")
else:
signals.append(f"⚠️ Price elevated ({surge.level.name})")
if ensemble.final_drop_probability > 60:
signals.append(f"⚠️ {ensemble.final_drop_probability:.0f}% drop probability β€” consider waiting")
else:
signals.append(f"βœ… Low drop probability ({ensemble.final_drop_probability:.0f}%)")
if trend_slope < -20:
signals.append("⚠️ Price trend declining β€” waiting may save money")
elif trend_slope > 20:
signals.append("βœ… Prices rising β€” book now beats higher future cost")
if days_to_departure <= 7:
signals.append("🚨 Departure imminent β€” book immediately")
final_reasoning = (
f"After consulting {len(ensemble.models)} AI models (sklearn RF/GB, HF finbert, "
f"Chronos forecast, heuristic), analysing {days_to_departure}-day horizon, "
f"trend '{trend_label}', and market sentiment '{sent_label}': "
+ " | ".join(signals)
)
self._step(
thought="All signals gathered. Synthesising final recommendation.",
action="synthesize_recommendation",
observation=final_reasoning
)
return AgentTrace(
steps=self.steps,
final_reasoning=final_reasoning,
models_used=len(ensemble.models),
duration_ms=int((time.time() - t0) * 1000)
)
# ============================================================================
# LAYER 6 - ARBITRAGE AGENT
# ============================================================================
class ArbitrageAgent:
AIRPORTS = {
"TRZ": {"name": "Trichy", "domestic_cost": 0},
"MAA": {"name": "Chennai", "domestic_cost": 1200},
"CJB": {"name": "Coimbatore", "domestic_cost": 800},
"BLR": {"name": "Bangalore", "domestic_cost": 2800},
"HYD": {"name": "Hyderabad", "domestic_cost": 3500}
}
MIN_SAVINGS = 2000
def __init__(self, aggregator: PriceAggregator):
self.aggregator = aggregator
def find_best_route(self, origin: str, destination: str, departure_date: str,
return_date: Optional[str], adults: int,
current_price: float) -> Optional[ArbitrageResult]:
best_result = None
for airport_code, airport_info in self.AIRPORTS.items():
if airport_code == origin:
continue
alt_flight = self.aggregator.get_best_price(
airport_code, destination, departure_date, return_date, adults
)
if not alt_flight:
continue
domestic_cost = airport_info["domestic_cost"]
total_cost = alt_flight.price + domestic_cost
savings = current_price - total_cost
if savings >= self.MIN_SAVINGS:
if not best_result or savings > best_result.savings:
best_result = ArbitrageResult(
best_airport=airport_code,
best_price=alt_flight.price,
domestic_cost=domestic_cost,
total_cost=total_cost,
savings=savings,
worth_it=True
)
return best_result
# ============================================================================
# LAYER 7 - DECISION ENGINE
# ============================================================================
class DecisionEngine:
@staticmethod
def decide(flight: FlightPrice, surge: SurgeResult, ensemble: EnsemblePrediction,
arbitrage: Optional[ArbitrageResult], days_to_departure: int,
emergency_mode: bool, rolling_avg: float) -> Decision:
drop_prob = ensemble.final_drop_probability
conf = ensemble.final_confidence
if emergency_mode:
return Decision(
action=ActionType.EMERGENCY_BOOK, urgency=Urgency.HIGH,
confidence_score=1.0,
explanation="Emergency mode activated β€” booking immediately regardless of price.",
arbitrage=arbitrage
)
if days_to_departure <= 5:
return Decision(
action=ActionType.BOOK_NOW, urgency=Urgency.HIGH,
confidence_score=0.95,
explanation=(
f"Only {days_to_departure} days to departure β€” last-minute prices typically spike. "
f"Ensemble confidence: {conf:.0%}."
),
arbitrage=arbitrage
)
if arbitrage and arbitrage.worth_it and arbitrage.savings / flight.price > 0.10:
return Decision(
action=ActionType.TRY_NEARBY, urgency=Urgency.HIGH,
confidence_score=0.88,
explanation=(
f"Arbitrage: save β‚Ή{arbitrage.savings:,.0f} ({arbitrage.savings/flight.price*100:.1f}%) "
f"flying from {arbitrage.best_airport} instead."
),
arbitrage=arbitrage
)
if rolling_avg > 0 and flight.price < rolling_avg * 0.95:
return Decision(
action=ActionType.BOOK_NOW, urgency=Urgency.HIGH,
confidence_score=round(0.80 + conf * 0.15, 3),
explanation=(
f"Price is {abs(surge.pct_vs_avg):.1f}% below 14-day avg β€” "
f"excellent deal confirmed by {ensemble.dominant_model} (drop_prob={drop_prob:.0f}%)."
),
arbitrage=arbitrage
)
if surge.level == SurgeLevel.EXTREME and drop_prob >= 60:
return Decision(
action=ActionType.WAIT_WEEK, urgency=Urgency.MEDIUM,
confidence_score=round(conf * drop_prob / 100, 3),
explanation=(
f"Extreme surge (Z={surge.z_score:.2f}) + {drop_prob:.0f}% ensemble drop probability "
f"β†’ wait for correction. Dominant signal: {ensemble.dominant_model}."
),
arbitrage=arbitrage
)
if surge.level == SurgeLevel.ELEVATED and drop_prob >= 45:
return Decision(
action=ActionType.WAIT_24H, urgency=Urgency.LOW,
confidence_score=round(conf * 0.7, 3),
explanation=(
f"Elevated price + {drop_prob:.0f}% drop probability "
f"β†’ monitor for 24 hours. {ensemble.explanation}."
),
arbitrage=arbitrage
)
if surge.level == SurgeLevel.BELOW_AVERAGE:
return Decision(
action=ActionType.BOOK_NOW, urgency=Urgency.HIGH,
confidence_score=round(0.75 + conf * 0.20, 3),
explanation=(
f"Below-average price ({surge.pct_vs_avg:+.1f}%) β€” strong buy signal. "
f"All models agree: drop_prob only {drop_prob:.0f}%."
),
arbitrage=arbitrage
)
return Decision(
action=ActionType.BOOK_NOW, urgency=Urgency.MEDIUM,
confidence_score=round(conf * 0.6, 3),
explanation=(
f"Normal market conditions. Ensemble: {ensemble.explanation}. "
f"Book when ready."
),
arbitrage=arbitrage
)
# ============================================================================
# LAYER 8 - AUTONOMOUS BOOKING
# ============================================================================
class BookingAgent:
def __init__(self):
self.playwright_available = self._check_playwright()
def _check_playwright(self) -> bool:
try:
from playwright.sync_api import sync_playwright
return True
except ImportError:
return False
def book(self, flight: FlightPrice, passenger: PassengerProfile) -> BookingResult:
if not self.playwright_available:
return BookingResult(
success=False,
screenshot_path=None,
error="Playwright not installed",
manual_instructions=self.instructions(flight, passenger)
)
try:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(flight.deep_link if flight.deep_link else "https://www.google.com/flights")
page.wait_for_timeout(3000)
selectors_first_name = [
"input[name='firstName']",
"input[placeholder*='First']",
"input[id*='first']",
"input.first-name"
]
selectors_last_name = [
"input[name='lastName']",
"input[placeholder*='Last']",
"input[id*='last']",
"input.last-name"
]
selectors_email = [
"input[type='email']",
"input[name='email']",
"input[placeholder*='email']"
]
selectors_phone = [
"input[type='tel']",
"input[name='phone']",
"input[placeholder*='phone']"
]
name_parts = passenger.full_name.split(maxsplit=1)
first_name = name_parts[0] if name_parts else passenger.full_name
last_name = name_parts[1] if len(name_parts) > 1 else ""
for selector in selectors_first_name:
try:
page.fill(selector, first_name, timeout=2000)
break
except:
continue
for selector in selectors_last_name:
try:
page.fill(selector, last_name, timeout=2000)
break
except:
continue
for selector in selectors_email:
try:
page.fill(selector, passenger.email, timeout=2000)
break
except:
continue
for selector in selectors_phone:
try:
page.fill(selector, passenger.phone, timeout=2000)
break
except:
continue
screenshot_path = f"booking_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
page.screenshot(path=screenshot_path)
browser.close()
return BookingResult(
success=True,
screenshot_path=screenshot_path,
error=None,
manual_instructions=self.instructions(flight, passenger)
)
except Exception as e:
return BookingResult(
success=False,
screenshot_path=None,
error=str(e),
manual_instructions=self.instructions(flight, passenger)
)
def instructions(self, flight: FlightPrice, passenger: PassengerProfile) -> str:
instructions = f"""
MANUAL BOOKING INSTRUCTIONS
{'=' * 50}
1. Open booking link:
{flight.deep_link if flight.deep_link else 'Search manually on flight booking site'}
2. Fill passenger details:
Name: {passenger.full_name}
Email: {passenger.email}
Phone: {passenger.phone}
3. Review flight details:
Route: {flight.route}
Price: β‚Ή{flight.price:,.0f}
Airline: {flight.airline}
Departure: {flight.departure_time}
4. Proceed to payment (MANUAL STEP - system stops here)
5. Complete payment using your preferred method
{'=' * 50}
"""
return instructions
# ============================================================================
# LAYER 9 - TELEGRAM NOTIFIER
# ============================================================================
class TelegramNotifier:
def __init__(self, token: str, chat_id: str):
self.token = token
self.chat_id = chat_id
self.enabled = bool(token and chat_id)
def should_alert(self, decision: Decision, surge: SurgeResult,
arbitrage: Optional[ArbitrageResult], monitoring: bool) -> bool:
if not self.enabled:
return False
if monitoring:
return True
if decision.action in [ActionType.BOOK_NOW, ActionType.EMERGENCY_BOOK, ActionType.TRY_NEARBY]:
return True
if surge.level == SurgeLevel.EXTREME:
return True
if arbitrage and arbitrage.savings >= 5000:
return True
if surge.level == SurgeLevel.BELOW_AVERAGE:
return True
return False
def send(self, report: IntelligenceReport) -> bool:
if not self.enabled:
return False
try:
message = self._format_message(report)
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
payload = {
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown"
}
response = requests.post(url, json=payload, timeout=10)
return response.status_code == 200
except Exception as e:
print(f"Telegram error: {e}")
return False
def _format_message(self, report: IntelligenceReport) -> str:
f = report.flight
s = report.surge
e = report.ensemble
d = report.decision
model_lines = "\n".join(
f" β€’ {m.model_name}: {m.drop_probability:.0f}% drop | conf {m.confidence:.0%}"
for m in e.models
)
message = (
f"\U0001f6eb *SkyGuardian AI v2.0 Alert*\n\n"
f"*Flight Found*\n"
f"{f.route}\n"
f"\U0001f4b0 \u20b9{f.price:,.0f} | {f.airline}\n"
f"\u23f1 {f.duration_min // 60}h {f.duration_min % 60}m | {f.stops} stop(s)\n\n"
f"*Surge Analysis*\n"
f"{s.level.value} (Z={s.z_score:.2f}, {s.pct_vs_avg:+.1f}% vs avg)\n\n"
f"*4-Model Ensemble*\n"
f"{model_lines}\n"
f"\U0001f9e0 Final: {e.final_drop_probability:.0f}% drop probability "
f"(conf {e.final_confidence:.0%})\n\n"
f"*Decision*\n"
f"\U0001f3af {d.action.value} ({d.urgency.value}) | confidence {d.confidence_score:.0%}\n"
f"{d.explanation}\n"
)
if d.arbitrage and d.arbitrage.worth_it:
message += f"\n\U0001f4a1 Arbitrage: Save \u20b9{d.arbitrage.savings:,.0f} via {d.arbitrage.best_airport}"
if d.auto_book_triggered:
message += "\n\n\u2705 Auto-booking triggered"
return message
# ============================================================================
# LAYER 10 - MASTER ORCHESTRATOR
# ============================================================================
class Orchestrator:
def __init__(self, kiwi_key: str, amadeus_key: str, amadeus_secret: str,
telegram_token: str, telegram_chat_id: str, db_path: str,
hf_token: str = "", model_path: Optional[str] = None):
self.aggregator = PriceAggregator(kiwi_key, amadeus_key, amadeus_secret)
self.logger = PriceLogger(db_path)
self.arbitrage_agent = ArbitrageAgent(self.aggregator)
self.ensemble = MultiModelEnsemble(hf_token, model_path)
self.react_agent = ReActAgent()
self.booking_agent = BookingAgent()
self.notifier = TelegramNotifier(telegram_token, telegram_chat_id)
def run(self, origin: str, destination: str, departure_date: str,
return_date: Optional[str], adults: int, emergency_mode: bool,
monitoring_mode: bool, passenger: Optional[PassengerProfile],
auto_book_threshold: float) -> IntelligenceReport:
# ── Step 1: Validate ──────────────────────────────────────────────────
if not origin or not destination or not departure_date:
raise ValueError("Origin, destination, and departure date are required")
# ── Step 2: Fetch live prices ─────────────────────────────────────────
flight = self.aggregator.get_best_price(
origin, destination, departure_date, return_date, adults
)
if not flight:
raise ValueError("No flights found β€” check API keys and route availability")
# ── Step 3: Log to SQLite ─────────────────────────────────────────────
self.logger.log_price(flight, departure_date)
# ── Step 4: Compute 14-day history stats ──────────────────────────────
history = self.logger.fetch_history(flight.route, 14)
rolling_avg = self.logger.rolling_avg(flight.route, 14)
std_dev_val = self.logger.std_dev(flight.route, 14)
trend_slope = self.logger.trend_slope(flight.route, 14)
vol = _volatility(std_dev_val, rolling_avg)
history_prices = [p for _, p in history]
# ── Step 5: Surge analysis ────────────────────────────────────────────
surge = SurgeAnalyzer.analyze(flight.price, rolling_avg, std_dev_val)
# ── Step 6: Days to departure ─────────────────────────────────────────
try:
days_to_departure = (datetime.strptime(departure_date, "%Y-%m-%d") - datetime.now()).days
except Exception:
days_to_departure = 30
# ── Step 7: Multi-model ensemble prediction ───────────────────────────
ensemble_result = self.ensemble.predict(
flight=flight,
surge=surge,
days_to_departure=days_to_departure,
trend_slope=trend_slope,
volatility=vol,
history_prices=history_prices
)
# ── Step 7b: Get forecast + sentiment score for agent ─────────────────
forecast = self.ensemble.chronos.forecast(history_prices)
sent_model = next(
(m for m in ensemble_result.models if "Sentiment" in m.model_name), None
)
sentiment_score = (100.0 - (sent_model.drop_probability if sent_model else 50.0)) / 100.0
# ── Step 8: Arbitrage check ───────────────────────────────────────────
arbitrage = self.arbitrage_agent.find_best_route(
origin, destination, departure_date, return_date, adults, flight.price
)
# ── Step 9: ReAct agent reasoning trace ──────────────────────────────
agent_trace = self.react_agent.analyze(
flight=flight,
surge=surge,
ensemble=ensemble_result,
arbitrage=arbitrage,
days_to_departure=days_to_departure,
history_prices=history_prices,
trend_slope=trend_slope,
volatility=vol,
sentiment_score=sentiment_score,
forecast=forecast
)
# ── Step 10: Decision engine ──────────────────────────────────────────
decision = DecisionEngine.decide(
flight, surge, ensemble_result, arbitrage,
days_to_departure, emergency_mode, rolling_avg
)
# ── Step 11: Auto-booking ─────────────────────────────────────────────
booking_result = None
if (auto_book_threshold > 0
and flight.price <= auto_book_threshold
and decision.action in [ActionType.BOOK_NOW, ActionType.EMERGENCY_BOOK, ActionType.TRY_NEARBY]
and passenger is not None):
booking_result = self.booking_agent.book(flight, passenger)
decision.auto_book_triggered = True
self.logger.log_booking(
route=flight.route, price=flight.price,
action=decision.action.value, success=booking_result.success,
details=booking_result.error or "Auto-booking attempted"
)
# ── Step 12: Build report ─────────────────────────────────────────────
report = IntelligenceReport(
flight=flight,
history_days=len(history),
rolling_avg=rolling_avg,
std_dev=std_dev_val,
trend_slope=trend_slope,
volatility=vol,
surge=surge,
ensemble=ensemble_result,
agent_trace=agent_trace,
arbitrage=arbitrage,
decision=decision,
booking=booking_result,
forecast_prices=forecast,
sentiment_score=sentiment_score
)
# ── Step 13: Telegram alert ───────────────────────────────────────────
if self.notifier.should_alert(decision, surge, arbitrage, monitoring_mode):
decision.telegram_sent = self.notifier.send(report)
return report
# ============================================================================
# OUTPUT FORMATTERS
# ============================================================================
def _fmt_status(report: IntelligenceReport) -> str:
e = report.ensemble
d = report.decision
return (
f"✈️ β‚Ή{report.flight.price:,.0f} | {report.surge.level.value} | "
f"Ensemble drop: {e.final_drop_probability:.0f}% (conf {e.final_confidence:.0%}) | "
f"Decision: {d.action.value} [{d.urgency.value}] | "
f"Confidence: {d.confidence_score:.0%} | "
f"Alert: {'βœ…' if d.telegram_sent else '❌'} | "
f"Auto-book: {'βœ…' if d.auto_book_triggered else '❌'} | "
f"Agent steps: {len(report.agent_trace.steps)} "
f"({report.agent_trace.duration_ms}ms)"
)
def _fmt_price(flight: FlightPrice) -> str:
return (
f"## ✈️ Best Flight Found\n\n"
f"| Field | Value |\n|-------|-------|\n"
f"| **Route** | {flight.route} |\n"
f"| **Price** | β‚Ή{flight.price:,.0f} {flight.currency} |\n"
f"| **Airline** | {flight.airline} |\n"
f"| **Duration** | {flight.duration_min // 60}h {flight.duration_min % 60}m |\n"
f"| **Stops** | {flight.stops} |\n"
f"| **Provider** | {flight.provider} |\n"
f"| **Flights Found** | {flight.flights_found} |\n"
f"| **Departure** | {flight.departure_time} |\n"
f"| **Value Score** | {flight.value_score:,.0f} |\n"
)
def _fmt_intel(report: IntelligenceReport) -> str:
trend_emoji = "πŸ“ˆ" if report.trend_slope > 0 else "πŸ“‰" if report.trend_slope < 0 else "➑️"
e = report.ensemble
fc_line = ""
if report.forecast_prices:
fc_vals = ", ".join(f"β‚Ή{p:,.0f}" for p in report.forecast_prices[:5])
fc_line = f"| **7-Day Forecast** | {fc_vals}... |\n"
return (
f"## πŸ“Š Market Intelligence\n\n"
f"| Metric | Value |\n|--------|-------|\n"
f"| **14-Day Average** | β‚Ή{report.rolling_avg:,.0f} |\n"
f"| **Std Deviation** | β‚Ή{report.std_dev:,.0f} |\n"
f"| **Price vs Avg** | {report.surge.pct_vs_avg:+.1f}% |\n"
f"| **Trend** | {trend_emoji} {report.trend_slope:+.1f} INR/day |\n"
f"| **Volatility (CoV)** | {report.volatility:.1f}% |\n"
f"| **History Points** | {report.history_days} |\n"
f"| **Surge Level** | {report.surge.level.value} (Z={report.surge.z_score:.2f}) |\n"
f"| **Sentiment Score** | {report.sentiment_score:.2f} "
f"({'BULLISH 🟒' if report.sentiment_score > 0.6 else 'BEARISH πŸ”΄' if report.sentiment_score < 0.4 else 'NEUTRAL 🟑'}) |\n"
f"| **Ensemble Drop Prob** | {e.final_drop_probability:.0f}% "
f"(conf {e.final_confidence:.0%}, {e.urgency}) |\n"
f"| **Estimated Drop** | {e.final_drop_pct:.1f}% |\n"
f"| **Dominant Model** | {e.dominant_model} |\n"
f"{fc_line}"
)
def _fmt_models(report: IntelligenceReport) -> str:
e = report.ensemble
rows = "\n".join(
f"| **{m.model_name}** | {m.drop_probability:.1f}% | {m.drop_pct:.1f}% | "
f"{m.confidence:.0%} | {m.reasoning[:80]}{'…' if len(m.reasoning) > 80 else ''} |"
for m in e.models
)
weight_map = {
"Heuristic-V2": "15%", "sklearn-RF+GB": "35%",
"HF-Sentiment(finbert)": "20%", "Chronos-Forecast": "30%"
}
wrows = "\n".join(
f"| {m.model_name} | {weight_map.get(m.model_name, 'β€”')} | {m.drop_probability:.1f}% |"
for m in e.models
)
return (
f"## 🧠 4-Model Ensemble Intelligence\n\n"
f"| Model | Drop Prob | Drop Est | Confidence | Reasoning |\n"
f"|-------|-----------|----------|------------|----------|\n"
f"{rows}\n\n"
f"### Weighted Ensemble Result\n\n"
f"| Model | Weight | Individual Prob |\n"
f"|-------|--------|-----------------|\n"
f"{wrows}\n\n"
f"**Final: {e.final_drop_probability:.1f}% drop probability** | "
f"Drop est: {e.final_drop_pct:.1f}% | "
f"Confidence: {e.final_confidence:.0%} | "
f"Dominant: **{e.dominant_model}**\n\n"
f"> {e.explanation}"
)
def _fmt_agent_trace(report: IntelligenceReport) -> str:
trace = report.agent_trace
lines = [
f"## πŸ€– ReAct Agent Trace\n",
f"> **{len(trace.steps)} reasoning steps** | "
f"**{trace.models_used} models consulted** | "
f"**{trace.duration_ms}ms**\n",
]
for step in trace.steps:
lines.append(
f"---\n"
f"**Step {step.step_num}** `[{step.timestamp}]`\n\n"
f"🧠 **Thought:** {step.thought}\n\n"
f"βš™οΈ **Action:** `{step.action}`\n\n"
f"πŸ‘οΈ **Observation:** {step.observation}\n"
)
lines.append(f"\n---\n### 🎯 Final Agent Reasoning\n\n> {trace.final_reasoning}")
return "\n".join(lines)
def _fmt_decision(report: IntelligenceReport) -> str:
d = report.decision
output = (
f"## 🎯 Decision: **{d.action.value}** ({d.urgency.value})\n\n"
f"**Confidence:** {d.confidence_score:.0%}\n\n"
f"{d.explanation}\n"
)
if d.arbitrage and d.arbitrage.worth_it:
output += (
f"\n### πŸ’‘ Arbitrage Opportunity\n\n"
f"Fly from **{d.arbitrage.best_airport}** instead:\n"
f"- Flight price: β‚Ή{d.arbitrage.best_price:,.0f}\n"
f"- Domestic cost: β‚Ή{d.arbitrage.domestic_cost:,.0f}\n"
f"- Total cost: β‚Ή{d.arbitrage.total_cost:,.0f}\n"
f"- **Savings: β‚Ή{d.arbitrage.savings:,.0f}**\n"
)
if d.auto_book_triggered:
output += "\n### βœ… Auto-Booking Triggered\nCheck booking instructions below.\n"
if d.telegram_sent:
output += "\n### πŸ“± Telegram Alert Sent\n"
return output
# ============================================================================
# GRADIO UI (6 tabs)
# ============================================================================
def create_ui():
import gradio as gr
css = """
@import url('https://fonts.googleapis.com/css2?family=Familjen+Grotesk:wght@400;600;700&family=JetBrains+Mono:wght@400;500&display=swap');
body, .gradio-container { background:#060810!important; font-family:'Familjen Grotesk',sans-serif!important; color:#e0e0e0!important; }
.hero-title { background:linear-gradient(135deg,#f0a500 0%,#00c2ff 100%); -webkit-background-clip:text; -webkit-text-fill-color:transparent; font-size:2.8rem; font-weight:700; margin-bottom:.4rem; }
.hero-sub { color:#00e87a; font-size:1.1rem; margin-bottom:.8rem; }
.pill { display:inline-block; background:rgba(240,165,0,.18); border:1px solid #f0a500; color:#f0a500; padding:.25rem .75rem; border-radius:20px; font-size:.8rem; margin:.15rem; font-weight:600; }
.pill2 { border-color:#00c2ff; color:#00c2ff; background:rgba(0,194,255,.1); }
.pill3 { border-color:#00e87a; color:#00e87a; background:rgba(0,232,122,.1); }
h1,h2,h3 { font-weight:700!important; }
code,pre,.mono { font-family:'JetBrains Mono',monospace!important; }
"""
hero = """
<div style="text-align:center;padding:1.5rem 0 1rem">
<div class="hero-title">SkyGuardian AI v2.0</div>
<div class="hero-sub">Trichy ⇄ Singapore Β· Multi-Model Agentic Flight Intelligence</div>
<div>
<span class="pill">πŸ€– ReAct Agent</span>
<span class="pill">πŸ“Š 4-Model Ensemble</span>
<span class="pill2">🧬 HF finbert</span>
<span class="pill2">⏳ Chronos Forecast</span>
<span class="pill3">🌲 sklearn RF+GB</span>
<span class="pill3">πŸ’° Arbitrage</span>
<span class="pill">🎯 Auto-Booking</span>
<span class="pill">πŸ“± Telegram</span>
</div>
</div>"""
with gr.Blocks(css=css, title="SkyGuardian AI v2.0", theme=gr.themes.Base()) as app:
gr.HTML(hero)
with gr.Tabs():
# ── Tab 1 ─ Search & Analyze ──────────────────────────────────────
with gr.Tab("πŸ” Search & Analyze"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Search Parameters")
origin = gr.Dropdown(
choices=["TRZ","MAA","CJB","BLR","HYD"], value="TRZ",
label="Origin Airport"
)
destination = gr.Dropdown(
choices=["SIN"], value="SIN", label="Destination Airport"
)
departure_date = gr.Textbox(
label="Departure Date (YYYY-MM-DD)",
value=(datetime.now()+timedelta(days=30)).strftime("%Y-%m-%d")
)
return_date = gr.Textbox(label="Return Date (Optional)", value="")
adults = gr.Slider(1, 9, value=1, step=1, label="Adults")
emergency = gr.Checkbox(label="🚨 Emergency Mode (Book Immediately)", value=False)
monitoring = gr.Checkbox(label="πŸ“‘ Monitoring Mode (Always Alert)", value=False)
analyze_btn = gr.Button("πŸš€ Run Full Intelligence Pipeline", variant="primary")
with gr.Column(scale=2):
gr.Markdown("### Intelligence Summary")
status_out = gr.Textbox(label="Pipeline Status", lines=2, elem_classes=["mono"])
flight_out = gr.Markdown(label="Best Flight")
intel_out = gr.Markdown(label="Market Intelligence")
decision_out = gr.Markdown(label="Decision")
booking_out = gr.Textbox(label="Booking Instructions", lines=8)
warnings_out = gr.Textbox(label="Warnings", lines=2)
# ── Tab 2 ─ Agent Reasoning Trace ────────────────────────────────
with gr.Tab("πŸ€– Agent Trace"):
gr.Markdown("### ReAct Agent – Step-by-Step Reasoning")
gr.Markdown(
"_Each step shows the agent's **Thought β†’ Action β†’ Observation** loop. "
"Powered by deterministic multi-signal reasoning (no LLM API cost)._"
)
agent_trace_out = gr.Markdown(label="Agent Trace")
# ── Tab 3 ─ Model Intelligence ───────────────────────────────────
with gr.Tab("πŸ“Š Model Intelligence"):
gr.Markdown("### 4-Model Ensemble Breakdown")
gr.Markdown(
"| Model | Type | Source | Weight |\n"
"|-------|------|--------|--------|\n"
"| **Heuristic-V2** | Rule-based | Local | 15% |\n"
"| **sklearn-RF+GB** | RandomForest + GradientBoosting | Local | 35% |\n"
"| **HF-Sentiment(finbert)** | Financial NLP (ProsusAI/finbert) | HF Inference API | 20% |\n"
"| **Chronos-Forecast** | Time-series (amazon/chronos-t5-tiny) | HF Inference API + OLS fallback | 30% |\n"
)
models_out = gr.Markdown(label="Model Comparison")
# ── Tab 4 ─ API Keys ─────────────────────────────────────────────
with gr.Tab("πŸ”‘ API Keys"):
gr.Markdown("### Configure API Credentials")
gr.Markdown("πŸ’‘ Set as environment variables for production β€” never commit keys to code.")
kiwi_key = gr.Textbox(label="Kiwi/Tequila API Key", type="password",
value=os.getenv("KIWI_API_KEY",""))
amadeus_key = gr.Textbox(label="Amadeus API Key", type="password",
value=os.getenv("AMADEUS_API_KEY",""))
amadeus_secret = gr.Textbox(label="Amadeus API Secret", type="password",
value=os.getenv("AMADEUS_API_SECRET",""))
hf_token = gr.Textbox(
label="HuggingFace Token (for finbert + Chronos API β€” optional but improves rate limits)",
type="password", value=os.getenv("HF_TOKEN","")
)
telegram_token = gr.Textbox(label="Telegram Bot Token", type="password",
value=os.getenv("TELEGRAM_TOKEN",""))
telegram_chat = gr.Textbox(label="Telegram Chat ID", type="password",
value=os.getenv("TELEGRAM_CHAT_ID",""))
# ── Tab 5 ─ Auto-Booking ─────────────────────────────────────────
with gr.Tab("πŸ€– Auto-Booking"):
gr.Markdown("### Autonomous Booking Configuration")
gr.Markdown("⚠️ **Payment is MANUAL** β€” system fills forms and takes screenshot, stops before payment.")
passenger_name = gr.Textbox(label="Passenger Full Name", value="")
passenger_email = gr.Textbox(label="Email", value="")
passenger_phone = gr.Textbox(label="Phone", value="")
auto_threshold = gr.Number(
label="Auto-Book Price Threshold (INR, 0 = disabled)", value=0, minimum=0
)
gr.Markdown("""
**Trigger condition:**
`price ≀ threshold AND action ∈ {BOOK_NOW, EMERGENCY_BOOK, TRY_NEARBY} AND passenger filled`
**What happens:**
1. Playwright opens booking deep-link
2. Fills name / email / phone with multiple CSS-selector fallbacks
3. Takes screenshot β†’ **stops before payment page**
4. You complete payment manually
""")
# ── Tab 6 ─ How It Works ─────────────────────────────────────────
with gr.Tab("πŸ“– How It Works"):
gr.Markdown(f"""
# SkyGuardian AI v2.0 – Full Pipeline
```
STEP 1 Validate inputs
STEP 2 Fetch prices β†’ Kiwi/Tequila + Amadeus (dedup by value_score)
STEP 3 Log to SQLite β†’ price_logs table
STEP 4 14-day history β†’ rolling_avg, std_dev, trend_slope, volatility
STEP 5 Surge analysis β†’ Z-score (BELOW / NORMAL / ELEVATED / EXTREME)
STEP 6 Days to departure
STEP 7 4-Model Ensemble
Model 1: Heuristic-V2 (rule-based, 15% weight)
Model 2: sklearn RF+GB (80 trees each, 35% weight)
Model 3: HF finbert (financial NLP via API, 20%)
Model 4: Chronos-Forecast (time-series OLS/API, 30%)
β†’ Weighted ensemble β†’ EnsemblePrediction
STEP 8 Arbitrage check β†’ 5 airports, min β‚Ή2,000 savings
STEP 9 ReAct Agent β†’ 8-step Thought/Action/Observation loop
STEP 10 Decision Engine β†’ 7-rule priority tree
STEP 11 Auto-booking β†’ Playwright (if threshold met)
STEP 12 Build IntelligenceReport
STEP 13 Telegram alert
```
## Route & Arbitrage Table
| Airport | IATA | Domestic Cost to TRZ |
|---------|------|---------------------|
| Trichy | TRZ | β‚Ή0 (home base) |
| Chennai | MAA | β‚Ή1,200 |
| Coimbatore | CJB | β‚Ή800 |
| Bangalore | BLR | β‚Ή2,800 |
| Hyderabad | HYD | β‚Ή3,500 |
## Value Score Formula
```
value_score = price + (stops Γ— 1500) + max(0, duration_min βˆ’ 300) Γ— 2
```
## Decision Rules (Priority Order)
| Priority | Condition | Action | Urgency |
|----------|-----------|--------|---------|
| 1 | Emergency mode | EMERGENCY_BOOK | HIGH |
| 2 | ≀ 5 days to departure | BOOK_NOW | HIGH |
| 3 | Arbitrage savings > 10% | TRY_NEARBY | HIGH |
| 4 | Price < 95% of avg | BOOK_NOW | HIGH |
| 5 | EXTREME surge + 60% drop prob | WAIT_WEEK | MEDIUM |
| 6 | ELEVATED surge + 45% drop prob | WAIT_24H | LOW |
| 7 | BELOW_AVERAGE surge | BOOK_NOW | HIGH |
| 8 | Default | BOOK_NOW | MEDIUM |
## Environment Variables
```bash
KIWI_API_KEY = your_kiwi_key
AMADEUS_API_KEY = your_amadeus_key
AMADEUS_API_SECRET = your_amadeus_secret
HF_TOKEN = your_huggingface_token # for finbert + Chronos
TELEGRAM_TOKEN = your_telegram_bot_token
TELEGRAM_CHAT_ID = your_chat_id
SKYGUARDIAN_DB = skyguardian.db # optional
SKYGUARDIAN_MODEL = drop_model.pkl # optional sklearn model
```
## HuggingFace Models Used
| Model | Task | API Endpoint |
|-------|------|-------------|
| `ProsusAI/finbert` | Financial sentiment β†’ buy/wait signal | HF Inference API |
| `distilbert-base-uncased-finetuned-sst-2-english` | SST-2 sentiment fallback | HF Inference API |
| `amazon/chronos-t5-tiny` | Time-series price forecasting | HF Inference API + OLS fallback |
## Deployment (HuggingFace Spaces – CPU Free Tier)
```bash
# Local
pip install -r requirements.txt
python app.py
# Access: http://localhost:7860
```
""")
# ── Wiring ────────────────────────────────────────────────────────────
def analyze_flight(origin, dest, dep_date, ret_date, adults, emergency, monitoring,
kiwi, amadeus_k, amadeus_s, hf_tok, telegram_t, telegram_c,
pass_name, pass_email, pass_phone, threshold):
try:
db_path = os.getenv("SKYGUARDIAN_DB", "skyguardian.db")
model_path = os.getenv("SKYGUARDIAN_MODEL", "drop_model.pkl")
orch = Orchestrator(
kiwi_key=kiwi, amadeus_key=amadeus_k, amadeus_secret=amadeus_s,
telegram_token=telegram_t, telegram_chat_id=telegram_c,
db_path=db_path, hf_token=hf_tok,
model_path=model_path if os.path.exists(model_path) else None
)
passenger = None
if pass_name and pass_email and pass_phone:
passenger = PassengerProfile(
full_name=pass_name, email=pass_email, phone=pass_phone
)
report = orch.run(
origin=origin, destination=dest,
departure_date=dep_date, return_date=ret_date if ret_date else None,
adults=int(adults), emergency_mode=emergency,
monitoring_mode=monitoring, passenger=passenger,
auto_book_threshold=float(threshold)
)
status = _fmt_status(report)
flight = _fmt_price(report.flight)
intel = _fmt_intel(report)
decision = _fmt_decision(report)
models = _fmt_models(report)
trace = _fmt_agent_trace(report)
if report.booking:
if report.booking.success:
booking_out = (
f"βœ… Auto-booking completed!\n"
f"Screenshot: {report.booking.screenshot_path}\n\n"
f"{report.booking.manual_instructions}"
)
else:
booking_out = (
f"⚠️ Auto-booking failed: {report.booking.error}\n\n"
f"{report.booking.manual_instructions}"
)
elif passenger and threshold > 0:
booking_out = "Auto-booking not triggered (price above threshold or non-booking decision)."
else:
booking_out = "Auto-booking disabled β€” fill passenger details and set a threshold."
warnings = ""
if not kiwi and not amadeus_k:
warnings = "⚠️ No flight API keys configured β€” no live data fetched."
elif report.flight.flights_found == 0:
warnings = "⚠️ Limited flight data β€” check API key validity."
return status, flight, intel, decision, booking_out, warnings, models, trace
except Exception as exc:
err = str(exc)
return (
f"❌ ERROR: {err}", "", "", "", "", f"⚠️ {err}", "", ""
)
analyze_btn.click(
fn=analyze_flight,
inputs=[
origin, destination, departure_date, return_date, adults, emergency, monitoring,
kiwi_key, amadeus_key, amadeus_secret, hf_token, telegram_token, telegram_chat,
passenger_name, passenger_email, passenger_phone, auto_threshold
],
outputs=[
status_out, flight_out, intel_out, decision_out,
booking_out, warnings_out, models_out, agent_trace_out
]
)
return app
if __name__ == "__main__":
app = create_ui()
app.launch(server_name="0.0.0.0", server_port=7860)