Spaces:
Running
Running
barathvasan-dev
Fix: All filter conditions working correctly - 49/49 tests passed (100% validation)
739be4a | # ========================================================= | |
| # ULTRA ADVANCED HYBRID NLP TO SQL ENGINE | |
| # PROFESSIONAL MULTI-FILTER ENGINE | |
| # MISTRAL / SQLCODER READY | |
| # ========================================================= | |
| import re | |
| import traceback | |
| import os | |
| import signal | |
| from contextlib import contextmanager | |
| from huggingface_hub import InferenceClient | |
| from dotenv import load_dotenv | |
| from sqlalchemy import create_engine, text, pool | |
| # ========================================================= | |
| # TIMEOUT HANDLER | |
| # ========================================================= | |
| def timeout(seconds): | |
| """Context manager for timeout on Windows""" | |
| def handler(signum, frame): | |
| raise TimeoutError("Operation timed out") | |
| # Note: signal.alarm only works on Unix, so we'll catch exceptions instead | |
| try: | |
| yield | |
| except TimeoutError: | |
| raise | |
| # ========================================================= | |
| # ENVIRONMENT SETUP | |
| # ========================================================= | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| DATABASE_URL = os.getenv("DATABASE_URL") | |
| # Initialize Mistral client with timeout | |
| client = None | |
| try: | |
| if HF_TOKEN: | |
| try: | |
| client = InferenceClient( | |
| model="mistralai/Mistral-7B-Instruct-v0.2", | |
| token=HF_TOKEN, | |
| timeout=10.0 # 10 second timeout | |
| ) | |
| print("✅ Mistral client initialized") | |
| except Exception as e: | |
| print(f"⚠️ Mistral client initialization timeout/error: {e}") | |
| client = None | |
| else: | |
| print("⚠️ HF_TOKEN not set - LLM features disabled") | |
| except Exception as e: | |
| print(f"⚠️ Mistral client error: {e}") | |
| client = None | |
| # Initialize database engine with timeout | |
| engine = None | |
| try: | |
| if DATABASE_URL: | |
| try: | |
| # PostgreSQL URL format: postgresql://user:password@host:port/database | |
| # Add connection options to the URL if needed | |
| db_url = DATABASE_URL | |
| if "?" not in db_url: | |
| db_url += "?connect_timeout=5" | |
| engine = create_engine( | |
| db_url, | |
| poolclass=pool.NullPool, # Disable connection pooling | |
| pool_pre_ping=True, # Test connections before using | |
| echo=False | |
| ) | |
| # Test connection | |
| try: | |
| with engine.connect() as conn: | |
| conn.execute(text("SELECT 1")) | |
| print("✅ Database connection initialized") | |
| except Exception as conn_err: | |
| print(f"⚠️ Database connection warning (may retry later): {conn_err}") | |
| # Keep engine even if initial connection fails | |
| except Exception as e: | |
| print(f"⚠️ Database engine creation error: {e}") | |
| engine = None | |
| else: | |
| print("⚠️ DATABASE_URL not set - Database features disabled") | |
| except Exception as e: | |
| print(f"⚠️ Database connection warning: {e}") | |
| engine = None | |
| # ========================================================= | |
| # HELPER: Safe Database Execution | |
| # ========================================================= | |
| def safe_db_query(query_func): | |
| """Decorator to safely execute database queries with None check""" | |
| def wrapper(*args, **kwargs): | |
| if engine is None: | |
| print(f"⚠️ Database engine not available for {query_func.__name__}") | |
| # Return appropriate empty result | |
| return [] if 'get_' in query_func.__name__ else None | |
| try: | |
| return query_func(*args, **kwargs) | |
| except Exception as e: | |
| print(f"❌ Database query error in {query_func.__name__}: {e}") | |
| return [] if 'get_' in query_func.__name__ else None | |
| return wrapper | |
| # ========================================================= | |
| # CONFIG | |
| # ========================================================= | |
| USE_LLM = True | |
| # ========================================================= | |
| # DATABASE KNOWLEDGE | |
| # ========================================================= | |
| SCHEMA = { | |
| "table": "vehicle_logs", | |
| "columns": [ | |
| "timestamp", | |
| "plate", | |
| "state", | |
| "vehicle_type", | |
| "vehicle_conf", | |
| "camera_id", | |
| "location", | |
| "date", | |
| "hour", | |
| "day" | |
| ] | |
| } | |
| VALID_STATES = { | |
| "tn": "TN", | |
| "tamil nadu": "TN", | |
| "ka": "KA", | |
| "karnataka": "KA", | |
| "kl": "KL", | |
| "kerala": "KL", | |
| "ap": "AP", | |
| "andhra": "AP", | |
| "ts": "TS", | |
| "telangana": "TS", | |
| "mh": "MH", | |
| "maharashtra": "MH", | |
| "dl": "DL", | |
| "delhi": "DL", | |
| "gj": "GJ", | |
| "gujarat": "GJ", | |
| "rj": "RJ", | |
| "rajasthan": "RJ", | |
| "up": "UP", | |
| "uttar pradesh": "UP", | |
| "wb": "WB", | |
| "west bengal": "WB", | |
| "hr": "HR", | |
| "haryana": "HR", | |
| "pb": "PB", | |
| "punjab": "PB" | |
| } | |
| KNOWN_LOCATIONS = [ | |
| "adyar", | |
| "guindy", | |
| "velachery", | |
| "besantnagar", | |
| "besant nagar", | |
| "thiruvanmiyur", | |
| "tnagar", | |
| "t nagar", | |
| "mylapore", | |
| "annanagar", | |
| "anna nagar", | |
| "koyambedu", | |
| "nungambakkam", | |
| "kotturpuram" | |
| ] | |
| VEHICLE_TYPES = [ | |
| "suv", | |
| "bus", | |
| "truck", | |
| "bike", | |
| "auto", | |
| "taxi", | |
| "car", | |
| "jeep", | |
| "sedan" | |
| ] | |
| # ========================================================= | |
| # SQL CLEANER | |
| # ========================================================= | |
| def clean_sql(sql): | |
| sql = sql.replace("```sql", "") | |
| sql = sql.replace("```", "") | |
| sql = sql.strip() | |
| if not sql.endswith(";"): | |
| sql += ";" | |
| return sql | |
| # ========================================================= | |
| # SQL VALIDATOR (IMPROVED) | |
| # ========================================================= | |
| def validate_sql(sql): | |
| """Validate SQL for safety. Allows JOINs and UNIONs for route tracking.""" | |
| blocked = [ | |
| "DROP", | |
| "DELETE", | |
| "UPDATE", | |
| "INSERT", | |
| "ALTER", | |
| "CREATE", | |
| "TRUNCATE", | |
| # Removed JOIN and UNION - needed for route tracking | |
| ] | |
| upper = sql.upper() | |
| # Check for blocked commands | |
| for word in blocked: | |
| if word in upper: | |
| return False | |
| # Must be SELECT query | |
| if not upper.startswith("SELECT"): | |
| return False | |
| # Must reference vehicle_logs | |
| if "VEHICLE_LOGS" not in upper and "VL1" not in upper and "VL2" not in upper: | |
| return False | |
| return True | |
| # ========================================================= | |
| # PRODUCTION-GRADE HYBRID NLP ENGINE | |
| # Advanced multi-filter, date-range, time-range support | |
| # ========================================================= | |
| class FilterExtractor: | |
| """ | |
| Production-grade filter extraction engine for complex real-world queries. | |
| Handles multi-filter extraction, date ranges, time ranges, and advanced aggregations. | |
| """ | |
| def __init__(self): | |
| # ===== VEHICLE TYPE SYNONYMS ===== | |
| self.vehicle_synonyms = { | |
| # Cars | |
| "car": "car", "cars": "car", "sedan": "car", "sedans": "car", | |
| "compact": "car", "compacts": "car", "hatchback": "car", | |
| # SUVs | |
| "suv": "suv", "suvs": "suv", "crossover": "suv", | |
| # Trucks | |
| "truck": "truck", "trucks": "truck", "lorry": "truck", "lorries": "truck", | |
| "heavy": "truck", "hgv": "truck", | |
| # Buses | |
| "bus": "bus", "buses": "bus", "coach": "bus", "shuttle": "bus", | |
| # Bikes | |
| "bike": "bike", "bikes": "bike", "motorcycle": "bike", | |
| "motorcycles": "bike", "motorbike": "bike", "two-wheeler": "bike", | |
| # Autos | |
| "auto": "auto", "autos": "auto", "autorickshaw": "auto", | |
| "auto-rickshaw": "auto", "tuk-tuk": "auto", | |
| # Jeeps | |
| "jeep": "jeep", "jeeps": "jeep", "4x4": "jeep", | |
| # Taxis | |
| "taxi": "taxi", "taxis": "taxi", "cab": "taxi", "cabs": "taxi" | |
| } | |
| # ===== DAY MAPPINGS ===== | |
| self.day_map = { | |
| "monday": "Monday", "tuesday": "Tuesday", "wednesday": "Wednesday", | |
| "thursday": "Thursday", "friday": "Friday", | |
| "saturday": "Saturday", "sunday": "Sunday", | |
| "weekend": ["Saturday", "Sunday"], | |
| "weekday": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"] | |
| } | |
| # ===== LOCATION VARIANTS ===== | |
| self.location_variants = { | |
| "adyar": ["adyar"], | |
| "besant nagar": ["besant", "besant nagar", "besantnagar"], | |
| "t nagar": ["t nagar", "tnagar", "t-nagar"], | |
| "anna nagar": ["anna", "anna nagar", "annanagar"], | |
| "velachery": ["velachery"], | |
| "guindy": ["guindy"], | |
| "thiruvanmiyur": ["thiruvanmiyur", "mylapore"], | |
| "mylapore": ["mylapore"], | |
| "koyambedu": ["koyambedu"], | |
| "nungambakkam": ["nungambakkam", "nungam"], | |
| "kotturpuram": ["kotturpuram"] | |
| } | |
| # ===== STATE MAPPINGS ===== | |
| self.state_map = { | |
| "tn": "TN", "tamil": "TN", "tamil nadu": "TN", | |
| "ka": "KA", "karnataka": "KA", | |
| "kl": "KL", "kerala": "KL", | |
| "ap": "AP", "andhra": "AP", "andhra pradesh": "AP", | |
| "ts": "TS", "telangana": "TS", | |
| "mh": "MH", "maharashtra": "MH", | |
| "dl": "DL", "delhi": "DL", | |
| "gj": "GJ", "gujarat": "GJ", | |
| "rj": "RJ", "rajasthan": "RJ", | |
| "up": "UP", "uttar pradesh": "UP", "uttar": "UP", | |
| "wb": "WB", "west bengal": "WB", | |
| "hr": "HR", "haryana": "HR", | |
| "pb": "PB", "punjab": "PB" | |
| } | |
| # ===== TIME PERIOD MAPPINGS ===== | |
| self.time_periods = { | |
| "morning": (5, 12), # 5 AM to 12 PM | |
| "afternoon": (12, 17), # 12 PM to 5 PM | |
| "evening": (17, 21), # 5 PM to 9 PM | |
| "night": (21, 24), # 9 PM to 12 AM | |
| "peak": (8, 10), # Peak traffic (8-10 AM) | |
| "rush": (8, 10), # Rush hour (8-10 AM) | |
| "midnight": (0, 4) # Midnight (0-4 AM) | |
| } | |
| # ===== EXTRACTION METHODS ===== | |
| def extract_plate(self, query): | |
| """Extract license plate number from query""" | |
| # Standard Indian plate format: XX00XX0000 | |
| match = re.search(r'\b([A-Z]{2}\d{1,2}[A-Z]{1,3}\d{3,4})\b', query.upper()) | |
| return match.group(1) if match else None | |
| def extract_state(self, query): | |
| """Extract state code from query""" | |
| q = query.lower() | |
| for key, state_code in self.state_map.items(): | |
| # Use word boundaries to avoid partial matches | |
| if re.search(r'\b' + key + r'\b', q): | |
| return state_code | |
| return None | |
| def extract_location(self, query): | |
| """Extract SINGLE location with variant matching (legacy method)""" | |
| q = query.lower() | |
| # Sort by length (longest first) to match longer variants first | |
| for canonical, variants in sorted( | |
| self.location_variants.items(), | |
| key=lambda x: max(len(v) for v in x[1]), | |
| reverse=True | |
| ): | |
| for variant in variants: | |
| if variant in q: | |
| return canonical | |
| return None | |
| def extract_locations(self, query): | |
| """Extract MULTIPLE locations from query (e.g., 'adyar and kottupuram')""" | |
| q = query.lower() | |
| locations = [] | |
| # Sort by length (longest first) to match longer variants first | |
| for canonical, variants in sorted( | |
| self.location_variants.items(), | |
| key=lambda x: max(len(v) for v in x[1]), | |
| reverse=True | |
| ): | |
| for variant in variants: | |
| if variant in q: | |
| locations.append(canonical) | |
| break | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_locations = [] | |
| for loc in locations: | |
| if loc not in seen: | |
| seen.add(loc) | |
| unique_locations.append(loc) | |
| return unique_locations if unique_locations else None | |
| def extract_vehicle_type(self, query): | |
| """Extract SINGLE vehicle type with synonym resolution (legacy method)""" | |
| q = query.lower() | |
| # Sort by length (longest first) to match longer synonyms first | |
| for synonym in sorted(self.vehicle_synonyms.keys(), key=len, reverse=True): | |
| if re.search(r'\b' + synonym + r'\b', q): | |
| return self.vehicle_synonyms[synonym] | |
| return None | |
| def extract_vehicle_types(self, query): | |
| """Extract MULTIPLE vehicle types from query (e.g., 'bike and mini_truck')""" | |
| q = query.lower() | |
| vehicle_types = [] | |
| # Sort by length (longest first) to match longer synonyms first | |
| for synonym in sorted(self.vehicle_synonyms.keys(), key=len, reverse=True): | |
| if re.search(r'\b' + synonym + r'\b', q): | |
| normalized = self.vehicle_synonyms[synonym] | |
| vehicle_types.append(normalized) | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_types = [] | |
| for vtype in vehicle_types: | |
| if vtype not in seen: | |
| seen.add(vtype) | |
| unique_types.append(vtype) | |
| return unique_types if unique_types else None | |
| def extract_date_range(self, query): | |
| """Extract date range (from X to Y, between X and Y)""" | |
| # Pattern: "from DD-MM-YYYY to DD-MM-YYYY" or "between DD-MM-YYYY and DD-MM-YYYY" | |
| patterns = [ | |
| r'from\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})\s+to\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})', | |
| r'between\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})\s+and\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})', | |
| r'from\s+(\d{4}-\d{2}-\d{2})\s+to\s+(\d{4}-\d{2}-\d{2})', | |
| r'between\s+(\d{4}-\d{2}-\d{2})\s+and\s+(\d{4}-\d{2}-\d{2})' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, query, re.IGNORECASE) | |
| if match: | |
| groups = match.groups() | |
| if len(groups) == 6: # DD-MM-YYYY format | |
| start = f"{groups[2]}-{groups[1].zfill(2)}-{groups[0].zfill(2)}" | |
| end = f"{groups[5]}-{groups[4].zfill(2)}-{groups[3].zfill(2)}" | |
| return {"start": start, "end": end} | |
| elif len(groups) == 2: # YYYY-MM-DD format | |
| return {"start": groups[0], "end": groups[1]} | |
| return None | |
| def extract_date(self, query): | |
| """Extract single date and normalize format""" | |
| # YYYY-MM-DD format | |
| match = re.search(r'\d{4}-\d{2}-\d{2}', query) | |
| if match: | |
| return match.group(0) | |
| # DD-MM-YYYY or DD/MM/YYYY format | |
| match = re.search(r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', query) | |
| if match: | |
| day, month, year = match.groups() | |
| return f"{year}-{month.zfill(2)}-{day.zfill(2)}" | |
| return None | |
| def extract_time_range(self, query): | |
| """Extract time range (after X, before X, between X and Y)""" | |
| q = query.lower() | |
| # Check for time period keywords first (morning, afternoon, evening, night) | |
| for period, (start_hour, end_hour) in self.time_periods.items(): | |
| if period in q: | |
| return {"start": start_hour, "end": end_hour} | |
| # Pattern: "after HH:MM" or "after HH AM/PM" | |
| after_match = re.search(r'after\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)?', q) | |
| if after_match: | |
| hour = int(after_match.group(1)) | |
| period = after_match.group(3) | |
| if period and period == "pm" and hour != 12: | |
| hour += 12 | |
| elif period and period == "am" and hour == 12: | |
| hour = 0 | |
| return {"start": hour, "end": 23} | |
| # Pattern: "before HH:MM" or "before HH AM/PM" | |
| before_match = re.search(r'before\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)?', q) | |
| if before_match: | |
| hour = int(before_match.group(1)) | |
| period = before_match.group(3) | |
| if period and period == "pm" and hour != 12: | |
| hour += 12 | |
| elif period and period == "am" and hour == 12: | |
| hour = 0 | |
| return {"start": 0, "end": hour} | |
| # Pattern: "between HH AM/PM and HH AM/PM" | |
| between_match = re.search( | |
| r'between\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)\s+and\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)', | |
| q | |
| ) | |
| if between_match: | |
| hour1 = int(between_match.group(1)) | |
| period1 = between_match.group(3) | |
| if period1 == "pm" and hour1 != 12: | |
| hour1 += 12 | |
| elif period1 == "am" and hour1 == 12: | |
| hour1 = 0 | |
| hour2 = int(between_match.group(4)) | |
| period2 = between_match.group(6) | |
| if period2 == "pm" and hour2 != 12: | |
| hour2 += 12 | |
| elif period2 == "am" and hour2 == 12: | |
| hour2 = 0 | |
| return {"start": min(hour1, hour2), "end": max(hour1, hour2)} | |
| return None | |
| def extract_hour(self, query): | |
| """Extract single hour""" | |
| # Don't match if this is part of a time range | |
| if any(k in query.lower() for k in ["between", "from", "to", "after", "before"]): | |
| return None | |
| match = re.search(r'(\d{1,2}):?(\d{0,2})\s*(am|pm)?', query.lower()) | |
| if match: | |
| hour = int(match.group(1)) | |
| period = match.group(3) | |
| if period == "pm" and hour != 12: | |
| hour += 12 | |
| elif period == "am" and hour == 12: | |
| hour = 0 | |
| return hour if 0 <= hour < 24 else None | |
| return None | |
| def extract_day(self, query): | |
| """Extract day of week""" | |
| q = query.lower() | |
| for day_key, day_values in self.day_map.items(): | |
| if day_key in q: | |
| return day_values | |
| return None | |
| def extract_confidence(self, query): | |
| """Extract confidence threshold - MUST have confidence/conf/accuracy keyword""" | |
| # Only match if explicit confidence/conf/accuracy keyword is present | |
| match = re.search(r'(\d+(?:\.\d+)?)\s*(?:confidence|conf|accuracy)\b', query.lower()) | |
| if match: | |
| conf = float(match.group(1)) | |
| # Normalize to 0-1 if given as percentage | |
| if conf > 1: | |
| conf = conf / 100 | |
| return conf if 0 <= conf <= 1 else None | |
| return None | |
| def extract_route(self, query): | |
| """Extract route/path pattern (from location1 to location2 or location1 to location2 pass through)""" | |
| q = query.lower() | |
| # Don't process if this looks like a date range (contains DD-MM-YYYY or YYYY-MM-DD) | |
| if re.search(r'\d{1,2}[-/]\d{1,2}[-/]\d{4}', q) or re.search(r'\d{4}-\d{2}-\d{2}', q): | |
| return None | |
| # Strict patterns only: requires explicit route keywords or location names | |
| route_patterns = [ | |
| r'(?:traveling|pass|going)\s+(?:from|through)\s+(\w+(?:\s+\w+)?)\s+(?:to|through)\s+(\w+(?:\s+\w+)?)', | |
| r'(?:pass\s+from)\s+(\w+(?:\s+\w+)?)\s+(?:to|through)\s+(\w+(?:\s+\w+)?)', | |
| r'(?:traveling\s+from)\s+(\w+(?:\s+\w+)?)\s+(?:to)\s+(\w+(?:\s+\w+)?)', | |
| ] | |
| for pattern in route_patterns: | |
| match = re.search(pattern, q) | |
| if match: | |
| loc1_raw = match.group(1).strip() | |
| loc2_raw = match.group(2).strip() | |
| # Map to canonical locations | |
| loc1 = None | |
| loc2 = None | |
| # Find canonical location names (longest match first) | |
| for canonical, variants in sorted( | |
| self.location_variants.items(), | |
| key=lambda x: max(len(v) for v in x[1]), | |
| reverse=True | |
| ): | |
| for variant in variants: | |
| if variant in loc1_raw and not loc1: | |
| loc1 = canonical | |
| if variant in loc2_raw and not loc2: | |
| loc2 = canonical | |
| if loc1 and loc2 and loc1 != loc2: | |
| return {"from": loc1, "to": loc2} | |
| return None | |
| def extract_filters(self, query): | |
| """Extract ALL filters simultaneously from query""" | |
| return { | |
| "plate": self.extract_plate(query), | |
| "state": self.extract_state(query), | |
| "location": self.extract_locations(query), # Now returns list or None | |
| "vehicle_type": self.extract_vehicle_types(query), # Now returns list or None | |
| "date": self.extract_date(query), | |
| "date_range": self.extract_date_range(query), | |
| "day": self.extract_day(query), | |
| "hour": self.extract_hour(query), | |
| "time_range": self.extract_time_range(query), | |
| "confidence": self.extract_confidence(query), | |
| "route": self.extract_route(query) # New: route extraction | |
| } | |
| def detect_intents(self, query): | |
| """Detect advanced query intents (IMPROVED - strict route detection)""" | |
| q = query.lower() | |
| # ROUTE TRACKING: Only if explicit route keywords present (not just "from...to" for dates) | |
| # Require: traveling/pass/pass through/pass from with location-like names | |
| has_route_keyword = any(k in q for k in [ | |
| "traveling", "travel from", "pass from", "pass through", | |
| "went from", "go from", "route", "path", "journey" | |
| ]) | |
| # Must have location-like context after the route keywords | |
| has_route_context = False | |
| if has_route_keyword: | |
| # Check if actual location names appear near route keywords | |
| for location in self.location_variants.keys(): | |
| if location in q: | |
| has_route_context = True | |
| break | |
| return { | |
| "tracking": any(k in q for k in ["track", "history", "movement", "travel", "route", "where", "location", "show"]), | |
| "count": any(k in q for k in ["count", "how many", "total", "number of"]), | |
| "analytics": any(k in q for k in ["analytics", "analysis", "statistics", "distribution"]), | |
| "top": any(k in q for k in ["top", "most", "leading"]), | |
| "latest": any(k in q for k in ["latest", "recent", "last", "new"]), | |
| "hourly": any(k in q for k in ["hourly", "by hour", "per hour"]), | |
| "daily": any(k in q for k in ["daily", "by day", "per day"]), | |
| "location_based": any(k in q for k in ["by location", "density", "traffic"]), | |
| "suspicious": any(k in q for k in ["suspicious", "repeated", "multiple", "across"]), | |
| "aggregation": any(k in q for k in ["group", "aggregate", "sum", "average"]), | |
| "route_tracking": has_route_keyword and has_route_context # STRICT: require both keyword AND location | |
| } | |
| def build_sql(self, filters, intents): | |
| """ | |
| Build production-grade SQL from filters and intents (IMPROVED). | |
| Handles complex aggregations, date ranges, time ranges, multiple vehicles, and multiple locations. | |
| Now supports all filter combinations without failures. | |
| """ | |
| try: | |
| # ========================================================= | |
| # ANALYTICS QUERIES (priority over other queries) | |
| # ========================================================= | |
| if intents["top"] or (intents["analytics"] and "top" in " ".join([k for k in intents.keys() if intents[k]])): | |
| return clean_sql(""" | |
| SELECT plate, state, COUNT(*) as detections | |
| FROM vehicle_logs | |
| GROUP BY plate, state | |
| ORDER BY detections DESC | |
| LIMIT 20; | |
| """) | |
| if intents["hourly"] and intents["analytics"]: | |
| return clean_sql(""" | |
| SELECT hour, COUNT(*) as traffic | |
| FROM vehicle_logs | |
| GROUP BY hour | |
| ORDER BY hour; | |
| """) | |
| if intents["location_based"] and intents["analytics"]: | |
| return clean_sql(""" | |
| SELECT location, COUNT(*) as count | |
| FROM vehicle_logs | |
| WHERE location IS NOT NULL | |
| GROUP BY location | |
| ORDER BY count DESC | |
| LIMIT 20; | |
| """) | |
| if intents["suspicious"]: | |
| return clean_sql(""" | |
| SELECT plate, state, COUNT(*) as detections, | |
| COUNT(DISTINCT location) as locations, | |
| COUNT(DISTINCT date) as days | |
| FROM vehicle_logs | |
| GROUP BY plate, state | |
| HAVING COUNT(*) > 5 | |
| ORDER BY detections DESC | |
| LIMIT 20; | |
| """) | |
| except Exception as e: | |
| print(f"⚠️ Analytics query generation error: {e}") | |
| # Fallback to basic query | |
| return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 100;") | |
| # ========================================================= | |
| # ROUTE TRACKING QUERIES (vehicles that passed through locations) | |
| # ========================================================= | |
| if intents["route_tracking"] and filters["route"]: | |
| try: | |
| route = filters["route"] | |
| loc_from = route["from"] | |
| loc_to = route["to"] | |
| # Build state filter if present | |
| state_filter = "" | |
| if filters["state"]: | |
| state_filter = f"AND vl1.state = '{filters['state']}'" | |
| # Build vehicle type filter | |
| vehicle_type_filter = "" | |
| if filters["vehicle_type"]: | |
| if isinstance(filters["vehicle_type"], list): | |
| vehicle_types = "', '".join(filters["vehicle_type"]) | |
| vehicle_type_filter = f"AND LOWER(vl1.vehicle_type) IN ('{vehicle_types}')" | |
| else: | |
| vehicle_type_filter = f"AND LOWER(vl1.vehicle_type) LIKE '%{filters['vehicle_type'].lower()}%'" | |
| # Build date filter if present | |
| date_filter = "" | |
| if filters["date_range"]: | |
| start = filters["date_range"]["start"] | |
| end = filters["date_range"]["end"] | |
| date_filter = f"AND vl1.date BETWEEN '{start}' AND '{end}'" | |
| elif filters["date"]: | |
| date_filter = f"AND vl1.date = '{filters['date']}'" | |
| # Build time range filter if present | |
| time_filter = "" | |
| if filters["time_range"]: | |
| start = filters["time_range"]["start"] | |
| end = filters["time_range"]["end"] | |
| if start < end: | |
| time_filter = f"AND vl1.hour BETWEEN {start} AND {end}" | |
| else: | |
| time_filter = f"AND (vl1.hour >= {start} OR vl1.hour <= {end})" | |
| elif filters["hour"] is not None: | |
| time_filter = f"AND vl1.hour = {filters['hour']}" | |
| # Query to find vehicles that traveled from location1 to location2 | |
| return clean_sql(f""" | |
| SELECT | |
| vl1.plate, | |
| vl1.state, | |
| vl1.vehicle_type, | |
| COUNT(DISTINCT vl1.timestamp) as visits_in_from_location, | |
| COUNT(DISTINCT vl2.timestamp) as visits_in_to_location, | |
| COUNT(DISTINCT vl1.date) as days_active, | |
| MIN(vl1.timestamp) as first_seen_from, | |
| MAX(vl1.timestamp) as last_seen_from, | |
| MIN(vl2.timestamp) as first_seen_to, | |
| MAX(vl2.timestamp) as last_seen_to | |
| FROM vehicle_logs vl1 | |
| INNER JOIN vehicle_logs vl2 | |
| ON vl1.plate = vl2.plate | |
| AND vl1.state = vl2.state | |
| AND vl2.timestamp > vl1.timestamp | |
| WHERE LOWER(vl1.location) LIKE '%{loc_from.lower()}%' | |
| AND LOWER(vl2.location) LIKE '%{loc_to.lower()}%' | |
| {state_filter} | |
| {vehicle_type_filter} | |
| {date_filter} | |
| {time_filter} | |
| GROUP BY vl1.plate, vl1.state, vl1.vehicle_type | |
| ORDER BY visits_in_from_location DESC, visits_in_to_location DESC | |
| LIMIT 100; | |
| """) | |
| except Exception as e: | |
| print(f"⚠️ Route tracking query generation error: {e}") | |
| # Fallback to basic location query | |
| return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 100;") | |
| # ========================================================= | |
| # BUILD WHERE CLAUSE FROM FILTERS (HANDLE MULTIPLE VALUES) | |
| # ========================================================= | |
| where_conditions = [] | |
| # Plate filter | |
| if filters["plate"]: | |
| where_conditions.append(f"plate = '{filters['plate']}'") | |
| # State filter | |
| if filters["state"]: | |
| where_conditions.append(f"state = '{filters['state']}'") | |
| # Location filter - HANDLE MULTIPLE LOCATIONS | |
| if filters["location"]: | |
| if isinstance(filters["location"], list): | |
| # Multiple locations with OR logic | |
| location_conditions = [ | |
| f"LOWER(location) LIKE '%{loc.lower()}%'" | |
| for loc in filters["location"] | |
| ] | |
| where_conditions.append(f"({' OR '.join(location_conditions)})") | |
| else: | |
| # Single location (legacy) | |
| where_conditions.append(f"LOWER(location) LIKE '%{filters['location'].lower()}%'") | |
| # Vehicle type filter - HANDLE MULTIPLE VEHICLE TYPES | |
| if filters["vehicle_type"]: | |
| if isinstance(filters["vehicle_type"], list): | |
| # Multiple vehicle types with OR logic | |
| vehicle_conditions = [ | |
| f"LOWER(vehicle_type) LIKE '%{vtype.lower()}%'" | |
| for vtype in filters["vehicle_type"] | |
| ] | |
| where_conditions.append(f"({' OR '.join(vehicle_conditions)})") | |
| else: | |
| # Single vehicle type (legacy) | |
| where_conditions.append(f"LOWER(vehicle_type) LIKE '%{filters['vehicle_type'].lower()}%'") | |
| # Date range filter | |
| if filters["date_range"]: | |
| start = filters["date_range"]["start"] | |
| end = filters["date_range"]["end"] | |
| where_conditions.append(f"date BETWEEN '{start}' AND '{end}'") | |
| elif filters["date"]: | |
| where_conditions.append(f"date = '{filters['date']}'") | |
| # Day filter | |
| if filters["day"]: | |
| if isinstance(filters["day"], list): | |
| day_conditions = [f"day = '{d}'" for d in filters["day"]] | |
| where_conditions.append(f"({' OR '.join(day_conditions)})") | |
| else: | |
| where_conditions.append(f"day = '{filters['day']}'") | |
| # Time range filter | |
| if filters["time_range"]: | |
| start = filters["time_range"]["start"] | |
| end = filters["time_range"]["end"] | |
| if start < end: | |
| where_conditions.append(f"hour BETWEEN {start} AND {end}") | |
| else: # Handles ranges like 9 PM to 4 AM (21 to 4) | |
| where_conditions.append(f"(hour >= {start} OR hour <= {end})") | |
| elif filters["hour"] is not None: | |
| where_conditions.append(f"hour = {filters['hour']}") | |
| # Confidence filter | |
| if filters["confidence"] is not None: | |
| where_conditions.append(f"vehicle_conf >= {filters['confidence']}") | |
| # ========================================================= | |
| # GENERATE FINAL SQL | |
| # ========================================================= | |
| where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" | |
| # Count queries | |
| if intents["count"]: | |
| if filters["plate"]: | |
| sql = f""" | |
| SELECT plate, COUNT(*) as detections | |
| FROM vehicle_logs | |
| WHERE {where_clause} | |
| GROUP BY plate | |
| ORDER BY detections DESC; | |
| """ | |
| else: | |
| sql = f""" | |
| SELECT COUNT(*) as total | |
| FROM vehicle_logs | |
| WHERE {where_clause}; | |
| """ | |
| # Tracking queries (show detailed records) | |
| elif intents["tracking"]: | |
| sql = f""" | |
| SELECT timestamp, plate, state, vehicle_type, location, camera_id, date, hour, day | |
| FROM vehicle_logs | |
| WHERE {where_clause} | |
| ORDER BY timestamp DESC | |
| LIMIT 100; | |
| """ | |
| # Hourly aggregation | |
| elif intents["hourly"]: | |
| sql = f""" | |
| SELECT hour, COUNT(*) as traffic | |
| FROM vehicle_logs | |
| WHERE {where_clause} | |
| GROUP BY hour | |
| ORDER BY hour; | |
| """ | |
| # Location-based aggregation | |
| elif intents["location_based"]: | |
| sql = f""" | |
| SELECT location, COUNT(*) as count | |
| FROM vehicle_logs | |
| WHERE {where_clause} AND location IS NOT NULL | |
| GROUP BY location | |
| ORDER BY count DESC; | |
| """ | |
| # Default: return all matching records | |
| else: | |
| sql = f""" | |
| SELECT * | |
| FROM vehicle_logs | |
| WHERE {where_clause} | |
| ORDER BY timestamp DESC | |
| LIMIT 100; | |
| """ | |
| return clean_sql(sql) | |
| def ask_llm(user_query): | |
| """ | |
| Production-grade hybrid NLP-to-SQL engine. | |
| Handles complex real-world queries with multiple filters, date ranges, time ranges, and aggregations. | |
| Features: | |
| - Multi-filter extraction (plate, state, location, vehicle type, date, time, confidence) | |
| - Route tracking (vehicles passing through multiple locations) | |
| - Date range support (from X to Y) | |
| - Time range support (after X, before X, between X and Y) | |
| - Time period recognition (morning, afternoon, evening, night, peak hour, rush hour) | |
| - Advanced intent detection (tracking, count, analytics, top vehicles, suspicious vehicles, route tracking, etc.) | |
| - Production SQL generation with proper GROUP BY, HAVING, ORDER BY | |
| - Timeout protection | |
| Example queries: | |
| - "show bikes passing through adyar to kottupuram" | |
| - "show buses in adyar from 10-04-2026 to 18-10-2026" | |
| - "show TN cars after 8 PM" | |
| - "show suspicious vehicles detected in more than 5 locations" | |
| - "show traffic density by location" | |
| - "show top 10 most detected vehicles" | |
| - "count bikes between 6 PM and 9 PM" | |
| - "find vehicles that traveled from adyar to mylapore" | |
| """ | |
| try: | |
| # Initialize the advanced filter extractor | |
| extractor = FilterExtractor() | |
| # Extract ALL filters from the query (simultaneous extraction) | |
| filters = extractor.extract_filters(user_query) | |
| # Detect query intents | |
| intents = extractor.detect_intents(user_query) | |
| # Log extracted information for debugging | |
| print(f"\n📊 QUERY ANALYSIS:") | |
| print(f" Extracted Filters:") | |
| print(f" - Plate: {filters['plate']}") | |
| print(f" - State: {filters['state']}") | |
| # Handle multiple locations | |
| if filters['location']: | |
| if isinstance(filters['location'], list): | |
| print(f" - Locations: {', '.join(filters['location'])}") | |
| else: | |
| print(f" - Location: {filters['location']}") | |
| else: | |
| print(f" - Location: None") | |
| # Handle multiple vehicle types | |
| if filters['vehicle_type']: | |
| if isinstance(filters['vehicle_type'], list): | |
| print(f" - Vehicle Types: {', '.join(filters['vehicle_type'])}") | |
| else: | |
| print(f" - Vehicle Type: {filters['vehicle_type']}") | |
| else: | |
| print(f" - Vehicle Type: None") | |
| print(f" - Date: {filters['date']}") | |
| print(f" - Date Range: {filters['date_range']}") | |
| print(f" - Day: {filters['day']}") | |
| print(f" - Hour: {filters['hour']}") | |
| print(f" - Time Range: {filters['time_range']}") | |
| print(f" - Confidence: {filters['confidence']}") | |
| # Log route information if available | |
| if filters['route']: | |
| print(f" - Route: From '{filters['route']['from']}' to '{filters['route']['to']}'") | |
| else: | |
| print(f" - Route: None") | |
| print(f" Detected Intents:") | |
| intent_list = [k for k, v in intents.items() if v] | |
| print(f" - {', '.join(intent_list) if intent_list else 'General query'}") | |
| # Build SQL from filters and intents | |
| sql = extractor.build_sql(filters, intents) | |
| return sql | |
| except Exception as e: | |
| print(f"❌ Filter extraction error: {e}") | |
| traceback.print_exc() | |
| # Fallback to basic query | |
| return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 10;") | |
| # ========================================================= | |
| # QUERY EXECUTION | |
| # ========================================================= | |
| def run_query(user_query): | |
| """Execute NLP-to-SQL query with timeout protection""" | |
| sql = "" | |
| try: | |
| sql = ask_llm(user_query) | |
| print("\n" + "="*40) | |
| print("USER QUERY:") | |
| print(user_query) | |
| print("\nGENERATED SQL:") | |
| print(sql) | |
| print("="*40) | |
| if engine is None: | |
| return { | |
| "query": user_query, | |
| "error": "❌ Database not configured - DATABASE_URL missing", | |
| "sql": sql, | |
| "result": [], | |
| "count": 0 | |
| } | |
| try: | |
| # Execute with timeout protection | |
| with engine.connect() as conn: | |
| # Set statement timeout to 30 seconds | |
| conn.execute(text("SET statement_timeout = 30000")) # 30 seconds | |
| result = conn.execute(text(sql)) | |
| rows = [ | |
| dict(r._mapping) | |
| for r in result | |
| ] | |
| return { | |
| "query": user_query, | |
| "sql": sql, | |
| "count": len(rows), | |
| "result": rows | |
| } | |
| except Exception as query_error: | |
| print(f"❌ Query Execution Error (possible timeout): {query_error}") | |
| return { | |
| "query": user_query, | |
| "error": f"Query timeout or error: {str(query_error)}", | |
| "sql": sql, | |
| "result": [], | |
| "count": 0 | |
| } | |
| except Exception as e: | |
| print(f"❌ Run Query Error: {e}") | |
| traceback.print_exc() | |
| return { | |
| "query": user_query, | |
| "error": str(e), | |
| "sql": sql if sql else "", | |
| "result": [], | |
| "count": 0 | |
| } | |
| # ========================================================= | |
| # DATABASE OPERATIONS | |
| # ========================================================= | |
| def save_detection(plate, state, vehicle_type, vehicle_conf, date, time): | |
| """Save a vehicle detection to the database | |
| Note: The table schema uses timestamp, date, hour, day columns. | |
| The 'time' parameter is extracted to hour for the hour column. | |
| """ | |
| try: | |
| if engine is None: | |
| print("⚠️ Engine not initialized - save_detection skipped") | |
| return False | |
| # Extract hour from time string (HH:MM:SS) | |
| try: | |
| hour = int(time.split(":")[0]) if time else 0 | |
| except: | |
| hour = 0 | |
| # Extract day of week from date (simplified) | |
| from datetime import datetime | |
| try: | |
| dt = datetime.strptime(date, "%Y-%m-%d") | |
| day = dt.strftime("%A") | |
| except: | |
| day = "Unknown" | |
| # Use timestamp for current time, date for the date field, hour for hourly grouping | |
| query = f""" | |
| INSERT INTO vehicle_logs | |
| (plate, state, vehicle_type, vehicle_conf, date, hour, day, timestamp, camera_id, location) | |
| VALUES ('{plate}', '{state}', '{vehicle_type}', {vehicle_conf}, '{date}', {hour}, '{day}', NOW(), 'CAM-01', 'default') | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text(query)) | |
| conn.commit() | |
| print(f"✅ Saved: {plate} from {state} at {time}") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Save Error: {e}") | |
| traceback.print_exc() | |
| return False | |
| def health_check(): | |
| """Check database health with timeout protection""" | |
| try: | |
| if engine is None: | |
| return False, "❌ Database not configured" | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 10000")) # 10 second timeout | |
| result = conn.execute(text("SELECT COUNT(*) FROM vehicle_logs")) | |
| count = result.scalar() | |
| return True, f"✅ Database OK - {count} records" | |
| except Exception as e: | |
| print(f"❌ Health Check Error (timeout?): {e}") | |
| return False, f"❌ Database Error: {str(e)}" | |
| def get_vehicles_by_state(): | |
| """Get vehicle count by state with timeout protection""" | |
| if engine is None: | |
| print("⚠️ Database not available for get_vehicles_by_state") | |
| return [] | |
| try: | |
| sql = """ | |
| SELECT state, COUNT(*) as count | |
| FROM vehicle_logs | |
| GROUP BY state | |
| ORDER BY count DESC | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ State Query Error (timeout?): {e}") | |
| return [] | |
| def get_hourly_traffic(): | |
| """Get traffic by hour with timeout protection""" | |
| if engine is None: | |
| print("⚠️ Database not available for get_hourly_traffic") | |
| return [] | |
| try: | |
| sql = """ | |
| SELECT hour, COUNT(*) as traffic | |
| FROM vehicle_logs | |
| GROUP BY hour | |
| ORDER BY hour | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Hourly Traffic Error (timeout?): {e}") | |
| return [] | |
| def get_top_plates(): | |
| """Get top detected plates with timeout protection""" | |
| if engine is None: | |
| print("⚠️ Database not available for get_top_plates") | |
| return [] | |
| try: | |
| sql = """ | |
| SELECT plate, COUNT(*) as detections | |
| FROM vehicle_logs | |
| GROUP BY plate | |
| ORDER BY detections DESC | |
| LIMIT 20 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Top Plates Error (timeout?): {e}") | |
| return [] | |
| def get_suspicious_vehicles(): | |
| """Get vehicles detected multiple times (potentially suspicious) with timeout protection""" | |
| try: | |
| sql = """ | |
| SELECT plate, state, COUNT(*) as detections, | |
| COUNT(DISTINCT location) as locations, | |
| COUNT(DISTINCT date) as days | |
| FROM vehicle_logs | |
| GROUP BY plate, state | |
| HAVING COUNT(*) > 5 | |
| ORDER BY detections DESC | |
| LIMIT 20 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Suspicious Vehicles Error (timeout?): {e}") | |
| return [] | |
| # ========================================================= | |
| # ADVANCED ANALYTICAL FUNCTIONS | |
| # ========================================================= | |
| def get_route_history(plate, limit=50): | |
| """ | |
| Get route history for a specific vehicle. | |
| Shows all detections in chronological order with locations. | |
| """ | |
| try: | |
| sql = f""" | |
| SELECT timestamp, plate, state, location, camera_id, date, hour, day | |
| FROM vehicle_logs | |
| WHERE plate = '{plate}' | |
| ORDER BY timestamp DESC | |
| LIMIT {limit} | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Route History Error: {e}") | |
| return [] | |
| def get_vehicles_by_location(location): | |
| """Get all vehicles detected in a specific location""" | |
| try: | |
| sql = f""" | |
| SELECT DISTINCT plate, state, COUNT(*) as detections | |
| FROM vehicle_logs | |
| WHERE LOWER(location) LIKE '%{location.lower()}%' | |
| GROUP BY plate, state | |
| ORDER BY detections DESC | |
| LIMIT 50 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Vehicles by Location Error: {e}") | |
| return [] | |
| def get_multi_location_detections(min_locations=2): | |
| """Get vehicles detected across multiple locations (suspicious activity indicator)""" | |
| try: | |
| sql = f""" | |
| SELECT plate, state, COUNT(*) as detections, | |
| COUNT(DISTINCT location) as locations, | |
| COUNT(DISTINCT date) as days | |
| FROM vehicle_logs | |
| GROUP BY plate, state | |
| HAVING COUNT(DISTINCT location) >= {min_locations} | |
| ORDER BY locations DESC, detections DESC | |
| LIMIT 20 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Multi-Location Detection Error: {e}") | |
| return [] | |
| def get_peak_traffic_hours(): | |
| """Identify peak traffic hours based on detections""" | |
| try: | |
| sql = """ | |
| SELECT hour, COUNT(*) as traffic_count, | |
| ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage | |
| FROM vehicle_logs | |
| GROUP BY hour | |
| ORDER BY traffic_count DESC | |
| LIMIT 10 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Peak Traffic Hours Error: {e}") | |
| return [] | |
| def get_vehicle_density_by_location(): | |
| """Get traffic density (vehicle count) by location""" | |
| if engine is None: | |
| print("⚠️ Database not available for get_vehicle_density_by_location") | |
| return [] | |
| try: | |
| sql = """ | |
| SELECT location, COUNT(*) as vehicle_count, | |
| COUNT(DISTINCT plate) as unique_vehicles, | |
| ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage | |
| FROM vehicle_logs | |
| WHERE location IS NOT NULL | |
| GROUP BY location | |
| ORDER BY vehicle_count DESC | |
| LIMIT 20 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Vehicle Density Error: {e}") | |
| return [] | |
| def get_high_confidence_detections(confidence_threshold=0.9): | |
| """Get detections with high confidence scores""" | |
| try: | |
| sql = f""" | |
| SELECT plate, state, vehicle_type, COUNT(*) as detections, | |
| ROUND(AVG(vehicle_conf), 3) as avg_confidence | |
| FROM vehicle_logs | |
| WHERE vehicle_conf >= {confidence_threshold} | |
| GROUP BY plate, state, vehicle_type | |
| ORDER BY avg_confidence DESC, detections DESC | |
| LIMIT 30 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ High Confidence Detections Error: {e}") | |
| return [] | |
| def get_daily_traffic_summary(date=None): | |
| """Get traffic summary for a specific date or today""" | |
| try: | |
| if date: | |
| where_clause = f"WHERE date = '{date}'" | |
| else: | |
| where_clause = "WHERE date = CURDATE()" | |
| sql = f""" | |
| SELECT | |
| COUNT(*) as total_vehicles, | |
| COUNT(DISTINCT plate) as unique_vehicles, | |
| COUNT(DISTINCT location) as locations_covered, | |
| COUNT(DISTINCT hour) as peak_hours | |
| FROM vehicle_logs | |
| {where_clause} | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| row = dict(result.fetchone()._mapping) | |
| return row | |
| except Exception as e: | |
| print(f"❌ Daily Summary Error: {e}") | |
| return {} | |
| def get_state_wise_distribution(): | |
| """Get vehicle distribution across states""" | |
| try: | |
| sql = """ | |
| SELECT state, COUNT(*) as detections, | |
| COUNT(DISTINCT plate) as unique_vehicles, | |
| ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage | |
| FROM vehicle_logs | |
| GROUP BY state | |
| ORDER BY detections DESC | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ State Distribution Error: {e}") | |
| return [] | |
| def query_by_date_range(start_date, end_date, state=None, location=None): | |
| """Query vehicles detected within a date range""" | |
| try: | |
| where_conditions = [f"date BETWEEN '{start_date}' AND '{end_date}'"] | |
| if state: | |
| where_conditions.append(f"state = '{state}'") | |
| if location: | |
| where_conditions.append(f"LOWER(location) LIKE '%{location.lower()}%'") | |
| where_clause = " AND ".join(where_conditions) | |
| sql = f""" | |
| SELECT * | |
| FROM vehicle_logs | |
| WHERE {where_clause} | |
| ORDER BY timestamp DESC | |
| LIMIT 500 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 30000")) # Longer timeout for large ranges | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Date Range Query Error: {e}") | |
| return [] | |
| def query_by_time_range(start_hour, end_hour, location=None, vehicle_type=None): | |
| """Query vehicles detected within a time range (hour of day)""" | |
| try: | |
| where_conditions = [] | |
| if start_hour < end_hour: | |
| where_conditions.append(f"hour BETWEEN {start_hour} AND {end_hour}") | |
| else: # Handles ranges like 9 PM to 4 AM (21 to 4) | |
| where_conditions.append(f"(hour >= {start_hour} OR hour <= {end_hour})") | |
| if location: | |
| where_conditions.append(f"LOWER(location) LIKE '%{location.lower()}%'") | |
| if vehicle_type: | |
| where_conditions.append(f"LOWER(vehicle_type) LIKE '%{vehicle_type.lower()}%'") | |
| where_clause = " AND ".join(where_conditions) | |
| sql = f""" | |
| SELECT * | |
| FROM vehicle_logs | |
| WHERE {where_clause} | |
| ORDER BY timestamp DESC | |
| LIMIT 200 | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Time Range Query Error: {e}") | |
| return [] | |
| # ========================================================= | |
| # ADVANCED ROUTE TRACKING FUNCTIONS | |
| # Track vehicles passing through multiple locations | |
| # ========================================================= | |
| def get_vehicles_route_between_locations(location_from, location_to, vehicle_type=None, limit=100): | |
| """ | |
| Get vehicles that traveled/passed from one location to another. | |
| Shows vehicles detected in location_from and then later in location_to. | |
| Example: get_vehicles_route_between_locations('adyar', 'kottupuram', 'bike') | |
| """ | |
| try: | |
| if engine is None: | |
| print("⚠️ Database not available") | |
| return [] | |
| vehicle_filter = "" | |
| if vehicle_type: | |
| vehicle_filter = f"AND LOWER(vl1.vehicle_type) LIKE '%{vehicle_type.lower()}%'" | |
| sql = f""" | |
| SELECT | |
| vl1.plate, | |
| vl1.state, | |
| vl1.vehicle_type, | |
| COUNT(DISTINCT vl1.timestamp) as detections_in_from_location, | |
| COUNT(DISTINCT vl2.timestamp) as detections_in_to_location, | |
| COUNT(DISTINCT vl1.date) as days_active, | |
| MIN(vl1.timestamp) as first_detected_from, | |
| MAX(vl1.timestamp) as last_detected_from, | |
| MIN(vl2.timestamp) as first_detected_to, | |
| MAX(vl2.timestamp) as last_detected_to | |
| FROM vehicle_logs vl1 | |
| INNER JOIN vehicle_logs vl2 | |
| ON vl1.plate = vl2.plate | |
| AND vl1.state = vl2.state | |
| AND vl2.timestamp > vl1.timestamp | |
| WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%' | |
| AND LOWER(vl2.location) LIKE '%{location_to.lower()}%' | |
| {vehicle_filter} | |
| GROUP BY vl1.plate, vl1.state, vl1.vehicle_type | |
| ORDER BY detections_in_from_location DESC, detections_in_to_location DESC | |
| LIMIT {limit}; | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 30000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Route Tracking Error: {e}") | |
| return [] | |
| def get_vehicle_complete_route_history(plate, state=None): | |
| """ | |
| Get complete route history for a specific vehicle. | |
| Shows all locations visited in chronological order. | |
| """ | |
| try: | |
| if engine is None: | |
| print("⚠️ Database not available") | |
| return [] | |
| state_filter = "" | |
| if state: | |
| state_filter = f"AND state = '{state}'" | |
| sql = f""" | |
| SELECT | |
| timestamp, | |
| plate, | |
| state, | |
| vehicle_type, | |
| location, | |
| camera_id, | |
| date, | |
| hour, | |
| day | |
| FROM vehicle_logs | |
| WHERE plate = '{plate}' | |
| {state_filter} | |
| ORDER BY timestamp ASC; | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 15000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Route History Error: {e}") | |
| return [] | |
| def get_vehicles_passing_through_multiple_locations(locations, vehicle_type=None, min_locations=2): | |
| """ | |
| Get vehicles that passed through multiple specified locations. | |
| Example: get_vehicles_passing_through_multiple_locations(['adyar', 'kottupuram', 'mylapore']) | |
| """ | |
| try: | |
| if engine is None: | |
| print("⚠️ Database not available") | |
| return [] | |
| vehicle_filter = "" | |
| if vehicle_type: | |
| vehicle_filter = f"AND LOWER(vehicle_type) LIKE '%{vehicle_type.lower()}%'" | |
| location_conditions = " OR ".join([ | |
| f"LOWER(location) LIKE '%{loc.lower()}%'" | |
| for loc in locations | |
| ]) | |
| sql = f""" | |
| SELECT | |
| plate, | |
| state, | |
| vehicle_type, | |
| COUNT(*) as total_detections, | |
| COUNT(DISTINCT location) as unique_locations_visited, | |
| COUNT(DISTINCT date) as days_active, | |
| COUNT(DISTINCT hour) as peak_hours, | |
| MIN(timestamp) as first_seen, | |
| MAX(timestamp) as last_seen | |
| FROM vehicle_logs | |
| WHERE ({location_conditions}) | |
| {vehicle_filter} | |
| GROUP BY plate, state, vehicle_type | |
| HAVING COUNT(DISTINCT location) >= {min_locations} | |
| ORDER BY COUNT(DISTINCT location) DESC, COUNT(*) DESC | |
| LIMIT 100; | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 30000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Multi-Location Route Error: {e}") | |
| return [] | |
| def get_location_to_location_traffic_flow(location_from, location_to, date_range=None): | |
| """ | |
| Get traffic flow statistics between two locations. | |
| Shows how many unique vehicles traveled between these locations. | |
| """ | |
| try: | |
| if engine is None: | |
| print("⚠️ Database not available") | |
| return {} | |
| date_filter = "" | |
| if date_range: | |
| start_date, end_date = date_range | |
| date_filter = f"AND vl1.date BETWEEN '{start_date}' AND '{end_date}'" | |
| sql = f""" | |
| SELECT | |
| COUNT(DISTINCT vl1.plate) as unique_vehicles, | |
| COUNT(DISTINCT vl1.vehicle_type) as vehicle_types, | |
| COUNT(DISTINCT vl1.state) as states, | |
| COUNT(*) as total_from_location_detections, | |
| COUNT(DISTINCT vl1.date) as days_active, | |
| ROUND(AVG(vl1.vehicle_conf), 3) as avg_confidence, | |
| MIN(vl1.timestamp) as earliest_traffic, | |
| MAX(vl1.timestamp) as latest_traffic | |
| FROM vehicle_logs vl1 | |
| INNER JOIN vehicle_logs vl2 | |
| ON vl1.plate = vl2.plate | |
| AND vl1.state = vl2.state | |
| AND vl2.timestamp > vl1.timestamp | |
| WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%' | |
| AND LOWER(vl2.location) LIKE '%{location_to.lower()}%' | |
| {date_filter} | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 30000")) | |
| result = conn.execute(text(sql)) | |
| row = result.fetchone() | |
| if row: | |
| return dict(row._mapping) | |
| return {} | |
| except Exception as e: | |
| print(f"❌ Traffic Flow Error: {e}") | |
| return {} | |
| def get_vehicle_type_route_analysis(location_from, location_to): | |
| """ | |
| Analyze which vehicle types travel between two locations most frequently. | |
| """ | |
| try: | |
| if engine is None: | |
| print("⚠️ Database not available") | |
| return [] | |
| sql = f""" | |
| SELECT | |
| vl1.vehicle_type, | |
| COUNT(DISTINCT vl1.plate) as unique_vehicles, | |
| COUNT(*) as total_detections, | |
| COUNT(DISTINCT vl1.date) as days_active, | |
| ROUND(AVG(vl1.vehicle_conf), 3) as avg_confidence, | |
| ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage | |
| FROM vehicle_logs vl1 | |
| INNER JOIN vehicle_logs vl2 | |
| ON vl1.plate = vl2.plate | |
| AND vl1.state = vl2.state | |
| AND vl2.timestamp > vl1.timestamp | |
| WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%' | |
| AND LOWER(vl2.location) LIKE '%{location_to.lower()}%' | |
| GROUP BY vl1.vehicle_type | |
| ORDER BY COUNT(*) DESC; | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 30000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Vehicle Type Route Analysis Error: {e}") | |
| return [] | |
| def get_suspicious_route_patterns(min_visits=5, min_locations=3): | |
| """ | |
| Identify potentially suspicious vehicles based on route patterns. | |
| Shows vehicles that visit multiple locations frequently. | |
| """ | |
| try: | |
| if engine is None: | |
| print("⚠️ Database not available") | |
| return [] | |
| sql = f""" | |
| SELECT | |
| plate, | |
| state, | |
| vehicle_type, | |
| COUNT(*) as total_detections, | |
| COUNT(DISTINCT location) as locations_visited, | |
| COUNT(DISTINCT date) as days_active, | |
| COUNT(DISTINCT hour) as peak_hours, | |
| MIN(timestamp) as first_detected, | |
| MAX(timestamp) as last_detected | |
| FROM vehicle_logs | |
| GROUP BY plate, state, vehicle_type | |
| HAVING COUNT(*) >= {min_visits} AND COUNT(DISTINCT location) >= {min_locations} | |
| ORDER BY COUNT(DISTINCT location) DESC, COUNT(*) DESC | |
| LIMIT 50; | |
| """ | |
| with engine.connect() as conn: | |
| conn.execute(text("SET statement_timeout = 30000")) | |
| result = conn.execute(text(sql)) | |
| rows = [dict(r._mapping) for r in result] | |
| return rows | |
| except Exception as e: | |
| print(f"❌ Suspicious Route Pattern Error: {e}") | |
| return [] | |