from fastapi import APIRouter, Depends, HTTPException, Body, Request from pydantic import BaseModel import logging from app.core.auth import verify_api_key from app.services.chat.api.llm_router import llm_router, circuit_registry import app.services.chat.api.llm_router as router_module from app.core.redis_client import redis_client from app.core.config import settings, BASE_DIR from app.services.chat.api.model_registry import model_registry import os import re import time logger = logging.getLogger(__name__) router = APIRouter( prefix="/admin", tags=["admin"], dependencies=[Depends(verify_api_key)] ) class BulkheadUpdate(BaseModel): limit: int class CircuitTune(BaseModel): failure_threshold: int recovery_timeout: int class ModelBanReq(BaseModel): model_name: str class KillSwitchReq(BaseModel): active: bool class EnvUpdateReq(BaseModel): provider: str models_string: str class BudgetUpdateReq(BaseModel): daily_token_budget: int # ─── Permanent Configuration (.env) ─────────────────────────────────────────── def _update_env_file(key: str, new_value: str): """Safely updates a key inside the physical .env file. Fails gracefully if Read-Only.""" env_path = BASE_DIR / ".env" try: if not env_path.exists(): # Fallback creation if it doesn't exist with open(env_path, "w", encoding="utf-8") as f: f.write(f"{key}={new_value}\n") return with open(env_path, "r", encoding="utf-8") as f: lines = f.readlines() updated = False with open(env_path, "w", encoding="utf-8") as f: for line in lines: if line.startswith(f"{key}="): f.write(f"{key}={new_value}\n") updated = True else: f.write(line) if not updated: # Key wasn't found, append it if lines and not lines[-1].endswith("\n"): f.write("\n") f.write(f"{key}={new_value}\n") except IOError as e: logger.warning(f"File system is restricted (e.g. Hugging Face Space). Could not save {key} to disk. Falling back to hot RAM reload only. Error: {e}") pass @router.get("/env") async def get_env_models(): """Retrieve current permanent models defined in the core system.""" return { "groq": settings.LLM_MODELS_GROQ, "gemini": settings.LLM_MODELS_GEMINI, "openai": settings.LLM_MODELS_OPENAI, "openrouter": settings.LLM_MODELS_OPENROUTER, } @router.post("/env") async def update_env_models(payload: EnvUpdateReq): """Securely writes default fallback models to physical .env file and hot-reloads.""" provider = payload.provider.strip().lower() mapping = { "groq": "LLM_MODELS_GROQ", "gemini": "LLM_MODELS_GEMINI", "openai": "LLM_MODELS_OPENAI", "openrouter": "LLM_MODELS_OPENROUTER", } if provider not in mapping: raise HTTPException(status_code=400, detail="Invalid provider ID") key = mapping[provider] # Write to physical file securely _update_env_file(key, payload.models_string) # Write to local settings memory so it's instantly available everywhere setattr(settings, key, payload.models_string) # Reload model_registry defaults instantly to trigger Hot-Reload model_registry._load_defaults() return {"status": "success", "msg": f"Updated {key} to {payload.models_string} safely."} @router.post("/provider/{name}/disable") async def disable_provider(name: str): """Manually force a provider offline unconditionally.""" state = llm_router.provider_states.get(name) if not state: raise HTTPException(status_code=404, detail="Provider not found") state.is_permanently_disabled = True logger.critical(f"Admin action: Provider {name} FORCE DISABLED.") return {"status": "success", "provider": name, "disabled": True} @router.post("/provider/{name}/enable") async def enable_provider(name: str): """Restore a manually disabled provider.""" state = llm_router.provider_states.get(name) if not state: raise HTTPException(status_code=404, detail="Provider not found") state.is_permanently_disabled = False state.disable_until = 0.0 logger.warning(f"Admin action: Provider {name} ENABLED.") return {"status": "success", "provider": name, "disabled": False} @router.post("/provider/{name}/bulkhead") async def update_bulkhead(name: str, payload: BulkheadUpdate): """Dynamically resize concurrency limit to throttle/boost a provider.""" ok = llm_router.set_bulkhead_limit(name, payload.limit) if not ok: raise HTTPException(status_code=404, detail="Provider not found") return {"status": "success", "provider": name, "new_limit": payload.limit} # ─── Circuit Controls ───────────────────────────────────────────────────────── @router.post("/circuit/{name}/trip") async def trip_circuit(name: str): """Force circuit OPEN (throws it into cooldown natively).""" cb = circuit_registry.get(name) if not cb: raise HTTPException(status_code=404, detail="Circuit not found") cb.record_failure(recovery_override=86400) # Open for 24h (admin tripped) logger.critical(f"Admin action: Circuit for {name} TRIPPED.") return {"status": "success", "provider": name, "circuit_state": cb.state.value} @router.post("/circuit/{name}/reset") async def reset_circuit(name: str): """Force circuit CLOSED (instant heal).""" cb = circuit_registry.get(name) if not cb: raise HTTPException(status_code=404, detail="Circuit not found") cb.record_success() state = llm_router.provider_states.get(name) if state: state.consecutive_failures = 0 state.cooldown_until = 0.0 logger.warning(f"Admin action: Circuit for {name} RESET to CLOSED.") return {"status": "success", "provider": name, "circuit_state": cb.state.value} @router.post("/circuit/{name}/tune") async def tune_circuit(name: str, payload: CircuitTune): """Dynamically adjust failure threshold and base recovery timeout.""" cb = circuit_registry.get(name) if not cb: raise HTTPException(status_code=404, detail="Circuit not found") cb.failure_threshold = max(1, payload.failure_threshold) cb._recovery_base = max(1, payload.recovery_timeout) cb.recovery_timeout = cb._recovery_base logger.warning(f"Admin action: Circuit {name} tuned (thr={cb.failure_threshold}, timeout={cb._recovery_base})") return {"status": "success", "provider": name, "tuned": payload.dict()} # ─── Model & Global Controls ────────────────────────────────────────────────── @router.post("/model/ban") async def ban_model(payload: ModelBanReq): """Add a model string to the global banlist.""" router_module.banned_models.add(payload.model_name) logger.critical(f"Admin action: Model {payload.model_name} GLOBALLY BANNED.") return {"status": "success", "banned_models": list(router_module.banned_models)} @router.post("/model/unban") async def unban_model(payload: ModelBanReq): """Remove a model string from the global banlist.""" router_module.banned_models.discard(payload.model_name) logger.warning(f"Admin action: Model {payload.model_name} unbanned.") return {"status": "success", "banned_models": list(router_module.banned_models)} @router.post("/system/killswitch") async def toggle_killswitch(payload: KillSwitchReq, request: Request): """Global emergency stop. All LLM calls return 503 instantly.""" router_module.KILL_SWITCH_ACTIVE = payload.active ip = request.headers.get("X-Forwarded-For", getattr(request.client, "host", "unknown")) if payload.active: logger.critical("🚨 ADMIN ACTION: GLOBAL KILL SWITCH ACTIVATED. System halted.") else: logger.warning("🟢 ADMIN ACTION: GLOBAL KILL SWITCH DEACTIVATED. System restored.") # Fire Telegram alert (non-blocking) try: import asyncio from app.core.alerts import telegram_alerter asyncio.create_task(telegram_alerter.send_kill_switch_alert(payload.active, by_ip=ip)) except Exception: pass return {"status": "success", "kill_switch_active": payload.active} @router.post("/system/flush-cache") async def flush_cache(): """Nuke all Redis keys (cache & telemetry). Extreme warning.""" if redis_client.is_connected and redis_client.redis: try: await redis_client.redis.flushdb() logger.critical("Admin action: Redis FLUSHDB executed.") return {"status": "success", "msg": "Redis flushed"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=503, detail="Redis offline") @router.get("/config-state") async def get_config_state(): """Retrieve current dynamic config state for Dashboard UI sync.""" return { "kill_switch_active": router_module.KILL_SWITCH_ACTIVE, "banned_models": list(router_module.banned_models), } # ─── Intelligence & Observability Endpoints ──────────────────────────────────────────── @router.get("/alert-feed") async def get_alert_feed(): """Returns last 20 alerts in reverse chronological order for Dashboard alert feed.""" from app.core.alerts import get_alert_history return {"alerts": get_alert_history()[:20]} @router.post("/test-alert") async def send_test_alert(request: Request): """Sends a test Telegram alert to verify webhook configuration is working.""" from app.core.alerts import telegram_alerter ok = await telegram_alerter.send_test_ping() if ok: return {"status": "delivered", "msg": "Test alert sent to Telegram successfully!"} else: return { "status": "not_delivered", "msg": "Telegram not configured or delivery failed. Check TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID.", } @router.get("/budget") async def get_budget(): """Returns current daily token usage and budget configuration.""" from app.core.cost_tracker import cost_tracker stats = await cost_tracker.get_daily_stats() return stats @router.post("/budget") async def update_budget(payload: BudgetUpdateReq): """Hot-updates the daily token budget limit (takes effect immediately).""" if payload.daily_token_budget < 1000: raise HTTPException(status_code=400, detail="Budget must be at least 1000 tokens") settings.DAILY_TOKEN_BUDGET = payload.daily_token_budget _update_env_file("DAILY_TOKEN_BUDGET", str(payload.daily_token_budget)) logger.warning(f"Admin action: Daily token budget updated to {payload.daily_token_budget:,}") return {"status": "success", "daily_token_budget": payload.daily_token_budget} @router.get("/routing-log") async def get_routing_log(): """Returns last 50 LLM routing decisions for Dashboard Intelligence Panel.""" from app.core.redis_client import redis_client if redis_client.is_connected and redis_client.redis: try: raw = await redis_client.redis.lrange("awn:routing:log", 0, 49) import json entries = [] for item in raw: try: entries.append(json.loads(item)) except Exception: pass return {"decisions": entries} except Exception: pass return {"decisions": []}