Spaces:
Paused
Paused
| """ | |
| Deprecated Endpoint Monitoring | |
| This module provides monitoring and alerting for deprecated API endpoints. | |
| Tracks usage patterns and sends alerts when deprecated endpoints are accessed. | |
| """ | |
| import logging | |
| from collections import defaultdict | |
| from datetime import datetime, timedelta | |
| from fastapi import Request | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| logger = logging.getLogger(__name__) | |
| # Track deprecated endpoint usage | |
| _deprecated_usage_stats = defaultdict( | |
| lambda: { | |
| "count": 0, | |
| "first_seen": None, | |
| "last_seen": None, | |
| "user_agents": set(), | |
| "ip_addresses": set(), | |
| } | |
| ) | |
| # Define deprecated endpoints | |
| DEPRECATED_ENDPOINTS = { | |
| "/api/v1/semantic_search/index": { | |
| "deprecated_since": "2025-12-20", | |
| "removal_date": "2026-02-01", | |
| "replacement": "/api/v1/ai/embeddings", | |
| "method": "POST", | |
| }, | |
| "/api/v1/semantic_search/index/batch": { | |
| "deprecated_since": "2025-12-20", | |
| "removal_date": "2026-02-01", | |
| "replacement": "/api/v1/ai/embeddings", | |
| "method": "POST", | |
| }, | |
| "/api/v1/semantic_search/search": { | |
| "deprecated_since": "2025-12-20", | |
| "removal_date": "2026-02-01", | |
| "replacement": "/api/v1/ai/semantic-search", | |
| "method": "POST", | |
| }, | |
| "/api/v1/semantic_search/stats": { | |
| "deprecated_since": "2025-12-20", | |
| "removal_date": "2026-02-01", | |
| "replacement": None, | |
| "method": "GET", | |
| }, | |
| "/api/v1/semantic_search/rebuild": { | |
| "deprecated_since": "2025-12-20", | |
| "removal_date": "2026-02-01", | |
| "replacement": None, | |
| "method": "POST", | |
| }, | |
| "/api/v1/semantic_search/backends": { | |
| "deprecated_since": "2025-12-20", | |
| "removal_date": "2026-02-01", | |
| "replacement": None, | |
| "method": "GET", | |
| }, | |
| "/api/v1/semantic_search/switch-backend": { | |
| "deprecated_since": "2025-12-20", | |
| "removal_date": "2026-02-01", | |
| "replacement": None, | |
| "method": "POST", | |
| }, | |
| } | |
| # Alert thresholds | |
| ALERT_THRESHOLD_HOURLY = 10 # Alert if deprecated endpoint called 10+ times per hour | |
| ALERT_THRESHOLD_DAILY = 100 # Alert if deprecated endpoint called 100+ times per day | |
| class DeprecatedEndpointMonitor(BaseHTTPMiddleware): | |
| """Middleware to monitor and alert on deprecated endpoint usage""" | |
| async def dispatch(self, request: Request, call_next): | |
| """Track deprecated endpoint calls""" | |
| path = request.url.path | |
| method = request.method | |
| # Check if this is a deprecated endpoint | |
| if path in DEPRECATED_ENDPOINTS: | |
| endpoint_info = DEPRECATED_ENDPOINTS[path] | |
| # Only track if method matches | |
| if endpoint_info["method"] == method: | |
| await self._track_usage(request, path, endpoint_info) | |
| # Continue with request | |
| response = await call_next(request) | |
| return response | |
| async def _track_usage(self, request: Request, path: str, endpoint_info: dict): | |
| """Track usage statistics for deprecated endpoint""" | |
| now = datetime.utcnow() | |
| user_agent = request.headers.get("user-agent", "unknown") | |
| ip_address = request.client.host if request.client else "unknown" | |
| # Update stats | |
| stats = _deprecated_usage_stats[path] | |
| stats["count"] += 1 | |
| if stats["first_seen"] is None: | |
| stats["first_seen"] = now | |
| stats["last_seen"] = now | |
| stats["user_agents"].add(user_agent[:100]) # Truncate to avoid memory issues | |
| stats["ip_addresses"].add(ip_address) | |
| # Log the usage | |
| logger.warning( | |
| f"Deprecated endpoint accessed: {path}", | |
| extra={ | |
| "deprecated_endpoint": path, | |
| "replacement": endpoint_info["replacement"], | |
| "removal_date": endpoint_info["removal_date"], | |
| "ip_address": ip_address, | |
| "user_agent": user_agent, | |
| "total_calls": stats["count"], | |
| "unique_ips": len(stats["ip_addresses"]), | |
| "unique_user_agents": len(stats["user_agents"]), | |
| }, | |
| ) | |
| # Check if we need to send an alert | |
| await self._check_alert_threshold(path, stats, endpoint_info) | |
| async def _check_alert_threshold(self, path: str, stats: dict, endpoint_info: dict): | |
| """Check if usage has exceeded alert thresholds""" | |
| now = datetime.utcnow() | |
| # Check hourly threshold | |
| if stats["last_seen"] and now - stats["last_seen"] < timedelta(hours=1): | |
| if stats["count"] >= ALERT_THRESHOLD_HOURLY: | |
| logger.critical( | |
| f"ALERT: Deprecated endpoint {path} called {stats['count']} times in the last hour!", | |
| extra={ | |
| "alert_type": "deprecated_endpoint_high_usage", | |
| "endpoint": path, | |
| "count": stats["count"], | |
| "threshold": ALERT_THRESHOLD_HOURLY, | |
| "replacement": endpoint_info["replacement"], | |
| "removal_date": endpoint_info["removal_date"], | |
| "unique_ips": len(stats["ip_addresses"]), | |
| "action_required": "Contact clients to migrate", | |
| }, | |
| ) | |
| # Check daily threshold | |
| if stats["first_seen"] and now - stats["first_seen"] < timedelta(days=1): | |
| if stats["count"] >= ALERT_THRESHOLD_DAILY: | |
| logger.critical( | |
| f"ALERT: Deprecated endpoint {path} called {stats['count']} times in the last 24 hours!", | |
| extra={ | |
| "alert_type": "deprecated_endpoint_very_high_usage", | |
| "endpoint": path, | |
| "count": stats["count"], | |
| "threshold": ALERT_THRESHOLD_DAILY, | |
| "replacement": endpoint_info["replacement"], | |
| "removal_date": endpoint_info["removal_date"], | |
| "unique_ips": len(stats["ip_addresses"]), | |
| "action_required": "URGENT: Mass migration needed", | |
| }, | |
| ) | |
| def get_deprecated_usage_stats() -> dict[str, dict]: | |
| """Get statistics on deprecated endpoint usage""" | |
| # Convert sets to lists for JSON serialization | |
| stats = {} | |
| for endpoint, data in _deprecated_usage_stats.items(): | |
| stats[endpoint] = { | |
| "count": data["count"], | |
| "first_seen": ( | |
| data["first_seen"].isoformat() if data["first_seen"] else None | |
| ), | |
| "last_seen": data["last_seen"].isoformat() if data["last_seen"] else None, | |
| "unique_user_agents": len(data["user_agents"]), | |
| "unique_ip_addresses": len(data["ip_addresses"]), | |
| "endpoint_info": DEPRECATED_ENDPOINTS.get(endpoint, {}), | |
| } | |
| return stats | |
| def get_deprecation_warnings() -> list[dict]: | |
| """Get list of deprecation warnings for upcoming removals""" | |
| warnings = [] | |
| now = datetime.utcnow() | |
| for endpoint, info in DEPRECATED_ENDPOINTS.items(): | |
| removal_date = datetime.fromisoformat(info["removal_date"]) | |
| days_until_removal = (removal_date - now).days | |
| stats = _deprecated_usage_stats.get(endpoint, {}) | |
| usage_count = stats.get("count", 0) | |
| if usage_count > 0 and days_until_removal <= 30: | |
| warnings.append( | |
| { | |
| "endpoint": endpoint, | |
| "method": info["method"], | |
| "removal_date": info["removal_date"], | |
| "days_until_removal": days_until_removal, | |
| "usage_count": usage_count, | |
| "replacement": info["replacement"], | |
| "severity": "critical" if days_until_removal <= 7 else "high", | |
| } | |
| ) | |
| return sorted(warnings, key=lambda x: x["days_until_removal"]) | |
| def reset_usage_stats(): | |
| """Reset usage statistics (for testing or new reporting period)""" | |
| global _deprecated_usage_stats | |
| _deprecated_usage_stats = defaultdict( | |
| lambda: { | |
| "count": 0, | |
| "first_seen": None, | |
| "last_seen": None, | |
| "user_agents": set(), | |
| "ip_addresses": set(), | |
| } | |
| ) | |
| # Utility function for manual alerts | |
| def check_and_send_deprecation_alerts(): | |
| """Check all deprecated endpoints and send alerts if needed""" | |
| warnings = get_deprecation_warnings() | |
| for warning in warnings: | |
| if warning["severity"] == "critical": | |
| logger.critical( | |
| f"CRITICAL: Deprecated endpoint {warning['endpoint']} will be removed in " | |
| f"{warning['days_until_removal']} days! Still receiving {warning['usage_count']} calls. " | |
| f"Migration required!", | |
| extra=warning, | |
| ) | |
| elif warning["severity"] == "high": | |
| logger.error( | |
| f"WARNING: Deprecated endpoint {warning['endpoint']} will be removed in " | |
| f"{warning['days_until_removal']} days. Currently receiving {warning['usage_count']} calls.", | |
| extra=warning, | |
| ) | |