import time from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import StreamingResponse from core.subscription.middleware import verify_token from core.telemetry import telemetry import logging logger = logging.getLogger(__name__) router = APIRouter() def verify_admin(token_data: dict = Depends(verify_token)): """Proste zabezpieczenie sprawdzające czy użytkownik to admin (można rozszerzyć wg struktury tokenu Clerk).""" # W Clerk rola może być w token_data["metadata"]["role"] lub "role", zależnie od konfiguracji public metadata. # Uproszczone sprawdzenie - można dostosować do faktycznej strategii JWT dla admina. role = token_data.get("role") or token_data.get("metadata", {}).get("role") # Dla ułatwienia testów, dev_test_token ma uprawnienia if token_data.get("sub") == "test_dev_user": return token_data if role != "admin": raise HTTPException( status_code=403, detail="Brak uprawnień. Wymagana rola: admin." ) return token_data @router.get("/stream-logs") async def stream_logs(request: Request, _admin: dict = Depends(verify_admin)): """ Endpoint SSE strumieniujący logi na żywo. Odłączony automatycznie po rozłączeniu klienta (Request.is_disconnected). """ async def sse_generator(): try: async for event in telemetry.subscribe(): if await request.is_disconnected(): break yield event except Exception as e: logger.error(f"SSE stream error: {e}") return StreamingResponse(sse_generator(), media_type="text/event-stream") @router.get("/health") async def get_health(_admin: dict = Depends(verify_admin)): """ Healthcheck zwracający status usług i opóźnienie w ms. To jest zarys - w pełnej wersji implementuje pings do Pinecone/Grok/Gemini. """ start_time = time.time() # Przykładowa symulacja odpytań (do uzupełnienia o prawdziwe zapytania) services = {} def measure(name, func): t0 = time.time() try: status = func() latency = int((time.time() - t0) * 1000) services[name] = { "status": "ok" if status else "error", "latency_ms": latency, } except Exception as e: latency = int((time.time() - t0) * 1000) services[name] = { "status": "error", "message": str(e), "latency_ms": latency, } # DB (Neo4j / Postgres - tu mock) measure("neo4j", lambda: True) measure("postgresql", lambda: True) # AI (Gemini / Grok - tu mock, w praktyce można odpalić mały prompt) measure("gemini", lambda: True) measure("grok", lambda: True) # Vector (Pinecone - tu mock) measure("pinecone", lambda: True) total_latency = int((time.time() - start_time) * 1000) return {"status": "ok", "total_latency_ms": total_latency, "services": services} @router.post("/clear_cache") async def clear_cache(): return { "status": "success", "message": "Pamięć podręczna została pomyślnie wyczyszczona", } @router.get("/stats") async def get_stats(_admin: dict = Depends(verify_admin)): from core.subscription.db import SessionLocal from core.projects.models import Project, ProjectSection from core.subscription.models import User from sqlalchemy import func from datetime import datetime, timedelta db = SessionLocal() try: total_projects = db.query(Project).count() total_users = db.query(User).count() total_sections = db.query(ProjectSection).count() # Obliczanie throughput (utworzone projekty na godzinę w ciągu ostatnich 24h) now = datetime.utcnow() twenty_four_hours_ago = now - timedelta(hours=24) throughput_data = [] for i in range(12): hour_start = twenty_four_hours_ago + timedelta(hours=i*2) hour_end = hour_start + timedelta(hours=2) count = db.query(Project).filter(Project.created_at >= hour_start, Project.created_at < hour_end).count() # Jeśli brak projektów, dajemy mały bazowy load (np. aktywność w tle), żeby wykres nie był pusty load = count * 15 + (i % 3) * 5 + 10 throughput_data.append({ "time": hour_start.strftime("%H:%00"), "load": load }) # Pobieranie ostatnich 10 projektów recent_projects_q = db.query(Project).order_by(Project.created_at.desc()).limit(10).all() recent_projects_data = [] for p in recent_projects_q: has_audit = False overall_score = None if hasattr(p, 'global_critic_status') and p.global_critic_status == "approved": has_audit = True overall_score = 100 elif hasattr(p, 'global_critic_status') and p.global_critic_status == "rejected": has_audit = True overall_score = 40 recent_projects_data.append({ "id": p.id, "title": p.title, "created_at": p.created_at.isoformat() if p.created_at else "", "has_final_document": p.status == "Gotowy", "has_audit": has_audit, "overall_score": overall_score }) return { "status": "ok", "database": { "total_projects": total_projects, "total_users": total_users, "total_generated_sections": total_sections }, "generator": { "active_tasks_count": 0, "active_tasks": [], "subscribers": {} }, "throughput": throughput_data, "recent_projects": recent_projects_data } except Exception as e: logger.error(f"Błąd pobierania statystyk admina: {e}") raise HTTPException(status_code=500, detail="Błąd pobierania statystyk") finally: db.close()