Spaces:
Running
Running
File size: 11,975 Bytes
3226916 5bd18a7 20f9c30 3226916 5bd18a7 20f9c30 5bd18a7 3226916 20f9c30 5bd18a7 3226916 5bd18a7 3226916 5bd18a7 3226916 5bd18a7 3226916 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 | from fastapi import APIRouter, Depends, HTTPException, Body, Request
from pydantic import BaseModel
import logging
from app.core.auth import verify_api_key
from app.services.chat.api.llm_router import llm_router, circuit_registry
import app.services.chat.api.llm_router as router_module
from app.core.redis_client import redis_client
from app.core.config import settings, BASE_DIR
from app.services.chat.api.model_registry import model_registry
import os
import re
import time
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/admin",
tags=["admin"],
dependencies=[Depends(verify_api_key)]
)
class BulkheadUpdate(BaseModel):
limit: int
class CircuitTune(BaseModel):
failure_threshold: int
recovery_timeout: int
class ModelBanReq(BaseModel):
model_name: str
class KillSwitchReq(BaseModel):
active: bool
class EnvUpdateReq(BaseModel):
provider: str
models_string: str
class BudgetUpdateReq(BaseModel):
daily_token_budget: int
# βββ Permanent Configuration (.env) βββββββββββββββββββββββββββββββββββββββββββ
def _update_env_file(key: str, new_value: str):
"""Safely updates a key inside the physical .env file. Fails gracefully if Read-Only."""
env_path = BASE_DIR / ".env"
try:
if not env_path.exists():
# Fallback creation if it doesn't exist
with open(env_path, "w", encoding="utf-8") as f:
f.write(f"{key}={new_value}\n")
return
with open(env_path, "r", encoding="utf-8") as f:
lines = f.readlines()
updated = False
with open(env_path, "w", encoding="utf-8") as f:
for line in lines:
if line.startswith(f"{key}="):
f.write(f"{key}={new_value}\n")
updated = True
else:
f.write(line)
if not updated:
# Key wasn't found, append it
if lines and not lines[-1].endswith("\n"):
f.write("\n")
f.write(f"{key}={new_value}\n")
except IOError as e:
logger.warning(f"File system is restricted (e.g. Hugging Face Space). Could not save {key} to disk. Falling back to hot RAM reload only. Error: {e}")
pass
@router.get("/env")
async def get_env_models():
"""Retrieve current permanent models defined in the core system."""
return {
"groq": settings.LLM_MODELS_GROQ,
"gemini": settings.LLM_MODELS_GEMINI,
"openai": settings.LLM_MODELS_OPENAI,
"openrouter": settings.LLM_MODELS_OPENROUTER,
}
@router.post("/env")
async def update_env_models(payload: EnvUpdateReq):
"""Securely writes default fallback models to physical .env file and hot-reloads."""
provider = payload.provider.strip().lower()
mapping = {
"groq": "LLM_MODELS_GROQ",
"gemini": "LLM_MODELS_GEMINI",
"openai": "LLM_MODELS_OPENAI",
"openrouter": "LLM_MODELS_OPENROUTER",
}
if provider not in mapping:
raise HTTPException(status_code=400, detail="Invalid provider ID")
key = mapping[provider]
# Write to physical file securely
_update_env_file(key, payload.models_string)
# Write to local settings memory so it's instantly available everywhere
setattr(settings, key, payload.models_string)
# Reload model_registry defaults instantly to trigger Hot-Reload
model_registry._load_defaults()
return {"status": "success", "msg": f"Updated {key} to {payload.models_string} safely."}
@router.post("/provider/{name}/disable")
async def disable_provider(name: str):
"""Manually force a provider offline unconditionally."""
state = llm_router.provider_states.get(name)
if not state:
raise HTTPException(status_code=404, detail="Provider not found")
state.is_permanently_disabled = True
logger.critical(f"Admin action: Provider {name} FORCE DISABLED.")
return {"status": "success", "provider": name, "disabled": True}
@router.post("/provider/{name}/enable")
async def enable_provider(name: str):
"""Restore a manually disabled provider."""
state = llm_router.provider_states.get(name)
if not state:
raise HTTPException(status_code=404, detail="Provider not found")
state.is_permanently_disabled = False
state.disable_until = 0.0
logger.warning(f"Admin action: Provider {name} ENABLED.")
return {"status": "success", "provider": name, "disabled": False}
@router.post("/provider/{name}/bulkhead")
async def update_bulkhead(name: str, payload: BulkheadUpdate):
"""Dynamically resize concurrency limit to throttle/boost a provider."""
ok = llm_router.set_bulkhead_limit(name, payload.limit)
if not ok:
raise HTTPException(status_code=404, detail="Provider not found")
return {"status": "success", "provider": name, "new_limit": payload.limit}
# βββ Circuit Controls βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.post("/circuit/{name}/trip")
async def trip_circuit(name: str):
"""Force circuit OPEN (throws it into cooldown natively)."""
cb = circuit_registry.get(name)
if not cb:
raise HTTPException(status_code=404, detail="Circuit not found")
cb.record_failure(recovery_override=86400) # Open for 24h (admin tripped)
logger.critical(f"Admin action: Circuit for {name} TRIPPED.")
return {"status": "success", "provider": name, "circuit_state": cb.state.value}
@router.post("/circuit/{name}/reset")
async def reset_circuit(name: str):
"""Force circuit CLOSED (instant heal)."""
cb = circuit_registry.get(name)
if not cb:
raise HTTPException(status_code=404, detail="Circuit not found")
cb.record_success()
state = llm_router.provider_states.get(name)
if state:
state.consecutive_failures = 0
state.cooldown_until = 0.0
logger.warning(f"Admin action: Circuit for {name} RESET to CLOSED.")
return {"status": "success", "provider": name, "circuit_state": cb.state.value}
@router.post("/circuit/{name}/tune")
async def tune_circuit(name: str, payload: CircuitTune):
"""Dynamically adjust failure threshold and base recovery timeout."""
cb = circuit_registry.get(name)
if not cb:
raise HTTPException(status_code=404, detail="Circuit not found")
cb.failure_threshold = max(1, payload.failure_threshold)
cb._recovery_base = max(1, payload.recovery_timeout)
cb.recovery_timeout = cb._recovery_base
logger.warning(f"Admin action: Circuit {name} tuned (thr={cb.failure_threshold}, timeout={cb._recovery_base})")
return {"status": "success", "provider": name, "tuned": payload.dict()}
# βββ Model & Global Controls ββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.post("/model/ban")
async def ban_model(payload: ModelBanReq):
"""Add a model string to the global banlist."""
router_module.banned_models.add(payload.model_name)
logger.critical(f"Admin action: Model {payload.model_name} GLOBALLY BANNED.")
return {"status": "success", "banned_models": list(router_module.banned_models)}
@router.post("/model/unban")
async def unban_model(payload: ModelBanReq):
"""Remove a model string from the global banlist."""
router_module.banned_models.discard(payload.model_name)
logger.warning(f"Admin action: Model {payload.model_name} unbanned.")
return {"status": "success", "banned_models": list(router_module.banned_models)}
@router.post("/system/killswitch")
async def toggle_killswitch(payload: KillSwitchReq, request: Request):
"""Global emergency stop. All LLM calls return 503 instantly."""
router_module.KILL_SWITCH_ACTIVE = payload.active
ip = request.headers.get("X-Forwarded-For", getattr(request.client, "host", "unknown"))
if payload.active:
logger.critical("π¨ ADMIN ACTION: GLOBAL KILL SWITCH ACTIVATED. System halted.")
else:
logger.warning("π’ ADMIN ACTION: GLOBAL KILL SWITCH DEACTIVATED. System restored.")
# Fire Telegram alert (non-blocking)
try:
import asyncio
from app.core.alerts import telegram_alerter
asyncio.create_task(telegram_alerter.send_kill_switch_alert(payload.active, by_ip=ip))
except Exception:
pass
return {"status": "success", "kill_switch_active": payload.active}
@router.post("/system/flush-cache")
async def flush_cache():
"""Nuke all Redis keys (cache & telemetry). Extreme warning."""
if redis_client.is_connected and redis_client.redis:
try:
await redis_client.redis.flushdb()
logger.critical("Admin action: Redis FLUSHDB executed.")
return {"status": "success", "msg": "Redis flushed"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=503, detail="Redis offline")
@router.get("/config-state")
async def get_config_state():
"""Retrieve current dynamic config state for Dashboard UI sync."""
return {
"kill_switch_active": router_module.KILL_SWITCH_ACTIVE,
"banned_models": list(router_module.banned_models),
}
# βββ Intelligence & Observability Endpoints ββββββββββββββββββββββββββββββββββββββββββββ
@router.get("/alert-feed")
async def get_alert_feed():
"""Returns last 20 alerts in reverse chronological order for Dashboard alert feed."""
from app.core.alerts import get_alert_history
return {"alerts": get_alert_history()[:20]}
@router.post("/test-alert")
async def send_test_alert(request: Request):
"""Sends a test Telegram alert to verify webhook configuration is working."""
from app.core.alerts import telegram_alerter
ok = await telegram_alerter.send_test_ping()
if ok:
return {"status": "delivered", "msg": "Test alert sent to Telegram successfully!"}
else:
return {
"status": "not_delivered",
"msg": "Telegram not configured or delivery failed. Check TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID.",
}
@router.get("/budget")
async def get_budget():
"""Returns current daily token usage and budget configuration."""
from app.core.cost_tracker import cost_tracker
stats = await cost_tracker.get_daily_stats()
return stats
@router.post("/budget")
async def update_budget(payload: BudgetUpdateReq):
"""Hot-updates the daily token budget limit (takes effect immediately)."""
if payload.daily_token_budget < 1000:
raise HTTPException(status_code=400, detail="Budget must be at least 1000 tokens")
settings.DAILY_TOKEN_BUDGET = payload.daily_token_budget
_update_env_file("DAILY_TOKEN_BUDGET", str(payload.daily_token_budget))
logger.warning(f"Admin action: Daily token budget updated to {payload.daily_token_budget:,}")
return {"status": "success", "daily_token_budget": payload.daily_token_budget}
@router.get("/routing-log")
async def get_routing_log():
"""Returns last 50 LLM routing decisions for Dashboard Intelligence Panel."""
from app.core.redis_client import redis_client
if redis_client.is_connected and redis_client.redis:
try:
raw = await redis_client.redis.lrange("awn:routing:log", 0, 49)
import json
entries = []
for item in raw:
try:
entries.append(json.loads(item))
except Exception:
pass
return {"decisions": entries}
except Exception:
pass
return {"decisions": []}
|