Peter Mutwiri commited on
Commit ·
ea24fcc
1
Parent(s): 79f9bb1
purge: DuckDB uses singular HOUR
Browse files- app/mapper.py +7 -5
- app/tasks/purge.py +5 -1
app/mapper.py
CHANGED
|
@@ -1,5 +1,5 @@
|
|
| 1 |
import os, json, duckdb, pandas as pd
|
| 2 |
-
from datetime import datetime
|
| 3 |
from app.db import get_conn, ensure_raw_table
|
| 4 |
from app.utils.detect_industry import _ALIAS
|
| 5 |
|
|
@@ -113,14 +113,16 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
|
|
| 113 |
# ⏱ Safe timestamp filtering
|
| 114 |
# --------------------------
|
| 115 |
try:
|
|
|
|
|
|
|
|
|
|
| 116 |
rows = conn.execute(
|
| 117 |
-
"""
|
| 118 |
SELECT row_data
|
| 119 |
FROM raw_rows
|
| 120 |
WHERE strptime(json_extract(row_data, '$.timestamp'), '%Y-%m-%d %H:%M:%S')
|
| 121 |
-
>=
|
| 122 |
-
"""
|
| 123 |
-
(hours_window,)
|
| 124 |
).fetchall()
|
| 125 |
except Exception as e:
|
| 126 |
print(f"[canonify] ⚠️ fallback to all rows due to timestamp parse error: {e}")
|
|
|
|
| 1 |
import os, json, duckdb, pandas as pd
|
| 2 |
+
from datetime import datetime, timedelta
|
| 3 |
from app.db import get_conn, ensure_raw_table
|
| 4 |
from app.utils.detect_industry import _ALIAS
|
| 5 |
|
|
|
|
| 113 |
# ⏱ Safe timestamp filtering
|
| 114 |
# --------------------------
|
| 115 |
try:
|
| 116 |
+
# Compute cutoff in Python to avoid parameter placeholders inside INTERVAL
|
| 117 |
+
cutoff = datetime.now() - timedelta(hours=hours_window)
|
| 118 |
+
cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S")
|
| 119 |
rows = conn.execute(
|
| 120 |
+
f"""
|
| 121 |
SELECT row_data
|
| 122 |
FROM raw_rows
|
| 123 |
WHERE strptime(json_extract(row_data, '$.timestamp'), '%Y-%m-%d %H:%M:%S')
|
| 124 |
+
>= TIMESTAMP '{cutoff_str}'
|
| 125 |
+
"""
|
|
|
|
| 126 |
).fetchall()
|
| 127 |
except Exception as e:
|
| 128 |
print(f"[canonify] ⚠️ fallback to all rows due to timestamp parse error: {e}")
|
app/tasks/purge.py
CHANGED
|
@@ -1,5 +1,9 @@
|
|
| 1 |
from app.db import get_conn, ensure_raw_table
|
|
|
|
|
|
|
| 2 |
def purge_old_raw(org_id: str, hours=6):
|
| 3 |
conn = get_conn(org_id)
|
| 4 |
-
|
|
|
|
|
|
|
| 5 |
conn.commit(); conn.close()
|
|
|
|
| 1 |
from app.db import get_conn, ensure_raw_table
|
| 2 |
+
from datetime import datetime, timedelta
|
| 3 |
+
|
| 4 |
def purge_old_raw(org_id: str, hours=6):
|
| 5 |
conn = get_conn(org_id)
|
| 6 |
+
cutoff = datetime.now() - timedelta(hours=hours)
|
| 7 |
+
cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S")
|
| 8 |
+
conn.execute(f"DELETE FROM raw_rows WHERE ingested_at < TIMESTAMP '{cutoff_str}'")
|
| 9 |
conn.commit(); conn.close()
|