petermutwiri commited on
Commit
3e6d56f
·
verified ·
1 Parent(s): 015dbe2

Update app/db.py

Browse files
Files changed (1) hide show
  1. app/db.py +42 -15
app/db.py CHANGED
@@ -100,27 +100,54 @@ def insert_records(conn, table_name: str, records: List[Dict[str, Any]]):
100
  # ------------------------------------------------------------
101
  def bootstrap(org_id: str, payload: Dict[str, Any]):
102
  """
103
- Accepts:
104
- { orgId: "real-org", rows: [ {...}, {...} ] }
105
- or any legacy shape.
 
 
 
106
  """
107
  conn = get_conn(org_id)
108
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
109
  ensure_raw_table(conn)
110
 
111
- # --- adapt n8n shape ---
112
- if "rows" in payload and "orgId" in payload:
113
- rows = payload["rows"] # your exact field
114
- org_id = payload["orgId"] # override param
115
- else:
116
- rows = payload # fallback
117
-
118
- # lineage
119
- conn.execute("INSERT INTO main.raw_rows (row_data) VALUES (?)", (json.dumps(payload),))
120
-
121
- # ingest
122
- if isinstance(rows, list) and rows:
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  ensure_table(conn, "sales", rows[0])
124
  insert_records(conn, "sales", rows)
 
 
 
 
 
 
 
 
 
 
125
 
126
  conn.close()
 
 
100
  # ------------------------------------------------------------
101
  def bootstrap(org_id: str, payload: Dict[str, Any]):
102
  """
103
+ Unified ingestion entrypoint. Accepts:
104
+ - {"orgId": "...", "rows": [...]}
105
+ - {"orgId": "...", "data": [...]}
106
+ - {"orgId": "...", "tables": {...}}
107
+ - legacy shapes (list of rows)
108
+ Stores the full payload into main.raw_rows (for audit), then inserts rows into per-table tables.
109
  """
110
  conn = get_conn(org_id)
111
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
112
  ensure_raw_table(conn)
113
 
114
+ # Normalize shapes
115
+ rows = None
116
+ if isinstance(payload, dict) and "orgId" in payload:
117
+ # typical n8n shape
118
+ if "rows" in payload and isinstance(payload["rows"], list):
119
+ rows = payload["rows"]
120
+ elif "data" in payload and isinstance(payload["data"], list):
121
+ rows = payload["data"]
122
+ elif "tables" in payload and isinstance(payload["tables"], dict):
123
+ # multi-table: write each table separately below
124
+ # but still store entire payload for audit
125
+ pass
126
+ elif isinstance(payload, list):
127
+ rows = payload
128
+
129
+ # lineage / audit: only insert a JSON string (non-empty)
130
+ try:
131
+ raw_json = json.dumps(payload)
132
+ if raw_json and raw_json != "null" and raw_json != "[]":
133
+ conn.execute("INSERT INTO main.raw_rows (row_data) VALUES (?)", (raw_json,))
134
+ except Exception as e:
135
+ print(f"[db] ⚠️ failed to store raw payload for auditing: {e}")
136
+
137
+ # ingest single-list rows (common)
138
+ if rows:
139
  ensure_table(conn, "sales", rows[0])
140
  insert_records(conn, "sales", rows)
141
+ else:
142
+ # multi-table handler
143
+ if isinstance(payload, dict) and "tables" in payload:
144
+ for tname, recs in payload["tables"].items():
145
+ if not isinstance(recs, list) or not recs:
146
+ continue
147
+ # sanitize table name (basic)
148
+ safe_table = "".join(c for c in tname.lower() if c.isalnum() or c == "_")
149
+ ensure_table(conn, safe_table, recs[0])
150
+ insert_records(conn, safe_table, recs)
151
 
152
  conn.close()
153
+