AetherMap / app.py
Madras1's picture
Upload app.py
abfbafc verified
raw
history blame
42.9 kB
# ==============================================================================
# API do AetherMap — VERSÃO 7.3 (KNOWLEDGE GRAPH EDITION)
# Backend com RAG Híbrido, CSV, Tavily, NER Entity Graph
# ==============================================================================
import numpy as np
import pandas as pd
import torch
import gc
import uuid
import os
import io
import json
import logging
import time
import nltk
from nltk.corpus import stopwords
from collections import defaultdict
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Tuple
from functools import lru_cache
# Ferramentas de Alquimia (ML & NLP)
from sentence_transformers import SentenceTransformer, CrossEncoder
import umap
import hdbscan
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from scipy.stats import entropy
# NER & Language Detection
import spacy
from langdetect import detect, LangDetectException
# Monitoramento (O Toque da Berta)
from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Histogram
# A Conexão com o Oráculo
from groq import Groq
# ==============================================================================
# CONFIGURAÇÕES GERAIS E LOGGING
# ==============================================================================
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Modelos de IA
RETRIEVAL_MODEL = "all-MiniLM-L6-v2" # Rápido para varredura inicial
RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2" # Preciso para reordenação
# Parâmetros de Processamento
BATCH_SIZE = 256
UMAP_N_NEIGHBORS = 30
# Cache de Sessão (Na memória RAM)
cache: Dict[str, Any] = {}
# Definição de Métricas Customizadas do Prometheus
# Isso permite separar a latência da sua lógica vs a latência da API externa
GROQ_LATENCY = Histogram(
"groq_api_latency_seconds",
"Tempo de resposta da API externa Groq (LLM Generation)",
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0]
)
# Inicialização do Cliente Groq
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
try:
if not GROQ_API_KEY:
logging.warning("GROQ_API_KEY não encontrada. Funcionalidades de LLM estarão indisponíveis.")
groq_client = None
else:
groq_client = Groq(api_key=GROQ_API_KEY)
logging.info("Cliente Groq inicializado com sucesso.")
except Exception as e:
logging.error(f"FALHA AO INICIALIZAR GROQ: {e}")
groq_client = None
# Inicialização do Cliente Tavily (Web Search)
TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY")
tavily_client = None
try:
if TAVILY_API_KEY:
from tavily import TavilyClient
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
logging.info("Cliente Tavily inicializado com sucesso.")
else:
logging.warning("TAVILY_API_KEY não encontrada. Busca web estará indisponível.")
except Exception as e:
logging.error(f"FALHA AO INICIALIZAR TAVILY: {e}")
tavily_client = None
# ==============================================================================
# GERENCIAMENTO HÍBRIDO DE STOP WORDS (NLTK + ARQUIVO TXT)
# ==============================================================================
def carregar_stopwords():
"""
Carrega stop words do NLTK e combina com um arquivo externo 'stopwords.txt'.
"""
logging.info("Iniciando carregamento de Stop Words...")
# 1. Base Gramatical (NLTK - Inglês e Português)
try:
nltk.data.find('corpora/stopwords')
except LookupError:
logging.info("Baixando corpus de stopwords...")
nltk.download('stopwords')
# Cria um conjunto único com PT e EN
final_stops = set(stopwords.words('portuguese')) | set(stopwords.words('english'))
logging.info(f"Stopwords base (NLTK) carregadas: {len(final_stops)}")
# 2. Base Customizada
arquivo_custom = "stopwords.txt"
if os.path.exists(arquivo_custom):
logging.info(f"Arquivo '{arquivo_custom}' encontrado. Lendo palavras customizadas...")
try:
count_custom = 0
with open(arquivo_custom, "r", encoding="utf-8") as f:
for linha in f:
palavra = linha.split('#')[0].strip().lower()
if palavra and len(palavra) > 1:
final_stops.add(palavra)
count_custom += 1
logging.info(f"{count_custom} stop words customizadas importadas do arquivo.")
except Exception as e:
logging.error(f"Erro ao ler '{arquivo_custom}': {e}")
else:
logging.warning(f"Arquivo '{arquivo_custom}' não encontrado. Usando apenas NLTK.")
lista_final = list(final_stops)
logging.info(f"Total final de Stop Words ativas: {len(lista_final)}")
return lista_final
# Variável global carregada na inicialização
STOP_WORDS_MULTILINGUAL = carregar_stopwords()
# ==============================================================================
# CARREGAMENTO DE MODELOS (COM CACHE)
# ==============================================================================
@lru_cache(maxsize=1)
def load_retriever():
device = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}")
return SentenceTransformer(RETRIEVAL_MODEL, device=device)
@lru_cache(maxsize=1)
def load_reranker():
device = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}")
return CrossEncoder(RERANKER_MODEL, device=device)
# Cache for spaCy models
_spacy_models = {}
_spacy_available = True
def load_spacy_model(lang: str):
"""Carrega modelo spaCy com cache."""
global _spacy_available
if not _spacy_available:
return None
if lang not in _spacy_models:
model_name = "pt_core_news_sm" if lang == "pt" else "en_core_web_sm"
try:
_spacy_models[lang] = spacy.load(model_name)
logging.info(f"Modelo spaCy '{model_name}' carregado.")
except OSError:
logging.warning(f"Modelo {model_name} não encontrado. Tentando baixar...")
try:
import subprocess
subprocess.run(["python", "-m", "spacy", "download", model_name], check=True)
_spacy_models[lang] = spacy.load(model_name)
except Exception as e:
logging.error(f"Falha ao baixar modelo spaCy: {e}")
_spacy_available = False
return None
return _spacy_models[lang]
def detect_language(texts: List[str]) -> str:
"""Detecta idioma predominante nos textos."""
sample = " ".join(texts[:10])[:1000]
try:
lang = detect(sample)
return "pt" if lang == "pt" else "en"
except LangDetectException:
return "en"
def extract_entities(textos: List[str]) -> List[List[Tuple[str, str]]]:
"""Extrai entidades nomeadas de cada texto."""
lang = detect_language(textos)
nlp = load_spacy_model(lang)
# Fallback se spaCy não estiver disponível
if nlp is None:
logging.warning("spaCy não disponível. Retornando entidades vazias.")
return [[] for _ in textos]
entities_by_doc = []
for text in textos:
# Limitar tamanho do texto para performance
doc = nlp(text[:2000])
entities = [(ent.text.lower().strip(), ent.label_) for ent in doc.ents
if len(ent.text.strip()) > 2 and ent.label_ in ("PERSON", "PER", "ORG", "GPE", "LOC")]
entities_by_doc.append(entities)
# Normalizar entidades para deduplicação
return normalize_entities(entities_by_doc)
def normalize_entities(entities_by_doc: List[List[Tuple[str, str]]]) -> List[List[Tuple[str, str]]]:
"""Normaliza entidades para agrupar variações do mesmo nome."""
# Coletar todas as entidades únicas por tipo
all_entities = defaultdict(set)
for entities in entities_by_doc:
for text, etype in entities:
all_entities[etype].add(text)
# Criar mapeamento de normalização
# Agrupa entidades onde uma contém a outra ou são muito similares
normalization_map = {}
for etype, entity_set in all_entities.items():
entities_list = sorted(entity_set, key=len, reverse=True) # Maiores primeiro
for entity in entities_list:
if entity in normalization_map:
continue
# Encontrar entidades que são parte desta ou similares
canonical = entity
for other in entities_list:
if other == entity:
continue
# Se uma contém a outra (ex: "donald trump" contém "trump")
if other in entity or entity in other:
# Usar a mais completa como canônica
if len(entity) >= len(other):
normalization_map[(other, etype)] = (entity, etype)
else:
normalization_map[(entity, etype)] = (other, etype)
canonical = other
# Mapear para si mesmo se não foi mapeado
if (entity, etype) not in normalization_map:
normalization_map[(entity, etype)] = (canonical, etype)
# Aplicar normalização
normalized_docs = []
for entities in entities_by_doc:
normalized = []
seen = set()
for text, etype in entities:
canonical = normalization_map.get((text, etype), (text, etype))
if canonical not in seen:
seen.add(canonical)
normalized.append(canonical)
normalized_docs.append(normalized)
logging.info(f"Normalização: {len(all_entities)} tipos, mapa com {len(normalization_map)} entradas")
return normalized_docs
def build_entity_graph(entities_by_doc: List[List[Tuple[str, str]]],
positions: List[Dict]) -> Dict[str, Any]:
"""Constrói grafo de conexões baseado em entidades compartilhadas."""
# Inverter: entidade -> lista de doc indices
entity_to_docs = defaultdict(set)
for doc_idx, entities in enumerate(entities_by_doc):
for entity_text, entity_type in entities:
entity_to_docs[(entity_text, entity_type)].add(doc_idx)
# Construir arestas (conexões entre docs que compartilham entidades)
edges = []
seen_pairs = set()
for (entity_text, entity_type), doc_indices in entity_to_docs.items():
if len(doc_indices) < 2:
continue
doc_list = sorted(doc_indices)
for i in range(len(doc_list)):
for j in range(i + 1, len(doc_list)):
pair = (doc_list[i], doc_list[j])
if pair not in seen_pairs:
seen_pairs.add(pair)
edges.append({
"source": doc_list[i],
"target": doc_list[j],
"entity": entity_text,
"entity_type": entity_type
})
# Contar entidades mais frequentes
entity_counts = [(k, len(v)) for k, v in entity_to_docs.items() if len(v) >= 2]
top_entities = sorted(entity_counts, key=lambda x: x[1], reverse=True)[:20]
return {
"edges": edges,
"edge_count": len(edges),
"connected_pairs": len(seen_pairs),
"top_entities": [{"entity": e[0][0], "type": e[0][1], "docs": e[1]} for e in top_entities]
}
def build_entity_to_entity_graph(entities_by_doc: List[List[Tuple[str, str]]]) -> Dict[str, Any]:
"""
Constrói grafo onde os NÓS são entidades e ARESTAS são co-ocorrências.
Entidades que aparecem no mesmo documento são conectadas.
"""
# Contar co-ocorrências
cooccurrence = defaultdict(int)
entity_doc_count = defaultdict(int)
for entities in entities_by_doc:
unique_entities = list(set(entities))
# Contar docs por entidade
for ent in unique_entities:
entity_doc_count[ent] += 1
# Criar pares de co-ocorrência
for i in range(len(unique_entities)):
for j in range(i + 1, len(unique_entities)):
pair = tuple(sorted([unique_entities[i], unique_entities[j]], key=str))
cooccurrence[pair] += 1
# Construir nós (entidades com >= 2 docs)
nodes = []
entity_to_id = {}
node_count = sum(1 for e, c in entity_doc_count.items() if c >= 2)
node_idx = 0
for entity, count in sorted(entity_doc_count.items(), key=lambda x: x[1], reverse=True):
if count >= 2:
entity_to_id[entity] = node_idx
# Posição em espiral 3D (melhor distribuição)
angle = 2 * np.pi * node_idx / max(node_count, 1) * 3 # 3 voltas na espiral
radius = 1.5 + (node_idx / max(node_count, 1)) * 2 # Raio aumenta
z_pos = (node_idx / max(node_count, 1) - 0.5) * 3 # Altura varia
nodes.append({
"id": node_idx,
"entity": entity[0],
"type": entity[1],
"docs": count,
"x": float(radius * np.cos(angle)),
"y": float(radius * np.sin(angle)),
"z": float(z_pos)
})
node_idx += 1
# Construir arestas (co-ocorrências)
entity_edges = []
for (ent1, ent2), weight in cooccurrence.items():
if ent1 in entity_to_id and ent2 in entity_to_id and weight >= 1:
entity_edges.append({
"source": entity_to_id[ent1],
"target": entity_to_id[ent2],
"weight": weight,
"source_entity": ent1[0],
"target_entity": ent2[0],
"reason": f"Aparecem juntos em {weight} documento(s)"
})
# Ordenar arestas por peso
entity_edges.sort(key=lambda x: x["weight"], reverse=True)
# Calcular métricas de grafo
# Degree centrality (número de conexões de cada nó)
degree = defaultdict(int)
for edge in entity_edges:
degree[edge["source"]] += edge["weight"]
degree[edge["target"]] += edge["weight"]
# Calcular max degree para normalização
max_degree = max(degree.values()) if degree else 1
# Atualizar nós com métricas
hubs = []
for node in nodes:
node_degree = degree.get(node["id"], 0)
node["degree"] = node_degree
node["centrality"] = round(node_degree / max_degree, 3)
# Classificar nó
if node["centrality"] > 0.7:
node["role"] = "hub" # Hub central
hubs.append(node)
elif node["centrality"] > 0.3:
node["role"] = "connector" # Conector
else:
node["role"] = "peripheral" # Periférico
# Top hubs
hubs.sort(key=lambda x: x["degree"], reverse=True)
return {
"nodes": nodes,
"edges": entity_edges[:200], # Limitar a 200 arestas mais fortes
"node_count": len(nodes),
"edge_count": len(entity_edges),
"hubs": [{"entity": h["entity"], "type": h["type"], "degree": h["degree"]} for h in hubs[:5]],
"insights": {
"total_connections": sum(degree.values()) // 2,
"avg_degree": round(sum(degree.values()) / len(degree), 1) if degree else 0,
"hub_count": len(hubs)
}
}
# ==============================================================================
# PIPELINE DE PROCESSAMENTO DE DADOS
# ==============================================================================
def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]:
"""Prepara textos de arquivo TXT (uma linha por documento)."""
linhas = file_bytes.decode("utf-8", errors="ignore").splitlines()
textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3]
return textos[:n_samples]
def detect_csv_separator(file_bytes: bytes) -> str:
"""Detecta separador do CSV (vírgula ou ponto-e-vírgula)."""
sample = file_bytes[:4096].decode("utf-8", errors="ignore")
first_line = sample.split('\n')[0]
# Conta ocorrências de cada separador na primeira linha
commas = first_line.count(',')
semicolons = first_line.count(';')
return ';' if semicolons > commas else ','
def read_csv_smart(file_bytes: bytes, nrows=None) -> pd.DataFrame:
"""Lê CSV com detecção automática de separador e encoding."""
sep = detect_csv_separator(file_bytes)
try:
df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="utf-8", nrows=nrows)
except UnicodeDecodeError:
df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="latin-1", nrows=nrows)
return df
def preparar_textos_csv(file_bytes: bytes, text_column: str, n_samples: int) -> List[str]:
"""Prepara textos de arquivo CSV extraindo coluna especificada."""
df = read_csv_smart(file_bytes)
if text_column not in df.columns:
available = ", ".join(df.columns.tolist()[:10])
raise ValueError(f"Coluna '{text_column}' não encontrada. Colunas disponíveis: {available}")
textos = df[text_column].dropna().astype(str).tolist()
# Filtrar textos muito curtos
textos = [t.strip() for t in textos if len(t.strip().split()) > 3]
return textos[:n_samples]
def get_csv_columns(file_bytes: bytes) -> List[str]:
"""Retorna lista de colunas de um arquivo CSV."""
df = read_csv_smart(file_bytes, nrows=0)
return df.columns.tolist()
def processar_pipeline(textos: List[str], small_dataset: bool = False) -> (pd.DataFrame, np.ndarray):
"""
Processa textos através do pipeline de embeddings, UMAP e HDBSCAN.
Args:
textos: Lista de textos
small_dataset: Se True, usa parâmetros otimizados para datasets pequenos (Tavily)
"""
logging.info(f"Iniciando pipeline para {len(textos)} textos (small_dataset={small_dataset})...")
model = load_retriever()
# 1. Embeddings
embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True)
# 2. UMAP - ajustar n_neighbors para datasets pequenos
num_textos = len(textos)
n_neighbors = min(15, max(3, num_textos - 1)) if small_dataset else UMAP_N_NEIGHBORS
reducer = umap.UMAP(n_components=3, n_neighbors=n_neighbors, min_dist=0.0, metric="cosine", random_state=42)
emb_3d = reducer.fit_transform(embeddings)
emb_3d = StandardScaler().fit_transform(emb_3d)
# 3. HDBSCAN - parâmetros adaptativos
if small_dataset:
# Para Tavily (10-50 docs): clusters menores, mais agressivo
min_size = max(2, int(num_textos * 0.1)) # mínimo 2, 10% do dataset
min_samples = 1 # permite clusters mais esparsos
else:
# Para datasets grandes: comportamento padrão
min_size = max(10, int(num_textos * 0.02))
min_samples = None # usa default do HDBSCAN
logging.info(f"HDBSCAN: min_cluster_size={min_size}, min_samples={min_samples}")
clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size, min_samples=min_samples)
clusters = clusterer.fit_predict(emb_3d)
# 4. DataFrame
df = pd.DataFrame({
"x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2],
"full_text": textos, "cluster": clusters.astype(str)
})
del reducer, clusterer, emb_3d; gc.collect()
return df, embeddings
def calcular_metricas(textos: List[str]) -> Dict[str, Any]:
logging.info("Calculando métricas globais...")
if not textos: return {}
vectorizer_count = CountVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000)
vectorizer_tfidf = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000)
try:
counts_matrix = vectorizer_count.fit_transform(textos)
tfidf_matrix = vectorizer_tfidf.fit_transform(textos)
except ValueError:
return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0}
vocab_count = vectorizer_count.get_feature_names_out()
contagens = counts_matrix.sum(axis=0).A1
vocab_tfidf = vectorizer_tfidf.get_feature_names_out()
soma_tfidf = tfidf_matrix.sum(axis=0).A1
top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1]
top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf]
return {
"riqueza_lexical": len(vocab_count),
"top_tfidf_palavras": top_tfidf,
"entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0
}
def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]:
logging.info("Detectando duplicados...")
mask = df["full_text"].duplicated(keep=False)
grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()}
pares_semanticos = []
if 2 < len(embeddings) < 5000:
sim = cosine_similarity(embeddings)
triu_indices = np.triu_indices_from(sim, k=1)
sim_vetor = sim[triu_indices]
pares_idx = np.where(sim_vetor > 0.98)[0]
top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]]
for i in top_pares_idx:
idx1, idx2 = triu_indices[0][i], triu_indices[1][i]
if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]:
pares_semanticos.append({
"similaridade": float(sim[idx1, idx2]),
"texto1": df["full_text"].iloc[idx1],
"texto2": df["full_text"].iloc[idx2]
})
return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos}
def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]:
logging.info("Analisando clusters...")
analise = {}
ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
for cid in ids_clusters_validos:
textos_cluster = df[df["cluster"] == cid]["full_text"].tolist()
if len(textos_cluster) < 2: continue
try:
vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000)
tfidf_matrix = vectorizer.fit_transform(textos_cluster)
vocab = vectorizer.get_feature_names_out()
soma = tfidf_matrix.sum(axis=0).A1
top_idx = np.argsort(soma)[-5:][::-1]
top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx]
except ValueError:
top_palavras = []
analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras}
return analise
# ==============================================================================
# API FASTAPI & INSTRUMENTAÇÃO
# ==============================================================================
app = FastAPI(title="AetherMap API 7.2", version="7.2.0", description="Backend Semantic Search + CSV + Tavily Web Search")
# --- A MÁGICA ACONTECE AQUI ---
# Isso expõe automaticamente o endpoint /metrics para o Prometheus/Grafana
Instrumentator().instrument(app).expose(app)
# ------------------------------
@app.get("/")
async def root():
return {"status": "online", "message": "Aether Map API 7.2 (CSV + Tavily Ready)."}
@app.post("/csv_columns/")
async def get_columns_api(file: UploadFile = File(...)):
"""Retorna as colunas de um arquivo CSV para preview."""
if not file.filename.lower().endswith('.csv'):
raise HTTPException(status_code=400, detail="Arquivo deve ser CSV.")
try:
file_bytes = await file.read()
columns = get_csv_columns(file_bytes)
return {"columns": columns, "filename": file.filename}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Erro ao ler CSV: {str(e)}")
@app.post("/process/")
async def process_api(
n_samples: int = Form(10000),
file: UploadFile = File(...),
text_column: str = Form(None) # Coluna de texto para CSV
):
logging.info(f"Processando arquivo: {file.filename}")
try:
file_bytes = await file.read()
# Detectar tipo de arquivo e processar
if file.filename.lower().endswith('.csv'):
if not text_column:
raise HTTPException(status_code=400, detail="Para CSV, informe 'text_column'.")
textos = preparar_textos_csv(file_bytes, text_column, n_samples)
else:
textos = preparar_textos(file_bytes, n_samples)
if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.")
df, embeddings = processar_pipeline(textos)
job_id = str(uuid.uuid4())
cache[job_id] = {"embeddings": embeddings, "df": df}
logging.info(f"Job criado: {job_id}")
metricas_globais = calcular_metricas(df["full_text"].tolist())
analise_de_duplicados = encontrar_duplicados(df, embeddings)
analise_por_cluster_tfidf = analisar_clusters(df)
n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0)
n_ruido = int((df["cluster"] == "-1").sum())
return {
"job_id": job_id,
"metadata": {
"filename": file.filename,
"num_documents_processed": len(df),
"num_clusters_found": n_clusters,
"num_noise_points": n_ruido
},
"metrics": metricas_globais,
"duplicates": analise_de_duplicados,
"cluster_analysis": analise_por_cluster_tfidf,
"plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"),
}
except Exception as e:
logging.error(f"ERRO EM /process/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search/")
async def search_api(query: str = Form(...), job_id: str = Form(...)):
"""
ENDPOINT DE BUSCA (RAG Híbrido) com Monitoramento de Latência
"""
logging.info(f"Busca: '{query}' [Job: {job_id}]")
if job_id not in cache:
raise HTTPException(status_code=404, detail="Job ID não encontrado.")
try:
model = load_retriever()
reranker = load_reranker()
cached_data = cache[job_id]
df = cached_data["df"]
corpus_embeddings = cached_data["embeddings"]
# FASE 1: Varredura Ampla
query_embedding = model.encode([query], convert_to_numpy=True)
similarities = cosine_similarity(query_embedding, corpus_embeddings)[0]
top_k_retrieval = 50
top_indices = np.argsort(similarities)[-top_k_retrieval:][::-1]
candidate_docs = []
candidate_indices = []
for idx in top_indices:
if similarities[idx] > 0.15:
doc_text = df.iloc[int(idx)]["full_text"]
candidate_docs.append([query, doc_text])
candidate_indices.append(int(idx))
if not candidate_docs:
return {"summary": "Não foram encontrados documentos relevantes.", "results": []}
# FASE 2: Reranking
logging.info(f"Reranking {len(candidate_docs)} documentos...")
rerank_scores = reranker.predict(candidate_docs)
rerank_results = sorted(
zip(candidate_indices, rerank_scores),
key=lambda x: x[1],
reverse=True
)
final_top_k = 5
final_results = []
context_parts = []
for rank, (idx, score) in enumerate(rerank_results[:final_top_k]):
doc_text = df.iloc[idx]["full_text"]
context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------")
final_results.append({
"index": idx,
"score": float(score),
"cosine_score": float(similarities[idx]),
"citation_id": rank + 1
})
# FASE 3: Geração (Groq) com TELEMETRIA
summary = ""
if groq_client:
context_str = "\n".join(context_parts)
rag_prompt = (
"INSTRUÇÃO DE SISTEMA:\n"
"Você é o Aetherius, um motor de busca semântica de alta precisão.\n"
"Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n"
"REGRAS OBRIGATÓRIAS:\n"
"1. CITAÇÕES: Toda afirmação deve ter fonte [ID: x]. Ex: 'O lucro subiu [ID: 1].'\n"
"2. HONESTIDADE: Se não estiver no texto, diga que não encontrou.\n"
"3. IDIOMA: Português do Brasil.\n\n"
f"CONTEXTO RECUPERADO:\n{context_str}\n\n"
f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n"
"RESPOSTA:"
)
try:
# --- INÍCIO DA MEDIÇÃO DA API EXTERNA ---
start_time_groq = time.time()
chat_completion = groq_client.chat.completions.create(
messages=[{"role": "user", "content": rag_prompt}],
model="moonshotai/kimi-k2-instruct-0905",
temperature=0.1,
max_tokens=1024
)
# Registra o tempo gasto apenas na chamada da API
duration = time.time() - start_time_groq
GROQ_LATENCY.observe(duration)
# --- FIM DA MEDIÇÃO ---
summary = chat_completion.choices[0].message.content.strip()
except Exception as e:
logging.warning(f"Erro na geração do LLM: {e}")
summary = "Não foi possível gerar o resumo automático, mas os documentos estão listados abaixo."
return {"summary": summary, "results": final_results}
except Exception as e:
logging.error(f"ERRO EM /search/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/describe_clusters/")
async def describe_clusters_api(job_id: str = Form(...)):
logging.info(f"Descrevendo clusters para Job: {job_id}")
if not groq_client: raise HTTPException(status_code=503, detail="Groq indisponível.")
if job_id not in cache: raise HTTPException(status_code=404, detail="Job não encontrado.")
try:
cached_data = cache[job_id]
df = cached_data["df"]
embeddings = cached_data["embeddings"]
champion_docs_by_cluster = {}
cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
for cid in cluster_ids:
mask = df["cluster"] == cid
cluster_embeddings = embeddings[mask]
cluster_texts = df[mask]["full_text"].tolist()
if len(cluster_texts) < 3: continue
centroid = np.mean(cluster_embeddings, axis=0)
similarities = cosine_similarity([centroid], cluster_embeddings)[0]
top_indices = np.argsort(similarities)[-3:][::-1]
champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices]
if not champion_docs_by_cluster: return {"insights": {}}
prompt_sections = []
for cid, docs in champion_docs_by_cluster.items():
doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs])
prompt_sections.append(f"Grupo {cid}:\n{doc_list}")
master_prompt = (
"Analise os grupos de texto abaixo. Para cada grupo, retorne um JSON com 'topic_name' e 'core_insight'.\n"
"Responda APENAS o JSON válido.\n\n" + "\n\n".join(prompt_sections)
)
# --- INÍCIO DA MEDIÇÃO DA API EXTERNA ---
start_time_groq = time.time()
chat_completion = groq_client.chat.completions.create(
messages=[
{"role": "system", "content": "JSON Output Only."},
{"role": "user", "content": master_prompt},
], model="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0.2,
)
duration = time.time() - start_time_groq
GROQ_LATENCY.observe(duration)
# --- FIM DA MEDIÇÃO ---
response_content = chat_completion.choices[0].message.content
insights = json.loads(response_content.strip().replace("```json", "").replace("```", ""))
return {"insights": insights}
except Exception as e:
logging.error(f"ERRO EM /describe_clusters/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ==============================================================================
# ENDPOINT TAVILY WEB SEARCH
# ==============================================================================
@app.post("/search_web/")
async def search_web_api(
query: str = Form(...),
max_results: int = Form(20),
search_depth: str = Form("basic") # "basic" ou "advanced"
):
"""
Busca na web via Tavily e processa resultados para visualização.
"""
if not tavily_client:
raise HTTPException(status_code=503, detail="Tavily não configurado. Defina TAVILY_API_KEY.")
logging.info(f"Tavily Search: '{query}' (max: {max_results})")
try:
# Buscar via Tavily
search_result = tavily_client.search(
query=query,
max_results=max_results,
search_depth=search_depth,
include_answer=False
)
results = search_result.get("results", [])
if not results:
return {"error": "Nenhum resultado encontrado.", "results_count": 0}
# Extrair textos dos resultados
textos = []
sources = []
for r in results:
title = r.get("title", "")
content = r.get("content", "")
url = r.get("url", "")
# Combinar título + conteúdo
full_text = f"{title}: {content}" if title else content
if len(full_text.strip().split()) > 5:
textos.append(full_text.strip())
sources.append({"title": title, "url": url})
if not textos:
return {"error": "Resultados sem conteúdo válido.", "results_count": 0}
# Processar através do pipeline com parâmetros para datasets pequenos
df, embeddings = processar_pipeline(textos, small_dataset=True)
# Adicionar URL de origem a cada ponto
df["source_url"] = [sources[i]["url"] if i < len(sources) else "" for i in range(len(df))]
df["source_title"] = [sources[i]["title"] if i < len(sources) else "" for i in range(len(df))]
# Criar job e cachear
job_id = str(uuid.uuid4())
cache[job_id] = {"embeddings": embeddings, "df": df, "sources": sources}
logging.info(f"Tavily Job criado: {job_id}")
# Calcular métricas e análises
metricas_globais = calcular_metricas(df["full_text"].tolist())
analise_por_cluster_tfidf = analisar_clusters(df)
n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0)
n_ruido = int((df["cluster"] == "-1").sum())
return {
"job_id": job_id,
"metadata": {
"query": query,
"source": "tavily_web_search",
"num_documents_processed": len(df),
"num_clusters_found": n_clusters,
"num_noise_points": n_ruido
},
"metrics": metricas_globais,
"cluster_analysis": analise_por_cluster_tfidf,
"plot_data": df[["x", "y", "z", "cluster", "full_text", "source_url", "source_title"]].to_dict("records"),
"sources": sources
}
except Exception as e:
logging.error(f"ERRO EM /search_web/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ==============================================================================
# ENDPOINT KNOWLEDGE GRAPH (NER)
# ==============================================================================
@app.post("/entity_graph/")
async def entity_graph_api(job_id: str = Form(...)):
"""
Extrai entidades nomeadas e constrói grafo de conexões entre documentos.
"""
if job_id not in cache:
raise HTTPException(status_code=404, detail="Job ID não encontrado.")
logging.info(f"Construindo Knowledge Graph para Job: {job_id}")
try:
cached_data = cache[job_id]
df = cached_data["df"]
textos = df["full_text"].tolist()
# Extrair entidades
logging.info(f"Extraindo entidades de {len(textos)} documentos...")
entities_by_doc = extract_entities(textos)
# Construir posições dos pontos
positions = df[["x", "y", "z"]].to_dict("records")
# Construir grafo
graph_data = build_entity_graph(entities_by_doc, positions)
# Adicionar posições ao resultado
graph_data["positions"] = positions
graph_data["num_documents"] = len(textos)
# Entidades por documento (para tooltip)
graph_data["entities_by_doc"] = [
[{"text": e[0], "type": e[1]} for e in ents]
for ents in entities_by_doc
]
# Adicionar grafo entidade-entidade
entity_network = build_entity_to_entity_graph(entities_by_doc)
graph_data["entity_network"] = entity_network
logging.info(f"Grafo construído: {graph_data['edge_count']} arestas doc-doc, {entity_network['node_count']} nós entidade")
return graph_data
except Exception as e:
logging.error(f"ERRO EM /entity_graph/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ==============================================================================
# ENDPOINT ANÁLISE DO GRAFO COM LLM (SÁBIO)
# ==============================================================================
@app.post("/analyze_graph/")
async def analyze_graph_api(job_id: str = Form(...)):
"""
Usa o LLM para analisar o Knowledge Graph e extrair insights semânticos.
"""
if job_id not in cache:
raise HTTPException(status_code=404, detail="Job ID não encontrado.")
logging.info(f"Analisando grafo com LLM para Job: {job_id}")
try:
cached_data = cache[job_id]
df = cached_data["df"]
textos = df["full_text"].tolist()
# Extrair entidades e construir grafo
entities_by_doc = extract_entities(textos)
entity_network = build_entity_to_entity_graph(entities_by_doc)
# Preparar resumo do grafo para o LLM
nodes = entity_network.get("nodes", [])[:15]
edges = entity_network.get("edges", [])[:20]
hubs = entity_network.get("hubs", [])[:5]
insights = entity_network.get("insights", {})
graph_summary = f"""
ANÁLISE DE KNOWLEDGE GRAPH
## Visão Geral
- {len(nodes)} entidades principais
- {insights.get('total_connections', 0)} conexões totais
- {insights.get('hub_count', 0)} hubs identificados
## Entidades Principais (por importância):
{chr(10).join([f"- {n['entity']} ({n['type']}): {n['docs']} docs, centralidade {n.get('centrality', 0)}" for n in nodes])}
## Hubs Centrais (entidades mais conectadas):
{chr(10).join([f"- {h['entity']} ({h['type']}): {h['degree']} conexões" for h in hubs]) if hubs else "Nenhum hub identificado"}
## Conexões Mais Fortes:
{chr(10).join([f"- {e['source_entity']}{e['target_entity']}: {e['weight']} co-ocorrências" for e in edges[:10]])}
"""
# Prompt para análise
prompt = f"""Você é um analista de inteligência especializado em análise de redes e grafos de conhecimento.
Analise o seguinte Knowledge Graph extraído de documentos e forneça insights acionáveis:
{graph_summary}
Por favor forneça:
1. **Narrativa Central**: Qual é a história principal que conecta estas entidades?
2. **Atores Chave**: Quem são os principais players e qual seu papel?
3. **Relações Ocultas**: Que conexões não-óbvias você identifica?
4. **Padrões**: Algum padrão interessante nos tipos de entidades ou conexões?
5. **Investigação**: O que valeria a pena investigar mais a fundo?
Responda de forma concisa e acionável, como um briefing de inteligência."""
# Chamar LLM
completion = groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
analysis = completion.choices[0].message.content
return {
"analysis": analysis,
"graph_summary": {
"entities": len(nodes),
"connections": insights.get('total_connections', 0),
"hubs": len(hubs)
}
}
except Exception as e:
logging.error(f"ERRO EM /analyze_graph/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))