| """Shared runtime helpers for the Maris Hugging Face chat Space.""" |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from typing import Any, Literal |
|
|
| import httpx |
| from huggingface_hub.utils import HfHubHTTPError |
| from pydantic import BaseModel, ConfigDict, Field, field_validator |
|
|
| from maris_core.orchestrator.routing import build_system_prompt |
| from maris_core.personas import DEFAULT_PERSONA_ID, get_persona_catalog, resolve_persona |
| from maris_core.space_agent import _complete_with_client |
| from maris_core.utils.emotional_context import analyze_emotional_context |
| from maris_core.utils.env import ( |
| get_env_any, |
| get_env_any_or_default, |
| ) |
| from maris_core.utils.hf_inference import create_hf_inference_client |
| from maris_core.utils.hf_integration import HFIntegration |
|
|
| logger = logging.getLogger(__name__) |
|
|
| DEFAULT_CHAT_MODEL = "MarisUK/maris-ai-text" |
| SPACE_CHAT_FALLBACK_MODELS_DEFAULT = ( |
| "MarisUK/maris-assistant-runtime-fallback", |
| "Qwen/Qwen3-Coder-480B-A35B-Instruct", |
| ) |
| DEFAULT_CHAT_SPACE_REPO = "MarisUK/maris.ai.chat" |
| SPACE_CHAT_MESSAGE_MAX_CHARS = 8000 |
| SPACE_CHAT_HISTORY_WINDOW = 16 |
|
|
|
|
| def _validate_space_chat_model_id(value: str, source: str) -> str: |
| normalized = value.strip() |
| if not normalized: |
| raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}") |
| if "/" not in normalized or not all(part.strip() for part in normalized.split("/", 1)): |
| raise RuntimeError(f"{source} modelim jābūt owner/name formātā.") |
| return normalized |
|
|
|
|
| def _get_space_chat_model(*names: str, default: str | None = None) -> str: |
| source = ", ".join(names) |
| value = get_env_any(*names) |
| if value is None: |
| if default is None: |
| raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}") |
| value = default |
| return _validate_space_chat_model_id(value, source) |
|
|
|
|
| class SpaceChatMessage(BaseModel): |
| """Single message in the public HF chat Space conversation.""" |
|
|
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| role: Literal["user", "assistant"] |
| content: str = Field(min_length=1, max_length=SPACE_CHAT_MESSAGE_MAX_CHARS) |
|
|
|
|
| class SpaceChatRequest(BaseModel): |
| """User request payload for the Hugging Face chat Space.""" |
|
|
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| message: str = Field(min_length=1, max_length=SPACE_CHAT_MESSAGE_MAX_CHARS) |
| history: list[SpaceChatMessage] = Field(default_factory=list, max_length=24) |
| model: str | None = Field(default=None, max_length=160) |
| persona_id: str | None = Field(default=DEFAULT_PERSONA_ID, max_length=64) |
| max_tokens: int = Field(default=900, ge=128, le=4096) |
| temperature: float = Field(default=0.3, ge=0.0, le=1.0) |
| session_id: str | None = Field(default=None, max_length=120) |
|
|
| @field_validator("model") |
| @classmethod |
| def validate_model(cls, value: str | None) -> str | None: |
| normalized = (value or "").strip() |
| if not normalized: |
| return None |
| try: |
| return _validate_space_chat_model_id(normalized, "model") |
| except RuntimeError as exc: |
| raise ValueError(str(exc)) from exc |
|
|
|
|
| class SpaceChatResponse(BaseModel): |
| """Model response returned to the Hugging Face chat UI.""" |
|
|
| response: str |
| model: str |
| persona_id: str |
| persona_title: str |
| persona_summary: str |
| detected_emotion: str |
| emotion_confidence: float |
| response_style: str |
|
|
|
|
| class SpaceChatRuntimeInfo(BaseModel): |
| """Public runtime metadata rendered by the chat Space UI.""" |
|
|
| default_model: str |
| available_models: tuple[str, ...] |
| default_persona_id: str |
| personas: list[dict[str, Any]] |
| space_repo: str |
| has_token: bool |
|
|
|
|
| def list_space_chat_models() -> tuple[str, ...]: |
| """Return the chat models exposed in the public Space.""" |
| configured = get_env_any("MARIS_CHAT_MODELS", "HF_SPACE_CHAT_MODELS", default="") or "" |
| configured_models = [ |
| _validate_space_chat_model_id(item.strip(), "MARIS_CHAT_MODELS") |
| for item in configured.split(",") |
| if item.strip() |
| ] |
| |
| |
| default_model = _get_space_chat_model( |
| "MARIS_CHAT_MODEL", |
| "HF_SPACE_CHAT_MODEL", |
| default=DEFAULT_CHAT_MODEL, |
| ) |
| return tuple(dict.fromkeys([default_model, *configured_models])) |
|
|
|
|
| def list_space_chat_fallback_models() -> tuple[str, ...]: |
| """Return hidden fallback models used when the selected model is unavailable.""" |
| configured = ( |
| get_env_any( |
| "MARIS_CHAT_FALLBACK_MODELS", |
| "HF_SPACE_CHAT_FALLBACK_MODELS", |
| default=",".join(SPACE_CHAT_FALLBACK_MODELS_DEFAULT), |
| ) |
| or "" |
| ) |
| return tuple( |
| dict.fromkeys( |
| [ |
| _validate_space_chat_model_id(item.strip(), "MARIS_CHAT_FALLBACK_MODELS") |
| for item in configured.split(",") |
| if item.strip() |
| ] |
| ) |
| ) |
|
|
|
|
| def resolve_space_chat_models(requested_model: str | None = None) -> tuple[str, ...]: |
| """Return the ordered list of inference candidates for a chat request.""" |
| selected = (requested_model or "").strip() |
| runtime_models = list_space_chat_models() |
| return tuple( |
| dict.fromkeys( |
| [ |
| *([selected] if selected else []), |
| *runtime_models, |
| *list_space_chat_fallback_models(), |
| ] |
| ) |
| ) |
|
|
|
|
| def get_space_chat_runtime_info() -> SpaceChatRuntimeInfo: |
| """Return the runtime metadata used by the Space UI.""" |
| catalog = get_persona_catalog() |
| return SpaceChatRuntimeInfo( |
| default_model=list_space_chat_models()[0], |
| available_models=list_space_chat_models(), |
| default_persona_id=catalog.default_persona_id, |
| personas=[persona.model_dump() for persona in catalog.personas], |
| space_repo=get_env_any_or_default( |
| "MARIS_CHAT_SPACE_REPO", |
| "MARIS_PUBLIC_CHAT_SPACE_REPO", |
| default=DEFAULT_CHAT_SPACE_REPO, |
| ), |
| has_token=bool(get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN")), |
| ) |
|
|
|
|
| def _trim_pending_user_turn( |
| history: list[SpaceChatMessage], message: str |
| ) -> list[SpaceChatMessage]: |
| """Drop trailing copies of the current user turn from request history.""" |
| trimmed_history = list(history[-SPACE_CHAT_HISTORY_WINDOW:]) |
| normalized_message = message.strip() |
| while ( |
| trimmed_history |
| and trimmed_history[-1].role == "user" |
| and trimmed_history[-1].content.strip() == normalized_message |
| ): |
| trimmed_history.pop() |
| return trimmed_history |
|
|
|
|
| def build_space_chat_messages(request: SpaceChatRequest) -> list[dict[str, str]]: |
| """Build a persona-aware conversation prompt for the public chat Space.""" |
| persona = resolve_persona(request.persona_id) |
| emotional_context = analyze_emotional_context(request.message) |
| messages = [ |
| { |
| "role": "system", |
| "content": ( |
| build_system_prompt("general", emotional_context, persona_id=persona.id) |
| + "\n\n" |
| + "Tu strādā publiskā Hugging Face čata režīmā. " |
| + "Atbildi skaidri, eleganti, konkrēti un bez lieka trokšņa. " |
| + "Ja lietotājs prasa plānu, strukturē to punktos. " |
| + "Ja nav pārliecības, skaidri pasaki ierobežojumu, neizdomā faktus." |
| ), |
| } |
| ] |
| for item in _trim_pending_user_turn(request.history, request.message): |
| messages.append({"role": item.role, "content": item.content}) |
| messages.append({"role": "user", "content": request.message}) |
| return messages |
|
|
|
|
| def _messages_to_generation_prompt(messages: list[dict[str, str]]) -> str: |
| parts: list[str] = [] |
| for item in messages: |
| role = item.get("role", "user").strip().lower() |
| if role == "system": |
| label = "System" |
| elif role == "assistant": |
| label = "Assistant" |
| else: |
| label = "User" |
| parts.append(f"{label}: {item.get('content', '').strip()}") |
| parts.append("Assistant:") |
| return "\n\n".join(parts) |
|
|
|
|
| def _complete_with_generation_fallback( |
| client: Any, |
| *, |
| models: tuple[str, ...], |
| messages: list[dict[str, str]], |
| max_tokens: int, |
| temperature: float, |
| ) -> tuple[str | None, str]: |
| prompt = _messages_to_generation_prompt(messages) |
| last_error: Exception | None = None |
|
|
| for model in models: |
| try: |
| raw_response = client.text_generation( |
| prompt=prompt, |
| model=model, |
| max_new_tokens=max_tokens, |
| temperature=temperature, |
| return_full_text=False, |
| ) |
| except AttributeError as exc: |
| logger.warning("Maris chat text_generation fallback is unavailable: %s", exc) |
| raise RuntimeError( |
| "Maris AI inference klients neatbalsta text_generation fallback." |
| ) from exc |
| except StopIteration as exc: |
| logger.warning( |
| "Maris chat text_generation raised StopIteration for model %s: %s", |
| model, |
| exc, |
| ) |
| continue |
| except ( |
| OSError, |
| TypeError, |
| ValueError, |
| RuntimeError, |
| httpx.HTTPError, |
| HfHubHTTPError, |
| ) as exc: |
| last_error = exc |
| logger.warning("Maris chat text_generation failed for model %s: %s", model, exc) |
| continue |
|
|
| text = str(raw_response).strip() |
| if text: |
| return model, text |
| logger.warning("Maris chat text_generation returned empty response for model %s", model) |
|
|
| if last_error is not None: |
| raise last_error |
| return None, "" |
|
|
|
|
| def _complete_space_chat_response( |
| client: Any, |
| *, |
| models: tuple[str, ...], |
| messages: list[dict[str, str]], |
| max_tokens: int, |
| temperature: float, |
| ) -> tuple[str | None, str]: |
| try: |
| model_name, raw_response = _complete_with_client( |
| client, |
| models=models, |
| messages=messages, |
| max_tokens=max_tokens, |
| temperature=temperature, |
| ) |
| except AttributeError: |
| model_name, raw_response = None, "" |
| except (OSError, TypeError, ValueError, RuntimeError, httpx.HTTPError, HfHubHTTPError): |
| model_name, raw_response = None, "" |
| else: |
| if raw_response: |
| return model_name, raw_response |
|
|
| return _complete_with_generation_fallback( |
| client, |
| models=models, |
| messages=messages, |
| max_tokens=max_tokens, |
| temperature=temperature, |
| ) |
|
|
|
|
| def _build_space_chat_emergency_response( |
| request: SpaceChatRequest, |
| *, |
| runtime: SpaceChatRuntimeInfo, |
| resolved_model: str, |
| persona_title: str, |
| ) -> str: |
| return ( |
| "Izvēlētais Hugging Face modelis šobrīd neatbildēja, bet čats palika darbībā ar drošo fallback režīmu.\n\n" |
| f"- Pieprasītais modelis: `{request.model or runtime.default_model}`\n" |
| f"- Rezerves modelis: `{resolved_model}`\n" |
| f"- Persona: `{persona_title}`\n" |
| f"- Space: `{runtime.space_repo}`\n\n" |
| "Vari turpināt lietot jebkuru Hugging Face `owner/name` modeli. Ja konkrētais modelis neatbild, " |
| "Space automātiski mēģina citus kandidātus un neatgriež tukšu 503 kļūdu." |
| ) |
|
|
|
|
| async def generate_space_chat_reply( |
| request: SpaceChatRequest, |
| *, |
| client_factory: Any | None = None, |
| token: str | None = None, |
| ) -> SpaceChatResponse: |
| """Generate a public chat reply using the Hugging Face inference client.""" |
| runtime = get_space_chat_runtime_info() |
| requested_model = request.model or runtime.default_model |
| persona = resolve_persona(request.persona_id) |
| emotional_context = analyze_emotional_context(request.message) |
| messages = build_space_chat_messages(request) |
| candidate_models = resolve_space_chat_models(requested_model) |
|
|
| if client_factory is None: |
| try: |
| from huggingface_hub import InferenceClient |
| except ImportError as exc: |
| raise RuntimeError("Maris AI inference klients nav pieejams.") from exc |
| client_factory = InferenceClient |
|
|
| try: |
| client = create_hf_inference_client(client_factory, token=token) |
| model_name, raw_response = _complete_space_chat_response( |
| client, |
| models=candidate_models, |
| messages=messages, |
| max_tokens=request.max_tokens, |
| temperature=request.temperature, |
| ) |
|
|
| response_text = raw_response.strip() |
| if not response_text: |
| raise RuntimeError("Maris AI neatgrieza derīgu atbildi.") |
| except (OSError, TypeError, ValueError, RuntimeError, httpx.HTTPError, HfHubHTTPError) as exc: |
| logger.warning("Maris chat inference failed: %s", exc) |
| fallback_candidates = ( |
| candidate_models[1:] if len(candidate_models) > 1 else candidate_models |
| ) |
| model_name = next(iter(fallback_candidates), requested_model) |
| response_text = _build_space_chat_emergency_response( |
| request, |
| runtime=runtime, |
| resolved_model=model_name, |
| persona_title=persona.title, |
| ) |
|
|
| await HFIntegration().save_conversation( |
| request.message, |
| response_text, |
| metadata={ |
| "session_id": (request.session_id or "").strip() or None, |
| "persona_id": persona.id, |
| "requested_model": requested_model, |
| "resolved_model": model_name or requested_model, |
| "history_messages": len(request.history), |
| "detected_emotion": emotional_context.emotion, |
| "emotion_confidence": emotional_context.confidence, |
| "response_style": emotional_context.response_style, |
| "space_repo": runtime.space_repo, |
| "public_space_chat": True, |
| }, |
| ) |
|
|
| return SpaceChatResponse( |
| response=response_text, |
| model=model_name or requested_model, |
| persona_id=persona.id, |
| persona_title=persona.title, |
| persona_summary=persona.summary, |
| detected_emotion=emotional_context.emotion, |
| emotion_confidence=emotional_context.confidence, |
| response_style=emotional_context.response_style, |
| ) |
|
|