"""Shared runtime helpers for the Maris Hugging Face chat Space.""" from __future__ import annotations import logging from typing import Any, Literal import httpx from huggingface_hub.utils import HfHubHTTPError from pydantic import BaseModel, ConfigDict, Field, field_validator from maris_core.orchestrator.routing import build_system_prompt from maris_core.personas import DEFAULT_PERSONA_ID, get_persona_catalog, resolve_persona from maris_core.space_agent import _complete_with_client from maris_core.utils.emotional_context import analyze_emotional_context from maris_core.utils.env import ( get_env_any, get_env_any_or_default, ) from maris_core.utils.hf_inference import create_hf_inference_client from maris_core.utils.hf_integration import HFIntegration logger = logging.getLogger(__name__) DEFAULT_CHAT_MODEL = "MarisUK/maris-ai-text" SPACE_CHAT_FALLBACK_MODELS_DEFAULT = ( "MarisUK/maris-assistant-runtime-fallback", "Qwen/Qwen3-Coder-480B-A35B-Instruct", ) DEFAULT_CHAT_SPACE_REPO = "MarisUK/maris.ai.chat" SPACE_CHAT_MESSAGE_MAX_CHARS = 8000 SPACE_CHAT_HISTORY_WINDOW = 16 def _validate_space_chat_model_id(value: str, source: str) -> str: normalized = value.strip() if not normalized: raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}") if "/" not in normalized or not all(part.strip() for part in normalized.split("/", 1)): raise RuntimeError(f"{source} modelim jābūt owner/name formātā.") return normalized def _get_space_chat_model(*names: str, default: str | None = None) -> str: source = ", ".join(names) value = get_env_any(*names) if value is None: if default is None: raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}") value = default return _validate_space_chat_model_id(value, source) class SpaceChatMessage(BaseModel): """Single message in the public HF chat Space conversation.""" model_config = ConfigDict(str_strip_whitespace=True) role: Literal["user", "assistant"] content: str = Field(min_length=1, max_length=SPACE_CHAT_MESSAGE_MAX_CHARS) class SpaceChatRequest(BaseModel): """User request payload for the Hugging Face chat Space.""" model_config = ConfigDict(str_strip_whitespace=True) message: str = Field(min_length=1, max_length=SPACE_CHAT_MESSAGE_MAX_CHARS) history: list[SpaceChatMessage] = Field(default_factory=list, max_length=24) model: str | None = Field(default=None, max_length=160) persona_id: str | None = Field(default=DEFAULT_PERSONA_ID, max_length=64) max_tokens: int = Field(default=900, ge=128, le=4096) temperature: float = Field(default=0.3, ge=0.0, le=1.0) session_id: str | None = Field(default=None, max_length=120) @field_validator("model") @classmethod def validate_model(cls, value: str | None) -> str | None: normalized = (value or "").strip() if not normalized: return None try: return _validate_space_chat_model_id(normalized, "model") except RuntimeError as exc: raise ValueError(str(exc)) from exc class SpaceChatResponse(BaseModel): """Model response returned to the Hugging Face chat UI.""" response: str model: str persona_id: str persona_title: str persona_summary: str detected_emotion: str emotion_confidence: float response_style: str class SpaceChatRuntimeInfo(BaseModel): """Public runtime metadata rendered by the chat Space UI.""" default_model: str available_models: tuple[str, ...] default_persona_id: str personas: list[dict[str, Any]] space_repo: str has_token: bool def list_space_chat_models() -> tuple[str, ...]: """Return the chat models exposed in the public Space.""" configured = get_env_any("MARIS_CHAT_MODELS", "HF_SPACE_CHAT_MODELS", default="") or "" configured_models = [ _validate_space_chat_model_id(item.strip(), "MARIS_CHAT_MODELS") for item in configured.split(",") if item.strip() ] # Chat Space vairs nepārņem aģenta modeļa mainīgos, lai publiskais čats # konsekventi lietotu tikai savu tekstam paredzēto konfigurāciju. default_model = _get_space_chat_model( "MARIS_CHAT_MODEL", "HF_SPACE_CHAT_MODEL", default=DEFAULT_CHAT_MODEL, ) return tuple(dict.fromkeys([default_model, *configured_models])) def list_space_chat_fallback_models() -> tuple[str, ...]: """Return hidden fallback models used when the selected model is unavailable.""" configured = ( get_env_any( "MARIS_CHAT_FALLBACK_MODELS", "HF_SPACE_CHAT_FALLBACK_MODELS", default=",".join(SPACE_CHAT_FALLBACK_MODELS_DEFAULT), ) or "" ) return tuple( dict.fromkeys( [ _validate_space_chat_model_id(item.strip(), "MARIS_CHAT_FALLBACK_MODELS") for item in configured.split(",") if item.strip() ] ) ) def resolve_space_chat_models(requested_model: str | None = None) -> tuple[str, ...]: """Return the ordered list of inference candidates for a chat request.""" selected = (requested_model or "").strip() runtime_models = list_space_chat_models() return tuple( dict.fromkeys( [ *([selected] if selected else []), *runtime_models, *list_space_chat_fallback_models(), ] ) ) def get_space_chat_runtime_info() -> SpaceChatRuntimeInfo: """Return the runtime metadata used by the Space UI.""" catalog = get_persona_catalog() return SpaceChatRuntimeInfo( default_model=list_space_chat_models()[0], available_models=list_space_chat_models(), default_persona_id=catalog.default_persona_id, personas=[persona.model_dump() for persona in catalog.personas], space_repo=get_env_any_or_default( "MARIS_CHAT_SPACE_REPO", "MARIS_PUBLIC_CHAT_SPACE_REPO", default=DEFAULT_CHAT_SPACE_REPO, ), has_token=bool(get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN")), ) def _trim_pending_user_turn( history: list[SpaceChatMessage], message: str ) -> list[SpaceChatMessage]: """Drop trailing copies of the current user turn from request history.""" trimmed_history = list(history[-SPACE_CHAT_HISTORY_WINDOW:]) normalized_message = message.strip() while ( trimmed_history and trimmed_history[-1].role == "user" and trimmed_history[-1].content.strip() == normalized_message ): trimmed_history.pop() return trimmed_history def build_space_chat_messages(request: SpaceChatRequest) -> list[dict[str, str]]: """Build a persona-aware conversation prompt for the public chat Space.""" persona = resolve_persona(request.persona_id) emotional_context = analyze_emotional_context(request.message) messages = [ { "role": "system", "content": ( build_system_prompt("general", emotional_context, persona_id=persona.id) + "\n\n" + "Tu strādā publiskā Hugging Face čata režīmā. " + "Atbildi skaidri, eleganti, konkrēti un bez lieka trokšņa. " + "Ja lietotājs prasa plānu, strukturē to punktos. " + "Ja nav pārliecības, skaidri pasaki ierobežojumu, neizdomā faktus." ), } ] for item in _trim_pending_user_turn(request.history, request.message): messages.append({"role": item.role, "content": item.content}) messages.append({"role": "user", "content": request.message}) return messages def _messages_to_generation_prompt(messages: list[dict[str, str]]) -> str: parts: list[str] = [] for item in messages: role = item.get("role", "user").strip().lower() if role == "system": label = "System" elif role == "assistant": label = "Assistant" else: label = "User" parts.append(f"{label}: {item.get('content', '').strip()}") parts.append("Assistant:") return "\n\n".join(parts) def _complete_with_generation_fallback( client: Any, *, models: tuple[str, ...], messages: list[dict[str, str]], max_tokens: int, temperature: float, ) -> tuple[str | None, str]: prompt = _messages_to_generation_prompt(messages) last_error: Exception | None = None for model in models: try: raw_response = client.text_generation( prompt=prompt, model=model, max_new_tokens=max_tokens, temperature=temperature, return_full_text=False, ) except AttributeError as exc: logger.warning("Maris chat text_generation fallback is unavailable: %s", exc) raise RuntimeError( "Maris AI inference klients neatbalsta text_generation fallback." ) from exc except StopIteration as exc: logger.warning( "Maris chat text_generation raised StopIteration for model %s: %s", model, exc, ) continue except ( OSError, TypeError, ValueError, RuntimeError, httpx.HTTPError, HfHubHTTPError, ) as exc: last_error = exc logger.warning("Maris chat text_generation failed for model %s: %s", model, exc) continue text = str(raw_response).strip() if text: return model, text logger.warning("Maris chat text_generation returned empty response for model %s", model) if last_error is not None: raise last_error return None, "" def _complete_space_chat_response( client: Any, *, models: tuple[str, ...], messages: list[dict[str, str]], max_tokens: int, temperature: float, ) -> tuple[str | None, str]: try: model_name, raw_response = _complete_with_client( client, models=models, messages=messages, max_tokens=max_tokens, temperature=temperature, ) except AttributeError: model_name, raw_response = None, "" except (OSError, TypeError, ValueError, RuntimeError, httpx.HTTPError, HfHubHTTPError): model_name, raw_response = None, "" else: if raw_response: return model_name, raw_response return _complete_with_generation_fallback( client, models=models, messages=messages, max_tokens=max_tokens, temperature=temperature, ) def _build_space_chat_emergency_response( request: SpaceChatRequest, *, runtime: SpaceChatRuntimeInfo, resolved_model: str, persona_title: str, ) -> str: return ( "Izvēlētais Hugging Face modelis šobrīd neatbildēja, bet čats palika darbībā ar drošo fallback režīmu.\n\n" f"- Pieprasītais modelis: `{request.model or runtime.default_model}`\n" f"- Rezerves modelis: `{resolved_model}`\n" f"- Persona: `{persona_title}`\n" f"- Space: `{runtime.space_repo}`\n\n" "Vari turpināt lietot jebkuru Hugging Face `owner/name` modeli. Ja konkrētais modelis neatbild, " "Space automātiski mēģina citus kandidātus un neatgriež tukšu 503 kļūdu." ) async def generate_space_chat_reply( request: SpaceChatRequest, *, client_factory: Any | None = None, token: str | None = None, ) -> SpaceChatResponse: """Generate a public chat reply using the Hugging Face inference client.""" runtime = get_space_chat_runtime_info() requested_model = request.model or runtime.default_model persona = resolve_persona(request.persona_id) emotional_context = analyze_emotional_context(request.message) messages = build_space_chat_messages(request) candidate_models = resolve_space_chat_models(requested_model) if client_factory is None: try: from huggingface_hub import InferenceClient # type: ignore except ImportError as exc: # pragma: no cover - import failure is environment-specific raise RuntimeError("Maris AI inference klients nav pieejams.") from exc client_factory = InferenceClient try: client = create_hf_inference_client(client_factory, token=token) model_name, raw_response = _complete_space_chat_response( client, models=candidate_models, messages=messages, max_tokens=request.max_tokens, temperature=request.temperature, ) response_text = raw_response.strip() if not response_text: raise RuntimeError("Maris AI neatgrieza derīgu atbildi.") except (OSError, TypeError, ValueError, RuntimeError, httpx.HTTPError, HfHubHTTPError) as exc: logger.warning("Maris chat inference failed: %s", exc) fallback_candidates = ( candidate_models[1:] if len(candidate_models) > 1 else candidate_models ) model_name = next(iter(fallback_candidates), requested_model) response_text = _build_space_chat_emergency_response( request, runtime=runtime, resolved_model=model_name, persona_title=persona.title, ) await HFIntegration().save_conversation( request.message, response_text, metadata={ "session_id": (request.session_id or "").strip() or None, "persona_id": persona.id, "requested_model": requested_model, "resolved_model": model_name or requested_model, "history_messages": len(request.history), "detected_emotion": emotional_context.emotion, "emotion_confidence": emotional_context.confidence, "response_style": emotional_context.response_style, "space_repo": runtime.space_repo, "public_space_chat": True, }, ) return SpaceChatResponse( response=response_text, model=model_name or requested_model, persona_id=persona.id, persona_title=persona.title, persona_summary=persona.summary, detected_emotion=emotional_context.emotion, emotion_confidence=emotional_context.confidence, response_style=emotional_context.response_style, )