| | """ |
| | app/db.py β ENTERPRISE-GRADE, MULTI-TENANT DUCKDB LAYER |
| | ======================================================= |
| | Handles per-tenant database isolation, schema versioning, quota enforcement, |
| | and bulletproof data insertion with automatic column inference. |
| | |
| | Architecture: |
| | - One DuckDB file per org_id: ./data/duckdb/{org_id}.duckdb |
| | - Three-tier table structure: |
| | 1. main.raw_rows β Immutable audit trail |
| | 2. main.{entity}_canonical β Versioned canonical schema |
| | 3. main.schema_versions β Schema evolution history |
| | """ |
| |
|
| | import os |
| | import pathlib |
| | import json |
| | import duckdb |
| | import pandas as pd |
| | from typing import Any, Dict, List, Optional |
| | from datetime import datetime |
| | from contextlib import contextmanager |
| | from fastapi import HTTPException |
| |
|
| | |
| | DB_DIR = pathlib.Path("./data/duckdb") |
| | DB_DIR.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | MAX_DB_SIZE_GB = float(os.getenv("MAX_DB_SIZE_GB", "10.0")) |
| |
|
| | |
| | REQUIRED_CANONICAL_COLUMNS = {"timestamp"} |
| |
|
| |
|
| | |
| | def get_conn(org_id: str) -> duckdb.DuckDBPyConnection: |
| | """ |
| | Get or create a DuckDB connection for an organization. |
| | |
| | Creates isolated DB file: ./data/duckdb/{org_id}.duckdb |
| | |
| | Args: |
| | org_id: Unique tenant identifier (validated upstream) |
| | |
| | Returns: |
| | DuckDB connection in read-write mode |
| | |
| | Raises: |
| | HTTPException: If tenant exceeds storage quota |
| | """ |
| | db_file = DB_DIR / f"{org_id}.duckdb" |
| | |
| | |
| | if db_file.exists(): |
| | size_gb = db_file.stat().st_size / (1024 ** 3) |
| | if size_gb > MAX_DB_SIZE_GB: |
| | raise HTTPException( |
| | status_code=413, |
| | detail=f"Tenant quota exceeded: {size_gb:.2f}GB > {MAX_DB_SIZE_GB}GB" |
| | ) |
| | |
| | return duckdb.connect(str(db_file), read_only=False) |
| |
|
| |
|
| | @contextmanager |
| | def transactional_conn(org_id: str): |
| | """ |
| | Context manager for transactional operations. |
| | Automatically commits on success, rolls back on failure. |
| | |
| | Usage: |
| | with transactional_conn("org_123") as conn: |
| | conn.execute("INSERT ...") |
| | conn.execute("UPDATE ...") |
| | """ |
| | conn = get_conn(org_id) |
| | conn.execute("BEGIN TRANSACTION") |
| | try: |
| | yield conn |
| | conn.execute("COMMIT") |
| | except Exception: |
| | conn.execute("ROLLBACK") |
| | raise |
| | finally: |
| | conn.close() |
| |
|
| |
|
| | |
| | def ensure_raw_table(conn: duckdb.DuckDBPyConnection): |
| | """ |
| | Creates immutable audit trail table for raw JSON payloads. |
| | Schema is intentionally rigid to prevent mutation. |
| | |
| | Table: main.raw_rows |
| | - ingested_at: Auto-timestamp of ingestion |
| | - row_data: Raw JSON payload (never modified) |
| | """ |
| | conn.execute("CREATE SCHEMA IF NOT EXISTS main") |
| | conn.execute(""" |
| | CREATE TABLE IF NOT EXISTS main.raw_rows( |
| | ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| | row_data JSON |
| | ) |
| | """) |
| |
|
| |
|
| | def ensure_schema_versions_table(conn: duckdb.DuckDBPyConnection): |
| | """ |
| | Tracks schema evolution for each entity table. |
| | Compatible with DuckDB 0.10.3 constraint limitations. |
| | """ |
| | conn.execute("CREATE SCHEMA IF NOT EXISTS main") |
| | |
| | conn.execute(""" |
| | CREATE TABLE IF NOT EXISTS main.schema_versions ( |
| | version_id BIGINT PRIMARY KEY, |
| | table_name VARCHAR NOT NULL, |
| | schema_json JSON NOT NULL, |
| | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| | applied_at TIMESTAMP, |
| | status VARCHAR DEFAULT 'pending', |
| | rows_at_migration BIGINT |
| | ) |
| | """) |
| | |
| | |
| | conn.execute(""" |
| | CREATE SEQUENCE IF NOT EXISTS schema_version_seq |
| | START WITH 1 |
| | INCREMENT BY 1 |
| | """) |
| |
|
| | def infer_duckdb_type(value: Any) -> str: |
| | """ |
| | Infer DuckDB column type from Python value. |
| | Falls back to VARCHAR for ambiguous types. |
| | |
| | Type mapping: |
| | bool β BOOLEAN |
| | int β BIGINT |
| | float β DOUBLE |
| | datetime β TIMESTAMP |
| | dict/list β JSON (but stored as VARCHAR for flexibility) |
| | None/null β VARCHAR (skip column creation) |
| | """ |
| | if isinstance(value, bool): |
| | return "BOOLEAN" |
| | if isinstance(value, int): |
| | return "BIGINT" |
| | if isinstance(value, float): |
| | return "DOUBLE" |
| | if isinstance(value, datetime): |
| | return "TIMESTAMP" |
| | return "VARCHAR" |
| |
|
| |
|
| | def ensure_table( |
| | conn: duckdb.DuckDBPyConnection, |
| | table_name: str, |
| | sample_record: Dict[str, Any] |
| | ) -> List[str]: |
| | """ |
| | Ensures table exists and evolves schema using sample_record. |
| | |
| | Creates base table with UUID + timestamp, then adds missing columns. |
| | |
| | Args: |
| | conn: DuckDB connection |
| | table_name: Target table name (e.g., 'sales_canonical') |
| | sample_record: Representative row to infer schema |
| | |
| | Returns: |
| | List of newly added column names (for logging) |
| | |
| | Raises: |
| | ValueError: If sample_record is empty |
| | """ |
| | if not sample_record: |
| | raise ValueError("Cannot infer schema from empty sample_record") |
| | |
| | conn.execute("CREATE SCHEMA IF NOT EXISTS main") |
| | |
| | |
| | conn.execute( |
| | f"CREATE TABLE IF NOT EXISTS main.{table_name} (" |
| | "id UUID DEFAULT uuid(), " |
| | "_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" |
| | ) |
| |
|
| | |
| | try: |
| | existing_cols_raw = conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall() |
| | existing_cols = {str(r[0]).lower() for r in existing_cols_raw} |
| | except Exception as e: |
| | print(f"[db] β οΈ Could not get table info: {e}") |
| | existing_cols = set() |
| |
|
| | |
| | added_cols = [] |
| | for col, val in sample_record.items(): |
| | col_name = str(col).lower().strip() |
| | |
| | if col_name in existing_cols: |
| | continue |
| | |
| | if val is None: |
| | print(f"[db] β οΈ Skipping column {col_name} (None value)") |
| | continue |
| | |
| | try: |
| | dtype = infer_duckdb_type(val) |
| | conn.execute(f"ALTER TABLE main.{table_name} ADD COLUMN {col_name} {dtype}") |
| | added_cols.append(f"{col_name}:{dtype}") |
| | print(f"[db] β Added column '{col_name}:{dtype}' to main.{table_name}") |
| | except Exception as e: |
| | print(f"[db] β Failed to add column {col_name}: {e}") |
| | |
| | |
| | return added_cols |
| |
|
| |
|
| | def enforce_schema_contract(df: pd.DataFrame, org_id: str): |
| | """Soft enforcement - logs warnings but doesn't crash""" |
| | missing = REQUIRED_CANONICAL_COLUMNS - set(df.columns) |
| | if missing: |
| | print(f"[schema_contract] β οΈ Org {org_id} missing recommended columns: {missing}") |
| |
|
| | def insert_records( |
| | conn: duckdb.DuckDBPyConnection, |
| | table_name: str, |
| | records: List[Dict[str, Any]] |
| | ): |
| | """ |
| | Insert records with safe column handling and automatic type conversion. |
| | |
| | Handles: |
| | - Missing keys β NULL |
| | - Extra keys β Ignored (not inserted) |
| | - dict/list values β JSON string |
| | - Column order mismatch β Reordered to table schema |
| | |
| | Args: |
| | conn: DuckDB connection |
| | table_name: Target table name |
| | records: List of dicts to insert |
| | |
| | Raises: |
| | HTTPException: On insertion failure (after logging) |
| | """ |
| | if not records: |
| | return |
| | |
| | |
| | table_info = conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall() |
| | table_cols = [str(r[0]) for r in table_info] |
| | |
| | if not table_cols: |
| | raise ValueError(f"Table main.{table_name} has no columns") |
| | |
| | |
| | placeholders = ", ".join(["?"] * len(table_cols)) |
| | col_list = ", ".join(table_cols) |
| | insert_sql = f"INSERT INTO main.{table_name} ({col_list}) VALUES ({placeholders})" |
| | |
| | |
| | values = [] |
| | for record in records: |
| | row = [] |
| | for col in table_cols: |
| | val = record.get(col) |
| | if isinstance(val, (dict, list)): |
| | val = json.dumps(val) |
| | row.append(val) |
| | values.append(tuple(row)) |
| | |
| | try: |
| | conn.executemany(insert_sql, values) |
| | print(f"[db] β
Inserted {len(records)} rows into main.{table_name}") |
| | except Exception as e: |
| | print(f"[db] β Insert failed: {e}") |
| | raise HTTPException(status_code=500, detail=f"Insertion failed: {str(e)}") |
| |
|
| |
|
| | def bootstrap(org_id: str, payload: Dict[str, Any]): |
| | """ |
| | **ENTERPRISE-GRADE**: Stores raw JSON payload for audit and disaster recovery. |
| | |
| | This is the ONLY function that writes to raw_rows. It intentionally does NOT |
| | create any derived tables to maintain separation of concerns. |
| | |
| | Args: |
| | org_id: Tenant identifier |
| | payload: Raw JSON payload (dict, list, or string) |
| | |
| | Side Effects: |
| | - Creates org DB if missing |
| | - Writes to main.raw_rows |
| | - Closes connection |
| | |
| | Raises: |
| | HTTPException: On audit failure (after logging) |
| | """ |
| | conn = get_conn(org_id) |
| | ensure_raw_table(conn) |
| | |
| | try: |
| | raw_json = json.dumps(payload) if not isinstance(payload, str) else payload |
| | |
| | |
| | if raw_json and raw_json not in ("null", "[]", "{}"): |
| | conn.execute( |
| | "INSERT INTO main.raw_rows (row_data) VALUES (?)", |
| | (raw_json,) |
| | ) |
| | conn.commit() |
| | print(f"[bootstrap] β
Audit stored: {len(raw_json)} bytes for org:{org_id}") |
| | else: |
| | print(f"[bootstrap] β οΈ Empty payload for org:{org_id}") |
| | except Exception as e: |
| | print(f"[bootstrap] β Audit failed for org:{org_id}: {e}") |
| | raise HTTPException(status_code=500, detail=f"Audit trail failed: {str(e)}") |
| | finally: |
| | conn.close() |
| |
|
| |
|
| | def get_db_stats(org_id: str) -> Dict[str, Any]: |
| | """ |
| | Retrieve storage and row count statistics for a tenant. |
| | |
| | Returns: |
| | dict: { |
| | "db_size_gb": float, |
| | "total_rows": int, |
| | "table_counts": {"raw_rows": int, "sales_canonical": int, ...} |
| | } |
| | """ |
| | conn = get_conn(org_id) |
| | stats = {} |
| | |
| | try: |
| | |
| | db_file = DB_DIR / f"{org_id}.duckdb" |
| | stats["db_size_gb"] = db_file.stat().st_size / (1024 ** 3) if db_file.exists() else 0 |
| | |
| | |
| | tables = conn.execute(""" |
| | SELECT table_name |
| | FROM information_schema.tables |
| | WHERE table_schema = 'main' |
| | """).fetchall() |
| | |
| | stats["table_counts"] = {} |
| | for (table_name,) in tables: |
| | count = conn.execute(f"SELECT COUNT(*) FROM main.{table_name}").fetchone()[0] |
| | stats["table_counts"][table_name] = count |
| | |
| | stats["total_rows"] = sum(stats["table_counts"].values()) |
| | |
| | finally: |
| | conn.close() |
| | |
| | return stats |