Spaces:
Sleeping
Sleeping
| """ | |
| Admin analytics endpoints for monitoring and metrics. | |
| """ | |
| import os | |
| import logging | |
| from typing import Optional | |
| from datetime import datetime, timedelta, timezone | |
| from fastapi import APIRouter, HTTPException, Header, Depends | |
| from .mongodb_service import get_mongodb_service | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/admin", tags=["admin"]) | |
| def verify_admin_token(authorization: Optional[str] = Header(None)) -> bool: | |
| """ | |
| Verify admin token from Authorization header. | |
| Args: | |
| authorization: Authorization header value | |
| Returns: | |
| True if valid, raises HTTPException otherwise | |
| """ | |
| admin_token = os.getenv("ADMIN_TOKEN") | |
| if not admin_token: | |
| logger.error("ADMIN_TOKEN not configured") | |
| raise HTTPException(status_code=500, detail="Admin authentication not configured") | |
| if not authorization: | |
| raise HTTPException(status_code=401, detail="Authorization header required") | |
| # Expected format: "Bearer <token>" | |
| try: | |
| scheme, token = authorization.split() | |
| if scheme.lower() != "bearer": | |
| raise HTTPException(status_code=401, detail="Invalid authorization scheme") | |
| if token != admin_token: | |
| raise HTTPException(status_code=401, detail="Invalid admin token") | |
| return True | |
| except ValueError: | |
| raise HTTPException(status_code=401, detail="Invalid authorization header format") | |
| async def get_metrics_summary(authorized: bool = Depends(verify_admin_token)): | |
| """ | |
| Get summary metrics for all events. | |
| Returns: | |
| - Total events by type | |
| - Unique devices | |
| - Unique users | |
| - Rate limit hits | |
| - Time range of data | |
| """ | |
| mongodb_service = get_mongodb_service() | |
| try: | |
| # Aggregate events by type | |
| pipeline = [ | |
| { | |
| "$group": { | |
| "_id": "$event_type", | |
| "count": {"$sum": 1} | |
| } | |
| }, | |
| { | |
| "$sort": {"count": -1} | |
| } | |
| ] | |
| events_by_type = mongodb_service.aggregate_events(pipeline) | |
| # Count unique devices | |
| pipeline_devices = [ | |
| { | |
| "$group": { | |
| "_id": "$device_id" | |
| } | |
| }, | |
| { | |
| "$count": "total" | |
| } | |
| ] | |
| unique_devices = mongodb_service.aggregate_events(pipeline_devices) | |
| device_count = unique_devices[0]["total"] if unique_devices else 0 | |
| # Count unique users | |
| pipeline_users = [ | |
| { | |
| "$match": {"user_id": {"$ne": None}} | |
| }, | |
| { | |
| "$group": { | |
| "_id": "$user_id" | |
| } | |
| }, | |
| { | |
| "$count": "total" | |
| } | |
| ] | |
| unique_users = mongodb_service.aggregate_events(pipeline_users) | |
| user_count = unique_users[0]["total"] if unique_users else 0 | |
| # Get time range | |
| pipeline_timerange = [ | |
| { | |
| "$group": { | |
| "_id": None, | |
| "first_event": {"$min": "$created_at"}, | |
| "last_event": {"$max": "$created_at"} | |
| } | |
| } | |
| ] | |
| timerange = mongodb_service.aggregate_events(pipeline_timerange) | |
| # Format response | |
| summary = { | |
| "events_by_type": {item["_id"]: item["count"] for item in events_by_type}, | |
| "unique_devices": device_count, | |
| "unique_users": user_count, | |
| "time_range": timerange[0] if timerange else None | |
| } | |
| return {"status": "success", "data": summary} | |
| except Exception as e: | |
| logger.error(f"Failed to get metrics summary: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_events_timeline( | |
| event_type: Optional[str] = None, | |
| days: int = 7, | |
| authorized: bool = Depends(verify_admin_token) | |
| ): | |
| """ | |
| Get events timeline (events per day/hour). | |
| Args: | |
| event_type: Filter by event type | |
| days: Number of days to include (default 7) | |
| Returns: | |
| Events grouped by date | |
| """ | |
| mongodb_service = get_mongodb_service() | |
| try: | |
| # Calculate date range | |
| end_date = datetime.now(timezone.utc) | |
| start_date = end_date - timedelta(days=days) | |
| # Build match stage | |
| match_stage = { | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| } | |
| } | |
| if event_type: | |
| match_stage["event_type"] = event_type | |
| # Aggregate by day | |
| pipeline = [ | |
| {"$match": match_stage}, | |
| { | |
| "$group": { | |
| "_id": { | |
| "year": {"$year": "$created_at"}, | |
| "month": {"$month": "$created_at"}, | |
| "day": {"$dayOfMonth": "$created_at"}, | |
| "event_type": "$event_type" | |
| }, | |
| "count": {"$sum": 1} | |
| } | |
| }, | |
| { | |
| "$sort": { | |
| "_id.year": 1, | |
| "_id.month": 1, | |
| "_id.day": 1 | |
| } | |
| } | |
| ] | |
| results = mongodb_service.aggregate_events(pipeline) | |
| # Format results | |
| timeline = [] | |
| for item in results: | |
| timeline.append({ | |
| "date": f"{item['_id']['year']}-{item['_id']['month']:02d}-{item['_id']['day']:02d}", | |
| "event_type": item["_id"]["event_type"], | |
| "count": item["count"] | |
| }) | |
| return { | |
| "status": "success", | |
| "data": { | |
| "timeline": timeline, | |
| "start_date": start_date.isoformat(), | |
| "end_date": end_date.isoformat() | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get events timeline: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_funnel_analysis( | |
| days: int = 7, | |
| authorized: bool = Depends(verify_admin_token) | |
| ): | |
| """ | |
| Get funnel analysis: DASHBOARD_VIEW → ANALYSIS_REQUEST → TASK_COMPLETED. | |
| Args: | |
| days: Number of days to analyze (default 7) | |
| Returns: | |
| Funnel metrics with conversion rates | |
| """ | |
| mongodb_service = get_mongodb_service() | |
| try: | |
| # Calculate date range | |
| end_date = datetime.now(timezone.utc) | |
| start_date = end_date - timedelta(days=days) | |
| # Count each stage | |
| funnel_stages = { | |
| "DASHBOARD_VIEW": 0, | |
| "ANALYSIS_REQUEST": 0, | |
| "TASK_QUEUED": 0, | |
| "TASK_COMPLETED": 0 | |
| } | |
| for event_type in funnel_stages.keys(): | |
| pipeline = [ | |
| { | |
| "$match": { | |
| "event_type": event_type, | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| } | |
| } | |
| }, | |
| { | |
| "$count": "total" | |
| } | |
| ] | |
| result = mongodb_service.aggregate_events(pipeline) | |
| funnel_stages[event_type] = result[0]["total"] if result else 0 | |
| # Calculate conversion rates | |
| dashboard_views = funnel_stages["DASHBOARD_VIEW"] | |
| analysis_requests = funnel_stages["ANALYSIS_REQUEST"] | |
| tasks_queued = funnel_stages["TASK_QUEUED"] | |
| tasks_completed = funnel_stages["TASK_COMPLETED"] | |
| conversions = { | |
| "view_to_request": ( | |
| (analysis_requests / dashboard_views * 100) | |
| if dashboard_views > 0 else 0 | |
| ), | |
| "request_to_queued": ( | |
| (tasks_queued / analysis_requests * 100) | |
| if analysis_requests > 0 else 0 | |
| ), | |
| "queued_to_completed": ( | |
| (tasks_completed / tasks_queued * 100) | |
| if tasks_queued > 0 else 0 | |
| ), | |
| "overall_completion": ( | |
| (tasks_completed / dashboard_views * 100) | |
| if dashboard_views > 0 else 0 | |
| ) | |
| } | |
| # Get unique devices at each stage | |
| device_counts = {} | |
| for event_type in funnel_stages.keys(): | |
| pipeline = [ | |
| { | |
| "$match": { | |
| "event_type": event_type, | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| } | |
| } | |
| }, | |
| { | |
| "$group": { | |
| "_id": "$device_id" | |
| } | |
| }, | |
| { | |
| "$count": "total" | |
| } | |
| ] | |
| result = mongodb_service.aggregate_events(pipeline) | |
| device_counts[event_type] = result[0]["total"] if result else 0 | |
| return { | |
| "status": "success", | |
| "data": { | |
| "funnel_stages": funnel_stages, | |
| "conversion_rates": conversions, | |
| "unique_devices_per_stage": device_counts, | |
| "date_range": { | |
| "start": start_date.isoformat(), | |
| "end": end_date.isoformat(), | |
| "days": days | |
| } | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get funnel analysis: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_rate_limit_stats( | |
| days: int = 7, | |
| authorized: bool = Depends(verify_admin_token) | |
| ): | |
| """ | |
| Get rate limit hit statistics. | |
| Args: | |
| days: Number of days to analyze (default 7) | |
| Returns: | |
| Rate limit statistics | |
| """ | |
| mongodb_service = get_mongodb_service() | |
| try: | |
| end_date = datetime.now(timezone.utc) | |
| start_date = end_date - timedelta(days=days) | |
| # Count total rate limit hits | |
| pipeline_total = [ | |
| { | |
| "$match": { | |
| "event_type": "RATE_LIMIT_HIT", | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| } | |
| } | |
| }, | |
| { | |
| "$count": "total" | |
| } | |
| ] | |
| total_result = mongodb_service.aggregate_events(pipeline_total) | |
| total_hits = total_result[0]["total"] if total_result else 0 | |
| # Group by device | |
| pipeline_devices = [ | |
| { | |
| "$match": { | |
| "event_type": "RATE_LIMIT_HIT", | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| } | |
| } | |
| }, | |
| { | |
| "$group": { | |
| "_id": "$device_id", | |
| "count": {"$sum": 1} | |
| } | |
| }, | |
| { | |
| "$sort": {"count": -1} | |
| }, | |
| { | |
| "$limit": 10 | |
| } | |
| ] | |
| top_devices = mongodb_service.aggregate_events(pipeline_devices) | |
| # Timeline | |
| pipeline_timeline = [ | |
| { | |
| "$match": { | |
| "event_type": "RATE_LIMIT_HIT", | |
| "created_at": { | |
| "$gte": start_date, | |
| "$lte": end_date | |
| } | |
| } | |
| }, | |
| { | |
| "$group": { | |
| "_id": { | |
| "year": {"$year": "$created_at"}, | |
| "month": {"$month": "$created_at"}, | |
| "day": {"$dayOfMonth": "$created_at"} | |
| }, | |
| "count": {"$sum": 1} | |
| } | |
| }, | |
| { | |
| "$sort": { | |
| "_id.year": 1, | |
| "_id.month": 1, | |
| "_id.day": 1 | |
| } | |
| } | |
| ] | |
| timeline = mongodb_service.aggregate_events(pipeline_timeline) | |
| return { | |
| "status": "success", | |
| "data": { | |
| "total_hits": total_hits, | |
| "top_devices": [ | |
| {"device_id": item["_id"], "hits": item["count"]} | |
| for item in top_devices | |
| ], | |
| "timeline": [ | |
| { | |
| "date": f"{item['_id']['year']}-{item['_id']['month']:02d}-{item['_id']['day']:02d}", | |
| "count": item["count"] | |
| } | |
| for item in timeline | |
| ] | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get rate limit stats: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |