petermutwiri commited on
Commit
f670966
Β·
verified Β·
1 Parent(s): 1703bb3

Update app/db.py

Browse files
Files changed (1) hide show
  1. app/db.py +79 -64
app/db.py CHANGED
@@ -1,24 +1,21 @@
1
- import duckdb, os, pathlib, json
2
- from datetime import datetime
 
 
 
3
  from typing import Any, Dict, List
 
4
 
5
  DB_DIR = pathlib.Path("./data/duckdb")
6
  DB_DIR.mkdir(parents=True, exist_ok=True)
7
 
8
-
9
  def get_conn(org_id: str):
10
  """Get or create a DuckDB connection for an organization."""
11
  db_file = DB_DIR / f"{org_id}.duckdb"
12
  return duckdb.connect(str(db_file), read_only=False)
13
 
14
-
15
- # ------------------------------------------------------------
16
- # πŸ”Ή Backward-compatible table for raw JSON ingestion
17
- # ------------------------------------------------------------
18
  def ensure_raw_table(conn):
19
- """
20
- Maintains legacy compatibility for ingestion from webhooks / file uploads.
21
- """
22
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
23
  conn.execute("""
24
  CREATE TABLE IF NOT EXISTS main.raw_rows(
@@ -27,94 +24,112 @@ def ensure_raw_table(conn):
27
  )
28
  """)
29
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
- # ------------------------------------------------------------
32
- # πŸ”Ή Flexible dynamic schema table creation
33
- # ------------------------------------------------------------
34
  def ensure_table(conn, table_name: str, sample_record: Dict[str, Any]):
35
  """
36
- Ensures a DuckDB table exists with columns inferred from sample_record.
37
- If new columns appear later, adds them automatically.
38
  """
 
 
 
39
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
 
 
40
  conn.execute(
41
  f"CREATE TABLE IF NOT EXISTS main.{table_name} ("
42
  "id UUID DEFAULT uuid(), "
43
  "_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)"
44
  )
45
 
46
- if not sample_record:
47
- return
48
-
49
- existing_cols = {r[0] for r in conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall()}
 
 
 
50
 
 
51
  for col, val in sample_record.items():
52
- if col.lower() in existing_cols:
 
 
53
  continue
54
- dtype = infer_duckdb_type(val)
55
- print(f"[db] βž• Adding new column '{col}:{dtype}' to main.{table_name}")
56
- conn.execute(f"ALTER TABLE main.{table_name} ADD COLUMN {col} {dtype}")
57
-
58
-
59
- def infer_duckdb_type(value: Any) -> str:
60
- """Infer a DuckDB-compatible column type from a Python value."""
61
- if isinstance(value, bool):
62
- return "BOOLEAN"
63
- if isinstance(value, int):
64
- return "BIGINT"
65
- if isinstance(value, float):
66
- return "DOUBLE"
67
- if isinstance(value, datetime):
68
- return "TIMESTAMP"
69
- if isinstance(value, (dict, list)):
70
- return "JSON"
71
- return "VARCHAR"
72
-
73
 
74
- # ------------------------------------------------------------
75
- # πŸ”Ή Insert records with auto-schema
76
- # ------------------------------------------------------------
77
  def insert_records(conn, table_name: str, records: List[Dict[str, Any]]):
78
  """
79
- Insert records into the specified table.
80
- Assumes ensure_table() has already been called.
81
  """
82
  if not records:
83
  return
84
 
85
- cols = records[0].keys()
 
86
  placeholders = ", ".join(["?"] * len(cols))
87
  col_list = ", ".join(cols)
 
88
  insert_sql = f"INSERT INTO main.{table_name} ({col_list}) VALUES ({placeholders})"
89
 
90
- values = [tuple(r.get(c) for c in cols) for r in records]
91
- conn.executemany(insert_sql, values)
92
- print(f"[db] βœ… Inserted {len(records)} rows into {table_name}")
93
-
94
-
95
- # ------------------------------------------------------------
96
- # πŸ”Ή Unified bootstrap entrypoint
97
- # ------------------------------------------------------------
98
- # ------------------------------------------------------------
99
- # πŸ”Ή Unified bootstrap entrypoint – n8n-native
100
- # ------------------------------------------------------------
101
- # app/db.py - FIX bootstrap to only audit, not create tables
 
 
 
 
 
 
102
 
 
103
  def bootstrap(org_id: str, payload: Dict[str, Any]):
104
  """
105
- PURE AUDIT TRAIL - Does NOT create tables.
106
- Only stores raw JSON for lineage. Table creation is canonify_df's job.
107
  """
108
  conn = get_conn(org_id)
109
  ensure_raw_table(conn)
110
 
111
- # Store raw payload for audit (only if non-empty)
112
  try:
113
- raw_json = json.dumps(payload)
114
  if raw_json and raw_json not in ("null", "[]", "{}"):
115
  conn.execute("INSERT INTO main.raw_rows (row_data) VALUES (?)", (raw_json,))
116
- print(f"[bootstrap] βœ… Audit trail stored: {len(raw_json)} bytes")
117
  except Exception as e:
118
- print(f"[bootstrap] ⚠️ Audit storage failed: {e}")
119
 
120
- conn.close()
 
1
+ # app/db.py – BULLETPROOF VERSION
2
+ import os
3
+ import pathlib
4
+ import json
5
+ import duckdb
6
  from typing import Any, Dict, List
7
+ from datetime import datetime
8
 
9
  DB_DIR = pathlib.Path("./data/duckdb")
10
  DB_DIR.mkdir(parents=True, exist_ok=True)
11
 
 
12
  def get_conn(org_id: str):
13
  """Get or create a DuckDB connection for an organization."""
14
  db_file = DB_DIR / f"{org_id}.duckdb"
15
  return duckdb.connect(str(db_file), read_only=False)
16
 
 
 
 
 
17
  def ensure_raw_table(conn):
18
+ """Creates audit trail table for raw JSON data."""
 
 
19
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
20
  conn.execute("""
21
  CREATE TABLE IF NOT EXISTS main.raw_rows(
 
24
  )
25
  """)
26
 
27
+ def infer_duckdb_type(value: Any) -> str:
28
+ """Infer DuckDB type from Python value."""
29
+ if isinstance(value, bool):
30
+ return "BOOLEAN"
31
+ if isinstance(value, int):
32
+ return "BIGINT"
33
+ if isinstance(value, float):
34
+ return "DOUBLE"
35
+ if isinstance(value, datetime):
36
+ return "TIMESTAMP"
37
+ return "VARCHAR"
38
 
39
+ # βœ… BULLETPROOF: Handles integer column names, None values, etc.
 
 
40
  def ensure_table(conn, table_name: str, sample_record: Dict[str, Any]):
41
  """
42
+ Ensures table exists with columns from sample_record.
43
+ SAFE: Converts int column names to strings, handles missing data.
44
  """
45
+ if not sample_record:
46
+ return
47
+
48
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
49
+
50
+ # Create base table with UUID and timestamp
51
  conn.execute(
52
  f"CREATE TABLE IF NOT EXISTS main.{table_name} ("
53
  "id UUID DEFAULT uuid(), "
54
  "_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)"
55
  )
56
 
57
+ # βœ… SAFE: Get existing columns (handle int names from DB)
58
+ try:
59
+ existing_cols_raw = conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall()
60
+ existing_cols = {str(r[0]).lower() for r in existing_cols_raw}
61
+ except Exception as e:
62
+ print(f"[db] ⚠️ Could not get table info: {e}")
63
+ existing_cols = set()
64
 
65
+ # βœ… BULLETPROOF: Add missing columns with safe name handling
66
  for col, val in sample_record.items():
67
+ col_name = str(col).lower().strip() # βœ… FORCE STRING
68
+
69
+ if col_name in existing_cols:
70
  continue
71
+
72
+ # Skip None/empty values which can't infer type
73
+ if val is None:
74
+ print(f"[db] ⚠️ Skipping column {col_name} (None value)")
75
+ continue
76
+
77
+ try:
78
+ dtype = infer_duckdb_type(val)
79
+ print(f"[db] βž• Adding column '{col_name}:{dtype}' to main.{table_name}")
80
+ conn.execute(f"ALTER TABLE main.{table_name} ADD COLUMN {col_name} {dtype}")
81
+ except Exception as e:
82
+ print(f"[db] ❌ Failed to add column {col_name}: {e}")
83
+ # Continue with next column - don't crash
 
 
 
 
 
 
84
 
 
 
 
85
  def insert_records(conn, table_name: str, records: List[Dict[str, Any]]):
86
  """
87
+ Insert records into table with safe column handling.
 
88
  """
89
  if not records:
90
  return
91
 
92
+ # βœ… Get columns from first record (force string names)
93
+ cols = [str(k) for k in records[0].keys()]
94
  placeholders = ", ".join(["?"] * len(cols))
95
  col_list = ", ".join(cols)
96
+
97
  insert_sql = f"INSERT INTO main.{table_name} ({col_list}) VALUES ({placeholders})"
98
 
99
+ # Prepare values (handle missing keys)
100
+ values = []
101
+ for record in records:
102
+ row = []
103
+ for col in cols:
104
+ val = record.get(col)
105
+ # Convert dict/list to JSON string for DuckDB
106
+ if isinstance(val, (dict, list)):
107
+ val = json.dumps(val)
108
+ row.append(val)
109
+ values.append(tuple(row))
110
+
111
+ try:
112
+ conn.executemany(insert_sql, values)
113
+ print(f"[db] βœ… Inserted {len(records)} rows into {table_name}")
114
+ except Exception as e:
115
+ print(f"[db] ❌ Insert failed: {e}")
116
+ raise
117
 
118
+ # βœ… PURE AUDIT FUNCTION - Does NOT create tables
119
  def bootstrap(org_id: str, payload: Dict[str, Any]):
120
  """
121
+ **ENTERPRISE-GRADE**: Only stores raw JSON for audit.
122
+ Does NOT create any tables. Schema evolution is canonify_df's job.
123
  """
124
  conn = get_conn(org_id)
125
  ensure_raw_table(conn)
126
 
 
127
  try:
128
+ raw_json = json.dumps(payload) if not isinstance(payload, str) else payload
129
  if raw_json and raw_json not in ("null", "[]", "{}"):
130
  conn.execute("INSERT INTO main.raw_rows (row_data) VALUES (?)", (raw_json,))
131
+ print(f"[bootstrap] βœ… Audit stored: {len(raw_json)} bytes")
132
  except Exception as e:
133
+ print(f"[bootstrap] ⚠️ Audit failed: {e}")
134
 
135
+ conn.close()