LeonardoMdSA commited on
Commit
94337ad
·
1 Parent(s): 61ee9c4

broken stuff

Browse files
app/api/background_drift.py CHANGED
@@ -30,7 +30,7 @@ async def drift_loop(interval_seconds: int = 10):
30
  continue
31
 
32
  prod_df = pd.read_csv(PROD_LOG_PATH)
33
-
34
  # ---- Retention window (prevents infinite growth) ----
35
  if len(prod_df) > MAX_ROWS:
36
  prod_df = prod_df.tail(MAX_ROWS)
@@ -50,10 +50,9 @@ async def drift_loop(interval_seconds: int = 10):
50
 
51
  reference_df = pd.read_csv(REFERENCE_PATH)
52
 
 
53
  _, drift_dict = run_drift_check(
54
- prod_df[predictor.features],
55
- reference_df[predictor.features],
56
- model_version="v1",
57
  )
58
 
59
  dashboard_payload = {
 
30
  continue
31
 
32
  prod_df = pd.read_csv(PROD_LOG_PATH)
33
+
34
  # ---- Retention window (prevents infinite growth) ----
35
  if len(prod_df) > MAX_ROWS:
36
  prod_df = prod_df.tail(MAX_ROWS)
 
50
 
51
  reference_df = pd.read_csv(REFERENCE_PATH)
52
 
53
+ # ---- FIX: pass reference_df to run_drift_check ----
54
  _, drift_dict = run_drift_check(
55
+ prod_df[predictor.features], reference_df[predictor.features], model_version="v1"
 
 
56
  )
57
 
58
  dashboard_payload = {
app/api/routes.py CHANGED
@@ -20,6 +20,25 @@ predictor = Predictor()
20
  # Production log file
21
  PROD_LOG = "data/production/predictions_log.csv"
22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
  @router.post("/predict")
25
  async def predict_file(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
@@ -28,7 +47,10 @@ async def predict_file(background_tasks: BackgroundTasks, file: UploadFile = Fil
28
  # ---- STRICT MODE: schema enforcement ----
29
  missing = set(predictor.features) - set(df.columns)
30
  if missing:
31
- raise HTTPException(status_code=400, detail=f"Invalid schema. Missing required columns: {sorted(missing)}")
 
 
 
32
 
33
  # ---- Model inference ----
34
  preds, probas = predictor.predict(df)
@@ -43,9 +65,12 @@ async def predict_file(background_tasks: BackgroundTasks, file: UploadFile = Fil
43
 
44
  # ---- Drift: immediate for frontend ----
45
  reference_df = pd.read_csv("models/v1/reference_data.csv")
46
- _, drift_dict = run_drift_check(df[predictor.features], reference_df[predictor.features], "v1")
 
 
 
 
47
 
48
- # Safe numeric drift values for chart
49
  drift_for_chart = []
50
  for col, score in drift_dict.items():
51
  try:
@@ -58,40 +83,43 @@ async def predict_file(background_tasks: BackgroundTasks, file: UploadFile = Fil
58
 
59
  # ---- Append predictions to production log ----
60
  df_log = df.copy()
 
 
 
 
 
 
61
  df_log["prediction"] = preds
62
  df_log["probability"] = probas
63
- df_log["risk_level"] = ["High" if p >= 0.75 else "Medium" if p >= 0.5 else "Low" for p in probas]
 
 
 
64
  df_log["model_version"] = predictor.model_version
65
  df_log["timestamp"] = pd.Timestamp.utcnow()
66
 
67
- os.makedirs(os.path.dirname(PROD_LOG), exist_ok=True)
68
- if not os.path.exists(PROD_LOG):
69
- df_log.to_csv(PROD_LOG, index=False)
70
- else:
71
- df_log.to_csv(PROD_LOG, mode="a", header=False, index=False)
72
 
73
- # ---- Background full drift check ----
74
- background_tasks.add_task(run_drift_check, df[predictor.features], reference_df[predictor.features], "v1")
75
  DASHBOARD_JSON = "reports/evidently/drift_report.json"
76
 
77
- # After computing drift_for_chart
78
  dashboard_payload = {
79
  "n_rows": len(results),
80
  "results": results,
81
- "drift": drift_for_chart
82
  }
83
 
84
- # Write JSON for dashboard frontend
85
  os.makedirs(os.path.dirname(DASHBOARD_JSON), exist_ok=True)
86
- # atomic write to avoid read/write collision
87
- import tempfile
88
  tmp_path = DASHBOARD_JSON + ".tmp"
89
  with open(tmp_path, "w") as f:
90
  json.dump(dashboard_payload, f, indent=2)
91
  os.replace(tmp_path, DASHBOARD_JSON)
92
 
93
-
94
- return JSONResponse({"n_rows": len(results), "results": results, "drift": drift_for_chart})
 
 
 
95
 
96
 
97
  @router.get("/health")
@@ -106,17 +134,6 @@ def run_drift():
106
  return {"status": "drift_check_completed", "report_path": report_path}
107
 
108
 
109
- @router.get("/monitoring/run")
110
- def monitoring_run(background_tasks: BackgroundTasks, model_version: str = "v1"):
111
- current_data = pd.read_csv("data/processed/current_data.csv")
112
- reference_data = pd.read_csv("data/processed/credit_default_clean.csv")
113
-
114
- background_tasks.add_task(run_drift_check, current_data[predictor.features], reference_data[predictor.features], model_version)
115
- background_tasks.add_task(run_governance_checks, current_data, model_version=model_version)
116
-
117
- return {"status": "monitoring triggered", "model_version": model_version}
118
-
119
-
120
  @router.get("/dashboard")
121
  def dashboard(request: Request):
122
  return templates.TemplateResponse("dashboard.html", {"request": request})
 
20
  # Production log file
21
  PROD_LOG = "data/production/predictions_log.csv"
22
 
23
+ # ------------------------------------------------------------------
24
+ # ENSURE production log exists at server startup (CRITICAL FIX)
25
+ # ------------------------------------------------------------------
26
+ os.makedirs(os.path.dirname(PROD_LOG), exist_ok=True)
27
+
28
+ if not os.path.exists(PROD_LOG):
29
+ # Create empty production log with correct schema
30
+ base_cols = list(predictor.features)
31
+ extra_cols = [
32
+ "prediction",
33
+ "probability",
34
+ "risk_level",
35
+ "model_version",
36
+ "timestamp",
37
+ ]
38
+ empty_df = pd.DataFrame(columns=base_cols + extra_cols)
39
+ empty_df.to_csv(PROD_LOG, index=False)
40
+ # ------------------------------------------------------------------
41
+
42
 
43
  @router.post("/predict")
44
  async def predict_file(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
 
47
  # ---- STRICT MODE: schema enforcement ----
48
  missing = set(predictor.features) - set(df.columns)
49
  if missing:
50
+ raise HTTPException(
51
+ status_code=400,
52
+ detail=f"Invalid schema. Missing required columns: {sorted(missing)}",
53
+ )
54
 
55
  # ---- Model inference ----
56
  preds, probas = predictor.predict(df)
 
65
 
66
  # ---- Drift: immediate for frontend ----
67
  reference_df = pd.read_csv("models/v1/reference_data.csv")
68
+ _, drift_dict = run_drift_check(
69
+ df[predictor.features],
70
+ reference_df[predictor.features],
71
+ "v1",
72
+ )
73
 
 
74
  drift_for_chart = []
75
  for col, score in drift_dict.items():
76
  try:
 
83
 
84
  # ---- Append predictions to production log ----
85
  df_log = df.copy()
86
+
87
+ # ---- FIX: Remove existing prediction/risk/probability/etc columns to avoid extra column issue ----
88
+ for col in ["prediction", "probability", "risk_level", "model_version", "timestamp"]:
89
+ if col in df_log.columns:
90
+ df_log = df_log.drop(columns=[col])
91
+
92
  df_log["prediction"] = preds
93
  df_log["probability"] = probas
94
+ df_log["risk_level"] = [
95
+ "High" if p >= 0.75 else "Medium" if p >= 0.5 else "Low"
96
+ for p in probas
97
+ ]
98
  df_log["model_version"] = predictor.model_version
99
  df_log["timestamp"] = pd.Timestamp.utcnow()
100
 
101
+ df_log.to_csv(PROD_LOG, mode="a", header=False, index=False)
 
 
 
 
102
 
103
+ # ---- Dashboard JSON ----
 
104
  DASHBOARD_JSON = "reports/evidently/drift_report.json"
105
 
 
106
  dashboard_payload = {
107
  "n_rows": len(results),
108
  "results": results,
109
+ "drift": drift_for_chart,
110
  }
111
 
 
112
  os.makedirs(os.path.dirname(DASHBOARD_JSON), exist_ok=True)
 
 
113
  tmp_path = DASHBOARD_JSON + ".tmp"
114
  with open(tmp_path, "w") as f:
115
  json.dump(dashboard_payload, f, indent=2)
116
  os.replace(tmp_path, DASHBOARD_JSON)
117
 
118
+ return JSONResponse({
119
+ "n_rows": len(results),
120
+ "results": results,
121
+ "drift": drift_for_chart,
122
+ })
123
 
124
 
125
  @router.get("/health")
 
134
  return {"status": "drift_check_completed", "report_path": report_path}
135
 
136
 
 
 
 
 
 
 
 
 
 
 
 
137
  @router.get("/dashboard")
138
  def dashboard(request: Request):
139
  return templates.TemplateResponse("dashboard.html", {"request": request})
app/api/traffic_daemon.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/api/traffic_daemon.py
2
+ import asyncio
3
+ import pandas as pd
4
+ import random
5
+ import requests
6
+ import os
7
+ import time
8
+
9
+ API_URL = "http://localhost:8000/predict"
10
+ SOURCE_DATA = "data/processed/current_data.csv"
11
+
12
+ MIN_SLEEP = 2
13
+ MAX_SLEEP = 8
14
+ MIN_BATCH = 1
15
+ MAX_BATCH = 5
16
+ STARTUP_DELAY = 10 # seconds – allow FastAPI to fully start
17
+
18
+
19
+ async def traffic_loop():
20
+ """
21
+ Continuously generate inference traffic against /predict.
22
+ """
23
+ await asyncio.sleep(STARTUP_DELAY)
24
+
25
+ if not os.path.exists(SOURCE_DATA):
26
+ print("Traffic daemon: source data not found, disabled.")
27
+ return
28
+
29
+ df = pd.read_csv(SOURCE_DATA)
30
+
31
+ print("Traffic daemon started.")
32
+
33
+ while True:
34
+ try:
35
+ batch_size = random.randint(MIN_BATCH, MAX_BATCH)
36
+ sample = df.sample(batch_size)
37
+
38
+ csv_bytes = sample.to_csv(index=False).encode("utf-8")
39
+
40
+ response = requests.post(
41
+ API_URL,
42
+ files={"file": ("sample.csv", csv_bytes, "text/csv")},
43
+ timeout=10,
44
+ )
45
+
46
+ if response.status_code != 200:
47
+ print("Traffic daemon warning:", response.status_code)
48
+
49
+ except Exception as e:
50
+ print("Traffic daemon error:", e)
51
+
52
+ await asyncio.sleep(random.uniform(MIN_SLEEP, MAX_SLEEP))
app/main.py CHANGED
@@ -1,4 +1,4 @@
1
- # app/main.py (no other changes)
2
  from fastapi import FastAPI
3
  from fastapi.staticfiles import StaticFiles
4
  import asyncio
@@ -8,19 +8,35 @@ from app.api.routes import router
8
  from app.api.dashboard_data import router as dashboard_data_router
9
  from app.core.logging import init_db
10
  from app.api.background_drift import drift_loop
 
 
11
 
12
  @asynccontextmanager
13
  async def lifespan(app: FastAPI):
 
14
  init_db()
15
- task = asyncio.create_task(drift_loop(interval_seconds=10))
 
 
 
 
 
 
16
  yield
17
- task.cancel()
18
- try:
19
- await task
20
- except asyncio.CancelledError:
21
- pass
22
 
23
- app = FastAPI(title="ML Inference Service", lifespan=lifespan)
 
 
 
 
 
 
 
 
 
 
 
 
24
 
25
  app.mount("/static", StaticFiles(directory="app/static"), name="static")
26
  app.mount("/reports", StaticFiles(directory="reports"), name="reports")
 
1
+ # app/main.py
2
  from fastapi import FastAPI
3
  from fastapi.staticfiles import StaticFiles
4
  import asyncio
 
8
  from app.api.dashboard_data import router as dashboard_data_router
9
  from app.core.logging import init_db
10
  from app.api.background_drift import drift_loop
11
+ from app.api.traffic_daemon import traffic_loop
12
+
13
 
14
  @asynccontextmanager
15
  async def lifespan(app: FastAPI):
16
+ # ---- Startup ----
17
  init_db()
18
+
19
+ # Start drift detection loop
20
+ drift_task = asyncio.create_task(drift_loop(interval_seconds=10))
21
+
22
+ # Start traffic daemon (delayed internally, HF-safe)
23
+ traffic_task = asyncio.create_task(traffic_loop())
24
+
25
  yield
 
 
 
 
 
26
 
27
+ # ---- Shutdown ----
28
+ for task in (drift_task, traffic_task):
29
+ task.cancel()
30
+ try:
31
+ await task
32
+ except asyncio.CancelledError:
33
+ pass
34
+
35
+
36
+ app = FastAPI(
37
+ title="ML Inference Service",
38
+ lifespan=lifespan,
39
+ )
40
 
41
  app.mount("/static", StaticFiles(directory="app/static"), name="static")
42
  app.mount("/reports", StaticFiles(directory="reports"), name="reports")
data/production/predictions_log.csv CHANGED
The diff for this file is too large to render. See raw diff
 
reports/evidently/drift_report.html CHANGED
The diff for this file is too large to render. See raw diff
 
reports/evidently/drift_report.json CHANGED
The diff for this file is too large to render. See raw diff
 
scripts/simulate_inference.py CHANGED
@@ -3,23 +3,66 @@ import pandas as pd
3
  import requests
4
  import random
5
  import time
 
6
 
7
- df = pd.read_csv("data/processed/current_data.csv")
 
8
 
9
- # Sample 1-5 rows randomly
10
- sample = df.sample(random.randint(1,5))
11
- csv_bytes = sample.to_csv(index=False).encode("utf-8")
 
 
12
 
13
- # POST to FastAPI predict endpoint
14
- response = requests.post(
15
- "http://localhost:8000/predict",
16
- files={"file": ("sample.csv", csv_bytes, "text/csv")}
17
- )
18
 
19
- print("Status:", response.status_code)
20
  try:
21
- print("Response:", response.json())
22
- except Exception:
23
- print("Server returned non-JSON response:")
24
- print(response.text)
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  import requests
4
  import random
5
  import time
6
+ import sys
7
 
8
+ API_URL = "http://localhost:8000/predict"
9
+ SOURCE_DATA = "data/processed/current_data.csv"
10
 
11
+ # Traffic behavior (tune freely)
12
+ MIN_SLEEP = 2 # seconds
13
+ MAX_SLEEP = 8 # seconds
14
+ MIN_BATCH = 1
15
+ MAX_BATCH = 5
16
 
17
+ print("Starting inference traffic daemon...")
18
+ print(f"Target API: {API_URL}")
19
+ print(f"Source data: {SOURCE_DATA}")
20
+ print("Press Ctrl+C to stop.\n")
 
21
 
22
+ # Load once (realistic: upstream feature store snapshot)
23
  try:
24
+ df = pd.read_csv(SOURCE_DATA)
25
+ except Exception as e:
26
+ print("Failed to load source data:", e)
27
+ sys.exit(1)
28
 
29
+ required_cols = set(df.columns)
30
+
31
+ while True:
32
+ try:
33
+ # ---- Random batch size ----
34
+ batch_size = random.randint(MIN_BATCH, MAX_BATCH)
35
+ sample = df.sample(batch_size)
36
+
37
+ # ---- Serialize to CSV ----
38
+ csv_bytes = sample.to_csv(index=False).encode("utf-8")
39
+
40
+ # ---- Send request ----
41
+ response = requests.post(
42
+ API_URL,
43
+ files={"file": ("sample.csv", csv_bytes, "text/csv")},
44
+ timeout=10,
45
+ )
46
+
47
+ if response.status_code == 200:
48
+ payload = response.json()
49
+ print(
50
+ f"[OK] rows={payload['n_rows']} "
51
+ f"predictions_logged=True"
52
+ )
53
+ else:
54
+ print(
55
+ f"[WARN] HTTP {response.status_code} "
56
+ f"{response.text}"
57
+ )
58
+
59
+ except KeyboardInterrupt:
60
+ print("\nTraffic daemon stopped by user.")
61
+ break
62
+
63
+ except Exception as e:
64
+ print("[ERROR] Inference request failed:", e)
65
+
66
+ # ---- Sleep (non-uniform traffic) ----
67
+ sleep_time = random.uniform(MIN_SLEEP, MAX_SLEEP)
68
+ time.sleep(sleep_time)