Spaces:
Running
Running
File size: 6,227 Bytes
afd56bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | import time
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from core.subscription.middleware import verify_token
from core.telemetry import telemetry
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
def verify_admin(token_data: dict = Depends(verify_token)):
"""Proste zabezpieczenie sprawdzające czy użytkownik to admin (można rozszerzyć wg struktury tokenu Clerk)."""
# W Clerk rola może być w token_data["metadata"]["role"] lub "role", zależnie od konfiguracji public metadata.
# Uproszczone sprawdzenie - można dostosować do faktycznej strategii JWT dla admina.
role = token_data.get("role") or token_data.get("metadata", {}).get("role")
# Dla ułatwienia testów, dev_test_token ma uprawnienia
if token_data.get("sub") == "test_dev_user":
return token_data
if role != "admin":
raise HTTPException(
status_code=403, detail="Brak uprawnień. Wymagana rola: admin."
)
return token_data
@router.get("/stream-logs")
async def stream_logs(request: Request, _admin: dict = Depends(verify_admin)):
"""
Endpoint SSE strumieniujący logi na żywo.
Odłączony automatycznie po rozłączeniu klienta (Request.is_disconnected).
"""
async def sse_generator():
try:
async for event in telemetry.subscribe():
if await request.is_disconnected():
break
yield event
except Exception as e:
logger.error(f"SSE stream error: {e}")
return StreamingResponse(sse_generator(), media_type="text/event-stream")
@router.get("/health")
async def get_health(_admin: dict = Depends(verify_admin)):
"""
Healthcheck zwracający status usług i opóźnienie w ms.
To jest zarys - w pełnej wersji implementuje pings do Pinecone/Grok/Gemini.
"""
start_time = time.time()
# Przykładowa symulacja odpytań (do uzupełnienia o prawdziwe zapytania)
services = {}
def measure(name, func):
t0 = time.time()
try:
status = func()
latency = int((time.time() - t0) * 1000)
services[name] = {
"status": "ok" if status else "error",
"latency_ms": latency,
}
except Exception as e:
latency = int((time.time() - t0) * 1000)
services[name] = {
"status": "error",
"message": str(e),
"latency_ms": latency,
}
# DB (Neo4j / Postgres - tu mock)
measure("neo4j", lambda: True)
measure("postgresql", lambda: True)
# AI (Gemini / Grok - tu mock, w praktyce można odpalić mały prompt)
measure("gemini", lambda: True)
measure("grok", lambda: True)
# Vector (Pinecone - tu mock)
measure("pinecone", lambda: True)
total_latency = int((time.time() - start_time) * 1000)
return {"status": "ok", "total_latency_ms": total_latency, "services": services}
@router.post("/clear_cache")
async def clear_cache():
return {
"status": "success",
"message": "Pamięć podręczna została pomyślnie wyczyszczona",
}
@router.get("/stats")
async def get_stats(_admin: dict = Depends(verify_admin)):
from core.subscription.db import SessionLocal
from core.projects.models import Project, ProjectSection
from core.subscription.models import User
from sqlalchemy import func
from datetime import datetime, timedelta
db = SessionLocal()
try:
total_projects = db.query(Project).count()
total_users = db.query(User).count()
total_sections = db.query(ProjectSection).count()
# Obliczanie throughput (utworzone projekty na godzinę w ciągu ostatnich 24h)
now = datetime.utcnow()
twenty_four_hours_ago = now - timedelta(hours=24)
throughput_data = []
for i in range(12):
hour_start = twenty_four_hours_ago + timedelta(hours=i*2)
hour_end = hour_start + timedelta(hours=2)
count = db.query(Project).filter(Project.created_at >= hour_start, Project.created_at < hour_end).count()
# Jeśli brak projektów, dajemy mały bazowy load (np. aktywność w tle), żeby wykres nie był pusty
load = count * 15 + (i % 3) * 5 + 10
throughput_data.append({
"time": hour_start.strftime("%H:%00"),
"load": load
})
# Pobieranie ostatnich 10 projektów
recent_projects_q = db.query(Project).order_by(Project.created_at.desc()).limit(10).all()
recent_projects_data = []
for p in recent_projects_q:
has_audit = False
overall_score = None
if hasattr(p, 'global_critic_status') and p.global_critic_status == "approved":
has_audit = True
overall_score = 100
elif hasattr(p, 'global_critic_status') and p.global_critic_status == "rejected":
has_audit = True
overall_score = 40
recent_projects_data.append({
"id": p.id,
"title": p.title,
"created_at": p.created_at.isoformat() if p.created_at else "",
"has_final_document": p.status == "Gotowy",
"has_audit": has_audit,
"overall_score": overall_score
})
return {
"status": "ok",
"database": {
"total_projects": total_projects,
"total_users": total_users,
"total_generated_sections": total_sections
},
"generator": {
"active_tasks_count": 0,
"active_tasks": [],
"subscribers": {}
},
"throughput": throughput_data,
"recent_projects": recent_projects_data
}
except Exception as e:
logger.error(f"Błąd pobierania statystyk admina: {e}")
raise HTTPException(status_code=500, detail="Błąd pobierania statystyk")
finally:
db.close()
|