Peter Mutwiri commited on
Commit Β·
eb5f1af
1
Parent(s): ffd2e51
refactored main.py for proper logign
Browse files- app/main.py +17 -2
- scheduler_loop.py +12 -2
app/main.py
CHANGED
|
@@ -3,12 +3,19 @@
|
|
| 3 |
MutSyncHub Analytics Engine
|
| 4 |
Enterprise-grade AI analytics platform with zero-cost inference
|
| 5 |
"""
|
|
|
|
| 6 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 7 |
# βββ Standard Library βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 8 |
import os
|
| 9 |
import time
|
| 10 |
import uuid
|
| 11 |
-
import
|
| 12 |
|
| 13 |
# βββ Third-Party ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 14 |
from fastapi import FastAPI, Depends, HTTPException, Request
|
|
@@ -70,6 +77,10 @@ async def lifespan(app: FastAPI):
|
|
| 70 |
except Exception as e:
|
| 71 |
logger.error(f"π΄ Startup health check failed: {e}")
|
| 72 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 73 |
logger.info("β
Startup sequence complete")
|
| 74 |
yield
|
| 75 |
|
|
@@ -78,6 +89,10 @@ async def lifespan(app: FastAPI):
|
|
| 78 |
logger.info("π ANALYTICS ENGINE - SHUTDOWN SEQUENCE")
|
| 79 |
logger.info("=" * 60)
|
| 80 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 81 |
# Close all database connections
|
| 82 |
from app.deps import _org_db_connections, _vector_db_conn
|
| 83 |
|
|
@@ -94,7 +109,7 @@ async def lifespan(app: FastAPI):
|
|
| 94 |
_vector_db_conn.close()
|
| 95 |
logger.info(" β Closed Vector DB")
|
| 96 |
except:
|
| 97 |
-
|
| 98 |
|
| 99 |
logger.info("β
Shutdown complete")
|
| 100 |
|
|
|
|
| 3 |
MutSyncHub Analytics Engine
|
| 4 |
Enterprise-grade AI analytics platform with zero-cost inference
|
| 5 |
"""
|
| 6 |
+
import logging
|
| 7 |
|
| 8 |
+
# Configure logging to see all info messages
|
| 9 |
+
logging.basicConfig(
|
| 10 |
+
level=logging.INFO,
|
| 11 |
+
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
|
| 12 |
+
datefmt="%Y-%m-%d %H:%M:%S"
|
| 13 |
+
)
|
| 14 |
# βββ Standard Library βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 15 |
import os
|
| 16 |
import time
|
| 17 |
import uuid
|
| 18 |
+
import subprocess
|
| 19 |
|
| 20 |
# βββ Third-Party ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 21 |
from fastapi import FastAPI, Depends, HTTPException, Request
|
|
|
|
| 77 |
except Exception as e:
|
| 78 |
logger.error(f"π΄ Startup health check failed: {e}")
|
| 79 |
|
| 80 |
+
# Start scheduler in background
|
| 81 |
+
scheduler_process = subprocess.Popen(["python", "/app/scheduler_loop.py"])
|
| 82 |
+
logger.info(f"β
Scheduler started (PID: {scheduler_process.pid})")
|
| 83 |
+
|
| 84 |
logger.info("β
Startup sequence complete")
|
| 85 |
yield
|
| 86 |
|
|
|
|
| 89 |
logger.info("π ANALYTICS ENGINE - SHUTDOWN SEQUENCE")
|
| 90 |
logger.info("=" * 60)
|
| 91 |
|
| 92 |
+
# Close scheduler
|
| 93 |
+
scheduler_process.terminate()
|
| 94 |
+
logger.info(" β Stopped scheduler")
|
| 95 |
+
|
| 96 |
# Close all database connections
|
| 97 |
from app.deps import _org_db_connections, _vector_db_conn
|
| 98 |
|
|
|
|
| 109 |
_vector_db_conn.close()
|
| 110 |
logger.info(" β Closed Vector DB")
|
| 111 |
except:
|
| 112 |
+
pass
|
| 113 |
|
| 114 |
logger.info("β
Shutdown complete")
|
| 115 |
|
scheduler_loop.py
CHANGED
|
@@ -1,7 +1,16 @@
|
|
| 1 |
import json, time, os, requests
|
|
|
|
| 2 |
from datetime import datetime, timedelta
|
| 3 |
from pathlib import Path
|
| 4 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
SCHEDULE_FILE = "/data/.schedules.json"
|
| 6 |
RUN_URL = "http://localhost:8000/analytics/run" # inside container
|
| 7 |
|
|
@@ -19,7 +28,7 @@ def tick():
|
|
| 19 |
r = requests.post(RUN_URL,
|
| 20 |
json={"analytic": analytic},
|
| 21 |
headers={"X-Data-Path": f"/data/{s['orgId']}/sales.parquet"})
|
| 22 |
-
|
| 23 |
# bump nextRun
|
| 24 |
s["nextRun"] = (_next_run(s["frequency"])).isoformat()
|
| 25 |
|
|
@@ -34,9 +43,10 @@ def _next_run(frequency: str) -> datetime:
|
|
| 34 |
return now
|
| 35 |
|
| 36 |
if __name__ == "__main__":
|
|
|
|
| 37 |
while True:
|
| 38 |
try:
|
| 39 |
tick()
|
| 40 |
except Exception as e:
|
| 41 |
-
|
| 42 |
time.sleep(60) # 1-minute granularity
|
|
|
|
| 1 |
import json, time, os, requests
|
| 2 |
+
import logging
|
| 3 |
from datetime import datetime, timedelta
|
| 4 |
from pathlib import Path
|
| 5 |
|
| 6 |
+
# Configure logging
|
| 7 |
+
logging.basicConfig(
|
| 8 |
+
level=logging.INFO,
|
| 9 |
+
format="%(asctime)s | scheduler | %(levelname)s | %(message)s",
|
| 10 |
+
datefmt="%Y-%m-%d %H:%M:%S"
|
| 11 |
+
)
|
| 12 |
+
logger = logging.getLogger(__name__)
|
| 13 |
+
|
| 14 |
SCHEDULE_FILE = "/data/.schedules.json"
|
| 15 |
RUN_URL = "http://localhost:8000/analytics/run" # inside container
|
| 16 |
|
|
|
|
| 28 |
r = requests.post(RUN_URL,
|
| 29 |
json={"analytic": analytic},
|
| 30 |
headers={"X-Data-Path": f"/data/{s['orgId']}/sales.parquet"})
|
| 31 |
+
logger.info(f"ran {analytic} for {s['orgId']} -> status={r.status_code}")
|
| 32 |
# bump nextRun
|
| 33 |
s["nextRun"] = (_next_run(s["frequency"])).isoformat()
|
| 34 |
|
|
|
|
| 43 |
return now
|
| 44 |
|
| 45 |
if __name__ == "__main__":
|
| 46 |
+
logger.info("π Scheduler loop started (60s interval)")
|
| 47 |
while True:
|
| 48 |
try:
|
| 49 |
tick()
|
| 50 |
except Exception as e:
|
| 51 |
+
logger.error(f"error: {e}")
|
| 52 |
time.sleep(60) # 1-minute granularity
|