Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import RedirectResponse | |
| from pydantic import BaseModel | |
| import uvicorn | |
| import logging | |
| import os | |
| # Import our custom modules | |
| from anomaly_detector import AnomalyDetector | |
| from rag_agent import SentinelAgent | |
| # 1. Initialize the App | |
| app = FastAPI( | |
| title="Sentinel MLOps Agent", | |
| description="Autonomous Anomaly Detection & RAG Investigation API. Powered by Gemini 2.5.", | |
| version="1.0" | |
| ) | |
| # 2. Load the Engines | |
| print("π Starting Sentinel Engines...") | |
| try: | |
| detector = AnomalyDetector() | |
| agent = SentinelAgent() | |
| print("β Engines Active.") | |
| except Exception as e: | |
| print(f"β Error loading engines: {e}") | |
| # 3. Define the Data Format | |
| class MetricData(BaseModel): | |
| timestamp: str | |
| service_name: str | |
| cpu_usage: float | |
| # 4. The API Endpoint | |
| async def monitor_system(data: MetricData): | |
| """ | |
| Receives live system data. | |
| - If Normal (< 2.5 sigma): Returns OK. | |
| - If Anomaly (> 2.5 sigma): Triggers Gemini 2.5 to investigate logs. | |
| """ | |
| # Step A: Check for Physics/Math Anomaly | |
| is_anomaly, msg, z_score = detector.update(data.cpu_usage) | |
| if not is_anomaly: | |
| return { | |
| "status": "Healthy", | |
| "message": msg, | |
| "z_score": z_score | |
| } | |
| # Step B: If Anomaly, Wake up the Agent | |
| print(f"π¨ ALERT: Anomaly on {data.service_name} detected! (Z={z_score:.2f})") | |
| report = agent.investigate( | |
| anomaly_value=data.cpu_usage, | |
| z_score=z_score | |
| ) | |
| # Return the Full Incident Report | |
| return { | |
| "status": "CRITICAL", | |
| "service": data.service_name, | |
| "deviation": f"{z_score:.2f} sigma", | |
| "investigation_report": report | |
| } | |
| # 5. Root Redirect (The Magic Trick) | |
| def root(): | |
| # Automatically redirects users to the Swagger UI | |
| return RedirectResponse(url="/docs") | |
| # 6. Run the Server | |
| if __name__ == "__main__": | |
| # Use the port required by Hugging Face (7860) or default to 8000 | |
| port = int(os.getenv("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |