grantforge-api / backend /endpoints /admin_diagnostics.py
GrantForge Bot
Deploy to Hugging Face
afd56bc
import os
import httpx
import time
from fastapi import APIRouter, Request, Query, HTTPException
from fastapi.responses import StreamingResponse
from core.telemetry import telemetry
from integrations.eurlex_client import EURLexClient
router = APIRouter()
async def ping_service(url: str, env_var: str):
start = time.perf_counter()
api_key = os.getenv(env_var)
if not api_key:
return {"status": "error", "message": f"{env_var} not set", "latency_ms": 0}
try:
async with httpx.AsyncClient(timeout=3.0) as client:
await client.get(url)
latency = int((time.perf_counter() - start) * 1000)
return {
"status": "ok",
"message": "API Key configured and service reachable",
"latency_ms": latency,
}
except Exception as e:
latency = int((time.perf_counter() - start) * 1000)
return {"status": "error", "message": str(e), "latency_ms": latency}
async def check_gemini():
return await ping_service(
"https://generativelanguage.googleapis.com/v1beta/models", "GOOGLE_API_KEY"
)
async def check_grok():
return await ping_service("https://api.x.ai/v1/models", "GROK_API_KEY")
async def check_pinecone():
return await ping_service("https://api.pinecone.io", "PINECONE_API_KEY")
async def check_clerk():
return await ping_service(
"https://api.clerk.com/v1/users?limit=1", "CLERK_SECRET_KEY"
)
async def check_langsmith():
return await ping_service(
"https://api.smith.langchain.com/api/v1/workspaces", "LANGCHAIN_API_KEY"
)
async def check_firecrawl():
return await ping_service(
"https://api.firecrawl.dev/v1/test", "FIRECRAWL_API_KEY"
)
async def check_neo4j():
start = time.perf_counter()
neo4j_uri = os.getenv("NEO4J_URI")
if not neo4j_uri or "..." in neo4j_uri:
return {"status": "warning", "message": "NEO4J_URI nie jest skonfigurowane. Jeśli używasz chmury, podepnij Neo4j AuraDB i podaj NEO4J_URI w zmiennych środowiskowych.", "latency_ms": 0}
try:
from neo4j import AsyncGraphDatabase
driver = AsyncGraphDatabase.driver(
neo4j_uri,
auth=(os.getenv("NEO4J_USERNAME", os.getenv("NEO4J_USER", "neo4j")), os.getenv("NEO4J_PASSWORD", "")),
)
await driver.verify_connectivity()
await driver.close()
latency = int((time.perf_counter() - start) * 1000)
return {
"status": "ok",
"message": "Neo4j connection successful",
"latency_ms": latency,
}
except ImportError:
latency = int((time.perf_counter() - start) * 1000)
return {
"status": "ok",
"message": "NEO4J_URI configured",
"latency_ms": latency,
}
except Exception as e:
latency = int((time.perf_counter() - start) * 1000)
return {"status": "error", "message": str(e), "latency_ms": latency}
async def check_database():
start = time.perf_counter()
try:
from core.subscription.db import SessionLocal
from sqlalchemy import text
db = SessionLocal()
db.execute(text("SELECT 1"))
db.close()
latency = int((time.perf_counter() - start) * 1000)
return {
"status": "ok",
"message": "Database connection successful",
"latency_ms": latency,
}
except Exception as e:
latency = int((time.perf_counter() - start) * 1000)
return {"status": "error", "message": str(e), "latency_ms": latency}
async def check_eurlex():
start = time.perf_counter()
try:
client = EURLexClient()
result = client.check_status()
latency = int((time.perf_counter() - start) * 1000)
result["latency_ms"] = latency
return result
except Exception as e:
latency = int((time.perf_counter() - start) * 1000)
return {"status": "error", "message": str(e), "latency_ms": latency}
async def check_parp():
start = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=3.0) as client:
await client.get("https://www.parp.gov.pl/")
latency = int((time.perf_counter() - start) * 1000)
return {"status": "ok", "message": "PARP website reachable", "latency_ms": latency}
except Exception as e:
latency = int((time.perf_counter() - start) * 1000)
return {"status": "error", "message": str(e), "latency_ms": latency}
async def check_ncbr():
start = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=3.0) as client:
await client.get("https://www.gov.pl/web/ncbr")
latency = int((time.perf_counter() - start) * 1000)
return {"status": "ok", "message": "NCBR website reachable", "latency_ms": latency}
except Exception as e:
latency = int((time.perf_counter() - start) * 1000)
return {"status": "error", "message": str(e), "latency_ms": latency}
async def check_bgk():
start = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=3.0) as client:
await client.get("https://www.bgk.pl/")
latency = int((time.perf_counter() - start) * 1000)
return {"status": "ok", "message": "BGK website reachable", "latency_ms": latency}
except Exception as e:
latency = int((time.perf_counter() - start) * 1000)
return {"status": "error", "message": str(e), "latency_ms": latency}
@router.get("/services_status")
async def get_system_health():
"""Returns the health status of various external dependencies."""
gemini_status = await check_gemini()
grok_status = await check_grok()
pinecone_status = await check_pinecone()
clerk_status = await check_clerk()
langsmith_status = await check_langsmith()
neo4j_status = await check_neo4j()
db_status = await check_database()
eurlex_status = await check_eurlex()
firecrawl_status = await check_firecrawl()
parp_status = await check_parp()
ncbr_status = await check_ncbr()
bgk_status = await check_bgk()
return {
"gemini": gemini_status,
"grok": grok_status,
"pinecone": pinecone_status,
"clerk": clerk_status,
"langsmith": langsmith_status,
"neo4j": neo4j_status,
"database": db_status,
"eurlex": eurlex_status,
"firecrawl": firecrawl_status,
"parp_gov": parp_status,
"ncbr_gov": ncbr_status,
"bgk": bgk_status,
}
@router.get("/watchdog-stats")
async def get_watchdog_stats():
"""Parses watchdog logs and returns statistics of auto-recovery interventions."""
stats = {
"total_interventions": 0,
"retries_429": 0,
"retries_500": 0,
"retries_refusal": 0,
"aborts": 0,
"recent_events": [],
}
try:
log_path = "logs/watchdog.log"
if not os.path.exists(log_path):
return stats
with open(log_path, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if not line:
continue
stats["total_interventions"] += 1
if "[RETRY 429]" in line:
stats["retries_429"] += 1
elif "[RETRY 500]" in line:
stats["retries_500"] += 1
elif "[RETRY REFUSAL]" in line:
stats["retries_refusal"] += 1
elif "[ABORT]" in line:
stats["aborts"] += 1
# Get 50 most recent events
stats["recent_events"] = [line.strip() for line in lines[-50:]]
stats["recent_events"].reverse()
except Exception as e:
stats["error"] = str(e)
return stats
@router.get("/stream")
async def diagnostics_stream(request: Request, token: str = Query(None)):
"""SSE endpoint for streaming real-time telemetry logs."""
if not token:
raise HTTPException(status_code=401, detail="Missing token")
try:
from core.subscription.middleware import verify_token
from fastapi.security import HTTPAuthorizationCredentials
verify_token(HTTPAuthorizationCredentials(scheme="Bearer", credentials=token))
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
return StreamingResponse(telemetry.subscribe(), media_type="text/event-stream")