plate-detector / database.py
barathvasan-dev
Fix: All filter conditions working correctly - 49/49 tests passed (100% validation)
739be4a
# =========================================================
# ULTRA ADVANCED HYBRID NLP TO SQL ENGINE
# PROFESSIONAL MULTI-FILTER ENGINE
# MISTRAL / SQLCODER READY
# =========================================================
import re
import traceback
import os
import signal
from contextlib import contextmanager
from huggingface_hub import InferenceClient
from dotenv import load_dotenv
from sqlalchemy import create_engine, text, pool
# =========================================================
# TIMEOUT HANDLER
# =========================================================
@contextmanager
def timeout(seconds):
"""Context manager for timeout on Windows"""
def handler(signum, frame):
raise TimeoutError("Operation timed out")
# Note: signal.alarm only works on Unix, so we'll catch exceptions instead
try:
yield
except TimeoutError:
raise
# =========================================================
# ENVIRONMENT SETUP
# =========================================================
load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")
DATABASE_URL = os.getenv("DATABASE_URL")
# Initialize Mistral client with timeout
client = None
try:
if HF_TOKEN:
try:
client = InferenceClient(
model="mistralai/Mistral-7B-Instruct-v0.2",
token=HF_TOKEN,
timeout=10.0 # 10 second timeout
)
print("✅ Mistral client initialized")
except Exception as e:
print(f"⚠️ Mistral client initialization timeout/error: {e}")
client = None
else:
print("⚠️ HF_TOKEN not set - LLM features disabled")
except Exception as e:
print(f"⚠️ Mistral client error: {e}")
client = None
# Initialize database engine with timeout
engine = None
try:
if DATABASE_URL:
try:
# PostgreSQL URL format: postgresql://user:password@host:port/database
# Add connection options to the URL if needed
db_url = DATABASE_URL
if "?" not in db_url:
db_url += "?connect_timeout=5"
engine = create_engine(
db_url,
poolclass=pool.NullPool, # Disable connection pooling
pool_pre_ping=True, # Test connections before using
echo=False
)
# Test connection
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
print("✅ Database connection initialized")
except Exception as conn_err:
print(f"⚠️ Database connection warning (may retry later): {conn_err}")
# Keep engine even if initial connection fails
except Exception as e:
print(f"⚠️ Database engine creation error: {e}")
engine = None
else:
print("⚠️ DATABASE_URL not set - Database features disabled")
except Exception as e:
print(f"⚠️ Database connection warning: {e}")
engine = None
# =========================================================
# HELPER: Safe Database Execution
# =========================================================
def safe_db_query(query_func):
"""Decorator to safely execute database queries with None check"""
def wrapper(*args, **kwargs):
if engine is None:
print(f"⚠️ Database engine not available for {query_func.__name__}")
# Return appropriate empty result
return [] if 'get_' in query_func.__name__ else None
try:
return query_func(*args, **kwargs)
except Exception as e:
print(f"❌ Database query error in {query_func.__name__}: {e}")
return [] if 'get_' in query_func.__name__ else None
return wrapper
# =========================================================
# CONFIG
# =========================================================
USE_LLM = True
# =========================================================
# DATABASE KNOWLEDGE
# =========================================================
SCHEMA = {
"table": "vehicle_logs",
"columns": [
"timestamp",
"plate",
"state",
"vehicle_type",
"vehicle_conf",
"camera_id",
"location",
"date",
"hour",
"day"
]
}
VALID_STATES = {
"tn": "TN",
"tamil nadu": "TN",
"ka": "KA",
"karnataka": "KA",
"kl": "KL",
"kerala": "KL",
"ap": "AP",
"andhra": "AP",
"ts": "TS",
"telangana": "TS",
"mh": "MH",
"maharashtra": "MH",
"dl": "DL",
"delhi": "DL",
"gj": "GJ",
"gujarat": "GJ",
"rj": "RJ",
"rajasthan": "RJ",
"up": "UP",
"uttar pradesh": "UP",
"wb": "WB",
"west bengal": "WB",
"hr": "HR",
"haryana": "HR",
"pb": "PB",
"punjab": "PB"
}
KNOWN_LOCATIONS = [
"adyar",
"guindy",
"velachery",
"besantnagar",
"besant nagar",
"thiruvanmiyur",
"tnagar",
"t nagar",
"mylapore",
"annanagar",
"anna nagar",
"koyambedu",
"nungambakkam",
"kotturpuram"
]
VEHICLE_TYPES = [
"suv",
"bus",
"truck",
"bike",
"auto",
"taxi",
"car",
"jeep",
"sedan"
]
# =========================================================
# SQL CLEANER
# =========================================================
def clean_sql(sql):
sql = sql.replace("```sql", "")
sql = sql.replace("```", "")
sql = sql.strip()
if not sql.endswith(";"):
sql += ";"
return sql
# =========================================================
# SQL VALIDATOR (IMPROVED)
# =========================================================
def validate_sql(sql):
"""Validate SQL for safety. Allows JOINs and UNIONs for route tracking."""
blocked = [
"DROP",
"DELETE",
"UPDATE",
"INSERT",
"ALTER",
"CREATE",
"TRUNCATE",
# Removed JOIN and UNION - needed for route tracking
]
upper = sql.upper()
# Check for blocked commands
for word in blocked:
if word in upper:
return False
# Must be SELECT query
if not upper.startswith("SELECT"):
return False
# Must reference vehicle_logs
if "VEHICLE_LOGS" not in upper and "VL1" not in upper and "VL2" not in upper:
return False
return True
# =========================================================
# PRODUCTION-GRADE HYBRID NLP ENGINE
# Advanced multi-filter, date-range, time-range support
# =========================================================
class FilterExtractor:
"""
Production-grade filter extraction engine for complex real-world queries.
Handles multi-filter extraction, date ranges, time ranges, and advanced aggregations.
"""
def __init__(self):
# ===== VEHICLE TYPE SYNONYMS =====
self.vehicle_synonyms = {
# Cars
"car": "car", "cars": "car", "sedan": "car", "sedans": "car",
"compact": "car", "compacts": "car", "hatchback": "car",
# SUVs
"suv": "suv", "suvs": "suv", "crossover": "suv",
# Trucks
"truck": "truck", "trucks": "truck", "lorry": "truck", "lorries": "truck",
"heavy": "truck", "hgv": "truck",
# Buses
"bus": "bus", "buses": "bus", "coach": "bus", "shuttle": "bus",
# Bikes
"bike": "bike", "bikes": "bike", "motorcycle": "bike",
"motorcycles": "bike", "motorbike": "bike", "two-wheeler": "bike",
# Autos
"auto": "auto", "autos": "auto", "autorickshaw": "auto",
"auto-rickshaw": "auto", "tuk-tuk": "auto",
# Jeeps
"jeep": "jeep", "jeeps": "jeep", "4x4": "jeep",
# Taxis
"taxi": "taxi", "taxis": "taxi", "cab": "taxi", "cabs": "taxi"
}
# ===== DAY MAPPINGS =====
self.day_map = {
"monday": "Monday", "tuesday": "Tuesday", "wednesday": "Wednesday",
"thursday": "Thursday", "friday": "Friday",
"saturday": "Saturday", "sunday": "Sunday",
"weekend": ["Saturday", "Sunday"],
"weekday": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
}
# ===== LOCATION VARIANTS =====
self.location_variants = {
"adyar": ["adyar"],
"besant nagar": ["besant", "besant nagar", "besantnagar"],
"t nagar": ["t nagar", "tnagar", "t-nagar"],
"anna nagar": ["anna", "anna nagar", "annanagar"],
"velachery": ["velachery"],
"guindy": ["guindy"],
"thiruvanmiyur": ["thiruvanmiyur", "mylapore"],
"mylapore": ["mylapore"],
"koyambedu": ["koyambedu"],
"nungambakkam": ["nungambakkam", "nungam"],
"kotturpuram": ["kotturpuram"]
}
# ===== STATE MAPPINGS =====
self.state_map = {
"tn": "TN", "tamil": "TN", "tamil nadu": "TN",
"ka": "KA", "karnataka": "KA",
"kl": "KL", "kerala": "KL",
"ap": "AP", "andhra": "AP", "andhra pradesh": "AP",
"ts": "TS", "telangana": "TS",
"mh": "MH", "maharashtra": "MH",
"dl": "DL", "delhi": "DL",
"gj": "GJ", "gujarat": "GJ",
"rj": "RJ", "rajasthan": "RJ",
"up": "UP", "uttar pradesh": "UP", "uttar": "UP",
"wb": "WB", "west bengal": "WB",
"hr": "HR", "haryana": "HR",
"pb": "PB", "punjab": "PB"
}
# ===== TIME PERIOD MAPPINGS =====
self.time_periods = {
"morning": (5, 12), # 5 AM to 12 PM
"afternoon": (12, 17), # 12 PM to 5 PM
"evening": (17, 21), # 5 PM to 9 PM
"night": (21, 24), # 9 PM to 12 AM
"peak": (8, 10), # Peak traffic (8-10 AM)
"rush": (8, 10), # Rush hour (8-10 AM)
"midnight": (0, 4) # Midnight (0-4 AM)
}
# ===== EXTRACTION METHODS =====
def extract_plate(self, query):
"""Extract license plate number from query"""
# Standard Indian plate format: XX00XX0000
match = re.search(r'\b([A-Z]{2}\d{1,2}[A-Z]{1,3}\d{3,4})\b', query.upper())
return match.group(1) if match else None
def extract_state(self, query):
"""Extract state code from query"""
q = query.lower()
for key, state_code in self.state_map.items():
# Use word boundaries to avoid partial matches
if re.search(r'\b' + key + r'\b', q):
return state_code
return None
def extract_location(self, query):
"""Extract SINGLE location with variant matching (legacy method)"""
q = query.lower()
# Sort by length (longest first) to match longer variants first
for canonical, variants in sorted(
self.location_variants.items(),
key=lambda x: max(len(v) for v in x[1]),
reverse=True
):
for variant in variants:
if variant in q:
return canonical
return None
def extract_locations(self, query):
"""Extract MULTIPLE locations from query (e.g., 'adyar and kottupuram')"""
q = query.lower()
locations = []
# Sort by length (longest first) to match longer variants first
for canonical, variants in sorted(
self.location_variants.items(),
key=lambda x: max(len(v) for v in x[1]),
reverse=True
):
for variant in variants:
if variant in q:
locations.append(canonical)
break
# Remove duplicates while preserving order
seen = set()
unique_locations = []
for loc in locations:
if loc not in seen:
seen.add(loc)
unique_locations.append(loc)
return unique_locations if unique_locations else None
def extract_vehicle_type(self, query):
"""Extract SINGLE vehicle type with synonym resolution (legacy method)"""
q = query.lower()
# Sort by length (longest first) to match longer synonyms first
for synonym in sorted(self.vehicle_synonyms.keys(), key=len, reverse=True):
if re.search(r'\b' + synonym + r'\b', q):
return self.vehicle_synonyms[synonym]
return None
def extract_vehicle_types(self, query):
"""Extract MULTIPLE vehicle types from query (e.g., 'bike and mini_truck')"""
q = query.lower()
vehicle_types = []
# Sort by length (longest first) to match longer synonyms first
for synonym in sorted(self.vehicle_synonyms.keys(), key=len, reverse=True):
if re.search(r'\b' + synonym + r'\b', q):
normalized = self.vehicle_synonyms[synonym]
vehicle_types.append(normalized)
# Remove duplicates while preserving order
seen = set()
unique_types = []
for vtype in vehicle_types:
if vtype not in seen:
seen.add(vtype)
unique_types.append(vtype)
return unique_types if unique_types else None
def extract_date_range(self, query):
"""Extract date range (from X to Y, between X and Y)"""
# Pattern: "from DD-MM-YYYY to DD-MM-YYYY" or "between DD-MM-YYYY and DD-MM-YYYY"
patterns = [
r'from\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})\s+to\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})',
r'between\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})\s+and\s+(\d{1,2})[-/](\d{1,2})[-/](\d{4})',
r'from\s+(\d{4}-\d{2}-\d{2})\s+to\s+(\d{4}-\d{2}-\d{2})',
r'between\s+(\d{4}-\d{2}-\d{2})\s+and\s+(\d{4}-\d{2}-\d{2})'
]
for pattern in patterns:
match = re.search(pattern, query, re.IGNORECASE)
if match:
groups = match.groups()
if len(groups) == 6: # DD-MM-YYYY format
start = f"{groups[2]}-{groups[1].zfill(2)}-{groups[0].zfill(2)}"
end = f"{groups[5]}-{groups[4].zfill(2)}-{groups[3].zfill(2)}"
return {"start": start, "end": end}
elif len(groups) == 2: # YYYY-MM-DD format
return {"start": groups[0], "end": groups[1]}
return None
def extract_date(self, query):
"""Extract single date and normalize format"""
# YYYY-MM-DD format
match = re.search(r'\d{4}-\d{2}-\d{2}', query)
if match:
return match.group(0)
# DD-MM-YYYY or DD/MM/YYYY format
match = re.search(r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', query)
if match:
day, month, year = match.groups()
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return None
def extract_time_range(self, query):
"""Extract time range (after X, before X, between X and Y)"""
q = query.lower()
# Check for time period keywords first (morning, afternoon, evening, night)
for period, (start_hour, end_hour) in self.time_periods.items():
if period in q:
return {"start": start_hour, "end": end_hour}
# Pattern: "after HH:MM" or "after HH AM/PM"
after_match = re.search(r'after\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)?', q)
if after_match:
hour = int(after_match.group(1))
period = after_match.group(3)
if period and period == "pm" and hour != 12:
hour += 12
elif period and period == "am" and hour == 12:
hour = 0
return {"start": hour, "end": 23}
# Pattern: "before HH:MM" or "before HH AM/PM"
before_match = re.search(r'before\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)?', q)
if before_match:
hour = int(before_match.group(1))
period = before_match.group(3)
if period and period == "pm" and hour != 12:
hour += 12
elif period and period == "am" and hour == 12:
hour = 0
return {"start": 0, "end": hour}
# Pattern: "between HH AM/PM and HH AM/PM"
between_match = re.search(
r'between\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)\s+and\s+(\d{1,2}):?(\d{0,2})\s*(am|pm)',
q
)
if between_match:
hour1 = int(between_match.group(1))
period1 = between_match.group(3)
if period1 == "pm" and hour1 != 12:
hour1 += 12
elif period1 == "am" and hour1 == 12:
hour1 = 0
hour2 = int(between_match.group(4))
period2 = between_match.group(6)
if period2 == "pm" and hour2 != 12:
hour2 += 12
elif period2 == "am" and hour2 == 12:
hour2 = 0
return {"start": min(hour1, hour2), "end": max(hour1, hour2)}
return None
def extract_hour(self, query):
"""Extract single hour"""
# Don't match if this is part of a time range
if any(k in query.lower() for k in ["between", "from", "to", "after", "before"]):
return None
match = re.search(r'(\d{1,2}):?(\d{0,2})\s*(am|pm)?', query.lower())
if match:
hour = int(match.group(1))
period = match.group(3)
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
return hour if 0 <= hour < 24 else None
return None
def extract_day(self, query):
"""Extract day of week"""
q = query.lower()
for day_key, day_values in self.day_map.items():
if day_key in q:
return day_values
return None
def extract_confidence(self, query):
"""Extract confidence threshold - MUST have confidence/conf/accuracy keyword"""
# Only match if explicit confidence/conf/accuracy keyword is present
match = re.search(r'(\d+(?:\.\d+)?)\s*(?:confidence|conf|accuracy)\b', query.lower())
if match:
conf = float(match.group(1))
# Normalize to 0-1 if given as percentage
if conf > 1:
conf = conf / 100
return conf if 0 <= conf <= 1 else None
return None
def extract_route(self, query):
"""Extract route/path pattern (from location1 to location2 or location1 to location2 pass through)"""
q = query.lower()
# Don't process if this looks like a date range (contains DD-MM-YYYY or YYYY-MM-DD)
if re.search(r'\d{1,2}[-/]\d{1,2}[-/]\d{4}', q) or re.search(r'\d{4}-\d{2}-\d{2}', q):
return None
# Strict patterns only: requires explicit route keywords or location names
route_patterns = [
r'(?:traveling|pass|going)\s+(?:from|through)\s+(\w+(?:\s+\w+)?)\s+(?:to|through)\s+(\w+(?:\s+\w+)?)',
r'(?:pass\s+from)\s+(\w+(?:\s+\w+)?)\s+(?:to|through)\s+(\w+(?:\s+\w+)?)',
r'(?:traveling\s+from)\s+(\w+(?:\s+\w+)?)\s+(?:to)\s+(\w+(?:\s+\w+)?)',
]
for pattern in route_patterns:
match = re.search(pattern, q)
if match:
loc1_raw = match.group(1).strip()
loc2_raw = match.group(2).strip()
# Map to canonical locations
loc1 = None
loc2 = None
# Find canonical location names (longest match first)
for canonical, variants in sorted(
self.location_variants.items(),
key=lambda x: max(len(v) for v in x[1]),
reverse=True
):
for variant in variants:
if variant in loc1_raw and not loc1:
loc1 = canonical
if variant in loc2_raw and not loc2:
loc2 = canonical
if loc1 and loc2 and loc1 != loc2:
return {"from": loc1, "to": loc2}
return None
def extract_filters(self, query):
"""Extract ALL filters simultaneously from query"""
return {
"plate": self.extract_plate(query),
"state": self.extract_state(query),
"location": self.extract_locations(query), # Now returns list or None
"vehicle_type": self.extract_vehicle_types(query), # Now returns list or None
"date": self.extract_date(query),
"date_range": self.extract_date_range(query),
"day": self.extract_day(query),
"hour": self.extract_hour(query),
"time_range": self.extract_time_range(query),
"confidence": self.extract_confidence(query),
"route": self.extract_route(query) # New: route extraction
}
def detect_intents(self, query):
"""Detect advanced query intents (IMPROVED - strict route detection)"""
q = query.lower()
# ROUTE TRACKING: Only if explicit route keywords present (not just "from...to" for dates)
# Require: traveling/pass/pass through/pass from with location-like names
has_route_keyword = any(k in q for k in [
"traveling", "travel from", "pass from", "pass through",
"went from", "go from", "route", "path", "journey"
])
# Must have location-like context after the route keywords
has_route_context = False
if has_route_keyword:
# Check if actual location names appear near route keywords
for location in self.location_variants.keys():
if location in q:
has_route_context = True
break
return {
"tracking": any(k in q for k in ["track", "history", "movement", "travel", "route", "where", "location", "show"]),
"count": any(k in q for k in ["count", "how many", "total", "number of"]),
"analytics": any(k in q for k in ["analytics", "analysis", "statistics", "distribution"]),
"top": any(k in q for k in ["top", "most", "leading"]),
"latest": any(k in q for k in ["latest", "recent", "last", "new"]),
"hourly": any(k in q for k in ["hourly", "by hour", "per hour"]),
"daily": any(k in q for k in ["daily", "by day", "per day"]),
"location_based": any(k in q for k in ["by location", "density", "traffic"]),
"suspicious": any(k in q for k in ["suspicious", "repeated", "multiple", "across"]),
"aggregation": any(k in q for k in ["group", "aggregate", "sum", "average"]),
"route_tracking": has_route_keyword and has_route_context # STRICT: require both keyword AND location
}
def build_sql(self, filters, intents):
"""
Build production-grade SQL from filters and intents (IMPROVED).
Handles complex aggregations, date ranges, time ranges, multiple vehicles, and multiple locations.
Now supports all filter combinations without failures.
"""
try:
# =========================================================
# ANALYTICS QUERIES (priority over other queries)
# =========================================================
if intents["top"] or (intents["analytics"] and "top" in " ".join([k for k in intents.keys() if intents[k]])):
return clean_sql("""
SELECT plate, state, COUNT(*) as detections
FROM vehicle_logs
GROUP BY plate, state
ORDER BY detections DESC
LIMIT 20;
""")
if intents["hourly"] and intents["analytics"]:
return clean_sql("""
SELECT hour, COUNT(*) as traffic
FROM vehicle_logs
GROUP BY hour
ORDER BY hour;
""")
if intents["location_based"] and intents["analytics"]:
return clean_sql("""
SELECT location, COUNT(*) as count
FROM vehicle_logs
WHERE location IS NOT NULL
GROUP BY location
ORDER BY count DESC
LIMIT 20;
""")
if intents["suspicious"]:
return clean_sql("""
SELECT plate, state, COUNT(*) as detections,
COUNT(DISTINCT location) as locations,
COUNT(DISTINCT date) as days
FROM vehicle_logs
GROUP BY plate, state
HAVING COUNT(*) > 5
ORDER BY detections DESC
LIMIT 20;
""")
except Exception as e:
print(f"⚠️ Analytics query generation error: {e}")
# Fallback to basic query
return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 100;")
# =========================================================
# ROUTE TRACKING QUERIES (vehicles that passed through locations)
# =========================================================
if intents["route_tracking"] and filters["route"]:
try:
route = filters["route"]
loc_from = route["from"]
loc_to = route["to"]
# Build state filter if present
state_filter = ""
if filters["state"]:
state_filter = f"AND vl1.state = '{filters['state']}'"
# Build vehicle type filter
vehicle_type_filter = ""
if filters["vehicle_type"]:
if isinstance(filters["vehicle_type"], list):
vehicle_types = "', '".join(filters["vehicle_type"])
vehicle_type_filter = f"AND LOWER(vl1.vehicle_type) IN ('{vehicle_types}')"
else:
vehicle_type_filter = f"AND LOWER(vl1.vehicle_type) LIKE '%{filters['vehicle_type'].lower()}%'"
# Build date filter if present
date_filter = ""
if filters["date_range"]:
start = filters["date_range"]["start"]
end = filters["date_range"]["end"]
date_filter = f"AND vl1.date BETWEEN '{start}' AND '{end}'"
elif filters["date"]:
date_filter = f"AND vl1.date = '{filters['date']}'"
# Build time range filter if present
time_filter = ""
if filters["time_range"]:
start = filters["time_range"]["start"]
end = filters["time_range"]["end"]
if start < end:
time_filter = f"AND vl1.hour BETWEEN {start} AND {end}"
else:
time_filter = f"AND (vl1.hour >= {start} OR vl1.hour <= {end})"
elif filters["hour"] is not None:
time_filter = f"AND vl1.hour = {filters['hour']}"
# Query to find vehicles that traveled from location1 to location2
return clean_sql(f"""
SELECT
vl1.plate,
vl1.state,
vl1.vehicle_type,
COUNT(DISTINCT vl1.timestamp) as visits_in_from_location,
COUNT(DISTINCT vl2.timestamp) as visits_in_to_location,
COUNT(DISTINCT vl1.date) as days_active,
MIN(vl1.timestamp) as first_seen_from,
MAX(vl1.timestamp) as last_seen_from,
MIN(vl2.timestamp) as first_seen_to,
MAX(vl2.timestamp) as last_seen_to
FROM vehicle_logs vl1
INNER JOIN vehicle_logs vl2
ON vl1.plate = vl2.plate
AND vl1.state = vl2.state
AND vl2.timestamp > vl1.timestamp
WHERE LOWER(vl1.location) LIKE '%{loc_from.lower()}%'
AND LOWER(vl2.location) LIKE '%{loc_to.lower()}%'
{state_filter}
{vehicle_type_filter}
{date_filter}
{time_filter}
GROUP BY vl1.plate, vl1.state, vl1.vehicle_type
ORDER BY visits_in_from_location DESC, visits_in_to_location DESC
LIMIT 100;
""")
except Exception as e:
print(f"⚠️ Route tracking query generation error: {e}")
# Fallback to basic location query
return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 100;")
# =========================================================
# BUILD WHERE CLAUSE FROM FILTERS (HANDLE MULTIPLE VALUES)
# =========================================================
where_conditions = []
# Plate filter
if filters["plate"]:
where_conditions.append(f"plate = '{filters['plate']}'")
# State filter
if filters["state"]:
where_conditions.append(f"state = '{filters['state']}'")
# Location filter - HANDLE MULTIPLE LOCATIONS
if filters["location"]:
if isinstance(filters["location"], list):
# Multiple locations with OR logic
location_conditions = [
f"LOWER(location) LIKE '%{loc.lower()}%'"
for loc in filters["location"]
]
where_conditions.append(f"({' OR '.join(location_conditions)})")
else:
# Single location (legacy)
where_conditions.append(f"LOWER(location) LIKE '%{filters['location'].lower()}%'")
# Vehicle type filter - HANDLE MULTIPLE VEHICLE TYPES
if filters["vehicle_type"]:
if isinstance(filters["vehicle_type"], list):
# Multiple vehicle types with OR logic
vehicle_conditions = [
f"LOWER(vehicle_type) LIKE '%{vtype.lower()}%'"
for vtype in filters["vehicle_type"]
]
where_conditions.append(f"({' OR '.join(vehicle_conditions)})")
else:
# Single vehicle type (legacy)
where_conditions.append(f"LOWER(vehicle_type) LIKE '%{filters['vehicle_type'].lower()}%'")
# Date range filter
if filters["date_range"]:
start = filters["date_range"]["start"]
end = filters["date_range"]["end"]
where_conditions.append(f"date BETWEEN '{start}' AND '{end}'")
elif filters["date"]:
where_conditions.append(f"date = '{filters['date']}'")
# Day filter
if filters["day"]:
if isinstance(filters["day"], list):
day_conditions = [f"day = '{d}'" for d in filters["day"]]
where_conditions.append(f"({' OR '.join(day_conditions)})")
else:
where_conditions.append(f"day = '{filters['day']}'")
# Time range filter
if filters["time_range"]:
start = filters["time_range"]["start"]
end = filters["time_range"]["end"]
if start < end:
where_conditions.append(f"hour BETWEEN {start} AND {end}")
else: # Handles ranges like 9 PM to 4 AM (21 to 4)
where_conditions.append(f"(hour >= {start} OR hour <= {end})")
elif filters["hour"] is not None:
where_conditions.append(f"hour = {filters['hour']}")
# Confidence filter
if filters["confidence"] is not None:
where_conditions.append(f"vehicle_conf >= {filters['confidence']}")
# =========================================================
# GENERATE FINAL SQL
# =========================================================
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
# Count queries
if intents["count"]:
if filters["plate"]:
sql = f"""
SELECT plate, COUNT(*) as detections
FROM vehicle_logs
WHERE {where_clause}
GROUP BY plate
ORDER BY detections DESC;
"""
else:
sql = f"""
SELECT COUNT(*) as total
FROM vehicle_logs
WHERE {where_clause};
"""
# Tracking queries (show detailed records)
elif intents["tracking"]:
sql = f"""
SELECT timestamp, plate, state, vehicle_type, location, camera_id, date, hour, day
FROM vehicle_logs
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT 100;
"""
# Hourly aggregation
elif intents["hourly"]:
sql = f"""
SELECT hour, COUNT(*) as traffic
FROM vehicle_logs
WHERE {where_clause}
GROUP BY hour
ORDER BY hour;
"""
# Location-based aggregation
elif intents["location_based"]:
sql = f"""
SELECT location, COUNT(*) as count
FROM vehicle_logs
WHERE {where_clause} AND location IS NOT NULL
GROUP BY location
ORDER BY count DESC;
"""
# Default: return all matching records
else:
sql = f"""
SELECT *
FROM vehicle_logs
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT 100;
"""
return clean_sql(sql)
def ask_llm(user_query):
"""
Production-grade hybrid NLP-to-SQL engine.
Handles complex real-world queries with multiple filters, date ranges, time ranges, and aggregations.
Features:
- Multi-filter extraction (plate, state, location, vehicle type, date, time, confidence)
- Route tracking (vehicles passing through multiple locations)
- Date range support (from X to Y)
- Time range support (after X, before X, between X and Y)
- Time period recognition (morning, afternoon, evening, night, peak hour, rush hour)
- Advanced intent detection (tracking, count, analytics, top vehicles, suspicious vehicles, route tracking, etc.)
- Production SQL generation with proper GROUP BY, HAVING, ORDER BY
- Timeout protection
Example queries:
- "show bikes passing through adyar to kottupuram"
- "show buses in adyar from 10-04-2026 to 18-10-2026"
- "show TN cars after 8 PM"
- "show suspicious vehicles detected in more than 5 locations"
- "show traffic density by location"
- "show top 10 most detected vehicles"
- "count bikes between 6 PM and 9 PM"
- "find vehicles that traveled from adyar to mylapore"
"""
try:
# Initialize the advanced filter extractor
extractor = FilterExtractor()
# Extract ALL filters from the query (simultaneous extraction)
filters = extractor.extract_filters(user_query)
# Detect query intents
intents = extractor.detect_intents(user_query)
# Log extracted information for debugging
print(f"\n📊 QUERY ANALYSIS:")
print(f" Extracted Filters:")
print(f" - Plate: {filters['plate']}")
print(f" - State: {filters['state']}")
# Handle multiple locations
if filters['location']:
if isinstance(filters['location'], list):
print(f" - Locations: {', '.join(filters['location'])}")
else:
print(f" - Location: {filters['location']}")
else:
print(f" - Location: None")
# Handle multiple vehicle types
if filters['vehicle_type']:
if isinstance(filters['vehicle_type'], list):
print(f" - Vehicle Types: {', '.join(filters['vehicle_type'])}")
else:
print(f" - Vehicle Type: {filters['vehicle_type']}")
else:
print(f" - Vehicle Type: None")
print(f" - Date: {filters['date']}")
print(f" - Date Range: {filters['date_range']}")
print(f" - Day: {filters['day']}")
print(f" - Hour: {filters['hour']}")
print(f" - Time Range: {filters['time_range']}")
print(f" - Confidence: {filters['confidence']}")
# Log route information if available
if filters['route']:
print(f" - Route: From '{filters['route']['from']}' to '{filters['route']['to']}'")
else:
print(f" - Route: None")
print(f" Detected Intents:")
intent_list = [k for k, v in intents.items() if v]
print(f" - {', '.join(intent_list) if intent_list else 'General query'}")
# Build SQL from filters and intents
sql = extractor.build_sql(filters, intents)
return sql
except Exception as e:
print(f"❌ Filter extraction error: {e}")
traceback.print_exc()
# Fallback to basic query
return clean_sql("SELECT * FROM vehicle_logs ORDER BY timestamp DESC LIMIT 10;")
# =========================================================
# QUERY EXECUTION
# =========================================================
def run_query(user_query):
"""Execute NLP-to-SQL query with timeout protection"""
sql = ""
try:
sql = ask_llm(user_query)
print("\n" + "="*40)
print("USER QUERY:")
print(user_query)
print("\nGENERATED SQL:")
print(sql)
print("="*40)
if engine is None:
return {
"query": user_query,
"error": "❌ Database not configured - DATABASE_URL missing",
"sql": sql,
"result": [],
"count": 0
}
try:
# Execute with timeout protection
with engine.connect() as conn:
# Set statement timeout to 30 seconds
conn.execute(text("SET statement_timeout = 30000")) # 30 seconds
result = conn.execute(text(sql))
rows = [
dict(r._mapping)
for r in result
]
return {
"query": user_query,
"sql": sql,
"count": len(rows),
"result": rows
}
except Exception as query_error:
print(f"❌ Query Execution Error (possible timeout): {query_error}")
return {
"query": user_query,
"error": f"Query timeout or error: {str(query_error)}",
"sql": sql,
"result": [],
"count": 0
}
except Exception as e:
print(f"❌ Run Query Error: {e}")
traceback.print_exc()
return {
"query": user_query,
"error": str(e),
"sql": sql if sql else "",
"result": [],
"count": 0
}
# =========================================================
# DATABASE OPERATIONS
# =========================================================
def save_detection(plate, state, vehicle_type, vehicle_conf, date, time):
"""Save a vehicle detection to the database
Note: The table schema uses timestamp, date, hour, day columns.
The 'time' parameter is extracted to hour for the hour column.
"""
try:
if engine is None:
print("⚠️ Engine not initialized - save_detection skipped")
return False
# Extract hour from time string (HH:MM:SS)
try:
hour = int(time.split(":")[0]) if time else 0
except:
hour = 0
# Extract day of week from date (simplified)
from datetime import datetime
try:
dt = datetime.strptime(date, "%Y-%m-%d")
day = dt.strftime("%A")
except:
day = "Unknown"
# Use timestamp for current time, date for the date field, hour for hourly grouping
query = f"""
INSERT INTO vehicle_logs
(plate, state, vehicle_type, vehicle_conf, date, hour, day, timestamp, camera_id, location)
VALUES ('{plate}', '{state}', '{vehicle_type}', {vehicle_conf}, '{date}', {hour}, '{day}', NOW(), 'CAM-01', 'default')
"""
with engine.connect() as conn:
conn.execute(text(query))
conn.commit()
print(f"✅ Saved: {plate} from {state} at {time}")
return True
except Exception as e:
print(f"❌ Save Error: {e}")
traceback.print_exc()
return False
def health_check():
"""Check database health with timeout protection"""
try:
if engine is None:
return False, "❌ Database not configured"
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 10000")) # 10 second timeout
result = conn.execute(text("SELECT COUNT(*) FROM vehicle_logs"))
count = result.scalar()
return True, f"✅ Database OK - {count} records"
except Exception as e:
print(f"❌ Health Check Error (timeout?): {e}")
return False, f"❌ Database Error: {str(e)}"
def get_vehicles_by_state():
"""Get vehicle count by state with timeout protection"""
if engine is None:
print("⚠️ Database not available for get_vehicles_by_state")
return []
try:
sql = """
SELECT state, COUNT(*) as count
FROM vehicle_logs
GROUP BY state
ORDER BY count DESC
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ State Query Error (timeout?): {e}")
return []
def get_hourly_traffic():
"""Get traffic by hour with timeout protection"""
if engine is None:
print("⚠️ Database not available for get_hourly_traffic")
return []
try:
sql = """
SELECT hour, COUNT(*) as traffic
FROM vehicle_logs
GROUP BY hour
ORDER BY hour
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Hourly Traffic Error (timeout?): {e}")
return []
def get_top_plates():
"""Get top detected plates with timeout protection"""
if engine is None:
print("⚠️ Database not available for get_top_plates")
return []
try:
sql = """
SELECT plate, COUNT(*) as detections
FROM vehicle_logs
GROUP BY plate
ORDER BY detections DESC
LIMIT 20
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Top Plates Error (timeout?): {e}")
return []
def get_suspicious_vehicles():
"""Get vehicles detected multiple times (potentially suspicious) with timeout protection"""
try:
sql = """
SELECT plate, state, COUNT(*) as detections,
COUNT(DISTINCT location) as locations,
COUNT(DISTINCT date) as days
FROM vehicle_logs
GROUP BY plate, state
HAVING COUNT(*) > 5
ORDER BY detections DESC
LIMIT 20
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000")) # 15 second timeout
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Suspicious Vehicles Error (timeout?): {e}")
return []
# =========================================================
# ADVANCED ANALYTICAL FUNCTIONS
# =========================================================
def get_route_history(plate, limit=50):
"""
Get route history for a specific vehicle.
Shows all detections in chronological order with locations.
"""
try:
sql = f"""
SELECT timestamp, plate, state, location, camera_id, date, hour, day
FROM vehicle_logs
WHERE plate = '{plate}'
ORDER BY timestamp DESC
LIMIT {limit}
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Route History Error: {e}")
return []
def get_vehicles_by_location(location):
"""Get all vehicles detected in a specific location"""
try:
sql = f"""
SELECT DISTINCT plate, state, COUNT(*) as detections
FROM vehicle_logs
WHERE LOWER(location) LIKE '%{location.lower()}%'
GROUP BY plate, state
ORDER BY detections DESC
LIMIT 50
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Vehicles by Location Error: {e}")
return []
def get_multi_location_detections(min_locations=2):
"""Get vehicles detected across multiple locations (suspicious activity indicator)"""
try:
sql = f"""
SELECT plate, state, COUNT(*) as detections,
COUNT(DISTINCT location) as locations,
COUNT(DISTINCT date) as days
FROM vehicle_logs
GROUP BY plate, state
HAVING COUNT(DISTINCT location) >= {min_locations}
ORDER BY locations DESC, detections DESC
LIMIT 20
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Multi-Location Detection Error: {e}")
return []
def get_peak_traffic_hours():
"""Identify peak traffic hours based on detections"""
try:
sql = """
SELECT hour, COUNT(*) as traffic_count,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage
FROM vehicle_logs
GROUP BY hour
ORDER BY traffic_count DESC
LIMIT 10
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Peak Traffic Hours Error: {e}")
return []
def get_vehicle_density_by_location():
"""Get traffic density (vehicle count) by location"""
if engine is None:
print("⚠️ Database not available for get_vehicle_density_by_location")
return []
try:
sql = """
SELECT location, COUNT(*) as vehicle_count,
COUNT(DISTINCT plate) as unique_vehicles,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage
FROM vehicle_logs
WHERE location IS NOT NULL
GROUP BY location
ORDER BY vehicle_count DESC
LIMIT 20
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Vehicle Density Error: {e}")
return []
def get_high_confidence_detections(confidence_threshold=0.9):
"""Get detections with high confidence scores"""
try:
sql = f"""
SELECT plate, state, vehicle_type, COUNT(*) as detections,
ROUND(AVG(vehicle_conf), 3) as avg_confidence
FROM vehicle_logs
WHERE vehicle_conf >= {confidence_threshold}
GROUP BY plate, state, vehicle_type
ORDER BY avg_confidence DESC, detections DESC
LIMIT 30
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ High Confidence Detections Error: {e}")
return []
def get_daily_traffic_summary(date=None):
"""Get traffic summary for a specific date or today"""
try:
if date:
where_clause = f"WHERE date = '{date}'"
else:
where_clause = "WHERE date = CURDATE()"
sql = f"""
SELECT
COUNT(*) as total_vehicles,
COUNT(DISTINCT plate) as unique_vehicles,
COUNT(DISTINCT location) as locations_covered,
COUNT(DISTINCT hour) as peak_hours
FROM vehicle_logs
{where_clause}
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
row = dict(result.fetchone()._mapping)
return row
except Exception as e:
print(f"❌ Daily Summary Error: {e}")
return {}
def get_state_wise_distribution():
"""Get vehicle distribution across states"""
try:
sql = """
SELECT state, COUNT(*) as detections,
COUNT(DISTINCT plate) as unique_vehicles,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage
FROM vehicle_logs
GROUP BY state
ORDER BY detections DESC
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ State Distribution Error: {e}")
return []
def query_by_date_range(start_date, end_date, state=None, location=None):
"""Query vehicles detected within a date range"""
try:
where_conditions = [f"date BETWEEN '{start_date}' AND '{end_date}'"]
if state:
where_conditions.append(f"state = '{state}'")
if location:
where_conditions.append(f"LOWER(location) LIKE '%{location.lower()}%'")
where_clause = " AND ".join(where_conditions)
sql = f"""
SELECT *
FROM vehicle_logs
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT 500
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 30000")) # Longer timeout for large ranges
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Date Range Query Error: {e}")
return []
def query_by_time_range(start_hour, end_hour, location=None, vehicle_type=None):
"""Query vehicles detected within a time range (hour of day)"""
try:
where_conditions = []
if start_hour < end_hour:
where_conditions.append(f"hour BETWEEN {start_hour} AND {end_hour}")
else: # Handles ranges like 9 PM to 4 AM (21 to 4)
where_conditions.append(f"(hour >= {start_hour} OR hour <= {end_hour})")
if location:
where_conditions.append(f"LOWER(location) LIKE '%{location.lower()}%'")
if vehicle_type:
where_conditions.append(f"LOWER(vehicle_type) LIKE '%{vehicle_type.lower()}%'")
where_clause = " AND ".join(where_conditions)
sql = f"""
SELECT *
FROM vehicle_logs
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT 200
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Time Range Query Error: {e}")
return []
# =========================================================
# ADVANCED ROUTE TRACKING FUNCTIONS
# Track vehicles passing through multiple locations
# =========================================================
def get_vehicles_route_between_locations(location_from, location_to, vehicle_type=None, limit=100):
"""
Get vehicles that traveled/passed from one location to another.
Shows vehicles detected in location_from and then later in location_to.
Example: get_vehicles_route_between_locations('adyar', 'kottupuram', 'bike')
"""
try:
if engine is None:
print("⚠️ Database not available")
return []
vehicle_filter = ""
if vehicle_type:
vehicle_filter = f"AND LOWER(vl1.vehicle_type) LIKE '%{vehicle_type.lower()}%'"
sql = f"""
SELECT
vl1.plate,
vl1.state,
vl1.vehicle_type,
COUNT(DISTINCT vl1.timestamp) as detections_in_from_location,
COUNT(DISTINCT vl2.timestamp) as detections_in_to_location,
COUNT(DISTINCT vl1.date) as days_active,
MIN(vl1.timestamp) as first_detected_from,
MAX(vl1.timestamp) as last_detected_from,
MIN(vl2.timestamp) as first_detected_to,
MAX(vl2.timestamp) as last_detected_to
FROM vehicle_logs vl1
INNER JOIN vehicle_logs vl2
ON vl1.plate = vl2.plate
AND vl1.state = vl2.state
AND vl2.timestamp > vl1.timestamp
WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%'
AND LOWER(vl2.location) LIKE '%{location_to.lower()}%'
{vehicle_filter}
GROUP BY vl1.plate, vl1.state, vl1.vehicle_type
ORDER BY detections_in_from_location DESC, detections_in_to_location DESC
LIMIT {limit};
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 30000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Route Tracking Error: {e}")
return []
def get_vehicle_complete_route_history(plate, state=None):
"""
Get complete route history for a specific vehicle.
Shows all locations visited in chronological order.
"""
try:
if engine is None:
print("⚠️ Database not available")
return []
state_filter = ""
if state:
state_filter = f"AND state = '{state}'"
sql = f"""
SELECT
timestamp,
plate,
state,
vehicle_type,
location,
camera_id,
date,
hour,
day
FROM vehicle_logs
WHERE plate = '{plate}'
{state_filter}
ORDER BY timestamp ASC;
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 15000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Route History Error: {e}")
return []
def get_vehicles_passing_through_multiple_locations(locations, vehicle_type=None, min_locations=2):
"""
Get vehicles that passed through multiple specified locations.
Example: get_vehicles_passing_through_multiple_locations(['adyar', 'kottupuram', 'mylapore'])
"""
try:
if engine is None:
print("⚠️ Database not available")
return []
vehicle_filter = ""
if vehicle_type:
vehicle_filter = f"AND LOWER(vehicle_type) LIKE '%{vehicle_type.lower()}%'"
location_conditions = " OR ".join([
f"LOWER(location) LIKE '%{loc.lower()}%'"
for loc in locations
])
sql = f"""
SELECT
plate,
state,
vehicle_type,
COUNT(*) as total_detections,
COUNT(DISTINCT location) as unique_locations_visited,
COUNT(DISTINCT date) as days_active,
COUNT(DISTINCT hour) as peak_hours,
MIN(timestamp) as first_seen,
MAX(timestamp) as last_seen
FROM vehicle_logs
WHERE ({location_conditions})
{vehicle_filter}
GROUP BY plate, state, vehicle_type
HAVING COUNT(DISTINCT location) >= {min_locations}
ORDER BY COUNT(DISTINCT location) DESC, COUNT(*) DESC
LIMIT 100;
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 30000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Multi-Location Route Error: {e}")
return []
def get_location_to_location_traffic_flow(location_from, location_to, date_range=None):
"""
Get traffic flow statistics between two locations.
Shows how many unique vehicles traveled between these locations.
"""
try:
if engine is None:
print("⚠️ Database not available")
return {}
date_filter = ""
if date_range:
start_date, end_date = date_range
date_filter = f"AND vl1.date BETWEEN '{start_date}' AND '{end_date}'"
sql = f"""
SELECT
COUNT(DISTINCT vl1.plate) as unique_vehicles,
COUNT(DISTINCT vl1.vehicle_type) as vehicle_types,
COUNT(DISTINCT vl1.state) as states,
COUNT(*) as total_from_location_detections,
COUNT(DISTINCT vl1.date) as days_active,
ROUND(AVG(vl1.vehicle_conf), 3) as avg_confidence,
MIN(vl1.timestamp) as earliest_traffic,
MAX(vl1.timestamp) as latest_traffic
FROM vehicle_logs vl1
INNER JOIN vehicle_logs vl2
ON vl1.plate = vl2.plate
AND vl1.state = vl2.state
AND vl2.timestamp > vl1.timestamp
WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%'
AND LOWER(vl2.location) LIKE '%{location_to.lower()}%'
{date_filter}
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 30000"))
result = conn.execute(text(sql))
row = result.fetchone()
if row:
return dict(row._mapping)
return {}
except Exception as e:
print(f"❌ Traffic Flow Error: {e}")
return {}
def get_vehicle_type_route_analysis(location_from, location_to):
"""
Analyze which vehicle types travel between two locations most frequently.
"""
try:
if engine is None:
print("⚠️ Database not available")
return []
sql = f"""
SELECT
vl1.vehicle_type,
COUNT(DISTINCT vl1.plate) as unique_vehicles,
COUNT(*) as total_detections,
COUNT(DISTINCT vl1.date) as days_active,
ROUND(AVG(vl1.vehicle_conf), 3) as avg_confidence,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage
FROM vehicle_logs vl1
INNER JOIN vehicle_logs vl2
ON vl1.plate = vl2.plate
AND vl1.state = vl2.state
AND vl2.timestamp > vl1.timestamp
WHERE LOWER(vl1.location) LIKE '%{location_from.lower()}%'
AND LOWER(vl2.location) LIKE '%{location_to.lower()}%'
GROUP BY vl1.vehicle_type
ORDER BY COUNT(*) DESC;
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 30000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Vehicle Type Route Analysis Error: {e}")
return []
def get_suspicious_route_patterns(min_visits=5, min_locations=3):
"""
Identify potentially suspicious vehicles based on route patterns.
Shows vehicles that visit multiple locations frequently.
"""
try:
if engine is None:
print("⚠️ Database not available")
return []
sql = f"""
SELECT
plate,
state,
vehicle_type,
COUNT(*) as total_detections,
COUNT(DISTINCT location) as locations_visited,
COUNT(DISTINCT date) as days_active,
COUNT(DISTINCT hour) as peak_hours,
MIN(timestamp) as first_detected,
MAX(timestamp) as last_detected
FROM vehicle_logs
GROUP BY plate, state, vehicle_type
HAVING COUNT(*) >= {min_visits} AND COUNT(DISTINCT location) >= {min_locations}
ORDER BY COUNT(DISTINCT location) DESC, COUNT(*) DESC
LIMIT 50;
"""
with engine.connect() as conn:
conn.execute(text("SET statement_timeout = 30000"))
result = conn.execute(text(sql))
rows = [dict(r._mapping) for r in result]
return rows
except Exception as e:
print(f"❌ Suspicious Route Pattern Error: {e}")
return []