ihute / database.py
LeonceNsh's picture
Upload folder using huggingface_hub
67fb51e verified
"""
DuckDB database connection manager for the IHUTE dashboard.
Supports both file-based warehouse.duckdb and in-memory fallback
with synthetic data for demo purposes.
"""
from pathlib import Path
from typing import Optional
import duckdb
import pandas as pd
class Database:
"""Manages DuckDB connection and query execution."""
def __init__(self, db_path: Optional[str] = None):
"""
Initialize database connection.
Args:
db_path: Path to DuckDB file. Defaults to warehouse.duckdb in app directory.
"""
if db_path is None:
# Look for database in standard locations for HF Spaces deployment
possible_paths = [
Path(__file__).parent / "warehouse.duckdb", # Same directory as app
Path("warehouse.duckdb"), # Current working directory
Path(__file__).parent.parent / "warehouse.duckdb", # Parent directory
]
for path in possible_paths:
if path.exists():
db_path = str(path)
break
self.db_path = db_path
self._connection: Optional[duckdb.DuckDBPyConnection] = None
@property
def connection(self) -> duckdb.DuckDBPyConnection:
"""Get or create database connection."""
if self._connection is None:
if self.db_path and Path(self.db_path).exists():
try:
self._connection = duckdb.connect(self.db_path, read_only=True)
except Exception as e:
print(f"Warning: Could not connect to {self.db_path}: {e}")
print("Falling back to in-memory database with sample data.")
self._connection = duckdb.connect(":memory:")
self._create_sample_data()
else:
# Create in-memory database with sample data for demo
print("No warehouse.duckdb found. Using in-memory database with sample data.")
self._connection = duckdb.connect(":memory:")
self._create_sample_data()
return self._connection
def query(self, sql: str) -> pd.DataFrame:
"""Execute SQL query and return DataFrame."""
return self.connection.execute(sql).fetchdf()
def _create_sample_data(self) -> None:
"""Create sample data for demo when no database file exists."""
conn = self._connection
# Create schema
conn.execute("CREATE SCHEMA IF NOT EXISTS main_marts")
# Create sample corridor flows
conn.execute("""
CREATE TABLE main_marts.fct_corridor_flows AS
SELECT
'I-24' as corridor_id,
'I-24 Main' as zone_name,
timestamp '2024-01-15 07:00:00' + interval (i) hour as hour_bucket,
CASE
WHEN (i % 24) BETWEEN 7 AND 9 THEN 'AM_PEAK'
WHEN (i % 24) BETWEEN 17 AND 19 THEN 'PM_PEAK'
ELSE 'OFF_PEAK'
END as time_period,
(random() * 500 + 200)::int as vehicle_count,
CASE
WHEN (i % 24) BETWEEN 7 AND 9 THEN 25 + random() * 15
WHEN (i % 24) BETWEEN 17 AND 19 THEN 20 + random() * 15
ELSE 55 + random() * 10
END as avg_speed_mph,
CASE
WHEN (i % 24) BETWEEN 7 AND 9 THEN 'D'
WHEN (i % 24) BETWEEN 17 AND 19 THEN 'E'
ELSE 'B'
END as level_of_service
FROM generate_series(0, 167) as t(i)
""")
# Create sample incentive events
conn.execute("""
CREATE TABLE main_marts.fct_incentive_events AS
SELECT
'alloc_' || i as incentive_key,
'agent_' || (random() * 1000)::int as agent_id,
'run_001' as simulation_run_id,
CASE (i % 4)
WHEN 0 THEN 'CARPOOL'
WHEN 1 THEN 'PACER'
WHEN 2 THEN 'DEPARTURE_SHIFT'
ELSE 'TRANSIT'
END as incentive_type,
2.0 + random() * 8 as offered_amount,
CASE WHEN random() > 0.3 THEN true ELSE false END as was_accepted,
CASE WHEN random() > 0.5 THEN true ELSE false END as was_completed,
CASE
WHEN random() > 0.5 THEN 'COMPLETED'
WHEN random() > 0.3 THEN 'ACCEPTED_PENDING'
ELSE 'REJECTED'
END as final_outcome,
(random() * 5)::decimal(10,2) as actual_payout
FROM generate_series(1, 500) as t(i)
""")
# Create sample elasticity metrics
conn.execute("""
CREATE TABLE main_marts.metrics_elasticity AS
SELECT
bucket as incentive_bucket,
(100 + idx * 50) as n_trips,
0.1 + idx * 0.08 as carpool_rate,
idx * 1.5 as avg_incentive
FROM (
SELECT unnest(['NONE', 'LOW', 'MEDIUM', 'HIGH']) as bucket,
unnest([0, 1, 2, 3]) as idx
) t
ORDER BY idx
""")
# Create sample scenario comparison
conn.execute("""
CREATE TABLE main_marts.fct_simulation_runs AS
SELECT
'run_' || i as run_key,
CASE (i % 3)
WHEN 0 THEN 'Carpool Incentive'
WHEN 1 THEN 'Pacer Program'
ELSE 'Transit Promotion'
END as scenario_name,
10000 as n_agents,
45 + random() * 10 as treatment_avg_speed,
42.0 as baseline_avg_speed,
(45 + random() * 10 - 42) / 42 * 100 as speed_improvement_pct,
5 + random() * 15 as vmt_reduction_pct,
3 + random() * 7 as peak_reduction_pct,
5000 + random() * 5000 as treatment_spend
FROM generate_series(1, 10) as t(i)
""")
# Global database instance
_db: Optional[Database] = None
def get_database() -> Database:
"""Get global database instance."""
global _db
if _db is None:
_db = Database()
return _db
def query(sql: str) -> pd.DataFrame:
"""Execute SQL query using global database."""
return get_database().query(sql)