# ========================================================= # ULTRA ADVANCED HYBRID NLP TO SQL ENGINE # PROFESSIONAL MULTI-FILTER ENGINE # MISTRAL / SQLCODER READY # ========================================================= import re import traceback import os import signal from contextlib import contextmanager from huggingface_hub import InferenceClient from dotenv import load_dotenv from sqlalchemy import create_engine, text, pool # ========================================================= # TIMEOUT HANDLER # ========================================================= @contextmanager def timeout(seconds): """Context manager for timeout on Windows""" def handler(signum, frame): raise TimeoutError("Operation timed out") # Note: signal.alarm only works on Unix, so we'll catch exceptions instead try: yield except TimeoutError: raise # ========================================================= # ENVIRONMENT SETUP # ========================================================= load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") DATABASE_URL = os.getenv("DATABASE_URL") # Initialize Mistral client with timeout client = None try: if HF_TOKEN: try: client = InferenceClient( model="mistralai/Mistral-7B-Instruct-v0.2", token=HF_TOKEN, timeout=10.0 # 10 second timeout ) print("✅ Mistral client initialized") except Exception as e: print(f"⚠️ Mistral client initialization timeout/error: {e}") client = None else: print("⚠️ HF_TOKEN not set - LLM features disabled") except Exception as e: print(f"⚠️ Mistral client error: {e}") client = None # Initialize database engine with timeout engine = None try: if DATABASE_URL: try: # PostgreSQL URL format: postgresql://user:password@host:port/database # Add connection options to the URL if needed db_url = DATABASE_URL if "?" not in db_url: db_url += "?connect_timeout=5" engine = create_engine( db_url, poolclass=pool.NullPool, # Disable connection pooling pool_pre_ping=True, # Test connections before using echo=False ) # Test connection try: with engine.connect() as conn: conn.execute(text("SELECT 1")) print("✅ Database connection initialized") except Exception as conn_err: print(f"⚠️ Database connection warning (may retry later): {conn_err}") # Keep engine even if initial connection fails except Exception as e: print(f"⚠️ Database engine creation error: {e}") engine = None else: print("⚠️ DATABASE_URL not set - Database features disabled") except Exception as e: print(f"⚠️ Database connection warning: {e}") engine = None # ========================================================= # HELPER: Safe Database Execution # ========================================================= def safe_db_query(query_func): """Decorator to safely execute database queries with None check""" def wrapper(*args, **kwargs): if engine is None: print(f"⚠️ Database engine not available for {query_func.__name__}") # Return appropriate empty result return [] if 'get_' in query_func.__name__ else None try: return query_func(*args, **kwargs) except Exception as e: print(f"❌ Database query error in {query_func.__name__}: {e}") return [] if 'get_' in query_func.__name__ else None return wrapper # ========================================================= # CONFIG # ========================================================= USE_LLM = True # ========================================================= # DATABASE KNOWLEDGE # ========================================================= SCHEMA = { "table": "vehicle_logs", "columns": [ "timestamp", "plate", "state", "vehicle_type", "vehicle_conf", "camera_id", "location", "date", "hour", "day" ] } VALID_STATES = { "tn": "TN", "tamil nadu": "TN", "ka": "KA", "karnataka": "KA", "kl": "KL", "kerala": "KL", "ap": "AP", "andhra": "AP", "ts": "TS", "telangana": "TS", "mh": "MH", "maharashtra": "MH", "dl": "DL", "delhi": "DL", "gj": "GJ", "gujarat": "GJ", "rj": "RJ", "rajasthan": "RJ", "up": "UP", "uttar pradesh": "UP", "wb": "WB", "west bengal": "WB", "hr": "HR", "haryana": "HR", "pb": "PB", "punjab": "PB" } KNOWN_LOCATIONS = [ "adyar", "guindy", "velachery", "besantnagar", "besant nagar", "thiruvanmiyur", "tnagar", "t nagar", "mylapore", "annanagar", "anna nagar", "koyambedu", "nungambakkam", "kotturpuram" ] VEHICLE_TYPES = [ "suv", "bus", "truck", "bike", "auto", "taxi", "car", "jeep", "sedan" ] # ========================================================= # SQL CLEANER # ========================================================= def clean_sql(sql): sql = sql.replace("```sql", "") sql = sql.replace("```", "") sql = sql.strip() if not sql.endswith(";"): sql += ";" return sql # ========================================================= # SQL VALIDATOR (IMPROVED) # ========================================================= def validate_sql(sql): """Validate SQL for safety. Allows JOINs and UNIONs for route tracking.""" blocked = [ "DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "CREATE", "TRUNCATE", # Removed JOIN and UNION - needed for route tracking ] upper = sql.upper() # Check for blocked commands for word in blocked: if word in upper: return False # Must be SELECT query if not upper.startswith("SELECT"): return False # Must reference vehicle_logs if "VEHICLE_LOGS" not in upper and "VL1" not in upper and "VL2" not in upper: return False return True # ========================================================= # PRODUCTION-GRADE HYBRID NLP ENGINE # Advanced multi-filter, date-range, time-range support # ========================================================= class FilterExtractor: """ Production-grade filter extraction engine for complex real-world queries. Handles multi-filter extraction, date ranges, time ranges, and advanced aggregations. """ def __init__(self): # ===== VEHICLE TYPE SYNONYMS ===== self.vehicle_synonyms = { # Cars "car": "car", "cars": "car", "sedan": "car", "sedans": "car", "compact": "car", "compacts": "car", "hatchback": "car", # SUVs "suv": "suv", "suvs": "suv", "crossover": "suv", # Trucks "truck": "truck", "trucks": "truck", "lorry": "truck", "lorries": "truck", "heavy": "truck", "hgv": "truck", # Buses "bus": "bus", "buses": "bus", "coach": "bus", "shuttle": "bus", # Bikes "bike": "bike", "bikes": "bike", "motorcycle": "bike", "motorcycles": "bike", "motorbike": "bike", "two-wheeler": "bike", # Autos "auto": "auto", "autos": "auto", "autorickshaw": "auto", "auto-rickshaw": "auto", "tuk-tuk": "auto", # Jeeps "jeep": "jeep", "jeeps": "jeep", "4x4": "jeep", # Taxis "taxi": "taxi", "taxis": "taxi", "cab": "taxi", "cabs": "taxi" } # ===== DAY MAPPINGS ===== self.day_map = { "monday": "Monday", "tuesday": "Tuesday", "wednesday": "Wednesday", "thursday": "Thursday", "friday": "Friday", "saturday": "Saturday", "sunday": "Sunday", "weekend": ["Saturday", "Sunday"], "weekday": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"] } # ===== LOCATION VARIANTS ===== self.location_variants = { "adyar": ["adyar"], "besant nagar": ["besant", "besant nagar", "besantnagar"], "t nagar": ["t nagar", "tnagar", "t-nagar"], "anna nagar": ["anna", "anna nagar", "annanagar"], "velachery": ["velachery"], "guindy": ["guindy"], "thiruvanmiyur": ["thiruvanmiyur", "mylapore"], "mylapore": ["mylapore"], "koyambedu": ["koyambedu"], "nungambakkam": ["nungambakkam", "nungam"], "kotturpuram": ["kotturpuram"] } # ===== STATE MAPPINGS ===== self.state_map = { "tn": "TN", "tamil": "TN", "tamil nadu": "TN", "ka": "KA", "karnataka": "KA", "kl": "KL", "kerala": "KL", "ap": "AP", "andhra": "AP", "andhra pradesh": "AP", "ts": "TS", "telangana": "TS", "mh": "MH", "maharashtra": "MH", "dl": "DL", "delhi": "DL", "gj": "GJ", "gujarat": "GJ", "rj": "RJ", "rajasthan": "RJ", "up": "UP", "uttar pradesh": "UP", "uttar": "UP", "wb": "WB", "west bengal": "WB", "hr": "HR", "haryana": "HR", "pb": "PB", "punjab": "PB" } # ===== TIME PERIOD MAPPINGS ===== self.time_periods = { "morning": (5, 12), # 5 AM to 12 PM "afternoon": (12, 17), # 12 PM to 5 PM "evening": (17, 21), # 5 PM to 9 PM "night": (21, 24), # 9 PM to 12 AM "peak": (8, 10), # Peak traffic (8-10 AM) "rush": (8, 10), # Rush hour (8-10 AM) "midnight": (0, 4) # Midnight (0-4 AM) } # ===== EXTRACTION METHODS ===== def extract_plate(self, query): """Extract license plate number from query""" # Standard Indian plate format: XX00XX0000 match = re.search(r'\b([A-Z]{2}\d{1,2}[A-Z]{1,3}\d{3,4})\b', query.upper()) return match.group(1) if match else None def extract_state(self, query): """Extract state code from query""" q = query.lower() for key, state_code in self.state_map.items(): # Use word boundaries to avoid partial matches if re.search(r'\b' + key + r'\b', q): return state_code return None def extract_location(self, query): """Extract SINGLE location with variant matching (legacy method)""" q = query.lower() # Sort by length (longest first) to match longer variants first for canonical, variants in sorted( self.location_variants.items(), key=lambda x: max(len(v) for v in x[1]), reverse=True ): for variant in variants: if variant in q: return canonical return None def extract_locations(self, query): """Extract MULTIPLE locations from query (e.g., 'adyar and kottupuram')""" q = query.lower() locations = [] # Sort by length (longest first) to match longer variants first for canonical, variants in sorted( self.location_variants.items(), key=lambda x: max(len(v) for v in x[1]), reverse=True ): for variant in variants: if variant in q: locations.append(canonical) break # Remove duplicates while preserving order seen = set() unique_locations = [] for loc in locations: if loc not in seen: seen.add(loc) unique_locations.append(loc) return unique_locations if unique_locations else None def extract_vehicle_type(self, query): """Extract SINGLE vehicle type with synonym resolution (legacy method)""" q = query.lower() # Sort by length (longest first) to match longer synonyms first for synonym in sorted(self.vehicle_synonyms.keys(), key=len, reverse=True): if re.search(r'\b' + synonym + r'\b', q): return self.vehicle_synonyms[synonym] return None def extract_vehicle_types(self, query): """Extract MULTIPLE vehicle types from query (e.g., 'bike and mini_truck')""" q = query.lower() vehicle_types = [] # Sort by length (longest first) to match longer synonyms first for synonym in sorted(self.vehicle_synonyms.keys(), key=len, reverse=True): if re.search(r'\b' + synonym + r'\b', q): normalized = self.vehicle_synonyms[synonym] vehicle_types.append(normalized) # Remove duplicates while preserving order seen = set() unique_types = [] for vtype in vehicle_types: if vtype not in seen: seen.add(vtype) unique_types.append(vtype) return unique_types if unique_types else None def extract_date_range(self, query): """Extract date range (from X to Y, between X and Y)""" # Pattern: "from DD-MM-YYYY to DD-MM-YYYY" or "between DD-MM-YYYY and DD-MM-YYYY" patterns = [ r'from\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})\s+to\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})', r'between\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})\s+and\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})', r'from\s+(\d{4}-\d{2}-\d{2})\s+to\s+(\d{4}-\d{2}-\d{2})', r'between\s+(\d{4}-\d{2}-\d{2})\s+and\s+(\d{4}-\d{2}-\d{2})' ] for pattern in patterns: match = re.search(pattern, query, re.IGNORECASE) if match: groups = match.groups() if len(groups) == 6: # DD-MM-YYYY format start = f"{groups[2]}-{groups[1].zfill(2)}-{groups[0].zfill(2)}" end = f"{groups[5]}-{groups[4].zfill(2)}-{groups[3].zfill(2)}" return {"start": start, "end": end} elif len(groups) == 2: # YYYY-MM-DD format return {"start": groups[0], "end": groups[1]} return None def extract_date(self, query): """Extract single date and normalize format""" # YYYY-MM-DD format match = re.search(r'\d{4}-\d{2}-\d{2}', query) if match: return match.group(0) # DD-MM-YYYY or DD/MM/YYYY format match = re.search(r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', query) if match: day, month, year = match.groups() return f"{year}-{month.zfill(2)}-{day.zfill(2)}" return None def extract_time_range(self, query): """Extract time range (after X, before X, between X and Y)""" q = query.lower() # Check for time period keywords first (morning, afternoon, evening, night) for period, (start_hour, end_hour) in self.time_periods.items(): if period in q: return {"start": start_hour, "end": end_hour} # Pattern: "after HH:MM" or "after HH AM/PM" after_match = re.search(r'after\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)?', q) if after_match: hour = int(after_match.group(1)) period = after_match.group(3) if period and period == "pm" and hour != 12: hour += 12 elif period and period == "am" and hour == 12: hour = 0 return {"start": hour, "end": 23} # Pattern: "before HH:MM" or "before HH AM/PM" before_match = re.search(r'before\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)?', q) if before_match: hour = int(before_match.group(1)) period = before_match.group(3) if period and period == "pm" and hour != 12: hour += 12 elif period and period == "am" and hour == 12: hour = 0 return {"start": 0, "end": hour} # Pattern: "between HH AM/PM and HH AM/PM" between_match = re.search( r'between\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)\s+and\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)', q ) if between_match: hour1 = int(between_match.group(1)) period1 = between_match.group(3) if period1 == "pm" and hour1 != 12: hour1 += 12 elif period1 == "am" and hour1 == 12: hour1 = 0 hour2 = int(between_match.group(4)) period2 = between_match.group(6) if period2 == "pm" and hour2 != 12: hour2 += 12 elif period2 == "am" and hour2 == 12: hour2 = 0 return {"start": min(hour1, hour2), "end": max(hour1, hour2)} return None def extract_hour(self, query): """Extract single hour""" # Don't match if this is part of a time range if any(k in query.lower() for k in ["between", "from", "to", "after", "before"]): return None match = re.search(r'(\d{1,2}):?(\d{0,2})\s*(am|pm)?', query.lower()) if match: hour = int(match.group(1)) period = match.group(3) if period == "pm" and hour != 12: hour += 12 elif period == "am" and hour == 12: hour = 0 return hour if 0 <= hour < 24 else None return None def extract_day(self, query): """Extract day of week""" q = query.lower() for day_key, day_values in self.day_map.items(): if day_key in q: return day_values return None def extract_confidence(self, query): """Extract confidence threshold - MUST have confidence/conf/accuracy keyword""" # Only match if explicit confidence/conf/accuracy keyword is present match = re.search(r'(\d+(?:\.\d+)?)\s*(?:confidence|conf|accuracy)\b', query.lower()) if match: conf = float(match.group(1)) # Normalize to 0-1 if given as percentage if conf > 1: conf = conf / 100 return conf if 0 <= conf <= 1 else None return None def extract_route(self, query): """Extract route/path pattern (from location1 to location2 or location1 to location2 pass through)""" q = query.lower() # Don't process if this looks like a date range (contains DD-MM-YYYY or YYYY-MM-DD) if re.search(r'\d{1,2}[-/]\d{1,2}[-/]\d{4}', q) or re.search(r'\d{4}-\d{2}-\d{2}', q): return None # Strict patterns only: requires explicit route keywords or location names route_patterns = [ r'(?:traveling|pass|going)\s+(?:from|through)\s+(\w+(?:\s+\w+)?)\s+(?:to|through)\s+(\w+(?:\s+\w+)?)', r'(?:pass\s+from)\s+(\w+(?:\s+\w+)?)\s+(?:to|through)\s+(\w+(?:\s+\w+)?)', r'(?:traveling\s+from)\s+(\w+(?:\s+\w+)?)\s+(?:to)\s+(\w+(?:\s+\w+)?)', ] for pattern in route_patterns: match = re.search(pattern, q) if match: loc1_raw = match.group(1).strip() loc2_raw = match.group(2).strip() # Map to canonical locations loc1 = None loc2 = None # Find canonical location names (longest match first) for canonical, variants in sorted( self.location_variants.items(), key=lambda x: max(len(v) for v in x[1]), reverse=True ): for variant in variants: if variant in loc1_raw and not loc1: loc1 = canonical if variant in loc2_raw and not loc2: loc2 = canonical if loc1 and loc2 and loc1 != loc2: return {"from": loc1, "to": loc2} return None def extract_filters(self, query): """Extract ALL filters simultaneously from query""" return { "plate": self.extract_plate(query), "state": self.extract_state(query), "location": self.extract_locations(query), # Now returns list or None "vehicle_type": self.extract_vehicle_types(query), # Now returns list or None "date": self.extract_date(query), "date_range": self.extract_date_range(query), "day": self.extract_day(query), "hour": self.extract_hour(query), "time_range": self.extract_time_range(query), "confidence": self.extract_confidence(query), "route": self.extract_route(query) # New: route extraction } def detect_intents(self, query): """Detect advanced query intents (IMPROVED - strict route detection)""" q = query.lower() # ROUTE TRACKING: Only if explicit route keywords present (not just "from...to" for dates) # Require: traveling/pass/pass through/pass from with location-like names has_route_keyword = any(k in q for k in [ "traveling", "travel from", "pass from", "pass through", "went from", "go from", "route", "path", "journey" ]) # Must have location-like context after the route keywords has_route_context = False if has_route_keyword: # Check if actual location names appear near route keywords for location in self.location_variants.keys(): if location in q: has_route_context = True break return { "tracking": any(k in q for k in ["track", "history", "movement", "travel", "route", "where", "location", "show"]), "count": any(k in q for k in ["count", "how many", "total", "number of"]), "analytics": any(k in q for k in ["analytics", "analysis", "statistics", "distribution"]), "top": any(k in q for k in ["top", "most", "leading"]), "latest": any(k in q for k in ["latest", "recent", "last", "new"]), "hourly": any(k in q for k in ["hourly", "by hour", "per hour"]), "daily": any(k in q for k in ["daily", "by day", "per day"]), "location_based": any(k in q for k in ["by location", "density", "traffic"]), "suspicious": any(k in q for k in ["suspicious", "repeated", "multiple", "across"]), "aggregation": any(k in q for k in ["group", "aggregate", "sum", "average"]), "route_tracking": has_route_keyword and has_route_context # STRICT: require both keyword AND location } def build_sql(self, filters, intents): """ Build production-grade SQL from filters and intents (IMPROVED). Handles complex aggregations, date ranges, time ranges, multiple vehicles, and multiple locations. Now supports all filter combinations without failures. """ try: # ========================================================= # ANALYTICS QUERIES (priority over other queries) # ========================================================= if intents["top"] or (intents["analytics"] and "top" in " ".join([k for k in intents.keys() if intents[k]])): return clean_sql(""" SELECT plate, state, COUNT(*) as detections FROM vehicle_logs GROUP BY plate, state ORDER BY detections DESC LIMIT 20; """) if intents["hourly"] and intents["analytics"]: return clean_sql(""" SELECT hour, COUNT(*) as traffic FROM vehicle_logs GROUP BY hour ORDER BY hour; """) if intents["location_based"] and intents["analytics"]: return clean_sql(""" SELECT location, COUNT(*) as count FROM vehicle_logs WHERE location IS NOT NULL GROUP BY location ORDER BY count DESC LIMIT 20; """) if intents["suspicious"]: return clean_sql(""" SELECT plate, state, COUNT(*) as detections, COUNT(DISTINCT location) as locations, COUNT(DISTINCT date) as days FROM vehicle_logs GROUP BY plate, state HAVING COUNT(*) > 5 ORDER BY detections DESC LIMIT 20; """) except Exception as e: print(f"⚠️ Analytics query generation error: {e}") # Fallback to basic query return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 100;") # ========================================================= # ROUTE TRACKING QUERIES (vehicles that passed through locations) # ========================================================= if intents["route_tracking"] and filters["route"]: try: route = filters["route"] loc_from = route["from"] loc_to = route["to"] # Build state filter if present state_filter = "" if filters["state"]: state_filter = f"AND vl1.state = '{filters['state']}'" # Build vehicle type filter vehicle_type_filter = "" if filters["vehicle_type"]: if isinstance(filters["vehicle_type"], list): vehicle_types = "', '".join(filters["vehicle_type"]) vehicle_type_filter = f"AND LOWER(vl1.vehicle_type) IN ('{vehicle_types}')" else: vehicle_type_filter = f"AND LOWER(vl1.vehicle_type) LIKE '%{filters['vehicle_type'].lower()}%'" # Build date filter if present date_filter = "" if filters["date_range"]: start = filters["date_range"]["start"] end = filters["date_range"]["end"] date_filter = f"AND vl1.date BETWEEN '{start}' AND '{end}'" elif filters["date"]: date_filter = f"AND vl1.date = '{filters['date']}'" # Build time range filter if present time_filter = "" if filters["time_range"]: start = filters["time_range"]["start"] end = filters["time_range"]["end"] if start < end: time_filter = f"AND vl1.hour BETWEEN {start} AND {end}" else: time_filter = f"AND (vl1.hour >= {start} OR vl1.hour <= {end})" elif filters["hour"] is not None: time_filter = f"AND vl1.hour = {filters['hour']}" # Query to find vehicles that traveled from location1 to location2 return clean_sql(f""" SELECT vl1.plate, vl1.state, vl1.vehicle_type, COUNT(DISTINCT vl1.timestamp) as visits_in_from_location, COUNT(DISTINCT vl2.timestamp) as visits_in_to_location, COUNT(DISTINCT vl1.date) as days_active, MIN(vl1.timestamp) as first_seen_from, MAX(vl1.timestamp) as last_seen_from, MIN(vl2.timestamp) as first_seen_to, MAX(vl2.timestamp) as last_seen_to FROM vehicle_logs vl1 INNER JOIN vehicle_logs vl2 ON vl1.plate = vl2.plate AND vl1.state = vl2.state AND vl2.timestamp > vl1.timestamp WHERE LOWER(vl1.location) LIKE '%{loc_from.lower()}%' AND LOWER(vl2.location) LIKE '%{loc_to.lower()}%' {state_filter} {vehicle_type_filter} {date_filter} {time_filter} GROUP BY vl1.plate, vl1.state, vl1.vehicle_type ORDER BY visits_in_from_location DESC, visits_in_to_location DESC LIMIT 100; """) except Exception as e: print(f"⚠️ Route tracking query generation error: {e}") # Fallback to basic location query return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 100;") # ========================================================= # BUILD WHERE CLAUSE FROM FILTERS (HANDLE MULTIPLE VALUES) # ========================================================= where_conditions = [] # Plate filter if filters["plate"]: where_conditions.append(f"plate = '{filters['plate']}'") # State filter if filters["state"]: where_conditions.append(f"state = '{filters['state']}'") # Location filter - HANDLE MULTIPLE LOCATIONS if filters["location"]: if isinstance(filters["location"], list): # Multiple locations with OR logic location_conditions = [ f"LOWER(location) LIKE '%{loc.lower()}%'" for loc in filters["location"] ] where_conditions.append(f"({' OR '.join(location_conditions)})") else: # Single location (legacy) where_conditions.append(f"LOWER(location) LIKE '%{filters['location'].lower()}%'") # Vehicle type filter - HANDLE MULTIPLE VEHICLE TYPES if filters["vehicle_type"]: if isinstance(filters["vehicle_type"], list): # Multiple vehicle types with OR logic vehicle_conditions = [ f"LOWER(vehicle_type) LIKE '%{vtype.lower()}%'" for vtype in filters["vehicle_type"] ] where_conditions.append(f"({' OR '.join(vehicle_conditions)})") else: # Single vehicle type (legacy) where_conditions.append(f"LOWER(vehicle_type) LIKE '%{filters['vehicle_type'].lower()}%'") # Date range filter if filters["date_range"]: start = filters["date_range"]["start"] end = filters["date_range"]["end"] where_conditions.append(f"date BETWEEN '{start}' AND '{end}'") elif filters["date"]: where_conditions.append(f"date = '{filters['date']}'") # Day filter if filters["day"]: if isinstance(filters["day"], list): day_conditions = [f"day = '{d}'" for d in filters["day"]] where_conditions.append(f"({' OR '.join(day_conditions)})") else: where_conditions.append(f"day = '{filters['day']}'") # Time range filter if filters["time_range"]: start = filters["time_range"]["start"] end = filters["time_range"]["end"] if start < end: where_conditions.append(f"hour BETWEEN {start} AND {end}") else: # Handles ranges like 9 PM to 4 AM (21 to 4) where_conditions.append(f"(hour >= {start} OR hour <= {end})") elif filters["hour"] is not None: where_conditions.append(f"hour = {filters['hour']}") # Confidence filter if filters["confidence"] is not None: where_conditions.append(f"vehicle_conf >= {filters['confidence']}") # ========================================================= # GENERATE FINAL SQL # ========================================================= where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" # Count queries if intents["count"]: if filters["plate"]: sql = f""" SELECT plate, COUNT(*) as detections FROM vehicle_logs WHERE {where_clause} GROUP BY plate ORDER BY detections DESC; """ else: sql = f""" SELECT COUNT(*) as total FROM vehicle_logs WHERE {where_clause}; """ # Tracking queries (show detailed records) elif intents["tracking"]: sql = f""" SELECT timestamp, plate, state, vehicle_type, location, camera_id, date, hour, day FROM vehicle_logs WHERE {where_clause} ORDER BY timestamp DESC LIMIT 100; """ # Hourly aggregation elif intents["hourly"]: sql = f""" SELECT hour, COUNT(*) as traffic FROM vehicle_logs WHERE {where_clause} GROUP BY hour ORDER BY hour; """ # Location-based aggregation elif intents["location_based"]: sql = f""" SELECT location, COUNT(*) as count FROM vehicle_logs WHERE {where_clause} AND location IS NOT NULL GROUP BY location ORDER BY count DESC; """ # Default: return all matching records else: sql = f""" SELECT * FROM vehicle_logs WHERE {where_clause} ORDER BY timestamp DESC LIMIT 100; """ return clean_sql(sql) def ask_llm(user_query): """ Production-grade hybrid NLP-to-SQL engine. Handles complex real-world queries with multiple filters, date ranges, time ranges, and aggregations. Features: - Multi-filter extraction (plate, state, location, vehicle type, date, time, confidence) - Route tracking (vehicles passing through multiple locations) - Date range support (from X to Y) - Time range support (after X, before X, between X and Y) - Time period recognition (morning, afternoon, evening, night, peak hour, rush hour) - Advanced intent detection (tracking, count, analytics, top vehicles, suspicious vehicles, route tracking, etc.) - Production SQL generation with proper GROUP BY, HAVING, ORDER BY - Timeout protection Example queries: - "show bikes passing through adyar to kottupuram" - "show buses in adyar from 10-04-2026 to 18-10-2026" - "show TN cars after 8 PM" - "show suspicious vehicles detected in more than 5 locations" - "show traffic density by location" - "show top 10 most detected vehicles" - "count bikes between 6 PM and 9 PM" - "find vehicles that traveled from adyar to mylapore" """ try: # Initialize the advanced filter extractor extractor = FilterExtractor() # Extract ALL filters from the query (simultaneous extraction) filters = extractor.extract_filters(user_query) # Detect query intents intents = extractor.detect_intents(user_query) # Log extracted information for debugging print(f"\n📊 QUERY ANALYSIS:") print(f" Extracted Filters:") print(f" - Plate: {filters['plate']}") print(f" - State: {filters['state']}") # Handle multiple locations if filters['location']: if isinstance(filters['location'], list): print(f" - Locations: {', '.join(filters['location'])}") else: print(f" - Location: {filters['location']}") else: print(f" - Location: None") # Handle multiple vehicle types if filters['vehicle_type']: if isinstance(filters['vehicle_type'], list): print(f" - Vehicle Types: {', '.join(filters['vehicle_type'])}") else: print(f" - Vehicle Type: {filters['vehicle_type']}") else: print(f" - Vehicle Type: None") print(f" - Date: {filters['date']}") print(f" - Date Range: {filters['date_range']}") print(f" - Day: {filters['day']}") print(f" - Hour: {filters['hour']}") print(f" - Time Range: {filters['time_range']}") print(f" - Confidence: {filters['confidence']}") # Log route information if available if filters['route']: print(f" - Route: From '{filters['route']['from']}' to '{filters['route']['to']}'") else: print(f" - Route: None") print(f" Detected Intents:") intent_list = [k for k, v in intents.items() if v] print(f" - {', '.join(intent_list) if intent_list else 'General query'}") # Build SQL from filters and intents sql = extractor.build_sql(filters, intents) return sql except Exception as e: print(f"❌ Filter extraction error: {e}") traceback.print_exc() # Fallback to basic query return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 10;") # ========================================================= # QUERY EXECUTION # ========================================================= def run_query(user_query): """Execute NLP-to-SQL query with timeout protection""" sql = "" try: sql = ask_llm(user_query) print("\n" + "="*40) print("USER QUERY:") print(user_query) print("\nGENERATED SQL:") print(sql) print("="*40) if engine is None: return { "query": user_query, "error": "❌ Database not configured - DATABASE_URL missing", "sql": sql, "result": [], "count": 0 } try: # Execute with timeout protection with engine.connect() as conn: # Set statement timeout to 30 seconds conn.execute(text("SET statement_timeout = 30000")) # 30 seconds result = conn.execute(text(sql)) rows = [ dict(r._mapping) for r in result ] return { "query": user_query, "sql": sql, "count": len(rows), "result": rows } except Exception as query_error: print(f"❌ Query Execution Error (possible timeout): {query_error}") return { "query": user_query, "error": f"Query timeout or error: {str(query_error)}", "sql": sql, "result": [], "count": 0 } except Exception as e: print(f"❌ Run Query Error: {e}") traceback.print_exc() return { "query": user_query, "error": str(e), "sql": sql if sql else "", "result": [], "count": 0 } # ========================================================= # DATABASE OPERATIONS # ========================================================= def save_detection(plate, state, vehicle_type, vehicle_conf, date, time): """Save a vehicle detection to the database Note: The table schema uses timestamp, date, hour, day columns. The 'time' parameter is extracted to hour for the hour column. """ try: if engine is None: print("⚠️ Engine not initialized - save_detection skipped") return False # Extract hour from time string (HH:MM:SS) try: hour = int(time.split(":")[0]) if time else 0 except: hour = 0 # Extract day of week from date (simplified) from datetime import datetime try: dt = datetime.strptime(date, "%Y-%m-%d") day = dt.strftime("%A") except: day = "Unknown" # Use timestamp for current time, date for the date field, hour for hourly grouping query = f""" INSERT INTO vehicle_logs (plate, state, vehicle_type, vehicle_conf, date, hour, day, timestamp, camera_id, location) VALUES ('{plate}', '{state}', '{vehicle_type}', {vehicle_conf}, '{date}', {hour}, '{day}', NOW(), 'CAM-01', 'default') """ with engine.connect() as conn: conn.execute(text(query)) conn.commit() print(f"✅ Saved: {plate} from {state} at {time}") return True except Exception as e: print(f"❌ Save Error: {e}") traceback.print_exc() return False def health_check(): """Check database health with timeout protection""" try: if engine is None: return False, "❌ Database not configured" with engine.connect() as conn: conn.execute(text("SET statement_timeout = 10000")) # 10 second timeout result = conn.execute(text("SELECT COUNT(*) FROM vehicle_logs")) count = result.scalar() return True, f"✅ Database OK - {count} records" except Exception as e: print(f"❌ Health Check Error (timeout?): {e}") return False, f"❌ Database Error: {str(e)}" def get_vehicles_by_state(): """Get vehicle count by state with timeout protection""" if engine is None: print("⚠️ Database not available for get_vehicles_by_state") return [] try: sql = """ SELECT state, COUNT(*) as count FROM vehicle_logs GROUP BY state ORDER BY count DESC """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ State Query Error (timeout?): {e}") return [] def get_hourly_traffic(): """Get traffic by hour with timeout protection""" if engine is None: print("⚠️ Database not available for get_hourly_traffic") return [] try: sql = """ SELECT hour, COUNT(*) as traffic FROM vehicle_logs GROUP BY hour ORDER BY hour """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Hourly Traffic Error (timeout?): {e}") return [] def get_top_plates(): """Get top detected plates with timeout protection""" if engine is None: print("⚠️ Database not available for get_top_plates") return [] try: sql = """ SELECT plate, COUNT(*) as detections FROM vehicle_logs GROUP BY plate ORDER BY detections DESC LIMIT 20 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Top Plates Error (timeout?): {e}") return [] def get_suspicious_vehicles(): """Get vehicles detected multiple times (potentially suspicious) with timeout protection""" try: sql = """ SELECT plate, state, COUNT(*) as detections, COUNT(DISTINCT location) as locations, COUNT(DISTINCT date) as days FROM vehicle_logs GROUP BY plate, state HAVING COUNT(*) > 5 ORDER BY detections DESC LIMIT 20 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Suspicious Vehicles Error (timeout?): {e}") return [] # ========================================================= # ADVANCED ANALYTICAL FUNCTIONS # ========================================================= def get_route_history(plate, limit=50): """ Get route history for a specific vehicle. Shows all detections in chronological order with locations. """ try: sql = f""" SELECT timestamp, plate, state, location, camera_id, date, hour, day FROM vehicle_logs WHERE plate = '{plate}' ORDER BY timestamp DESC LIMIT {limit} """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Route History Error: {e}") return [] def get_vehicles_by_location(location): """Get all vehicles detected in a specific location""" try: sql = f""" SELECT DISTINCT plate, state, COUNT(*) as detections FROM vehicle_logs WHERE LOWER(location) LIKE '%{location.lower()}%' GROUP BY plate, state ORDER BY detections DESC LIMIT 50 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Vehicles by Location Error: {e}") return [] def get_multi_location_detections(min_locations=2): """Get vehicles detected across multiple locations (suspicious activity indicator)""" try: sql = f""" SELECT plate, state, COUNT(*) as detections, COUNT(DISTINCT location) as locations, COUNT(DISTINCT date) as days FROM vehicle_logs GROUP BY plate, state HAVING COUNT(DISTINCT location) >= {min_locations} ORDER BY locations DESC, detections DESC LIMIT 20 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Multi-Location Detection Error: {e}") return [] def get_peak_traffic_hours(): """Identify peak traffic hours based on detections""" try: sql = """ SELECT hour, COUNT(*) as traffic_count, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage FROM vehicle_logs GROUP BY hour ORDER BY traffic_count DESC LIMIT 10 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Peak Traffic Hours Error: {e}") return [] def get_vehicle_density_by_location(): """Get traffic density (vehicle count) by location""" if engine is None: print("⚠️ Database not available for get_vehicle_density_by_location") return [] try: sql = """ SELECT location, COUNT(*) as vehicle_count, COUNT(DISTINCT plate) as unique_vehicles, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage FROM vehicle_logs WHERE location IS NOT NULL GROUP BY location ORDER BY vehicle_count DESC LIMIT 20 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Vehicle Density Error: {e}") return [] def get_high_confidence_detections(confidence_threshold=0.9): """Get detections with high confidence scores""" try: sql = f""" SELECT plate, state, vehicle_type, COUNT(*) as detections, ROUND(AVG(vehicle_conf), 3) as avg_confidence FROM vehicle_logs WHERE vehicle_conf >= {confidence_threshold} GROUP BY plate, state, vehicle_type ORDER BY avg_confidence DESC, detections DESC LIMIT 30 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ High Confidence Detections Error: {e}") return [] def get_daily_traffic_summary(date=None): """Get traffic summary for a specific date or today""" try: if date: where_clause = f"WHERE date = '{date}'" else: where_clause = "WHERE date = CURDATE()" sql = f""" SELECT COUNT(*) as total_vehicles, COUNT(DISTINCT plate) as unique_vehicles, COUNT(DISTINCT location) as locations_covered, COUNT(DISTINCT hour) as peak_hours FROM vehicle_logs {where_clause} """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) row = dict(result.fetchone()._mapping) return row except Exception as e: print(f"❌ Daily Summary Error: {e}") return {} def get_state_wise_distribution(): """Get vehicle distribution across states""" try: sql = """ SELECT state, COUNT(*) as detections, COUNT(DISTINCT plate) as unique_vehicles, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage FROM vehicle_logs GROUP BY state ORDER BY detections DESC """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ State Distribution Error: {e}") return [] def query_by_date_range(start_date, end_date, state=None, location=None): """Query vehicles detected within a date range""" try: where_conditions = [f"date BETWEEN '{start_date}' AND '{end_date}'"] if state: where_conditions.append(f"state = '{state}'") if location: where_conditions.append(f"LOWER(location) LIKE '%{location.lower()}%'") where_clause = " AND ".join(where_conditions) sql = f""" SELECT * FROM vehicle_logs WHERE {where_clause} ORDER BY timestamp DESC LIMIT 500 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 30000")) # Longer timeout for large ranges result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Date Range Query Error: {e}") return [] def query_by_time_range(start_hour, end_hour, location=None, vehicle_type=None): """Query vehicles detected within a time range (hour of day)""" try: where_conditions = [] if start_hour < end_hour: where_conditions.append(f"hour BETWEEN {start_hour} AND {end_hour}") else: # Handles ranges like 9 PM to 4 AM (21 to 4) where_conditions.append(f"(hour >= {start_hour} OR hour <= {end_hour})") if location: where_conditions.append(f"LOWER(location) LIKE '%{location.lower()}%'") if vehicle_type: where_conditions.append(f"LOWER(vehicle_type) LIKE '%{vehicle_type.lower()}%'") where_clause = " AND ".join(where_conditions) sql = f""" SELECT * FROM vehicle_logs WHERE {where_clause} ORDER BY timestamp DESC LIMIT 200 """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Time Range Query Error: {e}") return [] # ========================================================= # ADVANCED ROUTE TRACKING FUNCTIONS # Track vehicles passing through multiple locations # ========================================================= def get_vehicles_route_between_locations(location_from, location_to, vehicle_type=None, limit=100): """ Get vehicles that traveled/passed from one location to another. Shows vehicles detected in location_from and then later in location_to. Example: get_vehicles_route_between_locations('adyar', 'kottupuram', 'bike') """ try: if engine is None: print("⚠️ Database not available") return [] vehicle_filter = "" if vehicle_type: vehicle_filter = f"AND LOWER(vl1.vehicle_type) LIKE '%{vehicle_type.lower()}%'" sql = f""" SELECT vl1.plate, vl1.state, vl1.vehicle_type, COUNT(DISTINCT vl1.timestamp) as detections_in_from_location, COUNT(DISTINCT vl2.timestamp) as detections_in_to_location, COUNT(DISTINCT vl1.date) as days_active, MIN(vl1.timestamp) as first_detected_from, MAX(vl1.timestamp) as last_detected_from, MIN(vl2.timestamp) as first_detected_to, MAX(vl2.timestamp) as last_detected_to FROM vehicle_logs vl1 INNER JOIN vehicle_logs vl2 ON vl1.plate = vl2.plate AND vl1.state = vl2.state AND vl2.timestamp > vl1.timestamp WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%' AND LOWER(vl2.location) LIKE '%{location_to.lower()}%' {vehicle_filter} GROUP BY vl1.plate, vl1.state, vl1.vehicle_type ORDER BY detections_in_from_location DESC, detections_in_to_location DESC LIMIT {limit}; """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 30000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Route Tracking Error: {e}") return [] def get_vehicle_complete_route_history(plate, state=None): """ Get complete route history for a specific vehicle. Shows all locations visited in chronological order. """ try: if engine is None: print("⚠️ Database not available") return [] state_filter = "" if state: state_filter = f"AND state = '{state}'" sql = f""" SELECT timestamp, plate, state, vehicle_type, location, camera_id, date, hour, day FROM vehicle_logs WHERE plate = '{plate}' {state_filter} ORDER BY timestamp ASC; """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 15000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Route History Error: {e}") return [] def get_vehicles_passing_through_multiple_locations(locations, vehicle_type=None, min_locations=2): """ Get vehicles that passed through multiple specified locations. Example: get_vehicles_passing_through_multiple_locations(['adyar', 'kottupuram', 'mylapore']) """ try: if engine is None: print("⚠️ Database not available") return [] vehicle_filter = "" if vehicle_type: vehicle_filter = f"AND LOWER(vehicle_type) LIKE '%{vehicle_type.lower()}%'" location_conditions = " OR ".join([ f"LOWER(location) LIKE '%{loc.lower()}%'" for loc in locations ]) sql = f""" SELECT plate, state, vehicle_type, COUNT(*) as total_detections, COUNT(DISTINCT location) as unique_locations_visited, COUNT(DISTINCT date) as days_active, COUNT(DISTINCT hour) as peak_hours, MIN(timestamp) as first_seen, MAX(timestamp) as last_seen FROM vehicle_logs WHERE ({location_conditions}) {vehicle_filter} GROUP BY plate, state, vehicle_type HAVING COUNT(DISTINCT location) >= {min_locations} ORDER BY COUNT(DISTINCT location) DESC, COUNT(*) DESC LIMIT 100; """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 30000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Multi-Location Route Error: {e}") return [] def get_location_to_location_traffic_flow(location_from, location_to, date_range=None): """ Get traffic flow statistics between two locations. Shows how many unique vehicles traveled between these locations. """ try: if engine is None: print("⚠️ Database not available") return {} date_filter = "" if date_range: start_date, end_date = date_range date_filter = f"AND vl1.date BETWEEN '{start_date}' AND '{end_date}'" sql = f""" SELECT COUNT(DISTINCT vl1.plate) as unique_vehicles, COUNT(DISTINCT vl1.vehicle_type) as vehicle_types, COUNT(DISTINCT vl1.state) as states, COUNT(*) as total_from_location_detections, COUNT(DISTINCT vl1.date) as days_active, ROUND(AVG(vl1.vehicle_conf), 3) as avg_confidence, MIN(vl1.timestamp) as earliest_traffic, MAX(vl1.timestamp) as latest_traffic FROM vehicle_logs vl1 INNER JOIN vehicle_logs vl2 ON vl1.plate = vl2.plate AND vl1.state = vl2.state AND vl2.timestamp > vl1.timestamp WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%' AND LOWER(vl2.location) LIKE '%{location_to.lower()}%' {date_filter} """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 30000")) result = conn.execute(text(sql)) row = result.fetchone() if row: return dict(row._mapping) return {} except Exception as e: print(f"❌ Traffic Flow Error: {e}") return {} def get_vehicle_type_route_analysis(location_from, location_to): """ Analyze which vehicle types travel between two locations most frequently. """ try: if engine is None: print("⚠️ Database not available") return [] sql = f""" SELECT vl1.vehicle_type, COUNT(DISTINCT vl1.plate) as unique_vehicles, COUNT(*) as total_detections, COUNT(DISTINCT vl1.date) as days_active, ROUND(AVG(vl1.vehicle_conf), 3) as avg_confidence, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage FROM vehicle_logs vl1 INNER JOIN vehicle_logs vl2 ON vl1.plate = vl2.plate AND vl1.state = vl2.state AND vl2.timestamp > vl1.timestamp WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%' AND LOWER(vl2.location) LIKE '%{location_to.lower()}%' GROUP BY vl1.vehicle_type ORDER BY COUNT(*) DESC; """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 30000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Vehicle Type Route Analysis Error: {e}") return [] def get_suspicious_route_patterns(min_visits=5, min_locations=3): """ Identify potentially suspicious vehicles based on route patterns. Shows vehicles that visit multiple locations frequently. """ try: if engine is None: print("⚠️ Database not available") return [] sql = f""" SELECT plate, state, vehicle_type, COUNT(*) as total_detections, COUNT(DISTINCT location) as locations_visited, COUNT(DISTINCT date) as days_active, COUNT(DISTINCT hour) as peak_hours, MIN(timestamp) as first_detected, MAX(timestamp) as last_detected FROM vehicle_logs GROUP BY plate, state, vehicle_type HAVING COUNT(*) >= {min_visits} AND COUNT(DISTINCT location) >= {min_locations} ORDER BY COUNT(DISTINCT location) DESC, COUNT(*) DESC LIMIT 50; """ with engine.connect() as conn: conn.execute(text("SET statement_timeout = 30000")) result = conn.execute(text(sql)) rows = [dict(r._mapping) for r in result] return rows except Exception as e: print(f"❌ Suspicious Route Pattern Error: {e}") return []