legitey01's picture
Update app.py
a955569 verified
import os
import re
import time
import requests
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI(title="List-2.0-Ultra-Coder")
SYSTEM_IDENTITY = {
"role": "system",
"content": "Voce e o List-2.0-Ultra-Coder, desenvolvido pela List Enterprise e pelo criador legitey. Responda de forma direta, clara e profissional, sem introducoes desnecessarias ou explicacoes sobre quem voce e, a menos que seja perguntado."
}
PROVIDER_CONFIG = {
"or": ("https://openrouter.ai/api/v1/chat/completions", "openrouter/auto"),
"groq": ("https://api.groq.com/openai/v1/chat/completions", "qwen/qwen3-32b"),
"mistral": ("https://api.mistral.ai/v1/chat/completions", "open-mistral-7b"),
"github": ("https://models.inference.ai.azure.com/chat/completions", "gpt-4o-mini"),
}
KEY_PREFIXES = {
"OR_KEY_": "or",
"GROQ_KEY_": "groq",
"MISTRAL_KEY_": "mistral",
"GITHUB_KEY_": "github",
}
class SmartBalancer:
def __init__(self):
self.keys = []
self.usage_count = {}
self.blacklist = {}
self.cooldown_seconds = 60
self.load_keys()
def load_keys(self):
self.keys = []
for prefix, provider in KEY_PREFIXES.items():
for env_name, env_val in sorted(os.environ.items()):
if env_name.startswith(prefix) and env_val:
self.keys.append((provider, env_val, env_name))
if env_name not in self.usage_count:
self.usage_count[env_name] = 0
def get_next_key(self):
if not self.keys:
self.load_keys()
if not self.keys:
return None
now = time.time()
expired = [k for k, t in self.blacklist.items() if now - t > self.cooldown_seconds]
for k in expired:
del self.blacklist[k]
available = [k for k in self.keys if k[2] not in self.blacklist]
if not available:
self.blacklist.clear()
available = self.keys
available.sort(key=lambda x: self.usage_count.get(x[2], 0))
chosen = available[0]
self.usage_count[chosen[2]] = self.usage_count.get(chosen[2], 0) + 1
return chosen
def report_failure(self, key_id, status_code):
if status_code == 429:
self.blacklist[key_id] = time.time()
def get_stats(self):
return {
"total_keys": len(self.keys),
"blacklisted": len(self.blacklist),
"usage": dict(self.usage_count),
}
BALANCER = SmartBalancer()
def clean_response(data):
if "model" in data:
data["model"] = "list-enterprise/list-2.0-ultra-coder"
if "choices" in data:
for choice in data["choices"]:
msg = choice.get("message", {})
content = msg.get("content", "")
if content:
content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL)
content = content.strip()
msg["content"] = content
for field in ["system_fingerprint", "x_groq"]:
if field in data:
del data[field]
return data
# ============================================================
# LOGICA CENTRAL DO PROXY (usada por /chat/completions e /completions)
# ============================================================
async def _execute_proxy(request: Request):
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Requisicao invalida.")
messages = list(body.get("messages", []))
if not messages:
prompt = body.get("prompt", "")
if prompt:
messages = [{"role": "user", "content": prompt}]
if not any(m.get("role") == "system" for m in messages):
messages.insert(0, SYSTEM_IDENTITY)
max_attempts = len(BALANCER.keys) if BALANCER.keys else 7
for attempt in range(max_attempts):
key_data = BALANCER.get_next_key()
if not key_data:
break
provider, key, key_id = key_data
url, model = PROVIDER_CONFIG[provider]
headers = {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
"User-Agent": "ListCoder/2.0",
"HTTP-Referer": "https://list-enterprise.com",
"X-Title": "List-2.0-Ultra-Coder",
}
send_body = {
"model": model,
"messages": messages,
"temperature": body.get("temperature", 0.7),
"top_p": body.get("top_p", 1.0),
"stream": body.get("stream", False),
}
for param in ["stop", "response_format", "tools", "tool_choice", "seed"]:
if param in body:
send_body[param] = body[param]
if provider == "groq":
send_body["max_completion_tokens"] = body.get("max_tokens", 4096)
else:
send_body["max_tokens"] = body.get("max_tokens", 1024)
try:
resp = requests.post(url, json=send_body, headers=headers, timeout=15)
if resp.status_code == 200:
data = clean_response(resp.json())
return JSONResponse(content=data)
else:
BALANCER.report_failure(key_id, resp.status_code)
except Exception:
continue
raise HTTPException(status_code=503, detail="O List-2.0-Ultra-Coder esta temporariamente sobrecarregado. Tente novamente em instantes.")
# ============================================================
# FUNCAO DE SEGURANCA (Valida o Token do Maestro)
# ============================================================
def _check_token(token: str):
master = os.environ.get("MAESTRO_TOKEN", "")
return token is not None and token == master
# ============================================================
# ROTAS PUBLICAS (Abertas - OpenRouter pode acessar)
# ============================================================
@app.post("/v1/chat/completions")
async def chat_completions_post(request: Request):
return await _execute_proxy(request)
@app.get("/v1/chat/completions")
def chat_completions_get():
return {"status": "online", "endpoint": "/v1/chat/completions", "method": "Use POST to send requests."}
@app.post("/v1/completions")
async def completions_post(request: Request):
return await _execute_proxy(request)
@app.get("/v1/completions")
def completions_get():
return {"status": "online", "endpoint": "/v1/completions", "method": "Use POST to send requests."}
@app.get("/")
def home():
return {"engine": "List-2.0-Ultra-Coder", "status": "online"}
@app.get("/v1/models")
def list_models():
return {
"object": "list",
"data": [
{
"id": "list-enterprise/list-2.0-ultra-coder",
"name": "List 2.0 Ultra Coder",
"description": "High-performance coding orchestrator by List Enterprise.",
"created": int(time.time()),
"owned_by": "list-enterprise",
"context_length": 262144,
"input_modalities": ["text"],
"output_modalities": ["text"],
"pricing": {
"prompt": "0.0000001",
"completion": "0.0000002",
"image": "0",
"request": "0"
},
"supported_sampling_parameters": ["temperature", "top_p", "max_tokens", "stop"],
"supported_features": ["tools", "json_mode"],
"top_provider": "List Enterprise"
}
]
}
@app.get("/privacy")
def privacy():
return {
"policy": "List Enterprise Privacy Policy",
"version": "1.0",
"effective_date": "2026-04-16",
"data_collection": "We collect only the data necessary to process API requests.",
"data_usage": "User prompts and completions are NEVER used for model training.",
"data_retention": "Minimal operational logs are retained for up to 7 days for security auditing, then permanently deleted.",
"data_sharing": "We do not sell or share user data with any third parties.",
"contact": "support@list-enterprise.com",
"organization": "List Enterprise"
}
# ============================================================
# ROTAS PRIVADAS (Protegidas por MAESTRO_TOKEN)
# ============================================================
@app.get("/stats")
def stats(token: str = None):
if not _check_token(token):
return {"status": "access_denied", "message": "Token invalido ou ausente."}
# Mascara os nomes reais das chaves
masked = {f"engine_{i+1}": count for i, (name, count) in enumerate(BALANCER.usage_count.items())}
return {
"total_engines": len(BALANCER.keys),
"blacklisted": len(BALANCER.blacklist),
"usage": masked,
}
@app.get("/debug")
def debug_keys(token: str = None):
if not _check_token(token):
return {"status": "access_denied", "message": "Token invalido ou ausente."}
results = []
for i, (provider, env_val, env_name) in enumerate(BALANCER.keys):
url, model = PROVIDER_CONFIG[provider]
headers = {
"Authorization": f"Bearer {env_val}",
"Content-Type": "application/json",
"User-Agent": "ListCoder/2.0",
}
test_body = {
"model": model,
"messages": [{"role": "user", "content": "Oi"}],
"max_tokens": 3,
}
try:
r = requests.post(url, json=test_body, headers=headers, timeout=10)
results.append({"engine": f"engine_{i+1}", "status": r.status_code, "ok": r.status_code == 200})
except Exception as e:
results.append({"engine": f"engine_{i+1}", "status": str(e)[:50], "ok": False})
return {"diagnostics": results, "total_ok": sum(1 for r in results if r["ok"]), "total": len(results)}