Spaces:
Running
Running
feat: implement smart intelligence layer with hardened auth, telegram alerts, and dashboard v3
3226916 | from fastapi import APIRouter, Depends, HTTPException, Body, Request | |
| from pydantic import BaseModel | |
| import logging | |
| from app.core.auth import verify_api_key | |
| from app.services.chat.api.llm_router import llm_router, circuit_registry | |
| import app.services.chat.api.llm_router as router_module | |
| from app.core.redis_client import redis_client | |
| from app.core.config import settings, BASE_DIR | |
| from app.services.chat.api.model_registry import model_registry | |
| import os | |
| import re | |
| import time | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter( | |
| prefix="/admin", | |
| tags=["admin"], | |
| dependencies=[Depends(verify_api_key)] | |
| ) | |
| class BulkheadUpdate(BaseModel): | |
| limit: int | |
| class CircuitTune(BaseModel): | |
| failure_threshold: int | |
| recovery_timeout: int | |
| class ModelBanReq(BaseModel): | |
| model_name: str | |
| class KillSwitchReq(BaseModel): | |
| active: bool | |
| class EnvUpdateReq(BaseModel): | |
| provider: str | |
| models_string: str | |
| class BudgetUpdateReq(BaseModel): | |
| daily_token_budget: int | |
| # βββ Permanent Configuration (.env) βββββββββββββββββββββββββββββββββββββββββββ | |
| def _update_env_file(key: str, new_value: str): | |
| """Safely updates a key inside the physical .env file. Fails gracefully if Read-Only.""" | |
| env_path = BASE_DIR / ".env" | |
| try: | |
| if not env_path.exists(): | |
| # Fallback creation if it doesn't exist | |
| with open(env_path, "w", encoding="utf-8") as f: | |
| f.write(f"{key}={new_value}\n") | |
| return | |
| with open(env_path, "r", encoding="utf-8") as f: | |
| lines = f.readlines() | |
| updated = False | |
| with open(env_path, "w", encoding="utf-8") as f: | |
| for line in lines: | |
| if line.startswith(f"{key}="): | |
| f.write(f"{key}={new_value}\n") | |
| updated = True | |
| else: | |
| f.write(line) | |
| if not updated: | |
| # Key wasn't found, append it | |
| if lines and not lines[-1].endswith("\n"): | |
| f.write("\n") | |
| f.write(f"{key}={new_value}\n") | |
| except IOError as e: | |
| logger.warning(f"File system is restricted (e.g. Hugging Face Space). Could not save {key} to disk. Falling back to hot RAM reload only. Error: {e}") | |
| pass | |
| async def get_env_models(): | |
| """Retrieve current permanent models defined in the core system.""" | |
| return { | |
| "groq": settings.LLM_MODELS_GROQ, | |
| "gemini": settings.LLM_MODELS_GEMINI, | |
| "openai": settings.LLM_MODELS_OPENAI, | |
| "openrouter": settings.LLM_MODELS_OPENROUTER, | |
| } | |
| async def update_env_models(payload: EnvUpdateReq): | |
| """Securely writes default fallback models to physical .env file and hot-reloads.""" | |
| provider = payload.provider.strip().lower() | |
| mapping = { | |
| "groq": "LLM_MODELS_GROQ", | |
| "gemini": "LLM_MODELS_GEMINI", | |
| "openai": "LLM_MODELS_OPENAI", | |
| "openrouter": "LLM_MODELS_OPENROUTER", | |
| } | |
| if provider not in mapping: | |
| raise HTTPException(status_code=400, detail="Invalid provider ID") | |
| key = mapping[provider] | |
| # Write to physical file securely | |
| _update_env_file(key, payload.models_string) | |
| # Write to local settings memory so it's instantly available everywhere | |
| setattr(settings, key, payload.models_string) | |
| # Reload model_registry defaults instantly to trigger Hot-Reload | |
| model_registry._load_defaults() | |
| return {"status": "success", "msg": f"Updated {key} to {payload.models_string} safely."} | |
| async def disable_provider(name: str): | |
| """Manually force a provider offline unconditionally.""" | |
| state = llm_router.provider_states.get(name) | |
| if not state: | |
| raise HTTPException(status_code=404, detail="Provider not found") | |
| state.is_permanently_disabled = True | |
| logger.critical(f"Admin action: Provider {name} FORCE DISABLED.") | |
| return {"status": "success", "provider": name, "disabled": True} | |
| async def enable_provider(name: str): | |
| """Restore a manually disabled provider.""" | |
| state = llm_router.provider_states.get(name) | |
| if not state: | |
| raise HTTPException(status_code=404, detail="Provider not found") | |
| state.is_permanently_disabled = False | |
| state.disable_until = 0.0 | |
| logger.warning(f"Admin action: Provider {name} ENABLED.") | |
| return {"status": "success", "provider": name, "disabled": False} | |
| async def update_bulkhead(name: str, payload: BulkheadUpdate): | |
| """Dynamically resize concurrency limit to throttle/boost a provider.""" | |
| ok = llm_router.set_bulkhead_limit(name, payload.limit) | |
| if not ok: | |
| raise HTTPException(status_code=404, detail="Provider not found") | |
| return {"status": "success", "provider": name, "new_limit": payload.limit} | |
| # βββ Circuit Controls βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def trip_circuit(name: str): | |
| """Force circuit OPEN (throws it into cooldown natively).""" | |
| cb = circuit_registry.get(name) | |
| if not cb: | |
| raise HTTPException(status_code=404, detail="Circuit not found") | |
| cb.record_failure(recovery_override=86400) # Open for 24h (admin tripped) | |
| logger.critical(f"Admin action: Circuit for {name} TRIPPED.") | |
| return {"status": "success", "provider": name, "circuit_state": cb.state.value} | |
| async def reset_circuit(name: str): | |
| """Force circuit CLOSED (instant heal).""" | |
| cb = circuit_registry.get(name) | |
| if not cb: | |
| raise HTTPException(status_code=404, detail="Circuit not found") | |
| cb.record_success() | |
| state = llm_router.provider_states.get(name) | |
| if state: | |
| state.consecutive_failures = 0 | |
| state.cooldown_until = 0.0 | |
| logger.warning(f"Admin action: Circuit for {name} RESET to CLOSED.") | |
| return {"status": "success", "provider": name, "circuit_state": cb.state.value} | |
| async def tune_circuit(name: str, payload: CircuitTune): | |
| """Dynamically adjust failure threshold and base recovery timeout.""" | |
| cb = circuit_registry.get(name) | |
| if not cb: | |
| raise HTTPException(status_code=404, detail="Circuit not found") | |
| cb.failure_threshold = max(1, payload.failure_threshold) | |
| cb._recovery_base = max(1, payload.recovery_timeout) | |
| cb.recovery_timeout = cb._recovery_base | |
| logger.warning(f"Admin action: Circuit {name} tuned (thr={cb.failure_threshold}, timeout={cb._recovery_base})") | |
| return {"status": "success", "provider": name, "tuned": payload.dict()} | |
| # βββ Model & Global Controls ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def ban_model(payload: ModelBanReq): | |
| """Add a model string to the global banlist.""" | |
| router_module.banned_models.add(payload.model_name) | |
| logger.critical(f"Admin action: Model {payload.model_name} GLOBALLY BANNED.") | |
| return {"status": "success", "banned_models": list(router_module.banned_models)} | |
| async def unban_model(payload: ModelBanReq): | |
| """Remove a model string from the global banlist.""" | |
| router_module.banned_models.discard(payload.model_name) | |
| logger.warning(f"Admin action: Model {payload.model_name} unbanned.") | |
| return {"status": "success", "banned_models": list(router_module.banned_models)} | |
| async def toggle_killswitch(payload: KillSwitchReq, request: Request): | |
| """Global emergency stop. All LLM calls return 503 instantly.""" | |
| router_module.KILL_SWITCH_ACTIVE = payload.active | |
| ip = request.headers.get("X-Forwarded-For", getattr(request.client, "host", "unknown")) | |
| if payload.active: | |
| logger.critical("π¨ ADMIN ACTION: GLOBAL KILL SWITCH ACTIVATED. System halted.") | |
| else: | |
| logger.warning("π’ ADMIN ACTION: GLOBAL KILL SWITCH DEACTIVATED. System restored.") | |
| # Fire Telegram alert (non-blocking) | |
| try: | |
| import asyncio | |
| from app.core.alerts import telegram_alerter | |
| asyncio.create_task(telegram_alerter.send_kill_switch_alert(payload.active, by_ip=ip)) | |
| except Exception: | |
| pass | |
| return {"status": "success", "kill_switch_active": payload.active} | |
| async def flush_cache(): | |
| """Nuke all Redis keys (cache & telemetry). Extreme warning.""" | |
| if redis_client.is_connected and redis_client.redis: | |
| try: | |
| await redis_client.redis.flushdb() | |
| logger.critical("Admin action: Redis FLUSHDB executed.") | |
| return {"status": "success", "msg": "Redis flushed"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| raise HTTPException(status_code=503, detail="Redis offline") | |
| async def get_config_state(): | |
| """Retrieve current dynamic config state for Dashboard UI sync.""" | |
| return { | |
| "kill_switch_active": router_module.KILL_SWITCH_ACTIVE, | |
| "banned_models": list(router_module.banned_models), | |
| } | |
| # βββ Intelligence & Observability Endpoints ββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_alert_feed(): | |
| """Returns last 20 alerts in reverse chronological order for Dashboard alert feed.""" | |
| from app.core.alerts import get_alert_history | |
| return {"alerts": get_alert_history()[:20]} | |
| async def send_test_alert(request: Request): | |
| """Sends a test Telegram alert to verify webhook configuration is working.""" | |
| from app.core.alerts import telegram_alerter | |
| ok = await telegram_alerter.send_test_ping() | |
| if ok: | |
| return {"status": "delivered", "msg": "Test alert sent to Telegram successfully!"} | |
| else: | |
| return { | |
| "status": "not_delivered", | |
| "msg": "Telegram not configured or delivery failed. Check TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID.", | |
| } | |
| async def get_budget(): | |
| """Returns current daily token usage and budget configuration.""" | |
| from app.core.cost_tracker import cost_tracker | |
| stats = await cost_tracker.get_daily_stats() | |
| return stats | |
| async def update_budget(payload: BudgetUpdateReq): | |
| """Hot-updates the daily token budget limit (takes effect immediately).""" | |
| if payload.daily_token_budget < 1000: | |
| raise HTTPException(status_code=400, detail="Budget must be at least 1000 tokens") | |
| settings.DAILY_TOKEN_BUDGET = payload.daily_token_budget | |
| _update_env_file("DAILY_TOKEN_BUDGET", str(payload.daily_token_budget)) | |
| logger.warning(f"Admin action: Daily token budget updated to {payload.daily_token_budget:,}") | |
| return {"status": "success", "daily_token_budget": payload.daily_token_budget} | |
| async def get_routing_log(): | |
| """Returns last 50 LLM routing decisions for Dashboard Intelligence Panel.""" | |
| from app.core.redis_client import redis_client | |
| if redis_client.is_connected and redis_client.redis: | |
| try: | |
| raw = await redis_client.redis.lrange("awn:routing:log", 0, 49) | |
| import json | |
| entries = [] | |
| for item in raw: | |
| try: | |
| entries.append(json.loads(item)) | |
| except Exception: | |
| pass | |
| return {"decisions": entries} | |
| except Exception: | |
| pass | |
| return {"decisions": []} | |