Compost / backend /routers /analytics.py
abc1181's picture
fix: use backend. prefix for all internal imports
e8c0a73
Raw
History Blame Contribute Delete
2.85 kB
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import func
from datetime import datetime, timedelta
from backend.database import get_db, ToolExecution
from backend.auth import get_current_user, User
router = APIRouter()
@router.get("/overview")
async def get_overview(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
total = db.query(ToolExecution).filter(ToolExecution.user_id == current_user.id).count()
success = db.query(ToolExecution).filter(
ToolExecution.user_id == current_user.id,
ToolExecution.status == "success"
).count()
failed = total - success
success_rate = (success / total * 100) if total > 0 else 100
avg_latency = db.query(func.avg(ToolExecution.latency_ms)).filter(
ToolExecution.user_id == current_user.id
).scalar() or 0
return {
"total_executions": total,
"successful": success,
"failed": failed,
"success_rate": round(success_rate, 1),
"avg_latency_ms": int(avg_latency)
}
@router.get("/executions")
async def get_executions(
days: int = 7,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
from sqlalchemy import cast, Date
start_date = datetime.utcnow() - timedelta(days=days)
executions = db.query(
cast(ToolExecution.executed_at, Date).label("date"),
func.count(ToolExecution.id).label("count")
).filter(
ToolExecution.user_id == current_user.id,
ToolExecution.executed_at >= start_date
).group_by("date").all()
return {"executions": [{"date": str(e.date), "count": e.count} for e in executions]}
@router.get("/top-tools")
async def get_top_tools(
limit: int = 10,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
results = db.query(
ToolExecution.tool_id,
func.count(ToolExecution.id).label("count")
).filter(
ToolExecution.user_id == current_user.id
).group_by(ToolExecution.tool_id).order_by(func.count(ToolExecution.id).desc()).limit(limit).all()
return {"tools": [{"tool_id": r.tool_id, "count": r.count} for r in results]}
@router.get("/errors")
async def get_errors(
limit: int = 20,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
errors = db.query(ToolExecution).filter(
ToolExecution.user_id == current_user.id,
ToolExecution.status == "error"
).order_by(ToolExecution.executed_at.desc()).limit(limit).all()
return {"errors": [
{
"id": e.id,
"tool_id": e.tool_id,
"error": e.error_message,
"executed_at": e.executed_at.isoformat()
}
for e in errors
]}