grantforge-api / backend /core /llm_router.py
GrantForge Bot
Deploy to Hugging Face
2754b82
"""
LLM Router — wielomodelowy router dla GrantForge AI.
FAZA 2.1 (Stabilizacja Limitów): Grok 4.3 jako PRIMARY model dla zadań kreatywnych i krytycznych.
Fallback hierarchy:
legal_audit: Bielik 11b (Ollama) → Bielik-Instruct (HuggingFace) → Gemini-3.1-Pro
pii_anonymization: Bielik 11b → Gemini-3.1-Pro
critical/creative: Grok-4.3 → Gemini-3.1-Pro
standard: Gemini-3.1-Pro
fast: Gemini-3.1-Flash-Lite
Konfiguracja dla Bielika:
Ollama lokalnie: BIELIK_MODE=ollama (domyślnie)
HuggingFace API: BIELIK_MODE=huggingface + HUGGINGFACE_API_KEY
Bez GPU: BIELIK_MODE=disabled → tylko Gemini
Dokumentacja Bielik: https://huggingface.co/speakleash/Bielik-11B-v2.3-Instruct
"""
import os
import logging
from typing import Literal, Optional, Any
from langchain_core.runnables import Runnable
from langchain_core.callbacks import BaseCallbackHandler
logger = logging.getLogger(__name__)
class TelemetryCallbackHandler(BaseCallbackHandler):
"""Callback do logowania błędów LLM i fallbacków do telemetrii."""
def on_llm_error(self, error: BaseException, **kwargs: Any) -> Any:
try:
from core.telemetry import telemetry
telemetry.log(
"ERROR", "LLMRouter", f"Błąd LLM: {str(error)}. Próba fallbacku/retry."
)
except ImportError:
pass
# Task types dla type-safety
TaskType = Literal[
"standard",
"critical",
"creative",
"fast",
"legal_audit",
"pii_anonymization",
]
# ──────────────────────────────────────────────────────────────────────────────
# Konfiguracja Bielik
# ──────────────────────────────────────────────────────────────────────────────
_BIELIK_MODE = os.environ.get(
"BIELIK_MODE", "ollama"
) # ollama | huggingface | disabled
_BIELIK_OLLAMA_URL = os.environ.get("BIELIK_OLLAMA_URL", "http://localhost:11434")
_BIELIK_MODEL_NAME = os.environ.get("BIELIK_MODEL_NAME", "llama3.2")
_BIELIK_HF_REPO = os.environ.get(
"BIELIK_HF_REPO", "speakleash/Bielik-11B-v2.3-Instruct"
)
_BIELIK_HF_KEY = os.environ.get("HUGGINGFACE_API_KEY", "")
def _get_bielik_ollama(structured_output: bool = False):
"""
Bielik przez Ollama (lokalne GPU lub CPU — wolne ale bezpłatne).
Model: SpeakLeash/bielik-11b-v2.3-instruct:Q5_K_M (GGUF, ~7.8 GB)
Instalacja:
ollama pull SpeakLeash/bielik-11b-v2.3-instruct:Q5_K_M
"""
from langchain_ollama import ChatOllama
return ChatOllama(
model=_BIELIK_MODEL_NAME,
base_url=_BIELIK_OLLAMA_URL,
temperature=0.0,
format="json" if structured_output else None,
num_predict=4096, # max tokens
repeat_penalty=1.15, # ogranicza repetycje (ważne dla Bielika)
stop=["<|end|>", "</s>"], # tokeny stopu Bielika
keep_alive="30m", # trzymaj model w pamięci 30 min
)
def _get_bielik_huggingface():
"""
Bielik przez HuggingFace Inference API (bez GPU).
Wymaga tokenu: HUGGINGFACE_API_KEY
"""
from langchain_huggingface import HuggingFaceEndpoint
return HuggingFaceEndpoint(
repo_id=_BIELIK_HF_REPO,
task="text-generation",
huggingfacehub_api_token=_BIELIK_HF_KEY,
temperature=0.01,
max_new_tokens=2048,
repetition_penalty=1.15,
)
def _build_bielik(task_type: str) -> Optional[object]:
"""
Buduje instancję Bielika z odpowiednim backendem.
Zwraca None jeśli BIELIK_MODE=disabled lub błąd importu.
"""
if _BIELIK_MODE == "disabled":
logger.info(
f"[Router] Bielik wyłączony (BIELIK_MODE=disabled) dla '{task_type}'."
)
return None
structured = task_type == "legal_audit"
if _BIELIK_MODE == "huggingface":
if not _BIELIK_HF_KEY:
logger.warning("[Router] Brak HUGGINGFACE_API_KEY — Bielik HF niedostępny.")
return None
try:
llm = _get_bielik_huggingface()
logger.info(f"[Router] ✅ Bielik (HuggingFace API) dla '{task_type}'")
return llm
except ImportError:
logger.warning("[Router] langchain_huggingface nie zainstalowany.")
return None
# Domyślnie: ollama
try:
llm = _get_bielik_ollama(structured_output=structured)
# Ping Ollama żeby sprawdzić czy jest dostępny
# (ping nie jest konieczny — with_fallbacks() obsłuży błąd)
logger.info(
f"[Router] ✅ Bielik (Ollama @ {_BIELIK_OLLAMA_URL}, "
f"model={_BIELIK_MODEL_NAME}) dla '{task_type}'"
)
return llm
except ImportError:
logger.warning(
"[Router] langchain_community nie zainstalowany — Bielik niedostępny."
)
return None
except Exception as e:
logger.warning(f"[Router] Błąd tworzenia Bielika: {e}")
return None
# ──────────────────────────────────────────────────────────────────────────────
# Modele Gemini
# ──────────────────────────────────────────────────────────────────────────────
def _get_gemini(
model: str,
temperature: float = 0.0,
streaming: bool = False,
max_tokens: int = 8192,
callbacks: Optional[list] = None,
):
"""Tworzy ChatGoogleGenerativeAI z retry."""
from langchain_google_genai import ChatGoogleGenerativeAI
api_key = os.environ.get("GOOGLE_API_KEY") or os.environ.get(
"GEMINI_API_KEY", "missing_key"
)
# Walidacja klucza pod kątem znaków non-ASCII (placeholdery) i wartości domyślnych
try:
api_key.encode("ascii")
except UnicodeEncodeError:
logger.error(
"[Router] Klucz API zawiera niedozwolone znaki (non-ASCII). Użyto domyślnego klucza-zaślepki."
)
api_key = "invalid_key_non_ascii"
if api_key in ("missing_key", "invalid_key_non_ascii", "YOUR_GOOGLE_API_KEY"):
logger.warning(
f"[Router] Brak prawidłowego GOOGLE_API_KEY! Obecna wartość: {api_key}"
)
cb = [TelemetryCallbackHandler()]
if callbacks:
cb.extend(callbacks)
return ChatGoogleGenerativeAI(
model=model,
temperature=temperature,
google_api_key=api_key,
max_retries=2,
max_tokens=max_tokens,
streaming=streaming,
callbacks=cb,
)
# ──────────────────────────────────────────────────────────────────────────────
# Model Grok (xAI) - Faza 0 (Placeholder / Mock)
# ──────────────────────────────────────────────────────────────────────────────
def _get_grok(
model: str = "grok-4.3",
temperature: float = 0.0,
streaming: bool = False,
callbacks: Optional[list] = None,
):
"""Tworzy instancję modelu Grok (ChatXAI) lub zwraca Gemini jako fallback."""
api_key = os.environ.get("GROK_API_KEY") or os.environ.get("XAI_API_KEY")
if not api_key or api_key in ("YOUR_XAI_API_KEY", "YOUR_GROK_API_KEY"):
logger.info(
f"[Router] Brak GROK_API_KEY/XAI_API_KEY. Grok niedostępny dla modelu {model}. Fallback to Gemini."
)
return None
try:
from langchain_xai import ChatXAI
cb = [TelemetryCallbackHandler()]
if callbacks:
cb.extend(callbacks)
return ChatXAI(
xai_api_key=api_key,
model=model,
temperature=temperature,
max_retries=2,
callbacks=cb,
)
except ImportError:
logger.warning("[Router] langchain_xai nie zainstalowany. Fallback to Gemini.")
return None
# ──────────────────────────────────────────────────────────────────────────────
# Mock LLM dla środowisk deweloperskich
# ──────────────────────────────────────────────────────────────────────────────
class MockStructuredLLM(Runnable):
"""Zastępczy model LLM dla środowisk bez klucza API, zapobiegający zawieszaniu się."""
def __init__(self, structured_output_schema=None):
self.schema = structured_output_schema
def bind_tools(self, tools):
return self
def with_structured_output(self, schema):
return MockStructuredLLM(schema)
def invoke(self, input, config=None, **kwargs):
# Symulacja opóźnienia sieciowego
import time
time.sleep(1)
# Jeśli schema jest pydantic modelem
if self.schema and hasattr(self.schema, "model_construct"):
# Rekursywne budowanie domyślnego mocka na podstawie struktury
def _build_mock_data(field_type):
if hasattr(field_type, "__args__"): # Optional/Union
field_type = field_type.__args__[0]
if field_type is str:
return "Mocked text content"
if field_type is int:
return 100
if field_type is float:
return 0.99
if field_type is bool:
return True
if hasattr(field_type, "__origin__") and field_type.__origin__ is list:
return [_build_mock_data(field_type.__args__[0])]
if hasattr(field_type, "model_fields"):
return {
k: _build_mock_data(f.annotation)
for k, f in field_type.model_fields.items()
}
return "Mocked"
mock_data = {
k: _build_mock_data(f.annotation)
for k, f in self.schema.model_fields.items()
}
return self.schema.model_construct(**mock_data)
from langchain_core.messages import AIMessage
return AIMessage(
content="To jest zmockowana odpowiedź LLM z powodu braku GOOGLE_API_KEY w środowisku lokalnym."
)
# ──────────────────────────────────────────────────────────────────────────────
# Główny router
# ──────────────────────────────────────────────────────────────────────────────
def get_llm(
task_type: TaskType = "standard",
streaming: bool = False,
tools: Optional[list] = None,
structured_output_schema: Optional[Any] = None,
callbacks: Optional[list] = None,
):
"""
Wielomodelowy router z fallback chain.
Routing (FAZA 2.1):
legal_audit: Bielik → Gemini-3.1-Pro (temperatura 0.0 — zero halucynacji)
pii_anonymization: Bielik → Gemini-3.1-Pro
critical: Grok-4.3 → Gemini-3.1-Pro (głęboka analiza, t=0.2)
creative: Grok-4.3 → Gemini-3.1-Pro (pisanie sekcji narracyjnych, t=0.6)
standard: Gemini-3.1-Pro (generacja sekcji, t=0.1)
fast: Gemini-3.1-Flash-Lite (szybkie odpowiedzi, t=0.0)
"""
logger.info(f"[Router] task_type='{task_type}' streaming={streaming}")
try:
from core.telemetry import telemetry
telemetry.log(
"INFO",
"LLMRouter",
f"Wybieranie modelu dla zadania: {task_type}",
{"streaming": streaming},
)
except ImportError:
pass
# ── Bielik path (legal_audit + pii_anonymization) ──────────────────────
if task_type in ("legal_audit", "pii_anonymization"):
gemini_fallback = _get_gemini(
model="gemini-3.1-pro",
temperature=0.0 if task_type == "legal_audit" else 0.0,
callbacks=callbacks,
)
if tools:
gemini_fallback = gemini_fallback.bind_tools(tools)
if structured_output_schema:
try:
gemini_fallback = gemini_fallback.with_structured_output(
structured_output_schema
)
except NotImplementedError:
pass
bielik = _build_bielik(task_type)
if bielik is not None:
if tools and hasattr(bielik, "bind_tools"):
try:
bielik = bielik.bind_tools(tools)
except NotImplementedError:
# Model nie obsługuje narzędzi, zwracamy fallback
logger.warning(
"Bielik nie obsługuje bind_tools. Używam Gemini jako głównego modelu dla tego zadania."
)
return gemini_fallback
if structured_output_schema and hasattr(bielik, "with_structured_output"):
try:
bielik = bielik.with_structured_output(structured_output_schema)
except NotImplementedError:
logger.warning(
"Bielik nie obsługuje with_structured_output. Używam Gemini jako głównego modelu dla tego zadania."
)
return gemini_fallback
# PRIMARY: Bielik | FALLBACK: Gemini-1.5-Pro
return bielik.with_fallbacks([gemini_fallback])
logger.info(
f"[Router] Bielik N/A — używam Gemini jako primary dla '{task_type}'"
)
return gemini_fallback
# ── Gemini paths ────────────────────────────────────────────────────────
api_key = os.environ.get("GOOGLE_API_KEY") or os.environ.get(
"GEMINI_API_KEY", "missing_key"
)
use_local_fallback = api_key in (
"missing_key",
"invalid_key_non_ascii",
"YOUR_GOOGLE_API_KEY",
)
if use_local_fallback:
logger.warning(
f"[Router] GOOGLE_API_KEY invalid/missing. Falling back to Mock LLM for {task_type} to prevent hangs."
)
mock_llm = MockStructuredLLM(structured_output_schema)
if tools:
mock_llm = mock_llm.bind_tools(tools)
return mock_llm
if task_type == "critical":
grok = _get_grok(temperature=0.2, streaming=streaming, callbacks=callbacks)
gemini_fallback = _get_gemini(
"gemini-3.1-pro", temperature=0.2, streaming=streaming, callbacks=callbacks
)
if grok:
if tools:
try:
grok = grok.bind_tools(tools)
except NotImplementedError:
pass
try:
gemini_fallback = gemini_fallback.bind_tools(tools)
except NotImplementedError:
pass
if structured_output_schema:
try:
grok = grok.with_structured_output(structured_output_schema)
except NotImplementedError:
pass
try:
gemini_fallback = gemini_fallback.with_structured_output(
structured_output_schema
)
except NotImplementedError:
pass
return grok.with_fallbacks([gemini_fallback])
return gemini_fallback
elif task_type == "creative":
grok = _get_grok(temperature=0.6, streaming=streaming, callbacks=callbacks)
gemini_fallback = _get_gemini(
"gemini-3.1-pro",
temperature=0.6,
streaming=streaming,
max_tokens=8192,
callbacks=callbacks,
)
if grok:
if tools:
try:
grok = grok.bind_tools(tools)
except NotImplementedError:
pass
try:
gemini_fallback = gemini_fallback.bind_tools(tools)
except NotImplementedError:
pass
if structured_output_schema:
try:
grok = grok.with_structured_output(structured_output_schema)
except NotImplementedError:
pass
try:
gemini_fallback = gemini_fallback.with_structured_output(
structured_output_schema
)
except NotImplementedError:
pass
return grok.with_fallbacks([gemini_fallback])
return gemini_fallback
elif task_type == "fast":
return _get_gemini(
"gemini-3.1-flash-lite", temperature=0.0, streaming=streaming, callbacks=callbacks
)
else: # standard
gemini = _get_gemini(
"gemini-3.1-pro", temperature=0.1, streaming=streaming, callbacks=callbacks
)
grok_fallback = _get_grok(
temperature=0.1, streaming=streaming, callbacks=callbacks
)
if grok_fallback:
if tools:
try:
gemini = gemini.bind_tools(tools)
except NotImplementedError:
pass
try:
grok_fallback = grok_fallback.bind_tools(tools)
except NotImplementedError:
pass
if structured_output_schema:
try:
gemini = gemini.with_structured_output(structured_output_schema)
except NotImplementedError:
pass
try:
grok_fallback = grok_fallback.with_structured_output(
structured_output_schema
)
except NotImplementedError:
pass
return gemini.with_fallbacks([grok_fallback])
if tools:
try:
gemini = gemini.bind_tools(tools)
except NotImplementedError:
pass
if structured_output_schema:
try:
gemini = gemini.with_structured_output(structured_output_schema)
except NotImplementedError:
pass
return gemini
def get_bielik_status() -> dict:
"""
Sprawdza dostępność Bielika.
Używane przez /api/health endpoint.
"""
if _BIELIK_MODE == "disabled":
return {
"available": False,
"mode": "disabled",
"reason": "BIELIK_MODE=disabled",
}
if _BIELIK_MODE == "huggingface":
available = bool(_BIELIK_HF_KEY)
return {
"available": available,
"mode": "huggingface",
"repo": _BIELIK_HF_REPO,
"reason": None if available else "Brak HUGGINGFACE_API_KEY",
}
# Ollama — sprawdź ping
try:
import httpx
r = httpx.get(f"{_BIELIK_OLLAMA_URL}/api/tags", timeout=2.0)
models = [m["name"] for m in r.json().get("models", [])]
bielik_loaded = any("bielik" in m.lower() for m in models)
return {
"available": bielik_loaded,
"mode": "ollama",
"url": _BIELIK_OLLAMA_URL,
"model": _BIELIK_MODEL_NAME,
"loaded_models": models,
"reason": None if bielik_loaded else "Model bielik nie załadowany w Ollama",
}
except Exception as e:
return {
"available": False,
"mode": "ollama",
"url": _BIELIK_OLLAMA_URL,
"reason": f"Ollama niedostępny: {str(e)[:60]}",
}