| """ |
| app/db.py β ENTERPRISE-GRADE, MULTI-TENANT DUCKDB LAYER |
| ======================================================= |
| Handles per-tenant database isolation, schema versioning, quota enforcement, |
| and bulletproof data insertion with automatic column inference. |
| |
| Architecture: |
| - One DuckDB file per org_id: ./data/duckdb/{org_id}.duckdb |
| - Three-tier table structure: |
| 1. main.raw_rows β Immutable audit trail |
| 2. main.{entity}_canonical β Versioned canonical schema |
| 3. main.schema_versions β Schema evolution history |
| """ |
|
|
| import os |
| import pathlib |
| import json |
| import duckdb |
| import pandas as pd |
| from typing import Any, Dict, List, Optional |
| from datetime import datetime |
| from contextlib import contextmanager |
| from fastapi import HTTPException |
|
|
| |
| DB_DIR = pathlib.Path("./data/duckdb") |
| DB_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| MAX_DB_SIZE_GB = float(os.getenv("MAX_DB_SIZE_GB", "10.0")) |
|
|
| |
| REQUIRED_CANONICAL_COLUMNS = {"timestamp"} |
|
|
|
|
| |
| def get_conn(org_id: str) -> duckdb.DuckDBPyConnection: |
| """ |
| Get or create a DuckDB connection for an organization. |
| |
| Creates isolated DB file: ./data/duckdb/{org_id}.duckdb |
| |
| Args: |
| org_id: Unique tenant identifier (validated upstream) |
| |
| Returns: |
| DuckDB connection in read-write mode |
| |
| Raises: |
| HTTPException: If tenant exceeds storage quota |
| """ |
| db_file = DB_DIR / f"{org_id}.duckdb" |
| |
| |
| if db_file.exists(): |
| size_gb = db_file.stat().st_size / (1024 ** 3) |
| if size_gb > MAX_DB_SIZE_GB: |
| raise HTTPException( |
| status_code=413, |
| detail=f"Tenant quota exceeded: {size_gb:.2f}GB > {MAX_DB_SIZE_GB}GB" |
| ) |
| |
| return duckdb.connect(str(db_file), read_only=False) |
|
|
|
|
| @contextmanager |
| def transactional_conn(org_id: str): |
| """ |
| Context manager for transactional operations. |
| Automatically commits on success, rolls back on failure. |
| |
| Usage: |
| with transactional_conn("org_123") as conn: |
| conn.execute("INSERT ...") |
| conn.execute("UPDATE ...") |
| """ |
| conn = get_conn(org_id) |
| conn.execute("BEGIN TRANSACTION") |
| try: |
| yield conn |
| conn.execute("COMMIT") |
| except Exception: |
| conn.execute("ROLLBACK") |
| raise |
| finally: |
| conn.close() |
|
|
|
|
| |
| def ensure_raw_table(conn: duckdb.DuckDBPyConnection): |
| """ |
| Creates immutable audit trail table for raw JSON payloads. |
| Schema is intentionally rigid to prevent mutation. |
| |
| Table: main.raw_rows |
| - ingested_at: Auto-timestamp of ingestion |
| - row_data: Raw JSON payload (never modified) |
| """ |
| conn.execute("CREATE SCHEMA IF NOT EXISTS main") |
| conn.execute(""" |
| CREATE TABLE IF NOT EXISTS main.raw_rows( |
| ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| row_data JSON |
| ) |
| """) |
|
|
|
|
| def ensure_schema_versions_table(conn: duckdb.DuckDBPyConnection): |
| """ |
| Tracks schema evolution for each entity table. |
| Compatible with DuckDB 0.10.3 constraint limitations. |
| """ |
| conn.execute("CREATE SCHEMA IF NOT EXISTS main") |
| |
| conn.execute(""" |
| CREATE TABLE IF NOT EXISTS main.schema_versions ( |
| version_id BIGINT PRIMARY KEY, |
| table_name VARCHAR NOT NULL, |
| schema_json JSON NOT NULL, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| applied_at TIMESTAMP, |
| status VARCHAR DEFAULT 'pending', |
| rows_at_migration BIGINT |
| ) |
| """) |
| |
| |
| conn.execute(""" |
| CREATE SEQUENCE IF NOT EXISTS schema_version_seq |
| START WITH 1 |
| INCREMENT BY 1 |
| """) |
|
|
| def infer_duckdb_type(value: Any) -> str: |
| """ |
| Infer DuckDB column type from Python value. |
| Falls back to VARCHAR for ambiguous types. |
| |
| Type mapping: |
| bool β BOOLEAN |
| int β BIGINT |
| float β DOUBLE |
| datetime β TIMESTAMP |
| dict/list β JSON (but stored as VARCHAR for flexibility) |
| None/null β VARCHAR (skip column creation) |
| """ |
| if isinstance(value, bool): |
| return "BOOLEAN" |
| if isinstance(value, int): |
| return "BIGINT" |
| if isinstance(value, float): |
| return "DOUBLE" |
| if isinstance(value, datetime): |
| return "TIMESTAMP" |
| return "VARCHAR" |
|
|
|
|
| def ensure_table( |
| conn: duckdb.DuckDBPyConnection, |
| table_name: str, |
| sample_record: Dict[str, Any] |
| ) -> List[str]: |
| """ |
| Ensures table exists and evolves schema using sample_record. |
| |
| Creates base table with UUID + timestamp, then adds missing columns. |
| |
| Args: |
| conn: DuckDB connection |
| table_name: Target table name (e.g., 'sales_canonical') |
| sample_record: Representative row to infer schema |
| |
| Returns: |
| List of newly added column names (for logging) |
| |
| Raises: |
| ValueError: If sample_record is empty |
| """ |
| if not sample_record: |
| raise ValueError("Cannot infer schema from empty sample_record") |
| |
| conn.execute("CREATE SCHEMA IF NOT EXISTS main") |
| |
| |
| conn.execute( |
| f"CREATE TABLE IF NOT EXISTS main.{table_name} (" |
| "id UUID DEFAULT uuid(), " |
| "_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" |
| ) |
|
|
| |
| try: |
| existing_cols_raw = conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall() |
| existing_cols = {str(r[0]).lower() for r in existing_cols_raw} |
| except Exception as e: |
| print(f"[db] β οΈ Could not get table info: {e}") |
| existing_cols = set() |
|
|
| |
| added_cols = [] |
| for col, val in sample_record.items(): |
| col_name = str(col).lower().strip() |
| |
| if col_name in existing_cols: |
| continue |
| |
| if val is None: |
| print(f"[db] β οΈ Skipping column {col_name} (None value)") |
| continue |
| |
| try: |
| dtype = infer_duckdb_type(val) |
| conn.execute(f"ALTER TABLE main.{table_name} ADD COLUMN {col_name} {dtype}") |
| added_cols.append(f"{col_name}:{dtype}") |
| print(f"[db] β Added column '{col_name}:{dtype}' to main.{table_name}") |
| except Exception as e: |
| print(f"[db] β Failed to add column {col_name}: {e}") |
| |
| |
| return added_cols |
|
|
|
|
| def enforce_schema_contract(df: pd.DataFrame, org_id: str): |
| """Soft enforcement - logs warnings but doesn't crash""" |
| missing = REQUIRED_CANONICAL_COLUMNS - set(df.columns) |
| if missing: |
| print(f"[schema_contract] β οΈ Org {org_id} missing recommended columns: {missing}") |
|
|
| def insert_records( |
| conn: duckdb.DuckDBPyConnection, |
| table_name: str, |
| records: List[Dict[str, Any]] |
| ): |
| """ |
| Insert records with safe column handling and automatic type conversion. |
| |
| Handles: |
| - Missing keys β NULL |
| - Extra keys β Ignored (not inserted) |
| - dict/list values β JSON string |
| - Column order mismatch β Reordered to table schema |
| |
| Args: |
| conn: DuckDB connection |
| table_name: Target table name |
| records: List of dicts to insert |
| |
| Raises: |
| HTTPException: On insertion failure (after logging) |
| """ |
| if not records: |
| return |
| |
| |
| table_info = conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall() |
| table_cols = [str(r[0]) for r in table_info] |
| |
| if not table_cols: |
| raise ValueError(f"Table main.{table_name} has no columns") |
| |
| |
| placeholders = ", ".join(["?"] * len(table_cols)) |
| col_list = ", ".join(table_cols) |
| insert_sql = f"INSERT INTO main.{table_name} ({col_list}) VALUES ({placeholders})" |
| |
| |
| values = [] |
| for record in records: |
| row = [] |
| for col in table_cols: |
| val = record.get(col) |
| if isinstance(val, (dict, list)): |
| val = json.dumps(val) |
| row.append(val) |
| values.append(tuple(row)) |
| |
| try: |
| conn.executemany(insert_sql, values) |
| print(f"[db] β
Inserted {len(records)} rows into main.{table_name}") |
| except Exception as e: |
| print(f"[db] β Insert failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Insertion failed: {str(e)}") |
|
|
|
|
| def bootstrap(org_id: str, payload: Dict[str, Any]): |
| """ |
| **ENTERPRISE-GRADE**: Stores raw JSON payload for audit and disaster recovery. |
| |
| This is the ONLY function that writes to raw_rows. It intentionally does NOT |
| create any derived tables to maintain separation of concerns. |
| |
| Args: |
| org_id: Tenant identifier |
| payload: Raw JSON payload (dict, list, or string) |
| |
| Side Effects: |
| - Creates org DB if missing |
| - Writes to main.raw_rows |
| - Closes connection |
| |
| Raises: |
| HTTPException: On audit failure (after logging) |
| """ |
| conn = get_conn(org_id) |
| ensure_raw_table(conn) |
| |
| try: |
| raw_json = json.dumps(payload) if not isinstance(payload, str) else payload |
| |
| |
| if raw_json and raw_json not in ("null", "[]", "{}"): |
| conn.execute( |
| "INSERT INTO main.raw_rows (row_data) VALUES (?)", |
| (raw_json,) |
| ) |
| conn.commit() |
| print(f"[bootstrap] β
Audit stored: {len(raw_json)} bytes for org:{org_id}") |
| else: |
| print(f"[bootstrap] β οΈ Empty payload for org:{org_id}") |
| except Exception as e: |
| print(f"[bootstrap] β Audit failed for org:{org_id}: {e}") |
| raise HTTPException(status_code=500, detail=f"Audit trail failed: {str(e)}") |
| finally: |
| conn.close() |
|
|
|
|
| def get_db_stats(org_id: str) -> Dict[str, Any]: |
| """ |
| Retrieve storage and row count statistics for a tenant. |
| |
| Returns: |
| dict: { |
| "db_size_gb": float, |
| "total_rows": int, |
| "table_counts": {"raw_rows": int, "sales_canonical": int, ...} |
| } |
| """ |
| conn = get_conn(org_id) |
| stats = {} |
| |
| try: |
| |
| db_file = DB_DIR / f"{org_id}.duckdb" |
| stats["db_size_gb"] = db_file.stat().st_size / (1024 ** 3) if db_file.exists() else 0 |
| |
| |
| tables = conn.execute(""" |
| SELECT table_name |
| FROM information_schema.tables |
| WHERE table_schema = 'main' |
| """).fetchall() |
| |
| stats["table_counts"] = {} |
| for (table_name,) in tables: |
| count = conn.execute(f"SELECT COUNT(*) FROM main.{table_name}").fetchone()[0] |
| stats["table_counts"][table_name] = count |
| |
| stats["total_rows"] = sum(stats["table_counts"].values()) |
| |
| finally: |
| conn.close() |
| |
| return stats |