Peter Mutwiri commited on
Commit
dac1876
·
1 Parent(s): 382cbc2

refactored ingestion line

Browse files
Files changed (2) hide show
  1. app/db.py +3 -18
  2. app/mapper.py +1 -1
app/db.py CHANGED
@@ -30,7 +30,7 @@ DB_DIR.mkdir(parents=True, exist_ok=True)
30
  MAX_DB_SIZE_GB = float(os.getenv("MAX_DB_SIZE_GB", "10.0"))
31
 
32
  # Minimum canonical columns required for analytics contracts
33
- REQUIRED_CANONICAL_COLUMNS = {"timestamp", "total", "entity_type"}
34
 
35
 
36
  # ==================== CONNECTION MANAGEMENT ==================== #
@@ -223,25 +223,10 @@ def ensure_table(
223
 
224
 
225
  def enforce_schema_contract(df: pd.DataFrame, org_id: str):
226
- """
227
- Validates that canonical DataFrame meets minimum contract requirements.
228
-
229
- Prevents silent schema drift that breaks cross-org analytics.
230
-
231
- Args:
232
- df: DataFrame after canonification
233
- org_id: Tenant ID (for error messaging)
234
-
235
- Raises:
236
- ValueError: If required canonical columns are missing
237
- """
238
  missing = REQUIRED_CANONICAL_COLUMNS - set(df.columns)
239
  if missing:
240
- raise ValueError(
241
- f"Org {org_id} missing required canonical columns: {missing}. "
242
- f"Available: {list(df.columns)}"
243
- )
244
-
245
 
246
  def insert_records(
247
  conn: duckdb.DuckDBPyConnection,
 
30
  MAX_DB_SIZE_GB = float(os.getenv("MAX_DB_SIZE_GB", "10.0"))
31
 
32
  # Minimum canonical columns required for analytics contracts
33
+ REQUIRED_CANONICAL_COLUMNS = {"timestamp"}
34
 
35
 
36
  # ==================== CONNECTION MANAGEMENT ==================== #
 
223
 
224
 
225
  def enforce_schema_contract(df: pd.DataFrame, org_id: str):
226
+ """Soft enforcement - logs warnings but doesn't crash"""
 
 
 
 
 
 
 
 
 
 
 
227
  missing = REQUIRED_CANONICAL_COLUMNS - set(df.columns)
228
  if missing:
229
+ print(f"[schema_contract] ⚠️ Org {org_id} missing recommended columns: {missing}")
 
 
 
 
230
 
231
  def insert_records(
232
  conn: duckdb.DuckDBPyConnection,
app/mapper.py CHANGED
@@ -4,7 +4,7 @@ import json
4
  import duckdb
5
  import pandas as pd
6
  from datetime import datetime, timedelta
7
- from app.db import get_conn, ensure_raw_table, enforce_schema_contract
8
  from app.utils.detect_industry import _ALIAS, detect_industry
9
  # app/mapper.py (add line 1)
10
  from app.hybrid_entity_detector import hybrid_detect_entity_type
 
4
  import duckdb
5
  import pandas as pd
6
  from datetime import datetime, timedelta
7
+ from app.db import get_conn, ensure_raw_table, enforce_schema_contract, transactional_conn
8
  from app.utils.detect_industry import _ALIAS, detect_industry
9
  # app/mapper.py (add line 1)
10
  from app.hybrid_entity_detector import hybrid_detect_entity_type