| """Usage tracking service — log requests, query stats."""
|
|
|
| from datetime import datetime, timezone, timedelta
|
| from sqlalchemy import select, func
|
| from sqlalchemy.ext.asyncio import AsyncSession
|
| from mac.models.user import UsageLog, User
|
| from mac.config import settings
|
|
|
|
|
| def _today_start() -> datetime:
|
| now = datetime.now(timezone.utc)
|
| return now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
|
| def _hour_start() -> datetime:
|
| now = datetime.now(timezone.utc)
|
| return now.replace(minute=0, second=0, microsecond=0)
|
|
|
|
|
| async def log_request(
|
| db: AsyncSession,
|
| user_id: str,
|
| model: str,
|
| endpoint: str,
|
| tokens_in: int,
|
| tokens_out: int,
|
| latency_ms: int,
|
| status_code: int,
|
| request_id: str,
|
| ):
|
| """Log an API request for usage tracking."""
|
| log = UsageLog(
|
| user_id=user_id,
|
| model=model,
|
| endpoint=endpoint,
|
| tokens_in=tokens_in,
|
| tokens_out=tokens_out,
|
| latency_ms=latency_ms,
|
| status_code=status_code,
|
| request_id=request_id,
|
| )
|
| db.add(log)
|
| await db.flush()
|
|
|
|
|
| async def get_tokens_used_today(db: AsyncSession, user_id: str) -> int:
|
| """Total tokens used today by a user."""
|
| result = await db.execute(
|
| select(func.coalesce(func.sum(UsageLog.tokens_in + UsageLog.tokens_out), 0)).where(
|
| UsageLog.user_id == user_id,
|
| UsageLog.created_at >= _today_start(),
|
| )
|
| )
|
| return result.scalar_one()
|
|
|
|
|
| async def get_requests_this_hour(db: AsyncSession, user_id: str) -> int:
|
| """Total requests this hour by a user."""
|
| result = await db.execute(
|
| select(func.count()).where(
|
| UsageLog.user_id == user_id,
|
| UsageLog.created_at >= _hour_start(),
|
| )
|
| )
|
| return result.scalar_one()
|
|
|
|
|
| async def get_my_usage(db: AsyncSession, user_id: str) -> dict:
|
| """Get usage breakdown for current user."""
|
| today = _today_start()
|
| week_start = today - timedelta(days=today.weekday())
|
| month_start = today.replace(day=1)
|
|
|
|
|
| today_result = await db.execute(
|
| select(
|
| func.coalesce(func.sum(UsageLog.tokens_in), 0),
|
| func.coalesce(func.sum(UsageLog.tokens_out), 0),
|
| func.count(),
|
| ).where(UsageLog.user_id == user_id, UsageLog.created_at >= today)
|
| )
|
| t_in, t_out, t_count = today_result.one()
|
|
|
|
|
| model_result = await db.execute(
|
| select(
|
| UsageLog.model,
|
| func.sum(UsageLog.tokens_in + UsageLog.tokens_out),
|
| func.count(),
|
| ).where(
|
| UsageLog.user_id == user_id, UsageLog.created_at >= today
|
| ).group_by(UsageLog.model)
|
| )
|
| by_model = {row[0]: {"tokens": int(row[1]), "requests": row[2]} for row in model_result}
|
|
|
|
|
| week_result = await db.execute(
|
| select(
|
| func.coalesce(func.sum(UsageLog.tokens_in + UsageLog.tokens_out), 0),
|
| func.count(),
|
| ).where(UsageLog.user_id == user_id, UsageLog.created_at >= week_start)
|
| )
|
| w_tokens, w_count = week_result.one()
|
|
|
|
|
| month_result = await db.execute(
|
| select(
|
| func.coalesce(func.sum(UsageLog.tokens_in + UsageLog.tokens_out), 0),
|
| func.count(),
|
| ).where(UsageLog.user_id == user_id, UsageLog.created_at >= month_start)
|
| )
|
| m_tokens, m_count = month_result.one()
|
|
|
| return {
|
| "today": {
|
| "total_tokens": int(t_in + t_out),
|
| "prompt_tokens": int(t_in),
|
| "completion_tokens": int(t_out),
|
| "requests": t_count,
|
| "by_model": by_model,
|
| },
|
| "this_week": {"total_tokens": int(w_tokens), "requests": w_count},
|
| "this_month": {"total_tokens": int(m_tokens), "requests": m_count},
|
| }
|
|
|
|
|
| async def get_request_history(
|
| db: AsyncSession, user_id: str, page: int = 1, per_page: int = 50,
|
| model: str | None = None, date_from: str | None = None, date_to: str | None = None,
|
| ) -> tuple[list, int]:
|
| """Paginated request history."""
|
| query = select(UsageLog).where(UsageLog.user_id == user_id)
|
|
|
| if model:
|
| query = query.where(UsageLog.model == model)
|
| if date_from:
|
| query = query.where(UsageLog.created_at >= datetime.fromisoformat(date_from))
|
| if date_to:
|
| query = query.where(UsageLog.created_at <= datetime.fromisoformat(date_to + "T23:59:59+00:00"))
|
|
|
|
|
| count_query = select(func.count()).select_from(query.subquery())
|
| total = (await db.execute(count_query)).scalar_one()
|
|
|
|
|
| query = query.order_by(UsageLog.created_at.desc()).offset((page - 1) * per_page).limit(per_page)
|
| result = await db.execute(query)
|
|
|
| return list(result.scalars()), total
|
|
|
|
|
| async def get_all_users_usage(db: AsyncSession, page: int = 1, per_page: int = 50, department: str | None = None) -> tuple[list, int]:
|
| """Admin: all users usage summary."""
|
| today = _today_start()
|
| query = select(User)
|
| if department:
|
| query = query.where(User.department == department)
|
|
|
| count_query = select(func.count()).select_from(query.subquery())
|
| total = (await db.execute(count_query)).scalar_one()
|
|
|
| query = query.offset((page - 1) * per_page).limit(per_page)
|
| users = (await db.execute(query)).scalars().all()
|
|
|
| result = []
|
| for user in users:
|
| tokens_result = await db.execute(
|
| select(func.coalesce(func.sum(UsageLog.tokens_in + UsageLog.tokens_out), 0), func.count()).where(
|
| UsageLog.user_id == user.id, UsageLog.created_at >= today
|
| )
|
| )
|
| tokens, reqs = tokens_result.one()
|
|
|
| last_log = await db.execute(
|
| select(UsageLog.created_at).where(UsageLog.user_id == user.id).order_by(UsageLog.created_at.desc()).limit(1)
|
| )
|
| last_active = last_log.scalar_one_or_none()
|
|
|
| result.append({
|
| "roll_number": user.roll_number,
|
| "name": user.name,
|
| "department": user.department,
|
| "tokens_today": int(tokens),
|
| "requests_today": reqs,
|
| "quota_used_pct": round(int(tokens) / settings.rate_limit_tokens_per_day * 100, 1) if settings.rate_limit_tokens_per_day else 0,
|
| "last_active": last_active.isoformat() if last_active else None,
|
| })
|
|
|
| return result, total
|
|
|