Aoun-Ai / app /api /admin_ops.py
MuhammadMahmoud's picture
feat: implement smart intelligence layer with hardened auth, telegram alerts, and dashboard v3
3226916
from fastapi import APIRouter, Depends, HTTPException, Body, Request
from pydantic import BaseModel
import logging
from app.core.auth import verify_api_key
from app.services.chat.api.llm_router import llm_router, circuit_registry
import app.services.chat.api.llm_router as router_module
from app.core.redis_client import redis_client
from app.core.config import settings, BASE_DIR
from app.services.chat.api.model_registry import model_registry
import os
import re
import time
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/admin",
tags=["admin"],
dependencies=[Depends(verify_api_key)]
)
class BulkheadUpdate(BaseModel):
limit: int
class CircuitTune(BaseModel):
failure_threshold: int
recovery_timeout: int
class ModelBanReq(BaseModel):
model_name: str
class KillSwitchReq(BaseModel):
active: bool
class EnvUpdateReq(BaseModel):
provider: str
models_string: str
class BudgetUpdateReq(BaseModel):
daily_token_budget: int
# ─── Permanent Configuration (.env) ───────────────────────────────────────────
def _update_env_file(key: str, new_value: str):
"""Safely updates a key inside the physical .env file. Fails gracefully if Read-Only."""
env_path = BASE_DIR / ".env"
try:
if not env_path.exists():
# Fallback creation if it doesn't exist
with open(env_path, "w", encoding="utf-8") as f:
f.write(f"{key}={new_value}\n")
return
with open(env_path, "r", encoding="utf-8") as f:
lines = f.readlines()
updated = False
with open(env_path, "w", encoding="utf-8") as f:
for line in lines:
if line.startswith(f"{key}="):
f.write(f"{key}={new_value}\n")
updated = True
else:
f.write(line)
if not updated:
# Key wasn't found, append it
if lines and not lines[-1].endswith("\n"):
f.write("\n")
f.write(f"{key}={new_value}\n")
except IOError as e:
logger.warning(f"File system is restricted (e.g. Hugging Face Space). Could not save {key} to disk. Falling back to hot RAM reload only. Error: {e}")
pass
@router.get("/env")
async def get_env_models():
"""Retrieve current permanent models defined in the core system."""
return {
"groq": settings.LLM_MODELS_GROQ,
"gemini": settings.LLM_MODELS_GEMINI,
"openai": settings.LLM_MODELS_OPENAI,
"openrouter": settings.LLM_MODELS_OPENROUTER,
}
@router.post("/env")
async def update_env_models(payload: EnvUpdateReq):
"""Securely writes default fallback models to physical .env file and hot-reloads."""
provider = payload.provider.strip().lower()
mapping = {
"groq": "LLM_MODELS_GROQ",
"gemini": "LLM_MODELS_GEMINI",
"openai": "LLM_MODELS_OPENAI",
"openrouter": "LLM_MODELS_OPENROUTER",
}
if provider not in mapping:
raise HTTPException(status_code=400, detail="Invalid provider ID")
key = mapping[provider]
# Write to physical file securely
_update_env_file(key, payload.models_string)
# Write to local settings memory so it's instantly available everywhere
setattr(settings, key, payload.models_string)
# Reload model_registry defaults instantly to trigger Hot-Reload
model_registry._load_defaults()
return {"status": "success", "msg": f"Updated {key} to {payload.models_string} safely."}
@router.post("/provider/{name}/disable")
async def disable_provider(name: str):
"""Manually force a provider offline unconditionally."""
state = llm_router.provider_states.get(name)
if not state:
raise HTTPException(status_code=404, detail="Provider not found")
state.is_permanently_disabled = True
logger.critical(f"Admin action: Provider {name} FORCE DISABLED.")
return {"status": "success", "provider": name, "disabled": True}
@router.post("/provider/{name}/enable")
async def enable_provider(name: str):
"""Restore a manually disabled provider."""
state = llm_router.provider_states.get(name)
if not state:
raise HTTPException(status_code=404, detail="Provider not found")
state.is_permanently_disabled = False
state.disable_until = 0.0
logger.warning(f"Admin action: Provider {name} ENABLED.")
return {"status": "success", "provider": name, "disabled": False}
@router.post("/provider/{name}/bulkhead")
async def update_bulkhead(name: str, payload: BulkheadUpdate):
"""Dynamically resize concurrency limit to throttle/boost a provider."""
ok = llm_router.set_bulkhead_limit(name, payload.limit)
if not ok:
raise HTTPException(status_code=404, detail="Provider not found")
return {"status": "success", "provider": name, "new_limit": payload.limit}
# ─── Circuit Controls ─────────────────────────────────────────────────────────
@router.post("/circuit/{name}/trip")
async def trip_circuit(name: str):
"""Force circuit OPEN (throws it into cooldown natively)."""
cb = circuit_registry.get(name)
if not cb:
raise HTTPException(status_code=404, detail="Circuit not found")
cb.record_failure(recovery_override=86400) # Open for 24h (admin tripped)
logger.critical(f"Admin action: Circuit for {name} TRIPPED.")
return {"status": "success", "provider": name, "circuit_state": cb.state.value}
@router.post("/circuit/{name}/reset")
async def reset_circuit(name: str):
"""Force circuit CLOSED (instant heal)."""
cb = circuit_registry.get(name)
if not cb:
raise HTTPException(status_code=404, detail="Circuit not found")
cb.record_success()
state = llm_router.provider_states.get(name)
if state:
state.consecutive_failures = 0
state.cooldown_until = 0.0
logger.warning(f"Admin action: Circuit for {name} RESET to CLOSED.")
return {"status": "success", "provider": name, "circuit_state": cb.state.value}
@router.post("/circuit/{name}/tune")
async def tune_circuit(name: str, payload: CircuitTune):
"""Dynamically adjust failure threshold and base recovery timeout."""
cb = circuit_registry.get(name)
if not cb:
raise HTTPException(status_code=404, detail="Circuit not found")
cb.failure_threshold = max(1, payload.failure_threshold)
cb._recovery_base = max(1, payload.recovery_timeout)
cb.recovery_timeout = cb._recovery_base
logger.warning(f"Admin action: Circuit {name} tuned (thr={cb.failure_threshold}, timeout={cb._recovery_base})")
return {"status": "success", "provider": name, "tuned": payload.dict()}
# ─── Model & Global Controls ──────────────────────────────────────────────────
@router.post("/model/ban")
async def ban_model(payload: ModelBanReq):
"""Add a model string to the global banlist."""
router_module.banned_models.add(payload.model_name)
logger.critical(f"Admin action: Model {payload.model_name} GLOBALLY BANNED.")
return {"status": "success", "banned_models": list(router_module.banned_models)}
@router.post("/model/unban")
async def unban_model(payload: ModelBanReq):
"""Remove a model string from the global banlist."""
router_module.banned_models.discard(payload.model_name)
logger.warning(f"Admin action: Model {payload.model_name} unbanned.")
return {"status": "success", "banned_models": list(router_module.banned_models)}
@router.post("/system/killswitch")
async def toggle_killswitch(payload: KillSwitchReq, request: Request):
"""Global emergency stop. All LLM calls return 503 instantly."""
router_module.KILL_SWITCH_ACTIVE = payload.active
ip = request.headers.get("X-Forwarded-For", getattr(request.client, "host", "unknown"))
if payload.active:
logger.critical("🚨 ADMIN ACTION: GLOBAL KILL SWITCH ACTIVATED. System halted.")
else:
logger.warning("🟒 ADMIN ACTION: GLOBAL KILL SWITCH DEACTIVATED. System restored.")
# Fire Telegram alert (non-blocking)
try:
import asyncio
from app.core.alerts import telegram_alerter
asyncio.create_task(telegram_alerter.send_kill_switch_alert(payload.active, by_ip=ip))
except Exception:
pass
return {"status": "success", "kill_switch_active": payload.active}
@router.post("/system/flush-cache")
async def flush_cache():
"""Nuke all Redis keys (cache & telemetry). Extreme warning."""
if redis_client.is_connected and redis_client.redis:
try:
await redis_client.redis.flushdb()
logger.critical("Admin action: Redis FLUSHDB executed.")
return {"status": "success", "msg": "Redis flushed"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=503, detail="Redis offline")
@router.get("/config-state")
async def get_config_state():
"""Retrieve current dynamic config state for Dashboard UI sync."""
return {
"kill_switch_active": router_module.KILL_SWITCH_ACTIVE,
"banned_models": list(router_module.banned_models),
}
# ─── Intelligence & Observability Endpoints ────────────────────────────────────────────
@router.get("/alert-feed")
async def get_alert_feed():
"""Returns last 20 alerts in reverse chronological order for Dashboard alert feed."""
from app.core.alerts import get_alert_history
return {"alerts": get_alert_history()[:20]}
@router.post("/test-alert")
async def send_test_alert(request: Request):
"""Sends a test Telegram alert to verify webhook configuration is working."""
from app.core.alerts import telegram_alerter
ok = await telegram_alerter.send_test_ping()
if ok:
return {"status": "delivered", "msg": "Test alert sent to Telegram successfully!"}
else:
return {
"status": "not_delivered",
"msg": "Telegram not configured or delivery failed. Check TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID.",
}
@router.get("/budget")
async def get_budget():
"""Returns current daily token usage and budget configuration."""
from app.core.cost_tracker import cost_tracker
stats = await cost_tracker.get_daily_stats()
return stats
@router.post("/budget")
async def update_budget(payload: BudgetUpdateReq):
"""Hot-updates the daily token budget limit (takes effect immediately)."""
if payload.daily_token_budget < 1000:
raise HTTPException(status_code=400, detail="Budget must be at least 1000 tokens")
settings.DAILY_TOKEN_BUDGET = payload.daily_token_budget
_update_env_file("DAILY_TOKEN_BUDGET", str(payload.daily_token_budget))
logger.warning(f"Admin action: Daily token budget updated to {payload.daily_token_budget:,}")
return {"status": "success", "daily_token_budget": payload.daily_token_budget}
@router.get("/routing-log")
async def get_routing_log():
"""Returns last 50 LLM routing decisions for Dashboard Intelligence Panel."""
from app.core.redis_client import redis_client
if redis_client.is_connected and redis_client.redis:
try:
raw = await redis_client.redis.lrange("awn:routing:log", 0, 49)
import json
entries = []
for item in raw:
try:
entries.append(json.loads(item))
except Exception:
pass
return {"decisions": entries}
except Exception:
pass
return {"decisions": []}