"""Simulateur d'appels de production pour remplir logs/predictions.jsonl.""" # EXPLICATION : Imports standards uniquement (aucune dépendance nouvelle) import requests import json import time import pandas as pd import numpy as np from pathlib import Path # EXPLICATION : Chargement du dataset de référence (500 lignes échantillonnées de features_train) # Path(__file__).parent rend le chemin robuste quel que soit le répertoire courant reference = pd.read_csv(Path(__file__).parent / "reference.csv") # EXPLICATION : Gradio 5.x utilise une API SSE en 2 étapes : # 1) POST /gradio_api/call/ → retourne un event_id # 2) GET /gradio_api/call// → stream SSE avec le résultat BASE_URL = "http://127.0.0.1:7860" CALL_URL = f"{BASE_URL}/gradio_api/call/_predict" # EXPLICATION : Tirage aléatoire de 500 lignes (avec remise si dataset < 500) # random_state=42 pour reproductibilité, replace=True pour éviter l'erreur si reference < 500 sampled = reference.sample(n=500, replace=True, random_state=42).reset_index(drop=True) # EXPLICATION : Boucle de 500 appels simulés (375 normaux + 125 avec drift) for i in range(500): # EXPLICATION : Sélection de la ligne aléatoire pré-tirée row = sampled.iloc[i].to_dict() # EXPLICATION : Nettoyage — convertir "" et NaN en None pour JSON propre for k, v in row.items(): if v == "" or pd.isna(v): row[k] = None # EXPLICATION : 25% des appels avec drift simulé (AMT_INCOME_TOTAL * 1.5) if i % 4 == 0: row["AMT_INCOME_TOTAL"] = row["AMT_INCOME_TOTAL"] * 1.5 if row["AMT_INCOME_TOTAL"] else 100000 # EXPLICATION : Format payload attendu par l'interface Gradio (app.py) payload = {"data": [json.dumps(row)]} start = time.perf_counter() drift_tag = " [DRIFT]" if i % 4 == 0 else "" try: # EXPLICATION : Étape 1 — POST pour obtenir un event_id resp = requests.post(CALL_URL, json=payload, timeout=10) resp.raise_for_status() event_id = resp.json().get("event_id") # EXPLICATION : Étape 2 — GET SSE pour récupérer le résultat result_url = f"{CALL_URL}/{event_id}" sse_resp = requests.get(result_url, timeout=30, stream=True) sse_resp.raise_for_status() # EXPLICATION : Parse la réponse SSE (format "event: ...\ndata: ...\n") result_text = "" for line in sse_resp.iter_lines(decode_unicode=True): if line and line.startswith("data:"): result_text = line[len("data:"):].strip() duration = (time.perf_counter() - start) * 1000 print(f"Appel {i+1}/500 - OK - Temps: {duration:.1f}ms{drift_tag}") except Exception as e: duration = (time.perf_counter() - start) * 1000 print(f"Erreur appel {i+1}: {e} ({duration:.1f}ms){drift_tag}") # EXPLICATION : Pause entre chaque appel pour ne pas surcharger Docker time.sleep(0.3) # Sous-étape 4 terminée - 500 appels simulés (375 normal + 125 avec drift) # Lancer avec : uv run python simulate_production_calls.py (API doit tourner sur 7860)