mesa-react / backend /app /services /model_repository.py
Guilherme Silberfarb Costa
Cache HF model repository resolution
9249920
from __future__ import annotations
from io import BytesIO
import os
import re
import unicodedata
from dataclasses import dataclass
from pathlib import Path
from threading import Event, Lock
from time import monotonic
from typing import Any
from fastapi import HTTPException
from app.runtime_paths import resolve_core_path
try:
from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, snapshot_download
except Exception: # pragma: no cover - dependência opcional em tempo de import
CommitOperationAdd = None # type: ignore[assignment]
CommitOperationDelete = None # type: ignore[assignment]
HfApi = None # type: ignore[assignment]
snapshot_download = None # type: ignore[assignment]
DEFAULT_LOCAL_MODELOS_DIR = resolve_core_path("pesquisa", "modelos_dai")
DEFAULT_HF_REPO_ID = "gui-sparim/repositorio_mesa"
DEFAULT_HF_REVISION = "main"
DEFAULT_HF_SUBDIR = "modelos_dai"
SAFE_FILENAME_RE = re.compile(r"[^A-Za-z0-9_.-]+")
_STATE_LOCK = Lock()
_STATE: dict[str, Any] = {
"provider": None,
"signature": None,
"revision": None,
"repo_id": None,
"subdir": None,
"modelos_dir": None,
"degraded": False,
"last_checked_at": None,
}
_HF_RESOLUTION_TTL_SECONDS = float(os.getenv("MODELOS_REPOSITORIO_HF_CACHE_TTL_SECONDS") or 300.0)
_HF_RESOLUTION_INFLIGHT: dict[tuple[str, str, str], Event] = {}
def _is_hf_runtime() -> bool:
# HF Spaces define essas variaveis; em runtime HF queremos evitar fallback local.
for key in ("SPACE_ID", "SPACE_AUTHOR_NAME", "HF_SPACE_ID"):
value = str(os.getenv(key) or "").strip()
if value:
return True
return False
@dataclass(frozen=True)
class ModelRepositoryResolution:
provider: str
modelos_dir: Path
signature: str
revision: str | None = None
repo_id: str | None = None
subdir: str | None = None
degraded: bool = False
def as_payload(self) -> dict[str, Any]:
return {
"provider": self.provider,
"revision": self.revision,
"repo_id": self.repo_id,
"subdir": self.subdir,
"degraded": self.degraded,
"writable": provider_supports_write(self.provider),
"modelos_dir": str(self.modelos_dir),
"signature": self.signature,
}
def provider_supports_write(provider: str) -> bool:
value = str(provider or "").strip().lower()
return value in {"local", "hf_dataset"}
def _provider() -> str:
if _is_hf_runtime():
return "hf_dataset"
raw = os.getenv("MODELOS_REPOSITORIO_PROVIDER")
if raw is None:
raw = os.getenv("PESQUISA_MODELOS_PROVIDER")
if raw is None:
raw = "local"
value = str(raw).strip().lower()
if value in {"hf", "hf_dataset", "dataset", "huggingface"}:
return "hf_dataset"
return "local"
def _hf_repo_id() -> str:
return str(
os.getenv("MODELOS_REPOSITORIO_HF_REPO_ID")
or os.getenv("PESQUISA_HF_REPO_ID")
or DEFAULT_HF_REPO_ID
).strip()
def _hf_revision() -> str:
return str(
os.getenv("MODELOS_REPOSITORIO_HF_REVISION")
or os.getenv("PESQUISA_HF_REVISION")
or DEFAULT_HF_REVISION
).strip()
def _hf_subdir() -> str:
return str(
os.getenv("MODELOS_REPOSITORIO_HF_SUBDIR")
or os.getenv("PESQUISA_HF_SUBDIR")
or DEFAULT_HF_SUBDIR
).strip().strip("/")
def _hf_token() -> str | None:
for key in ("HF_TOKEN", "HUGGINGFACE_HUB_TOKEN", "HUGGINGFACE_TOKEN"):
value = os.getenv(key)
if value:
return value
return None
def _local_mode_models_dir() -> Path:
raw = os.getenv("MODELOS_REPOSITORIO_LOCAL_DIR")
if raw and str(raw).strip():
return Path(str(raw).strip()).expanduser().resolve()
return DEFAULT_LOCAL_MODELOS_DIR
def _ensure_local_resolution() -> ModelRepositoryResolution:
modelos_dir = _local_mode_models_dir()
modelos_dir.mkdir(parents=True, exist_ok=True)
signature = f"local:{modelos_dir}"
return ModelRepositoryResolution(
provider="local",
modelos_dir=modelos_dir,
signature=signature,
)
def _resolve_hf() -> ModelRepositoryResolution:
if HfApi is None or snapshot_download is None:
raise HTTPException(
status_code=500,
detail="Provider hf_dataset indisponivel: instale huggingface_hub no backend",
)
repo_id = _hf_repo_id()
revision_ref = _hf_revision()
subdir = _hf_subdir()
token = _hf_token()
pattern = f"{subdir}/*.dai"
inflight_key = (repo_id, revision_ref, subdir)
while True:
now = monotonic()
with _STATE_LOCK:
current_dir = _STATE.get("modelos_dir")
current_signature = _STATE.get("signature")
current_revision = _STATE.get("revision")
current_repo = _STATE.get("repo_id")
current_subdir = _STATE.get("subdir")
current_degraded = bool(_STATE.get("degraded"))
last_checked_at = _STATE.get("last_checked_at")
if (
current_dir
and current_signature
and current_repo == repo_id
and current_subdir == subdir
and Path(str(current_dir)).exists()
and isinstance(last_checked_at, (int, float))
and (now - float(last_checked_at)) < _HF_RESOLUTION_TTL_SECONDS
):
return ModelRepositoryResolution(
provider="hf_dataset",
modelos_dir=Path(str(current_dir)),
signature=str(current_signature),
revision=str(current_revision) if current_revision else revision_ref,
repo_id=repo_id,
subdir=subdir,
degraded=current_degraded,
)
inflight = _HF_RESOLUTION_INFLIGHT.get(inflight_key)
if inflight is None:
inflight = Event()
_HF_RESOLUTION_INFLIGHT[inflight_key] = inflight
is_builder = True
else:
is_builder = False
if not is_builder:
inflight.wait()
continue
try:
snapshot_root = Path(
snapshot_download(
repo_id=repo_id,
repo_type="dataset",
revision=revision_ref,
allow_patterns=[pattern],
token=token,
)
)
modelos_dir = snapshot_root / subdir
if not modelos_dir.exists():
raise RuntimeError(f"Pasta '{subdir}' nao encontrada no snapshot '{snapshot_root.name}'")
snapshot_revision = str(snapshot_root.name or revision_ref).strip() or revision_ref
signature = f"hf_dataset:{repo_id}@{snapshot_revision}:{subdir}"
with _STATE_LOCK:
_STATE.update(
{
"provider": "hf_dataset",
"signature": signature,
"revision": snapshot_revision,
"repo_id": repo_id,
"subdir": subdir,
"modelos_dir": str(modelos_dir),
"degraded": False,
"last_checked_at": monotonic(),
}
)
return ModelRepositoryResolution(
provider="hf_dataset",
modelos_dir=modelos_dir,
signature=signature,
revision=snapshot_revision,
repo_id=repo_id,
subdir=subdir,
degraded=False,
)
except Exception as exc:
with _STATE_LOCK:
fallback_dir = _STATE.get("modelos_dir")
fallback_signature = _STATE.get("signature")
fallback_rev = _STATE.get("revision")
same_repo = (
_STATE.get("provider") == "hf_dataset"
and _STATE.get("repo_id") == repo_id
and _STATE.get("subdir") == subdir
)
if fallback_dir and fallback_signature and same_repo and Path(str(fallback_dir)).exists():
_STATE["degraded"] = True
_STATE["last_checked_at"] = monotonic()
return ModelRepositoryResolution(
provider="hf_dataset",
modelos_dir=Path(str(fallback_dir)),
signature=str(fallback_signature),
revision=str(fallback_rev) if fallback_rev else None,
repo_id=repo_id,
subdir=subdir,
degraded=True,
)
raise HTTPException(
status_code=503,
detail=f"Nao foi possivel sincronizar modelos do dataset no Hugging Face: {exc}",
) from exc
finally:
with _STATE_LOCK:
waiter = _HF_RESOLUTION_INFLIGHT.pop(inflight_key, None)
if waiter is not None:
waiter.set()
def resolve_model_repository() -> ModelRepositoryResolution:
provider = _provider()
if provider == "hf_dataset":
return _resolve_hf()
return _ensure_local_resolution()
def invalidate_repository_cache() -> None:
with _STATE_LOCK:
_STATE.update(
{
"provider": None,
"signature": None,
"revision": None,
"repo_id": None,
"subdir": None,
"modelos_dir": None,
"degraded": False,
"last_checked_at": None,
}
)
def list_repository_models() -> dict[str, Any]:
resolved = resolve_model_repository()
modelos = sorted(resolved.modelos_dir.glob("*.dai"), key=lambda item: item.name.lower())
return {
"modelos": [
{
"id": caminho.stem,
"arquivo": caminho.name,
"nome_modelo": caminho.stem,
}
for caminho in modelos
],
"total_modelos": len(modelos),
"fonte": resolved.as_payload(),
}
def _normalize_model_key(value: str) -> str:
text = str(value or "").strip()
if not text:
return ""
normalized = unicodedata.normalize("NFKD", text)
without_marks = "".join(ch for ch in normalized if not unicodedata.combining(ch))
return without_marks.casefold().strip()
def resolve_model_file(modelo_id: str) -> Path:
resolved = resolve_model_repository()
chave = str(modelo_id or "").strip()
if not chave:
raise HTTPException(status_code=400, detail="Informe o identificador do modelo")
modelos = sorted(resolved.modelos_dir.glob("*.dai"), key=lambda item: item.name.lower())
by_stem = {caminho.stem.lower(): caminho for caminho in modelos}
by_name = {caminho.name.lower(): caminho for caminho in modelos}
candidato = by_stem.get(chave.lower()) or by_name.get(chave.lower())
if candidato is None and not chave.lower().endswith(".dai"):
candidato = by_name.get(f"{chave.lower()}.dai")
if candidato is None:
chave_norm = _normalize_model_key(chave)
chave_norm_stem = chave_norm[:-4] if chave_norm.endswith(".dai") else chave_norm
for caminho in modelos:
candidatos_norm = {
_normalize_model_key(caminho.stem),
_normalize_model_key(caminho.name),
}
if chave_norm in candidatos_norm or chave_norm_stem in candidatos_norm:
candidato = caminho
break
if candidato is None:
raise HTTPException(status_code=404, detail="Modelo nao encontrado no repositório configurado")
return candidato
def _normalizar_nome_arquivo(nome_arquivo: str) -> str:
nome = Path(str(nome_arquivo or "").strip()).name
if not nome:
raise HTTPException(status_code=400, detail="Nome de arquivo invalido")
if not nome.lower().endswith(".dai"):
raise HTTPException(status_code=400, detail=f"Arquivo '{nome}' precisa ter extensao .dai")
safe = SAFE_FILENAME_RE.sub("_", nome).lstrip(".")
if not safe:
raise HTTPException(status_code=400, detail=f"Nome de arquivo invalido: '{nome}'")
if not safe.lower().endswith(".dai"):
safe = f"{safe}.dai"
if safe == ".dai":
raise HTTPException(status_code=400, detail=f"Nome de arquivo invalido: '{nome}'")
return safe
def upsert_repository_models(
arquivos: list[tuple[str, bytes]],
actor: str | None = None,
confirmar_substituicao: bool = False,
) -> dict[str, Any]:
if not arquivos:
raise HTTPException(status_code=400, detail="Nenhum arquivo foi informado")
payload: dict[str, bytes] = {}
for nome, conteudo in arquivos:
if conteudo is None:
continue
raw_bytes = bytes(conteudo)
if not raw_bytes:
continue
nome_safe = _normalizar_nome_arquivo(nome)
payload[nome_safe] = raw_bytes
if not payload:
raise HTTPException(status_code=400, detail="Nenhum arquivo valido para upload")
resolved = resolve_model_repository()
existentes = {item.name for item in resolved.modelos_dir.glob("*.dai")}
substituidos = sorted([nome for nome in payload if nome in existentes])
adicionados = sorted([nome for nome in payload if nome not in existentes])
if substituidos and not confirmar_substituicao:
raise HTTPException(
status_code=409,
detail={
"code": "repositorio_modelo_duplicado",
"message": "Ja existe modelo com o mesmo nome no repositorio. Confirme para substituir.",
"substituidos": substituidos,
},
)
if resolved.provider == "local":
resolved.modelos_dir.mkdir(parents=True, exist_ok=True)
for nome, conteudo in payload.items():
destino = resolved.modelos_dir / nome
destino.write_bytes(conteudo)
elif resolved.provider == "hf_dataset":
if HfApi is None or CommitOperationAdd is None:
raise HTTPException(
status_code=500,
detail="Provider hf_dataset indisponivel para escrita: instale huggingface_hub no backend",
)
repo_id = resolved.repo_id or _hf_repo_id()
subdir = resolved.subdir or _hf_subdir()
revision = _hf_revision()
token = _hf_token()
operations = [
CommitOperationAdd(
path_in_repo=f"{subdir}/{nome}",
path_or_fileobj=BytesIO(conteudo),
)
for nome, conteudo in payload.items()
]
usuario = str(actor or "sistema").strip() or "sistema"
mensagem = f"[mesa] upsert de {len(operations)} modelo(s) por {usuario}"
api = HfApi(token=token)
try:
api.create_commit(
repo_id=repo_id,
repo_type="dataset",
revision=revision,
operations=operations,
commit_message=mensagem,
token=token,
)
except Exception as exc:
raise HTTPException(status_code=503, detail=f"Falha ao gravar no dataset HF: {exc}") from exc
else:
raise HTTPException(status_code=500, detail=f"Provider de modelos nao suportado: {resolved.provider}")
invalidate_repository_cache()
return {
"arquivos": sorted(payload.keys()),
"adicionados": adicionados,
"substituidos": substituidos,
"confirmar_substituicao": bool(confirmar_substituicao),
"fonte": resolved.as_payload(),
}
def delete_repository_models(modelos_ids: list[str], actor: str | None = None) -> dict[str, Any]:
ids = []
seen = set()
for item in modelos_ids or []:
chave = str(item or "").strip()
if not chave or chave in seen:
continue
seen.add(chave)
ids.append(chave)
if not ids:
raise HTTPException(status_code=400, detail="Nenhum modelo informado para exclusao")
resolved = resolve_model_repository()
modelos = sorted(resolved.modelos_dir.glob("*.dai"), key=lambda item: item.name.lower())
by_stem = {caminho.stem.lower(): caminho for caminho in modelos}
by_name = {caminho.name.lower(): caminho for caminho in modelos}
alvo_por_nome: dict[str, Path] = {}
nao_encontrados: list[str] = []
for chave in ids:
key_lower = chave.lower()
candidato = by_stem.get(key_lower) or by_name.get(key_lower)
if candidato is None and not key_lower.endswith(".dai"):
candidato = by_name.get(f"{key_lower}.dai")
if candidato is None:
nao_encontrados.append(chave)
continue
alvo_por_nome[candidato.name] = candidato
if not alvo_por_nome:
raise HTTPException(status_code=404, detail="Nenhum dos modelos informados foi encontrado")
removidos = sorted(alvo_por_nome.keys())
if resolved.provider == "local":
for caminho in alvo_por_nome.values():
if caminho.exists():
caminho.unlink()
elif resolved.provider == "hf_dataset":
if HfApi is None or CommitOperationDelete is None:
raise HTTPException(
status_code=500,
detail="Provider hf_dataset indisponivel para escrita: instale huggingface_hub no backend",
)
repo_id = resolved.repo_id or _hf_repo_id()
subdir = resolved.subdir or _hf_subdir()
revision = _hf_revision()
token = _hf_token()
operations = [
CommitOperationDelete(path_in_repo=f"{subdir}/{nome}")
for nome in removidos
]
usuario = str(actor or "sistema").strip() or "sistema"
mensagem = f"[mesa] exclusao de {len(operations)} modelo(s) por {usuario}"
api = HfApi(token=token)
try:
api.create_commit(
repo_id=repo_id,
repo_type="dataset",
revision=revision,
operations=operations,
commit_message=mensagem,
token=token,
)
except Exception as exc:
raise HTTPException(status_code=503, detail=f"Falha ao excluir no dataset HF: {exc}") from exc
else:
raise HTTPException(status_code=500, detail=f"Provider de modelos nao suportado: {resolved.provider}")
invalidate_repository_cache()
return {
"removidos": removidos,
"nao_encontrados": nao_encontrados,
"fonte": resolved.as_payload(),
}