Update app/mapper.py
Browse files- app/mapper.py +10 -7
app/mapper.py
CHANGED
|
@@ -3,7 +3,7 @@ import os
|
|
| 3 |
import json
|
| 4 |
import duckdb
|
| 5 |
import pandas as pd
|
| 6 |
-
from datetime import datetime
|
| 7 |
from app.db import get_conn, ensure_raw_table
|
| 8 |
from app.utils.detect_industry import _ALIAS
|
| 9 |
|
|
@@ -90,20 +90,22 @@ def reconcile_latest_schema(duck: duckdb.DuckDBPyConnection) -> None:
|
|
| 90 |
duck.execute("CREATE OR REPLACE TABLE main.canonical_latest AS " + union_query)
|
| 91 |
print(f"[schema] ✅ reconciled {len(tables)} versions → canonical_latest")
|
| 92 |
|
| 93 |
-
# ---------------------- Canonify core logic ---------------------- #
|
| 94 |
def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
|
| 95 |
load_dynamic_aliases()
|
| 96 |
conn = get_conn(org_id)
|
| 97 |
ensure_raw_table(conn)
|
| 98 |
|
| 99 |
-
# 1️⃣
|
| 100 |
-
|
|
|
|
|
|
|
|
|
|
| 101 |
SELECT row_data
|
| 102 |
FROM raw_rows
|
| 103 |
WHERE try_strptime(NULLIF(json_extract(row_data, '$.timestamp'), ''),
|
| 104 |
'%Y-%m-%d %H:%M:%S')
|
| 105 |
-
|
| 106 |
-
"""
|
| 107 |
|
| 108 |
if not rows:
|
| 109 |
print("[canonify] no rows")
|
|
@@ -119,6 +121,7 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
|
|
| 119 |
if any(a in col for a in aliases):
|
| 120 |
mapping[col] = canon
|
| 121 |
break
|
|
|
|
| 122 |
# dynamic aliases
|
| 123 |
for col in raw.columns:
|
| 124 |
if col not in sum(CANONICAL.values(), []):
|
|
@@ -151,6 +154,6 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
|
|
| 151 |
duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
|
| 152 |
reconcile_latest_schema(duck)
|
| 153 |
duck.close()
|
| 154 |
-
|
| 155 |
print(f"[canonify] ✅ canonical snapshot updated for {org_id}")
|
| 156 |
return df
|
|
|
|
| 3 |
import json
|
| 4 |
import duckdb
|
| 5 |
import pandas as pd
|
| 6 |
+
from datetime import datetime, timedelta
|
| 7 |
from app.db import get_conn, ensure_raw_table
|
| 8 |
from app.utils.detect_industry import _ALIAS
|
| 9 |
|
|
|
|
| 90 |
duck.execute("CREATE OR REPLACE TABLE main.canonical_latest AS " + union_query)
|
| 91 |
print(f"[schema] ✅ reconciled {len(tables)} versions → canonical_latest")
|
| 92 |
|
|
|
|
| 93 |
def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
|
| 94 |
load_dynamic_aliases()
|
| 95 |
conn = get_conn(org_id)
|
| 96 |
ensure_raw_table(conn)
|
| 97 |
|
| 98 |
+
# 1️⃣ zero-parameter timestamp filter
|
| 99 |
+
cutoff = datetime.utcnow() - timedelta(hours=hours_window)
|
| 100 |
+
cutoff_str = cutoff.strftime('%Y-%m-%d %H:%M:%S')
|
| 101 |
+
|
| 102 |
+
rows = sql(conn, f"""
|
| 103 |
SELECT row_data
|
| 104 |
FROM raw_rows
|
| 105 |
WHERE try_strptime(NULLIF(json_extract(row_data, '$.timestamp'), ''),
|
| 106 |
'%Y-%m-%d %H:%M:%S')
|
| 107 |
+
>= TIMESTAMP '{cutoff_str}'
|
| 108 |
+
""")
|
| 109 |
|
| 110 |
if not rows:
|
| 111 |
print("[canonify] no rows")
|
|
|
|
| 121 |
if any(a in col for a in aliases):
|
| 122 |
mapping[col] = canon
|
| 123 |
break
|
| 124 |
+
|
| 125 |
# dynamic aliases
|
| 126 |
for col in raw.columns:
|
| 127 |
if col not in sum(CANONICAL.values(), []):
|
|
|
|
| 154 |
duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
|
| 155 |
reconcile_latest_schema(duck)
|
| 156 |
duck.close()
|
| 157 |
+
|
| 158 |
print(f"[canonify] ✅ canonical snapshot updated for {org_id}")
|
| 159 |
return df
|