"""Maris publicēšanas integrācija — saglabā origin datus atmiņas repozitorijā.""" from __future__ import annotations import json import logging from datetime import UTC, datetime from typing import Any from maris_core.utils.env import get_env_any_or_default, get_hf_token, validate_hf_repo_id logger = logging.getLogger(__name__) MARIS_ORIGIN_NAME = "Maris AI" MARIS_FRAMEWORK_NAME = "maris-ai-core" class HFIntegration: """Pārvalda Maris publicēšanas repozitorija sinhronizāciju.""" def __init__(self) -> None: self.token = get_hf_token() self.dataset_repo = get_env_any_or_default( "MARIS_MEMORY_REPO", "MARIS_DATASET_REPO", "HF_DATASET_REPO", default="MarisUK/maris-ai-memory", ) self.dataset_repo = validate_hf_repo_id( self.dataset_repo, "MARIS_MEMORY_REPO/MARIS_DATASET_REPO/HF_DATASET_REPO", label="dataset repozitorijs", ) self._client = None self._client_unavailable = False def _get_api(self) -> Any | None: """Iegūst publicēšanas API klientu.""" if self._client_unavailable: return None if self._client is None: try: from huggingface_hub import HfApi # type: ignore self._client = HfApi(token=self.token) except Exception as exc: # noqa: BLE001 self._client_unavailable = True logger.warning("Publicēšanas klients nav pieejams: %s", exc) return None return self._client def _build_maris_metadata(self, metadata: dict[str, Any] | None = None) -> dict[str, Any]: payload = dict(metadata or {}) payload.setdefault("generated_by", MARIS_ORIGIN_NAME) payload.setdefault("maris_framework", MARIS_FRAMEWORK_NAME) payload.setdefault("maris_origin", True) return payload async def save_conversation( self, user_message: str, ai_response: str, metadata: dict[str, Any] | None = None, ) -> None: """Saglabā sarunu atmiņā.""" entry = { "timestamp": datetime.now(tz=UTC).isoformat(), "type": "conversation", "user": user_message, "assistant": ai_response, "metadata": self._build_maris_metadata(metadata), } await self._push_to_dataset(entry) async def save_generation(self, gen_type: str, prompt: str, metadata: dict[str, Any]) -> None: """Saglabā ģenerēšanas pieprasījumu.""" entry = { "timestamp": datetime.now(tz=UTC).isoformat(), "type": gen_type, "prompt": prompt, "metadata": self._build_maris_metadata(metadata), } await self._push_to_dataset(entry) async def _push_to_dataset(self, entry: dict[str, Any]) -> None: """Sūta ierakstu uz Maris atmiņas repozitoriju.""" if not self.token: logger.debug("Nav konfigurēts Maris publicēšanas tokens — izlaižam saglabāšanu.") return api = self._get_api() if api is None: return try: import io content = json.dumps(entry, ensure_ascii=False) + "\n" filename = ( f"data/{entry['type']}/{datetime.now(tz=UTC).strftime('%Y%m%d_%H%M%S_%f')}.jsonl" ) api.upload_file( path_or_fileobj=io.BytesIO(content.encode()), path_in_repo=filename, repo_id=self.dataset_repo, repo_type="dataset", commit_message=f"Maris AI auto-save: {entry['type']}", ) logger.debug("Saglabāts origin repozitorijā: %s/%s", self.dataset_repo, filename) except Exception as exc: # noqa: BLE001 logger.error("Origin saglabāšanas kļūda: %s", exc)