llm-ready-data / app /services /chat_service.py
light-infer-chat's picture
ok
2cd9b1c
Raw
History Blame Contribute Delete
18.5 kB
from __future__ import annotations
import json
import logging
from typing import Any, Dict, List, Optional
import aiohttp
from redis.asyncio import Redis
from app.config import (
AION_LABS_BASE_URL,
AION_LABS_CHAT_PATH,
AION_LABS_DEFAULT_MODEL,
DEFAULT_MAX_TOKENS,
DEFAULT_TEMPERATURE,
DEFAULT_TOP_P,
MEGANOVA_BASE_URL,
MEGANOVA_CHAT_PATH,
MODELS,
OPENROUTER_MIMIKA_BASE_URL,
OPENROUTER_MIMIKA_CHAT_PATH,
OPENROUTER_MIMIKA_MODEL,
get_settings,
)
from app.utils.json_utils import extract_json_blocks
logger = logging.getLogger(__name__)
_settings = get_settings()
PREFIX = "ai_lb"
AION_PREFIX = "ai_lb:aion"
DEFAULT_JSON_PROMPT = "Return your response as a valid JSON object inside a JSON code block (```json)."
KEY_IDS: List[str] = []
KEY_MAP: Dict[str, str] = {}
def _refresh_keys() -> None:
global KEY_IDS, KEY_MAP
keys_str = _settings.ai_api_keys
if keys_str:
keys = [k.strip() for k in keys_str.split(",") if k.strip()]
KEY_IDS = [f"k{i}" for i in range(len(keys))]
KEY_MAP = {f"k{i}": key for i, key in enumerate(keys)}
else:
KEY_IDS = []
KEY_MAP = {}
_refresh_keys()
def build_default_system_prompt(model_name: Optional[str]) -> str:
name = model_name or "agentdeck-1.0"
return (
f"Role: You are LLM model {name}. "
f"You are built by AgentDeck. "
f"You are a helpful, respectful, and honest assistant. "
f"Always respond in a concise and accurate manner."
)
def inject_system_identity(
messages: List[Dict[str, str]],
model_name: str,
) -> List[Dict[str, str]]:
default_prompt = build_default_system_prompt(model_name)
has_system = messages and messages[0].get("role") == "system"
if has_system:
messages[0] = {
"role": "system",
"content": f"{default_prompt}\n\n{messages[0]['content']}",
}
else:
messages.insert(0, {"role": "system", "content": default_prompt})
return messages
def prepare_messages(
messages: List[Dict[str, str]],
response_format: Optional[Dict[str, str]],
) -> List[Dict[str, str]]:
if not response_format or response_format.get("type") != "json_object":
return messages
has_system = messages and messages[0].get("role") == "system"
if has_system:
return [
{
"role": "system",
"content": f"{messages[0]['content']}\n\n{DEFAULT_JSON_PROMPT}",
},
*messages[1:],
]
return [
{"role": "system", "content": DEFAULT_JSON_PROMPT},
*messages,
]
def attach_json_content(
response_data: Dict[str, Any],
response_format: Optional[Dict[str, str]],
) -> None:
if not response_format or response_format.get("type") != "json_object":
return
try:
choices = response_data.get("choices", [])
if not choices:
return
content = choices[0].get("message", {}).get("content", "")
if content:
parsed = extract_json_blocks(content)
if parsed:
response_data["parsed"] = (
parsed[0] if len(parsed) == 1 else parsed
)
except Exception as exc:
response_data["parsed"] = {"error": str(exc)}
async def call_openrouter_mimika(
messages: List[Dict[str, str]],
response_format: Optional[Dict[str, str]],
max_tokens: int = DEFAULT_MAX_TOKENS,
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
) -> Optional[Dict[str, Any]]:
api_key = _settings.openrouter_mimika_api_key
if not api_key:
logger.info("OPENROUTER_MIMIKA_API_KEY not set, skipping")
return None
prepared = prepare_messages(messages, response_format)
payload = {
"model": OPENROUTER_MIMIKA_MODEL,
"messages": prepared,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"stream": False,
}
logger.info("Calling OpenRouter Mimika...")
try:
timeout = aiohttp.ClientTimeout(total=_settings.request_timeout_ms / 1000)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f"{OPENROUTER_MIMIKA_BASE_URL}{OPENROUTER_MIMIKA_CHAT_PATH}",
json=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
) as resp:
if resp.status != 200:
logger.warning("OpenRouter Mimika HTTP %d", resp.status)
return None
data = await resp.json()
attach_json_content(data, response_format)
return data
except Exception as exc:
logger.warning("OpenRouter Mimika failed: %s", exc)
return None
def _get_meganova_key() -> Optional[str]:
keys_str = _settings.ai_api_keys
if not keys_str:
return None
keys = [k.strip() for k in keys_str.split(",") if k.strip()]
return keys[0] if keys else None
async def _acquire_slot(
redis: Redis,
scripts: Dict[str, str],
) -> Optional[Dict[str, Any]]:
_refresh_keys()
if not KEY_IDS:
return None
raw = await redis.evalsha(
scripts["acquire_slot_sha"],
1,
PREFIX,
json.dumps(KEY_IDS),
json.dumps(MODELS),
str(_settings.rounds_per_model),
str(len(KEY_IDS)),
)
if not raw or raw[0] != "ok":
return None
return {
"keyId": raw[1],
"model": raw[2],
"modelIndex": int(raw[3]),
}
async def _acquire_slot_for_model(
redis: Redis,
target_model: str,
) -> Optional[Dict[str, Any]]:
_refresh_keys()
if not KEY_IDS:
return None
target_idx = MODELS.index(target_model) if target_model in MODELS else -1
if target_idx == -1:
return None
for key_id in KEY_IDS:
lock_k = f"{PREFIX}:key:{key_id}:lock"
failed_k = f"{PREFIX}:key:{key_id}:m{target_idx}:failed"
locked = await redis.exists(lock_k)
if locked:
continue
failed = await redis.get(failed_k)
if failed == "1":
continue
rounds_k = f"{PREFIX}:key:{key_id}:m{target_idx}:rounds"
used = int(await redis.get(rounds_k) or "0")
if used < _settings.rounds_per_model:
await redis.incr(rounds_k)
return {
"keyId": key_id,
"model": target_model,
"modelIndex": target_idx,
}
return None
async def _mark_failure(
redis: Redis,
scripts: Dict[str, str],
key_id: str,
model_index: int,
) -> str:
result = await redis.evalsha(
scripts["lock_key_sha"],
1,
PREFIX,
key_id,
str(model_index),
str(len(MODELS)),
str(_settings.key_lock_ttl),
)
return result
async def call_meganova(
redis: Redis,
scripts: Dict[str, str],
messages: List[Dict[str, str]],
response_format: Optional[Dict[str, str]],
max_tokens: int = DEFAULT_MAX_TOKENS,
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
target_model: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
if target_model and target_model not in MODELS:
return None
total_unique_slots = len(MODELS) * len(KEY_IDS)
tried: set = set()
hard_cap = total_unique_slots * 2 + 2
loops = 0
max_tries = len(KEY_IDS) if target_model else total_unique_slots
while len(tried) < max_tries and loops < hard_cap:
loops += 1
slot = (
await _acquire_slot_for_model(redis, target_model)
if target_model
else await _acquire_slot(redis, scripts)
)
if not slot:
return None
combo_key = f"{slot['keyId']}:{slot['modelIndex']}"
if combo_key in tried:
continue
tried.add(combo_key)
prepared = prepare_messages(messages, response_format)
api_key = KEY_MAP.get(slot["keyId"])
if not api_key:
continue
payload = {
"messages": prepared,
"model": slot["model"],
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"stream": False,
}
try:
timeout = aiohttp.ClientTimeout(total=_settings.request_timeout_ms / 1000)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f"{MEGANOVA_BASE_URL}{MEGANOVA_CHAT_PATH}",
json=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
) as resp:
if resp.status >= 400:
logger.warning(
"MegaNova HTTP %d for keyId=%s model=%s",
resp.status,
slot["keyId"],
slot["model"],
)
await _mark_failure(redis, scripts, slot["keyId"], slot["modelIndex"])
continue
data = await resp.json()
attach_json_content(data, response_format)
return data
except Exception as exc:
logger.warning("MegaNova request failed: %s", exc)
continue
return None
async def _get_next_aion_key(redis: Redis) -> Optional[str]:
keys_json = await redis.get(f"{AION_PREFIX}:keys")
if not keys_json:
return None
keys: List[str] = json.loads(keys_json)
if not keys:
return None
ptr = await redis.incr(f"{AION_PREFIX}:rr_ptr")
idx = (ptr - 1) % len(keys)
await redis.incr(f"{AION_PREFIX}:key:{idx}:uses")
logger.info("[aion] Round-robin: ptr=%s, idx=%s, total=%s", ptr, idx, len(keys))
return keys[idx]
async def call_aion_labs(
redis: Redis,
messages: List[Dict[str, str]],
response_format: Optional[Dict[str, str]],
model: str = AION_LABS_DEFAULT_MODEL,
max_tokens: int = DEFAULT_MAX_TOKENS,
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
) -> Optional[Dict[str, Any]]:
keys_json = await redis.get(f"{AION_PREFIX}:keys")
if not keys_json:
logger.info("[aion] No keys in Redis, skipping")
return None
keys: List[str] = json.loads(keys_json)
if not keys:
return None
total_tries = min(len(keys), 3)
for attempt in range(total_tries):
key = await _get_next_aion_key(redis)
if not key:
return None
prepared = prepare_messages(messages, response_format)
payload = {
"model": model,
"messages": prepared,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"stream": False,
}
logger.info("[aion] Attempt %s/%s", attempt + 1, total_tries)
try:
timeout = aiohttp.ClientTimeout(total=_settings.request_timeout_ms / 1000)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f"{AION_LABS_BASE_URL}{AION_LABS_CHAT_PATH}",
json=payload,
headers={
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
},
) as resp:
if resp.status != 200:
logger.warning("[aion] Attempt %s HTTP %s", attempt + 1, resp.status)
continue
data = await resp.json()
attach_json_content(data, response_format)
return data
except Exception as exc:
logger.warning("[aion] Attempt %s failed: %s", attempt + 1, exc)
continue
logger.info("[aion] All attempts exhausted")
return None
async def chat_completion(
messages: List[Dict[str, str]],
model: str = "agentdeck-1.0",
response_format: Optional[Dict[str, str]] = None,
max_tokens: int = DEFAULT_MAX_TOKENS,
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
provider: Optional[str] = None,
redis: Optional[Redis] = None,
scripts: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
messages = inject_system_identity(messages, model)
if not provider:
provider = "meganova"
logger.info(
"Chat completion: model=%s provider=%s messages=%d",
model,
provider,
len(messages),
)
target = model if model in MODELS else None
if provider == "aionlabs":
if redis and scripts:
result = await call_aion_labs(
redis, messages, response_format, model or AION_LABS_DEFAULT_MODEL,
max_tokens, temperature, top_p,
)
else:
result = await call_aion_labs_no_redis(
messages, response_format, model or AION_LABS_DEFAULT_MODEL,
max_tokens, temperature, top_p,
)
if result:
result["model"] = model
return result
raise RuntimeError("AionLabs request failed")
if provider == "openprovider":
result = await call_openrouter_mimika(
messages, response_format, max_tokens, temperature, top_p,
)
if result:
result["model"] = model
return result
raise RuntimeError("OpenRouter Mimika request failed")
if redis and scripts:
result = await call_meganova(
redis, scripts, messages, response_format,
max_tokens, temperature, top_p, target,
)
else:
result = await call_meganova_no_redis(
messages, response_format, max_tokens, temperature, top_p, target,
)
if result:
result["model"] = model
return result
logger.info("MegaNova failed, falling back to AionLabs")
if redis and scripts:
result = await call_aion_labs(
redis, messages, response_format, model or AION_LABS_DEFAULT_MODEL,
max_tokens, temperature, top_p,
)
else:
result = await call_aion_labs_no_redis(
messages, response_format, model or AION_LABS_DEFAULT_MODEL,
max_tokens, temperature, top_p,
)
if result:
result["model"] = model
return result
logger.info("AionLabs failed, falling back to OpenRouter Mimika")
result = await call_openrouter_mimika(
messages, response_format, max_tokens, temperature, top_p,
)
if result:
result["model"] = model
return result
raise RuntimeError("All AI providers exhausted")
async def call_meganova_no_redis(
messages: List[Dict[str, str]],
response_format: Optional[Dict[str, str]],
max_tokens: int = DEFAULT_MAX_TOKENS,
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
target_model: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
api_key = _get_meganova_key()
if not api_key:
logger.info("No MegaNova keys configured, skipping")
return None
prepared = prepare_messages(messages, response_format)
payload = {
"model": target_model or MODELS[1],
"messages": prepared,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"stream": False,
}
logger.info("Calling MegaNova (no Redis)...")
try:
timeout = aiohttp.ClientTimeout(total=_settings.request_timeout_ms / 1000)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f"{MEGANOVA_BASE_URL}{MEGANOVA_CHAT_PATH}",
json=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
) as resp:
if resp.status != 200:
logger.warning("MegaNova HTTP %d", resp.status)
return None
data = await resp.json()
attach_json_content(data, response_format)
return data
except Exception as exc:
logger.warning("MegaNova failed: %s", exc)
return None
async def call_aion_labs_no_redis(
messages: List[Dict[str, str]],
response_format: Optional[Dict[str, str]],
model: str = AION_LABS_DEFAULT_MODEL,
max_tokens: int = DEFAULT_MAX_TOKENS,
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
) -> Optional[Dict[str, Any]]:
keys_str = _settings.aion_lab_keys
if not keys_str:
logger.info("No AION keys configured, skipping")
return None
keys = [k.strip() for k in keys_str.split(",") if k.strip()]
if not keys:
return None
for attempt, key in enumerate(keys[:3]):
prepared = prepare_messages(messages, response_format)
payload = {
"model": model,
"messages": prepared,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"stream": False,
}
logger.info("[aion] (no redis) Attempt %s/%s", attempt + 1, min(len(keys), 3))
try:
timeout = aiohttp.ClientTimeout(total=_settings.request_timeout_ms / 1000)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f"{AION_LABS_BASE_URL}{AION_LABS_CHAT_PATH}",
json=payload,
headers={
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
},
) as resp:
if resp.status != 200:
continue
data = await resp.json()
attach_json_content(data, response_format)
return data
except Exception:
continue
return None