import os import httpx import time from fastapi import APIRouter, Request, Query, HTTPException from fastapi.responses import StreamingResponse from core.telemetry import telemetry from integrations.eurlex_client import EURLexClient router = APIRouter() async def ping_service(url: str, env_var: str): start = time.perf_counter() api_key = os.getenv(env_var) if not api_key: return {"status": "error", "message": f"{env_var} not set", "latency_ms": 0} try: async with httpx.AsyncClient(timeout=3.0) as client: await client.get(url) latency = int((time.perf_counter() - start) * 1000) return { "status": "ok", "message": "API Key configured and service reachable", "latency_ms": latency, } except Exception as e: latency = int((time.perf_counter() - start) * 1000) return {"status": "error", "message": str(e), "latency_ms": latency} async def check_gemini(): return await ping_service( "https://generativelanguage.googleapis.com/v1beta/models", "GOOGLE_API_KEY" ) async def check_grok(): return await ping_service("https://api.x.ai/v1/models", "GROK_API_KEY") async def check_pinecone(): return await ping_service("https://api.pinecone.io", "PINECONE_API_KEY") async def check_clerk(): return await ping_service( "https://api.clerk.com/v1/users?limit=1", "CLERK_SECRET_KEY" ) async def check_langsmith(): return await ping_service( "https://api.smith.langchain.com/api/v1/workspaces", "LANGCHAIN_API_KEY" ) async def check_firecrawl(): return await ping_service( "https://api.firecrawl.dev/v1/test", "FIRECRAWL_API_KEY" ) async def check_neo4j(): start = time.perf_counter() neo4j_uri = os.getenv("NEO4J_URI") if not neo4j_uri or "..." in neo4j_uri: return {"status": "warning", "message": "NEO4J_URI nie jest skonfigurowane. Jeśli używasz chmury, podepnij Neo4j AuraDB i podaj NEO4J_URI w zmiennych środowiskowych.", "latency_ms": 0} try: from neo4j import AsyncGraphDatabase driver = AsyncGraphDatabase.driver( neo4j_uri, auth=(os.getenv("NEO4J_USERNAME", os.getenv("NEO4J_USER", "neo4j")), os.getenv("NEO4J_PASSWORD", "")), ) await driver.verify_connectivity() await driver.close() latency = int((time.perf_counter() - start) * 1000) return { "status": "ok", "message": "Neo4j connection successful", "latency_ms": latency, } except ImportError: latency = int((time.perf_counter() - start) * 1000) return { "status": "ok", "message": "NEO4J_URI configured", "latency_ms": latency, } except Exception as e: latency = int((time.perf_counter() - start) * 1000) return {"status": "error", "message": str(e), "latency_ms": latency} async def check_database(): start = time.perf_counter() try: from core.subscription.db import SessionLocal from sqlalchemy import text db = SessionLocal() db.execute(text("SELECT 1")) db.close() latency = int((time.perf_counter() - start) * 1000) return { "status": "ok", "message": "Database connection successful", "latency_ms": latency, } except Exception as e: latency = int((time.perf_counter() - start) * 1000) return {"status": "error", "message": str(e), "latency_ms": latency} async def check_eurlex(): start = time.perf_counter() try: client = EURLexClient() result = client.check_status() latency = int((time.perf_counter() - start) * 1000) result["latency_ms"] = latency return result except Exception as e: latency = int((time.perf_counter() - start) * 1000) return {"status": "error", "message": str(e), "latency_ms": latency} async def check_parp(): start = time.perf_counter() try: async with httpx.AsyncClient(timeout=3.0) as client: await client.get("https://www.parp.gov.pl/") latency = int((time.perf_counter() - start) * 1000) return {"status": "ok", "message": "PARP website reachable", "latency_ms": latency} except Exception as e: latency = int((time.perf_counter() - start) * 1000) return {"status": "error", "message": str(e), "latency_ms": latency} async def check_ncbr(): start = time.perf_counter() try: async with httpx.AsyncClient(timeout=3.0) as client: await client.get("https://www.gov.pl/web/ncbr") latency = int((time.perf_counter() - start) * 1000) return {"status": "ok", "message": "NCBR website reachable", "latency_ms": latency} except Exception as e: latency = int((time.perf_counter() - start) * 1000) return {"status": "error", "message": str(e), "latency_ms": latency} async def check_bgk(): start = time.perf_counter() try: async with httpx.AsyncClient(timeout=3.0) as client: await client.get("https://www.bgk.pl/") latency = int((time.perf_counter() - start) * 1000) return {"status": "ok", "message": "BGK website reachable", "latency_ms": latency} except Exception as e: latency = int((time.perf_counter() - start) * 1000) return {"status": "error", "message": str(e), "latency_ms": latency} @router.get("/services_status") async def get_system_health(): """Returns the health status of various external dependencies.""" gemini_status = await check_gemini() grok_status = await check_grok() pinecone_status = await check_pinecone() clerk_status = await check_clerk() langsmith_status = await check_langsmith() neo4j_status = await check_neo4j() db_status = await check_database() eurlex_status = await check_eurlex() firecrawl_status = await check_firecrawl() parp_status = await check_parp() ncbr_status = await check_ncbr() bgk_status = await check_bgk() return { "gemini": gemini_status, "grok": grok_status, "pinecone": pinecone_status, "clerk": clerk_status, "langsmith": langsmith_status, "neo4j": neo4j_status, "database": db_status, "eurlex": eurlex_status, "firecrawl": firecrawl_status, "parp_gov": parp_status, "ncbr_gov": ncbr_status, "bgk": bgk_status, } @router.get("/watchdog-stats") async def get_watchdog_stats(): """Parses watchdog logs and returns statistics of auto-recovery interventions.""" stats = { "total_interventions": 0, "retries_429": 0, "retries_500": 0, "retries_refusal": 0, "aborts": 0, "recent_events": [], } try: log_path = "logs/watchdog.log" if not os.path.exists(log_path): return stats with open(log_path, "r", encoding="utf-8") as f: lines = f.readlines() for line in lines: line = line.strip() if not line: continue stats["total_interventions"] += 1 if "[RETRY 429]" in line: stats["retries_429"] += 1 elif "[RETRY 500]" in line: stats["retries_500"] += 1 elif "[RETRY REFUSAL]" in line: stats["retries_refusal"] += 1 elif "[ABORT]" in line: stats["aborts"] += 1 # Get 50 most recent events stats["recent_events"] = [line.strip() for line in lines[-50:]] stats["recent_events"].reverse() except Exception as e: stats["error"] = str(e) return stats @router.get("/stream") async def diagnostics_stream(request: Request, token: str = Query(None)): """SSE endpoint for streaming real-time telemetry logs.""" if not token: raise HTTPException(status_code=401, detail="Missing token") try: from core.subscription.middleware import verify_token from fastapi.security import HTTPAuthorizationCredentials verify_token(HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)) except Exception as e: raise HTTPException(status_code=401, detail=str(e)) return StreamingResponse(telemetry.subscribe(), media_type="text/event-stream")