para.AI_ASSUNTOS_CNJ / app /builders.py
caarleexx's picture
Update app/builders.py
9edf0d5 verified
"""
builders.py — Converte respostas raw do ES nos schemas Pydantic
"""
from __future__ import annotations
from .schemas import (
Assunto, AssuntoHit, BuscaResponse, Facets, FacetBucket,
HierarquiaNo, HierarquiaResponse,
)
def _buckets_to_facet(buckets: list) -> list[FacetBucket]:
"""
Converte buckets de agregação do Elasticsearch para lista de FacetBucket.
Garante que o valor seja string, pois pode vir como número (ex: profundidade).
"""
result = []
for b in buckets:
try:
# Converte explicitamente para string, tratando qualquer tipo
valor_str = str(b["key"])
result.append(FacetBucket(valor=valor_str, total=b["doc_count"]))
except (KeyError, TypeError, ValueError) as e:
# Log do erro mas continua processando
print(f"Erro ao processar bucket: {b}, erro: {e}")
continue
return result
def build_busca_response(raw: dict, took_ms: int, page: int, size: int) -> BuscaResponse:
"""
Constrói resposta de busca a partir do resultado raw do Elasticsearch.
"""
hits_raw = raw["hits"]
total = hits_raw["total"]["value"]
aggs = raw.get("aggregations", {})
# Processa resultados
resultados = []
for hit in hits_raw["hits"]:
src = hit["_source"].copy() # Cria cópia para não modificar original
# Injeta highlights no breve_sintese se disponível
if "highlight" in hit and "breve_sintese" in hit["highlight"]:
src["breve_sintese"] = " … ".join(hit["highlight"]["breve_sintese"])
# Garante que campos obrigatórios existam
if "id" not in src:
src["id"] = hit["_id"]
resultados.append(AssuntoHit(
score=round(hit["_score"] or 0.0, 4),
assunto=Assunto(**src),
))
# Processa facets/agregações
facets = None
if aggs:
facets = Facets(
por_ramo = _buckets_to_facet(aggs.get("por_ramo", {}).get("buckets", [])),
por_nivel2 = _buckets_to_facet(aggs.get("por_nivel2", {}).get("buckets", [])),
por_nivel3 = _buckets_to_facet(aggs.get("por_nivel3", {}).get("buckets", [])),
por_lei = _buckets_to_facet(aggs.get("por_lei", {}).get("buckets", [])),
profundidades = _buckets_to_facet(aggs.get("profundidades", {}).get("buckets", [])),
)
return BuscaResponse(
total=total,
pagina=page,
tamanho=size,
took_ms=took_ms,
resultados=resultados,
facets=facets,
)
def build_hierarquia_response(raw: dict) -> HierarquiaResponse:
"""
Constrói árvore hierárquica a partir da agregação do Elasticsearch.
"""
ramos_buckets = raw.get("aggregations", {}).get("ramos", {}).get("buckets", [])
ramos = []
for rb in ramos_buckets:
# Nível 1: Ramos
nivel2_buckets = rb.get("nivel2", {}).get("buckets", [])
filhos_n2 = []
for n2b in nivel2_buckets:
# Nível 2: Categorias
nivel3_buckets = n2b.get("nivel3", {}).get("buckets", [])
filhos_n3 = [
HierarquiaNo(
nome=n3b["key"],
caminho=f"{rb['key']} > {n2b['key']} > {n3b['key']}",
total=n3b["doc_count"],
filhos=[],
)
for n3b in nivel3_buckets
]
filhos_n2.append(HierarquiaNo(
nome=n2b["key"],
caminho=f"{rb['key']} > {n2b['key']}",
total=n2b["doc_count"],
filhos=filhos_n3,
))
ramos.append(HierarquiaNo(
nome=rb["key"],
caminho=rb["key"],
total=rb["doc_count"],
filhos=filhos_n2,
))
return HierarquiaResponse(ramos=ramos)
def build_detalhe_response(source: dict) -> Assunto:
"""
Constrói resposta para detalhe de um assunto.
"""
return Assunto(**source)
def _src_to_ficha(
src: dict,
hit: dict,
incluir_texto: bool,
retornar: list[str] | None = None,
) -> "FichaAssunto":
"""
Converte _source + highlight → FichaAssunto.
retornar: lista de campos da ficha solicitados pelo caller.
Vazio/None = todos os campos.
id sempre retornado.
"""
from .schemas import FichaAssunto
# Conjunto de campos solicitados (None ou [] = todos)
want: set[str] | None = (set(retornar) | {"id"}) if retornar else None
def _want(campo: str) -> bool:
return want is None or campo in want
# Highlight helper
hl = hit.get("highlight", {})
def _hl(campo_hl: str, es_key: str) -> str | None:
if not _want(campo_hl):
return None
if es_key in hl:
return " … ".join(hl[es_key])
return src.get(es_key)
return FichaAssunto(
id = src.get("id", hit.get("_id", "")),
cod_assunto = src.get("cod_assunto") if _want("cod_assunto") else None,
titulo = _hl("titulo", "nome_assunto"),
titulo_curto = _hl("titulo_curto", "titulo_curto"),
# caminho e definicao são sempre retornados — necessários para o DNA
caminho = src.get("classes_path"),
ramo = src.get("ramo") if _want("ramo") else None,
nivel1 = src.get("classes_nivel1") if _want("nivel1") else None,
nivel2 = src.get("classes_nivel2") if _want("nivel2") else None,
nivel3 = src.get("classes_nivel3") if _want("nivel3") else None,
profundidade = src.get("classes_profundidade") if _want("profundidade") else None,
# introducao sempre presente como fallback quando glossario é null
introducao = _hl("introducao", "breve_sintese") if _want("introducao") else src.get("breve_sintese"),
# definicao sempre retornada — texto técnico completo do assunto
definicao = _hl("definicao", "glossario"),
normas = src.get("dispositivos_legais", []) if _want("normas") else [],
artigos = src.get("artigos", []) if _want("artigos") else [],
texto = src.get("texto_completo") if (incluir_texto and _want("texto")) else None,
)
def build_busca_q_response(
raw: dict,
took_ms: int,
topk: int,
modo: str,
operador: str,
matched_campos: list[str],
incluir_texto: bool,
retornar: list[str] | None = None,
) -> "BuscaQResponse":
from .schemas import BuscaQResponse, FichaHit, Facets, FacetBucket
hits_raw = raw["hits"]
total = hits_raw["total"]["value"]
aggs = raw.get("aggregations", {})
resultados = []
for hit in hits_raw["hits"]:
src = hit["_source"].copy()
ficha = _src_to_ficha(src, hit, incluir_texto, retornar)
resultados.append(FichaHit(
score = round(hit["_score"] or 0.0, 4),
campos_matched = matched_campos,
ficha = ficha,
))
facets = None
if aggs:
facets = Facets(
por_ramo = _buckets_to_facet(aggs.get("por_ramo", {}).get("buckets", [])),
por_nivel2 = _buckets_to_facet(aggs.get("por_nivel2", {}).get("buckets", [])),
por_nivel3 = _buckets_to_facet(aggs.get("por_nivel3", {}).get("buckets", [])),
por_lei = _buckets_to_facet(aggs.get("por_lei", {}).get("buckets", [])),
profundidades = _buckets_to_facet(aggs.get("profundidades", {}).get("buckets", [])),
)
return BuscaQResponse(
total = total,
retornados = len(resultados),
took_ms = took_ms,
modo = modo,
operador = operador,
resultados = resultados,
facets = facets,
)