| """Maris publicēšanas integrācija — saglabā origin datus atmiņas repozitorijā.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| from datetime import UTC, datetime |
| from typing import Any |
|
|
| from maris_core.utils.env import get_env_any_or_default, get_hf_token, validate_hf_repo_id |
|
|
| logger = logging.getLogger(__name__) |
| MARIS_ORIGIN_NAME = "Maris AI" |
| MARIS_FRAMEWORK_NAME = "maris-ai-core" |
|
|
|
|
| class HFIntegration: |
| """Pārvalda Maris publicēšanas repozitorija sinhronizāciju.""" |
|
|
| def __init__(self) -> None: |
| self.token = get_hf_token() |
| self.dataset_repo = get_env_any_or_default( |
| "MARIS_MEMORY_REPO", |
| "MARIS_DATASET_REPO", |
| "HF_DATASET_REPO", |
| default="MarisUK/maris-ai-memory", |
| ) |
| self.dataset_repo = validate_hf_repo_id( |
| self.dataset_repo, |
| "MARIS_MEMORY_REPO/MARIS_DATASET_REPO/HF_DATASET_REPO", |
| label="dataset repozitorijs", |
| ) |
| self._client = None |
| self._client_unavailable = False |
|
|
| def _get_api(self) -> Any | None: |
| """Iegūst publicēšanas API klientu.""" |
| if self._client_unavailable: |
| return None |
|
|
| if self._client is None: |
| try: |
| from huggingface_hub import HfApi |
|
|
| self._client = HfApi(token=self.token) |
| except Exception as exc: |
| self._client_unavailable = True |
| logger.warning("Publicēšanas klients nav pieejams: %s", exc) |
| return None |
| return self._client |
|
|
| def _build_maris_metadata(self, metadata: dict[str, Any] | None = None) -> dict[str, Any]: |
| payload = dict(metadata or {}) |
| payload.setdefault("generated_by", MARIS_ORIGIN_NAME) |
| payload.setdefault("maris_framework", MARIS_FRAMEWORK_NAME) |
| payload.setdefault("maris_origin", True) |
| return payload |
|
|
| async def save_conversation( |
| self, |
| user_message: str, |
| ai_response: str, |
| metadata: dict[str, Any] | None = None, |
| ) -> None: |
| """Saglabā sarunu atmiņā.""" |
| entry = { |
| "timestamp": datetime.now(tz=UTC).isoformat(), |
| "type": "conversation", |
| "user": user_message, |
| "assistant": ai_response, |
| "metadata": self._build_maris_metadata(metadata), |
| } |
| await self._push_to_dataset(entry) |
|
|
| async def save_generation(self, gen_type: str, prompt: str, metadata: dict[str, Any]) -> None: |
| """Saglabā ģenerēšanas pieprasījumu.""" |
| entry = { |
| "timestamp": datetime.now(tz=UTC).isoformat(), |
| "type": gen_type, |
| "prompt": prompt, |
| "metadata": self._build_maris_metadata(metadata), |
| } |
| await self._push_to_dataset(entry) |
|
|
| async def _push_to_dataset(self, entry: dict[str, Any]) -> None: |
| """Sūta ierakstu uz Maris atmiņas repozitoriju.""" |
| if not self.token: |
| logger.debug("Nav konfigurēts Maris publicēšanas tokens — izlaižam saglabāšanu.") |
| return |
|
|
| api = self._get_api() |
| if api is None: |
| return |
|
|
| try: |
| import io |
|
|
| content = json.dumps(entry, ensure_ascii=False) + "\n" |
| filename = ( |
| f"data/{entry['type']}/{datetime.now(tz=UTC).strftime('%Y%m%d_%H%M%S_%f')}.jsonl" |
| ) |
|
|
| api.upload_file( |
| path_or_fileobj=io.BytesIO(content.encode()), |
| path_in_repo=filename, |
| repo_id=self.dataset_repo, |
| repo_type="dataset", |
| commit_message=f"Maris AI auto-save: {entry['type']}", |
| ) |
| logger.debug("Saglabāts origin repozitorijā: %s/%s", self.dataset_repo, filename) |
| except Exception as exc: |
| logger.error("Origin saglabāšanas kļūda: %s", exc) |
|
|