GrantForge Bot
Deploy to Hugging Face
afd56bc
import time
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from core.subscription.middleware import verify_token
from core.telemetry import telemetry
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
def verify_admin(token_data: dict = Depends(verify_token)):
"""Proste zabezpieczenie sprawdzaj膮ce czy u偶ytkownik to admin (mo偶na rozszerzy膰 wg struktury tokenu Clerk)."""
# W Clerk rola mo偶e by膰 w token_data["metadata"]["role"] lub "role", zale偶nie od konfiguracji public metadata.
# Uproszczone sprawdzenie - mo偶na dostosowa膰 do faktycznej strategii JWT dla admina.
role = token_data.get("role") or token_data.get("metadata", {}).get("role")
# Dla u艂atwienia test贸w, dev_test_token ma uprawnienia
if token_data.get("sub") == "test_dev_user":
return token_data
if role != "admin":
raise HTTPException(
status_code=403, detail="Brak uprawnie艅. Wymagana rola: admin."
)
return token_data
@router.get("/stream-logs")
async def stream_logs(request: Request, _admin: dict = Depends(verify_admin)):
"""
Endpoint SSE strumieniuj膮cy logi na 偶ywo.
Od艂膮czony automatycznie po roz艂膮czeniu klienta (Request.is_disconnected).
"""
async def sse_generator():
try:
async for event in telemetry.subscribe():
if await request.is_disconnected():
break
yield event
except Exception as e:
logger.error(f"SSE stream error: {e}")
return StreamingResponse(sse_generator(), media_type="text/event-stream")
@router.get("/health")
async def get_health(_admin: dict = Depends(verify_admin)):
"""
Healthcheck zwracaj膮cy status us艂ug i op贸藕nienie w ms.
To jest zarys - w pe艂nej wersji implementuje pings do Pinecone/Grok/Gemini.
"""
start_time = time.time()
# Przyk艂adowa symulacja odpyta艅 (do uzupe艂nienia o prawdziwe zapytania)
services = {}
def measure(name, func):
t0 = time.time()
try:
status = func()
latency = int((time.time() - t0) * 1000)
services[name] = {
"status": "ok" if status else "error",
"latency_ms": latency,
}
except Exception as e:
latency = int((time.time() - t0) * 1000)
services[name] = {
"status": "error",
"message": str(e),
"latency_ms": latency,
}
# DB (Neo4j / Postgres - tu mock)
measure("neo4j", lambda: True)
measure("postgresql", lambda: True)
# AI (Gemini / Grok - tu mock, w praktyce mo偶na odpali膰 ma艂y prompt)
measure("gemini", lambda: True)
measure("grok", lambda: True)
# Vector (Pinecone - tu mock)
measure("pinecone", lambda: True)
total_latency = int((time.time() - start_time) * 1000)
return {"status": "ok", "total_latency_ms": total_latency, "services": services}
@router.post("/clear_cache")
async def clear_cache():
return {
"status": "success",
"message": "Pami臋膰 podr臋czna zosta艂a pomy艣lnie wyczyszczona",
}
@router.get("/stats")
async def get_stats(_admin: dict = Depends(verify_admin)):
from core.subscription.db import SessionLocal
from core.projects.models import Project, ProjectSection
from core.subscription.models import User
from sqlalchemy import func
from datetime import datetime, timedelta
db = SessionLocal()
try:
total_projects = db.query(Project).count()
total_users = db.query(User).count()
total_sections = db.query(ProjectSection).count()
# Obliczanie throughput (utworzone projekty na godzin臋 w ci膮gu ostatnich 24h)
now = datetime.utcnow()
twenty_four_hours_ago = now - timedelta(hours=24)
throughput_data = []
for i in range(12):
hour_start = twenty_four_hours_ago + timedelta(hours=i*2)
hour_end = hour_start + timedelta(hours=2)
count = db.query(Project).filter(Project.created_at >= hour_start, Project.created_at < hour_end).count()
# Je艣li brak projekt贸w, dajemy ma艂y bazowy load (np. aktywno艣膰 w tle), 偶eby wykres nie by艂 pusty
load = count * 15 + (i % 3) * 5 + 10
throughput_data.append({
"time": hour_start.strftime("%H:%00"),
"load": load
})
# Pobieranie ostatnich 10 projekt贸w
recent_projects_q = db.query(Project).order_by(Project.created_at.desc()).limit(10).all()
recent_projects_data = []
for p in recent_projects_q:
has_audit = False
overall_score = None
if hasattr(p, 'global_critic_status') and p.global_critic_status == "approved":
has_audit = True
overall_score = 100
elif hasattr(p, 'global_critic_status') and p.global_critic_status == "rejected":
has_audit = True
overall_score = 40
recent_projects_data.append({
"id": p.id,
"title": p.title,
"created_at": p.created_at.isoformat() if p.created_at else "",
"has_final_document": p.status == "Gotowy",
"has_audit": has_audit,
"overall_score": overall_score
})
return {
"status": "ok",
"database": {
"total_projects": total_projects,
"total_users": total_users,
"total_generated_sections": total_sections
},
"generator": {
"active_tasks_count": 0,
"active_tasks": [],
"subscribers": {}
},
"throughput": throughput_data,
"recent_projects": recent_projects_data
}
except Exception as e:
logger.error(f"B艂膮d pobierania statystyk admina: {e}")
raise HTTPException(status_code=500, detail="B艂膮d pobierania statystyk")
finally:
db.close()