AetherMap / app.py
Madras1's picture
Upload app.py
7d9aea8 verified
# ==============================================================================
# API do AetherMap — VERSÃO 7.4 (FAISS EDITION)
# Backend com RAG Híbrido, CSV, Tavily, NER Entity Graph, FAISS ANN
# ==============================================================================
import numpy as np
import pandas as pd
import torch
import gc
import uuid
import os
import io
import json
import logging
import time
import nltk
from nltk.corpus import stopwords
from collections import defaultdict
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Tuple
from functools import lru_cache
# Ferramentas de Alquimia (ML & NLP)
from sentence_transformers import SentenceTransformer, CrossEncoder
import umap
import hdbscan
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from scipy.stats import entropy
import faiss
# NER & Language Detection
import spacy
from langdetect import detect, LangDetectException
# Monitoramento (O Toque da Berta)
from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Histogram
# A Conexão com o Oráculo
from groq import Groq
# ==============================================================================
# CONFIGURAÇÕES GERAIS E LOGGING
# ==============================================================================
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Modelos de IA
RETRIEVAL_MODEL = "all-MiniLM-L6-v2" # Rápido para varredura inicial
RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2" # Preciso para reordenação
# Parâmetros de Processamento
BATCH_SIZE = 256
UMAP_N_NEIGHBORS = 30
# Cache de Sessão (Na memória RAM)
cache: Dict[str, Any] = {}
# Definição de Métricas Customizadas do Prometheus
# Isso permite separar a latência da sua lógica vs a latência da API externa
GROQ_LATENCY = Histogram(
"groq_api_latency_seconds",
"Tempo de resposta da API externa Groq (LLM Generation)",
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0]
)
# Inicialização do Cliente Groq
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
try:
if not GROQ_API_KEY:
logging.warning("GROQ_API_KEY não encontrada. Funcionalidades de LLM estarão indisponíveis.")
groq_client = None
else:
groq_client = Groq(api_key=GROQ_API_KEY)
logging.info("Cliente Groq inicializado com sucesso.")
except Exception as e:
logging.error(f"FALHA AO INICIALIZAR GROQ: {e}")
groq_client = None
# Inicialização do Cliente Tavily (Web Search)
TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY")
tavily_client = None
try:
if TAVILY_API_KEY:
from tavily import TavilyClient
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
logging.info("Cliente Tavily inicializado com sucesso.")
else:
logging.warning("TAVILY_API_KEY não encontrada. Busca web estará indisponível.")
except Exception as e:
logging.error(f"FALHA AO INICIALIZAR TAVILY: {e}")
tavily_client = None
# ==============================================================================
# GERENCIAMENTO HÍBRIDO DE STOP WORDS (NLTK + ARQUIVO TXT)
# ==============================================================================
def carregar_stopwords():
"""
Carrega stop words do NLTK e combina com um arquivo externo 'stopwords.txt'.
"""
logging.info("Iniciando carregamento de Stop Words...")
# 1. Base Gramatical (NLTK - Inglês e Português)
try:
nltk.data.find('corpora/stopwords')
except LookupError:
logging.info("Baixando corpus de stopwords...")
nltk.download('stopwords')
# Cria um conjunto único com PT e EN
final_stops = set(stopwords.words('portuguese')) | set(stopwords.words('english'))
logging.info(f"Stopwords base (NLTK) carregadas: {len(final_stops)}")
# 2. Base Customizada
arquivo_custom = "stopwords.txt"
if os.path.exists(arquivo_custom):
logging.info(f"Arquivo '{arquivo_custom}' encontrado. Lendo palavras customizadas...")
try:
count_custom = 0
with open(arquivo_custom, "r", encoding="utf-8") as f:
for linha in f:
palavra = linha.split('#')[0].strip().lower()
if palavra and len(palavra) > 1:
final_stops.add(palavra)
count_custom += 1
logging.info(f"{count_custom} stop words customizadas importadas do arquivo.")
except Exception as e:
logging.error(f"Erro ao ler '{arquivo_custom}': {e}")
else:
logging.warning(f"Arquivo '{arquivo_custom}' não encontrado. Usando apenas NLTK.")
lista_final = list(final_stops)
logging.info(f"Total final de Stop Words ativas: {len(lista_final)}")
return lista_final
# Variável global carregada na inicialização
STOP_WORDS_MULTILINGUAL = carregar_stopwords()
# ==============================================================================
# CARREGAMENTO DE MODELOS (COM CACHE)
# ==============================================================================
@lru_cache(maxsize=1)
def load_retriever():
device = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}")
return SentenceTransformer(RETRIEVAL_MODEL, device=device)
@lru_cache(maxsize=1)
def load_reranker():
device = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}")
return CrossEncoder(RERANKER_MODEL, device=device)
# Cache for spaCy models
_spacy_models = {}
_spacy_available = True
def load_spacy_model(lang: str):
"""Carrega modelo spaCy com cache."""
global _spacy_available
if not _spacy_available:
return None
if lang not in _spacy_models:
model_name = "pt_core_news_sm" if lang == "pt" else "en_core_web_sm"
try:
_spacy_models[lang] = spacy.load(model_name)
logging.info(f"Modelo spaCy '{model_name}' carregado.")
except OSError:
logging.warning(f"Modelo {model_name} não encontrado. Tentando baixar...")
try:
import subprocess
subprocess.run(["python", "-m", "spacy", "download", model_name], check=True)
_spacy_models[lang] = spacy.load(model_name)
except Exception as e:
logging.error(f"Falha ao baixar modelo spaCy: {e}")
_spacy_available = False
return None
return _spacy_models[lang]
def detect_language(texts: List[str]) -> str:
"""Detecta idioma predominante nos textos."""
sample = " ".join(texts[:10])[:1000]
try:
lang = detect(sample)
return "pt" if lang == "pt" else "en"
except LangDetectException:
return "en"
def extract_entities(textos: List[str]) -> List[List[Tuple[str, str]]]:
"""Extrai entidades nomeadas de cada texto."""
lang = detect_language(textos)
nlp = load_spacy_model(lang)
# Fallback se spaCy não estiver disponível
if nlp is None:
logging.warning("spaCy não disponível. Retornando entidades vazias.")
return [[] for _ in textos]
entities_by_doc = []
for text in textos:
# Limitar tamanho do texto para performance
doc = nlp(text[:2000])
entities = [(ent.text.lower().strip(), ent.label_) for ent in doc.ents
if len(ent.text.strip()) > 2 and ent.label_ in ("PERSON", "PER", "ORG", "GPE", "LOC")]
entities_by_doc.append(entities)
# Normalizar entidades para deduplicação
return normalize_entities(entities_by_doc)
def normalize_entities(entities_by_doc: List[List[Tuple[str, str]]]) -> List[List[Tuple[str, str]]]:
"""Normaliza entidades para agrupar variações do mesmo nome."""
# Coletar todas as entidades únicas por tipo
all_entities = defaultdict(set)
for entities in entities_by_doc:
for text, etype in entities:
all_entities[etype].add(text)
# Criar mapeamento de normalização
# Agrupa entidades onde uma contém a outra ou são muito similares
normalization_map = {}
for etype, entity_set in all_entities.items():
entities_list = sorted(entity_set, key=len, reverse=True) # Maiores primeiro
for entity in entities_list:
if entity in normalization_map:
continue
# Encontrar entidades que são parte desta ou similares
canonical = entity
for other in entities_list:
if other == entity:
continue
# Se uma contém a outra (ex: "donald trump" contém "trump")
if other in entity or entity in other:
# Usar a mais completa como canônica
if len(entity) >= len(other):
normalization_map[(other, etype)] = (entity, etype)
else:
normalization_map[(entity, etype)] = (other, etype)
canonical = other
# Mapear para si mesmo se não foi mapeado
if (entity, etype) not in normalization_map:
normalization_map[(entity, etype)] = (canonical, etype)
# Aplicar normalização
normalized_docs = []
for entities in entities_by_doc:
normalized = []
seen = set()
for text, etype in entities:
canonical = normalization_map.get((text, etype), (text, etype))
if canonical not in seen:
seen.add(canonical)
normalized.append(canonical)
normalized_docs.append(normalized)
logging.info(f"Normalização: {len(all_entities)} tipos, mapa com {len(normalization_map)} entradas")
return normalized_docs
def build_entity_graph(entities_by_doc: List[List[Tuple[str, str]]],
positions: List[Dict]) -> Dict[str, Any]:
"""Constrói grafo de conexões baseado em entidades compartilhadas."""
# Inverter: entidade -> lista de doc indices
entity_to_docs = defaultdict(set)
for doc_idx, entities in enumerate(entities_by_doc):
for entity_text, entity_type in entities:
entity_to_docs[(entity_text, entity_type)].add(doc_idx)
# Construir arestas (conexões entre docs que compartilham entidades)
edges = []
seen_pairs = set()
for (entity_text, entity_type), doc_indices in entity_to_docs.items():
if len(doc_indices) < 2:
continue
doc_list = sorted(doc_indices)
for i in range(len(doc_list)):
for j in range(i + 1, len(doc_list)):
pair = (doc_list[i], doc_list[j])
if pair not in seen_pairs:
seen_pairs.add(pair)
edges.append({
"source": doc_list[i],
"target": doc_list[j],
"entity": entity_text,
"entity_type": entity_type
})
# Contar entidades mais frequentes
entity_counts = [(k, len(v)) for k, v in entity_to_docs.items() if len(v) >= 2]
top_entities = sorted(entity_counts, key=lambda x: x[1], reverse=True)[:20]
return {
"edges": edges,
"edge_count": len(edges),
"connected_pairs": len(seen_pairs),
"top_entities": [{"entity": e[0][0], "type": e[0][1], "docs": e[1]} for e in top_entities]
}
def build_entity_to_entity_graph(entities_by_doc: List[List[Tuple[str, str]]]) -> Dict[str, Any]:
"""
Constrói grafo onde os NÓS são entidades e ARESTAS são co-ocorrências.
Entidades que aparecem no mesmo documento são conectadas.
"""
# Contar co-ocorrências
cooccurrence = defaultdict(int)
entity_doc_count = defaultdict(int)
for entities in entities_by_doc:
unique_entities = list(set(entities))
# Contar docs por entidade
for ent in unique_entities:
entity_doc_count[ent] += 1
# Criar pares de co-ocorrência
for i in range(len(unique_entities)):
for j in range(i + 1, len(unique_entities)):
pair = tuple(sorted([unique_entities[i], unique_entities[j]], key=str))
cooccurrence[pair] += 1
# Construir nós (entidades com >= 2 docs, limitado a top 50)
nodes = []
entity_to_id = {}
# Filtrar e ordenar por frequência
valid_entities = [(e, c) for e, c in entity_doc_count.items() if c >= 2]
valid_entities.sort(key=lambda x: x[1], reverse=True)
valid_entities = valid_entities[:50] # Limitar a 50 nós principais
node_count = len(valid_entities)
node_idx = 0
for entity, count in valid_entities:
entity_to_id[entity] = node_idx
# Layout em esfera 3D (melhor para muitos nós)
# Golden angle para distribuição uniforme
golden_angle = np.pi * (3 - np.sqrt(5))
theta = node_idx * golden_angle
phi = np.arccos(1 - 2 * (node_idx + 0.5) / max(node_count, 1))
radius = 3.0 # Raio fixo maior
x = radius * np.sin(phi) * np.cos(theta)
y = radius * np.sin(phi) * np.sin(theta)
z = radius * np.cos(phi)
nodes.append({
"id": node_idx,
"entity": entity[0],
"type": entity[1],
"docs": count,
"x": float(x),
"y": float(y),
"z": float(z)
})
node_idx += 1
# Construir arestas (co-ocorrências)
entity_edges = []
for (ent1, ent2), weight in cooccurrence.items():
if ent1 in entity_to_id and ent2 in entity_to_id and weight >= 1:
entity_edges.append({
"source": entity_to_id[ent1],
"target": entity_to_id[ent2],
"weight": weight,
"source_entity": ent1[0],
"target_entity": ent2[0],
"reason": f"Aparecem juntos em {weight} documento(s)"
})
# Ordenar arestas por peso
entity_edges.sort(key=lambda x: x["weight"], reverse=True)
# Calcular métricas de grafo
# Degree centrality (número de conexões de cada nó)
degree = defaultdict(int)
for edge in entity_edges:
degree[edge["source"]] += edge["weight"]
degree[edge["target"]] += edge["weight"]
# Calcular max degree para normalização
max_degree = max(degree.values()) if degree else 1
# Atualizar nós com métricas
hubs = []
for node in nodes:
node_degree = degree.get(node["id"], 0)
node["degree"] = node_degree
node["centrality"] = round(node_degree / max_degree, 3)
# Classificar nó
if node["centrality"] > 0.7:
node["role"] = "hub" # Hub central
hubs.append(node)
elif node["centrality"] > 0.3:
node["role"] = "connector" # Conector
else:
node["role"] = "peripheral" # Periférico
# Top hubs
hubs.sort(key=lambda x: x["degree"], reverse=True)
return {
"nodes": nodes,
"edges": entity_edges[:200], # Limitar a 200 arestas mais fortes
"node_count": len(nodes),
"edge_count": len(entity_edges),
"hubs": [{"entity": h["entity"], "type": h["type"], "degree": h["degree"]} for h in hubs[:5]],
"insights": {
"total_connections": sum(degree.values()) // 2,
"avg_degree": round(sum(degree.values()) / len(degree), 1) if degree else 0,
"hub_count": len(hubs)
}
}
# ==============================================================================
# PIPELINE DE PROCESSAMENTO DE DADOS
# ==============================================================================
def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]:
"""Prepara textos de arquivo TXT (uma linha por documento)."""
linhas = file_bytes.decode("utf-8", errors="ignore").splitlines()
textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3]
return textos[:n_samples]
def detect_csv_separator(file_bytes: bytes) -> str:
"""Detecta separador do CSV (vírgula ou ponto-e-vírgula)."""
sample = file_bytes[:4096].decode("utf-8", errors="ignore")
first_line = sample.split('\n')[0]
# Conta ocorrências de cada separador na primeira linha
commas = first_line.count(',')
semicolons = first_line.count(';')
return ';' if semicolons > commas else ','
def read_csv_smart(file_bytes: bytes, nrows=None) -> pd.DataFrame:
"""Lê CSV com detecção automática de separador e encoding."""
sep = detect_csv_separator(file_bytes)
try:
df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="utf-8", nrows=nrows)
except UnicodeDecodeError:
df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="latin-1", nrows=nrows)
return df
def preparar_textos_csv(file_bytes: bytes, text_column: str, n_samples: int) -> List[str]:
"""Prepara textos de arquivo CSV extraindo coluna especificada."""
df = read_csv_smart(file_bytes)
if text_column not in df.columns:
available = ", ".join(df.columns.tolist()[:10])
raise ValueError(f"Coluna '{text_column}' não encontrada. Colunas disponíveis: {available}")
textos = df[text_column].dropna().astype(str).tolist()
# Filtrar textos muito curtos
textos = [t.strip() for t in textos if len(t.strip().split()) > 3]
return textos[:n_samples]
def get_csv_columns(file_bytes: bytes) -> List[str]:
"""Retorna lista de colunas de um arquivo CSV."""
df = read_csv_smart(file_bytes, nrows=0)
return df.columns.tolist()
def processar_pipeline(
textos: List[str],
small_dataset: bool = False,
fast_mode: bool = False,
custom_min_cluster_size: int = 0, # 0 = Auto
custom_min_samples: int = 0 # 0 = Auto
) -> (pd.DataFrame, np.ndarray):
"""
Processa textos através do pipeline de embeddings, redução dimensional e clustering.
Args:
textos: Lista de textos
small_dataset: Se True, usa parâmetros otimizados para datasets pequenos (Tavily)
fast_mode: Se True, usa PCA ao invés de UMAP (10-100x mais rápido)
custom_min_cluster_size: Tamanho mínimo do cluster (0 = Auto)
custom_min_samples: Número mínimo de amostras (0 = Auto)
"""
num_textos = len(textos)
logging.info(f"Iniciando pipeline para {num_textos} textos (fast_mode={fast_mode})...")
model = load_retriever()
# 1. Embeddings
embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True)
# 2. Redução Dimensional
if fast_mode:
# PCA - O(N) muito mais rápido que UMAP O(N²)
from sklearn.decomposition import PCA
logging.info("Usando PCA (modo rápido)...")
reducer = PCA(n_components=3, random_state=42)
emb_3d = reducer.fit_transform(embeddings)
else:
# UMAP - mais preciso mas lento
n_neighbors = min(15, max(3, num_textos - 1)) if small_dataset else UMAP_N_NEIGHBORS
logging.info(f"Usando UMAP (n_neighbors={n_neighbors})...")
reducer = umap.UMAP(n_components=3, n_neighbors=n_neighbors, min_dist=0.0, metric="cosine", random_state=42)
emb_3d = reducer.fit_transform(embeddings)
emb_3d = StandardScaler().fit_transform(emb_3d)
# 3. HDBSCAN - usa parâmetros customizados ou adaptativos
if custom_min_cluster_size > 0:
# Usar valores customizados do usuário
min_size = custom_min_cluster_size
min_samples_val = custom_min_samples if custom_min_samples > 0 else None
logging.info(f"Usando parâmetros customizados: min_cluster_size={min_size}, min_samples={min_samples_val}")
elif small_dataset:
# Para Tavily (10-50 docs): clusters menores, mais agressivo
min_size = max(2, int(num_textos * 0.1))
min_samples_val = 1
elif fast_mode:
# Para PCA: clusters menores já que PCA não separa tão bem quanto UMAP
min_size = max(8, int(num_textos * 0.01))
min_samples_val = 3
else:
# Para datasets grandes com UMAP: comportamento padrão
min_size = max(10, int(num_textos * 0.02))
min_samples_val = None
logging.info(f"HDBSCAN: min_cluster_size={min_size}, min_samples={min_samples_val}")
clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size, min_samples=min_samples_val)
clusters = clusterer.fit_predict(emb_3d)
# 4. DataFrame
df = pd.DataFrame({
"x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2],
"full_text": textos, "cluster": clusters.astype(str)
})
del reducer, clusterer, emb_3d; gc.collect()
return df, embeddings
def calcular_metricas(textos: List[str]) -> Dict[str, Any]:
logging.info("Calculando métricas globais...")
if not textos: return {}
# Token pattern: só palavras alfabéticas com 3+ caracteres (ignora números)
token_pattern = r'\b[a-zA-ZÀ-ÿ]{3,}\b'
vectorizer_count = CountVectorizer(
stop_words=STOP_WORDS_MULTILINGUAL,
max_features=1000,
token_pattern=token_pattern
)
vectorizer_tfidf = TfidfVectorizer(
stop_words=STOP_WORDS_MULTILINGUAL,
max_features=1000,
token_pattern=token_pattern
)
try:
counts_matrix = vectorizer_count.fit_transform(textos)
tfidf_matrix = vectorizer_tfidf.fit_transform(textos)
except ValueError:
return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0}
vocab_count = vectorizer_count.get_feature_names_out()
contagens = counts_matrix.sum(axis=0).A1
vocab_tfidf = vectorizer_tfidf.get_feature_names_out()
soma_tfidf = tfidf_matrix.sum(axis=0).A1
top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1]
top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf]
return {
"riqueza_lexical": len(vocab_count),
"top_tfidf_palavras": top_tfidf,
"entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0
}
def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]:
logging.info("Detectando duplicados...")
mask = df["full_text"].duplicated(keep=False)
grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()}
pares_semanticos = []
if 2 < len(embeddings) < 5000:
sim = cosine_similarity(embeddings)
triu_indices = np.triu_indices_from(sim, k=1)
sim_vetor = sim[triu_indices]
pares_idx = np.where(sim_vetor > 0.98)[0]
top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]]
for i in top_pares_idx:
idx1, idx2 = triu_indices[0][i], triu_indices[1][i]
if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]:
pares_semanticos.append({
"similaridade": float(sim[idx1, idx2]),
"texto1": df["full_text"].iloc[idx1],
"texto2": df["full_text"].iloc[idx2]
})
return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos}
def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]:
logging.info("Analisando clusters...")
analise = {}
ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
for cid in ids_clusters_validos:
textos_cluster = df[df["cluster"] == cid]["full_text"].tolist()
if len(textos_cluster) < 2: continue
try:
vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000)
tfidf_matrix = vectorizer.fit_transform(textos_cluster)
vocab = vectorizer.get_feature_names_out()
soma = tfidf_matrix.sum(axis=0).A1
top_idx = np.argsort(soma)[-5:][::-1]
top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx]
except ValueError:
top_palavras = []
analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras}
return analise
# ==============================================================================
# API FASTAPI & INSTRUMENTAÇÃO
# ==============================================================================
app = FastAPI(title="AetherMap API 7.2", version="7.2.0", description="Backend Semantic Search + CSV + Tavily Web Search")
# --- A MÁGICA ACONTECE AQUI ---
# Isso expõe automaticamente o endpoint /metrics para o Prometheus/Grafana
Instrumentator().instrument(app).expose(app)
# ------------------------------
@app.get("/")
async def root():
return {"status": "online", "message": "Aether Map API 7.2 (CSV + Tavily Ready)."}
@app.post("/csv_columns/")
async def get_columns_api(file: UploadFile = File(...)):
"""Retorna as colunas de um arquivo CSV para preview."""
if not file.filename.lower().endswith('.csv'):
raise HTTPException(status_code=400, detail="Arquivo deve ser CSV.")
try:
file_bytes = await file.read()
columns = get_csv_columns(file_bytes)
return {"columns": columns, "filename": file.filename}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Erro ao ler CSV: {str(e)}")
@app.post("/process/")
async def process_api(
n_samples: int = Form(10000),
file: UploadFile = File(...),
text_column: str = Form(None), # Coluna de texto para CSV
fast_mode: str = Form("false"), # Modo rápido (PCA ao invés de UMAP)
min_cluster_size: str = Form("0"), # 0 = Auto
min_samples: str = Form("0") # 0 = Auto
):
# Converter strings para tipos apropriados
fast_mode_bool = fast_mode.lower() in ("true", "1", "yes")
min_cluster_size_int = int(min_cluster_size) if min_cluster_size.isdigit() else 0
min_samples_int = int(min_samples) if min_samples.isdigit() else 0
logging.info(f"Processando: {file.filename} (fast_mode={fast_mode_bool}, min_cluster={min_cluster_size_int}, min_samples={min_samples_int})")
try:
file_bytes = await file.read()
# Detectar tipo de arquivo e processar
if file.filename.lower().endswith('.csv'):
if not text_column:
raise HTTPException(status_code=400, detail="Para CSV, informe 'text_column'.")
textos = preparar_textos_csv(file_bytes, text_column, n_samples)
else:
textos = preparar_textos(file_bytes, n_samples)
if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.")
# Auto-enable fast_mode para datasets grandes
if len(textos) > 5000 and not fast_mode_bool:
logging.warning(f"Dataset grande ({len(textos)} docs) - considere usar fast_mode=True")
df, embeddings = processar_pipeline(
textos,
fast_mode=fast_mode_bool,
custom_min_cluster_size=min_cluster_size_int,
custom_min_samples=min_samples_int
)
# Criar índice FAISS para busca rápida
embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1]) # Inner Product = Cosine sim para vetores normalizados
faiss_index.add(embeddings_normalized.astype('float32'))
job_id = str(uuid.uuid4())
cache[job_id] = {
"embeddings": embeddings,
"embeddings_normalized": embeddings_normalized,
"faiss_index": faiss_index,
"df": df
}
logging.info(f"Job criado: {job_id} (FAISS index com {faiss_index.ntotal} vetores)")
metricas_globais = calcular_metricas(df["full_text"].tolist())
analise_de_duplicados = encontrar_duplicados(df, embeddings)
analise_por_cluster_tfidf = analisar_clusters(df)
n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0)
n_ruido = int((df["cluster"] == "-1").sum())
return {
"job_id": job_id,
"metadata": {
"filename": file.filename,
"num_documents_processed": len(df),
"num_clusters_found": n_clusters,
"num_noise_points": n_ruido
},
"metrics": metricas_globais,
"duplicates": analise_de_duplicados,
"cluster_analysis": analise_por_cluster_tfidf,
"plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"),
}
except Exception as e:
logging.error(f"ERRO EM /process/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search/")
async def search_api(query: str = Form(...), job_id: str = Form(...)):
"""
ENDPOINT DE BUSCA (RAG Híbrido) com Monitoramento de Latência
"""
logging.info(f"Busca: '{query}' [Job: {job_id}]")
if job_id not in cache:
raise HTTPException(status_code=404, detail="Job ID não encontrado.")
try:
model = load_retriever()
reranker = load_reranker()
cached_data = cache[job_id]
df = cached_data["df"]
faiss_index = cached_data.get("faiss_index")
# FASE 1: Busca FAISS (O(log N) ao invés de O(N))
query_embedding = model.encode([query], convert_to_numpy=True)
query_normalized = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)
top_k_retrieval = min(50, faiss_index.ntotal) if faiss_index else 50
if faiss_index:
# FAISS search - retorna (distances, indices)
scores, top_indices = faiss_index.search(query_normalized.astype('float32'), top_k_retrieval)
scores = scores[0] # Flatten
top_indices = top_indices[0]
logging.info(f"FAISS search: top score = {scores[0]:.3f}")
else:
# Fallback para busca bruta se não tiver FAISS
corpus_embeddings = cached_data["embeddings"]
similarities = cosine_similarity(query_embedding, corpus_embeddings)[0]
top_indices = np.argsort(similarities)[-top_k_retrieval:][::-1]
scores = similarities[top_indices]
candidate_docs = []
candidate_indices = []
for i, idx in enumerate(top_indices):
if idx < 0: # FAISS pode retornar -1 se não tiver resultados suficientes
continue
if scores[i] > 0.15:
doc_text = df.iloc[int(idx)]["full_text"]
candidate_docs.append([query, doc_text])
candidate_indices.append(int(idx))
if not candidate_docs:
return {"summary": "Não foram encontrados documentos relevantes.", "results": []}
# FASE 2: Reranking
logging.info(f"Reranking {len(candidate_docs)} documentos...")
rerank_scores = reranker.predict(candidate_docs)
rerank_results = sorted(
zip(candidate_indices, rerank_scores),
key=lambda x: x[1],
reverse=True
)
final_top_k = 5
final_results = []
context_parts = []
for rank, (idx, score) in enumerate(rerank_results[:final_top_k]):
doc_text = df.iloc[idx]["full_text"]
context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------")
final_results.append({
"index": idx,
"score": float(score),
"cosine_score": float(similarities[idx]),
"citation_id": rank + 1
})
# FASE 3: Geração (Groq) com TELEMETRIA
summary = ""
if groq_client:
context_str = "\n".join(context_parts)
rag_prompt = (
"INSTRUÇÃO DE SISTEMA:\n"
"Você é o Aetherius, um motor de busca semântica de alta precisão.\n"
"Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n"
"REGRAS OBRIGATÓRIAS:\n"
"1. CITAÇÕES: Toda afirmação deve ter fonte [ID: x]. Ex: 'O lucro subiu [ID: 1].'\n"
"2. HONESTIDADE: Se não estiver no texto, diga que não encontrou.\n"
"3. IDIOMA: Português do Brasil.\n\n"
f"CONTEXTO RECUPERADO:\n{context_str}\n\n"
f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n"
"RESPOSTA:"
)
try:
# --- INÍCIO DA MEDIÇÃO DA API EXTERNA ---
start_time_groq = time.time()
chat_completion = groq_client.chat.completions.create(
messages=[{"role": "user", "content": rag_prompt}],
model="moonshotai/kimi-k2-instruct-0905",
temperature=0.1,
max_tokens=1024
)
# Registra o tempo gasto apenas na chamada da API
duration = time.time() - start_time_groq
GROQ_LATENCY.observe(duration)
# --- FIM DA MEDIÇÃO ---
summary = chat_completion.choices[0].message.content.strip()
except Exception as e:
logging.warning(f"Erro na geração do LLM: {e}")
summary = "Não foi possível gerar o resumo automático, mas os documentos estão listados abaixo."
return {"summary": summary, "results": final_results}
except Exception as e:
logging.error(f"ERRO EM /search/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/describe_clusters/")
async def describe_clusters_api(job_id: str = Form(...)):
logging.info(f"Descrevendo clusters para Job: {job_id}")
if not groq_client: raise HTTPException(status_code=503, detail="Groq indisponível.")
if job_id not in cache: raise HTTPException(status_code=404, detail="Job não encontrado.")
try:
cached_data = cache[job_id]
df = cached_data["df"]
embeddings = cached_data["embeddings"]
champion_docs_by_cluster = {}
cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
for cid in cluster_ids:
mask = df["cluster"] == cid
cluster_embeddings = embeddings[mask]
cluster_texts = df[mask]["full_text"].tolist()
if len(cluster_texts) < 3: continue
centroid = np.mean(cluster_embeddings, axis=0)
similarities = cosine_similarity([centroid], cluster_embeddings)[0]
top_indices = np.argsort(similarities)[-3:][::-1]
champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices]
if not champion_docs_by_cluster: return {"insights": {}}
prompt_sections = []
for cid, docs in champion_docs_by_cluster.items():
doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs])
prompt_sections.append(f"Grupo {cid}:\n{doc_list}")
master_prompt = (
"Analise os grupos de texto abaixo. Para cada grupo, retorne um JSON com 'topic_name' e 'core_insight'.\n"
"Responda APENAS o JSON válido.\n\n" + "\n\n".join(prompt_sections)
)
# --- INÍCIO DA MEDIÇÃO DA API EXTERNA ---
start_time_groq = time.time()
chat_completion = groq_client.chat.completions.create(
messages=[
{"role": "system", "content": "JSON Output Only."},
{"role": "user", "content": master_prompt},
], model="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0.2,
)
duration = time.time() - start_time_groq
GROQ_LATENCY.observe(duration)
# --- FIM DA MEDIÇÃO ---
response_content = chat_completion.choices[0].message.content
insights = json.loads(response_content.strip().replace("```json", "").replace("```", ""))
return {"insights": insights}
except Exception as e:
logging.error(f"ERRO EM /describe_clusters/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ==============================================================================
# ENDPOINT TAVILY WEB SEARCH
# ==============================================================================
@app.post("/search_web/")
async def search_web_api(
query: str = Form(...),
max_results: int = Form(20),
search_depth: str = Form("basic") # "basic" ou "advanced"
):
"""
Busca na web via Tavily e processa resultados para visualização.
"""
if not tavily_client:
raise HTTPException(status_code=503, detail="Tavily não configurado. Defina TAVILY_API_KEY.")
logging.info(f"Tavily Search: '{query}' (max: {max_results})")
try:
# Buscar via Tavily
search_result = tavily_client.search(
query=query,
max_results=max_results,
search_depth=search_depth,
include_answer=False
)
results = search_result.get("results", [])
if not results:
return {"error": "Nenhum resultado encontrado.", "results_count": 0}
# Extrair textos dos resultados
textos = []
sources = []
for r in results:
title = r.get("title", "")
content = r.get("content", "")
url = r.get("url", "")
# Combinar título + conteúdo
full_text = f"{title}: {content}" if title else content
if len(full_text.strip().split()) > 5:
textos.append(full_text.strip())
sources.append({"title": title, "url": url})
if not textos:
return {"error": "Resultados sem conteúdo válido.", "results_count": 0}
# Processar através do pipeline com parâmetros para datasets pequenos
df, embeddings = processar_pipeline(textos, small_dataset=True)
# Criar índice FAISS para busca rápida
embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1])
faiss_index.add(embeddings_normalized.astype('float32'))
# Adicionar URL de origem a cada ponto
df["source_url"] = [sources[i]["url"] if i < len(sources) else "" for i in range(len(df))]
df["source_title"] = [sources[i]["title"] if i < len(sources) else "" for i in range(len(df))]
# Criar job e cachear
job_id = str(uuid.uuid4())
cache[job_id] = {
"embeddings": embeddings,
"embeddings_normalized": embeddings_normalized,
"faiss_index": faiss_index,
"df": df,
"sources": sources
}
logging.info(f"Tavily Job criado: {job_id}")
# Calcular métricas e análises
metricas_globais = calcular_metricas(df["full_text"].tolist())
analise_por_cluster_tfidf = analisar_clusters(df)
n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0)
n_ruido = int((df["cluster"] == "-1").sum())
return {
"job_id": job_id,
"metadata": {
"query": query,
"source": "tavily_web_search",
"num_documents_processed": len(df),
"num_clusters_found": n_clusters,
"num_noise_points": n_ruido
},
"metrics": metricas_globais,
"cluster_analysis": analise_por_cluster_tfidf,
"plot_data": df[["x", "y", "z", "cluster", "full_text", "source_url", "source_title"]].to_dict("records"),
"sources": sources
}
except Exception as e:
logging.error(f"ERRO EM /search_web/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ==============================================================================
# ENDPOINT KNOWLEDGE GRAPH (NER)
# ==============================================================================
@app.post("/entity_graph/")
async def entity_graph_api(job_id: str = Form(...)):
"""
Extrai entidades nomeadas e constrói grafo de conexões entre documentos.
"""
if job_id not in cache:
raise HTTPException(status_code=404, detail="Job ID não encontrado.")
logging.info(f"Construindo Knowledge Graph para Job: {job_id}")
try:
cached_data = cache[job_id]
df = cached_data["df"]
textos = df["full_text"].tolist()
# Extrair entidades
logging.info(f"Extraindo entidades de {len(textos)} documentos...")
entities_by_doc = extract_entities(textos)
# Construir posições dos pontos
positions = df[["x", "y", "z"]].to_dict("records")
# Construir grafo
graph_data = build_entity_graph(entities_by_doc, positions)
# Adicionar posições ao resultado
graph_data["positions"] = positions
graph_data["num_documents"] = len(textos)
# Entidades por documento (para tooltip)
graph_data["entities_by_doc"] = [
[{"text": e[0], "type": e[1]} for e in ents]
for ents in entities_by_doc
]
# Adicionar grafo entidade-entidade
entity_network = build_entity_to_entity_graph(entities_by_doc)
graph_data["entity_network"] = entity_network
logging.info(f"Grafo construído: {graph_data['edge_count']} arestas doc-doc, {entity_network['node_count']} nós entidade")
return graph_data
except Exception as e:
logging.error(f"ERRO EM /entity_graph/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ==============================================================================
# ENDPOINT ANÁLISE DO GRAFO COM LLM (SÁBIO)
# ==============================================================================
@app.post("/analyze_graph/")
async def analyze_graph_api(job_id: str = Form(...)):
"""
Usa o LLM para analisar o Knowledge Graph e extrair insights semânticos.
"""
if job_id not in cache:
raise HTTPException(status_code=404, detail="Job ID não encontrado.")
logging.info(f"Analisando grafo com LLM para Job: {job_id}")
try:
cached_data = cache[job_id]
df = cached_data["df"]
textos = df["full_text"].tolist()
clusters = df["cluster"].tolist() if "cluster" in df.columns else ["0"] * len(textos)
# Extrair entidades e construir grafo
entities_by_doc = extract_entities(textos)
entity_network = build_entity_to_entity_graph(entities_by_doc)
# Mapear entidades por cluster
entity_clusters = defaultdict(lambda: defaultdict(int))
for doc_idx, (entities, cluster) in enumerate(zip(entities_by_doc, clusters)):
for ent_text, ent_type in entities:
entity_clusters[ent_text][str(cluster)] += 1
# Preparar resumo do grafo para o LLM
nodes = entity_network.get("nodes", [])[:15]
edges = entity_network.get("edges", [])[:20]
hubs = entity_network.get("hubs", [])[:5]
insights = entity_network.get("insights", {})
# Criar contexto de clusters
cluster_context = []
unique_clusters = sorted(set(str(c) for c in clusters if str(c) != "-1"))
for cluster_id in unique_clusters[:5]:
cluster_docs = [textos[i][:200] for i, c in enumerate(clusters) if str(c) == cluster_id][:3]
cluster_entities = [(ent, entity_clusters[ent].get(cluster_id, 0))
for ent in entity_clusters if entity_clusters[ent].get(cluster_id, 0) > 0]
cluster_entities.sort(key=lambda x: x[1], reverse=True)
cluster_context.append(f"""
### Cluster {cluster_id} ({len([c for c in clusters if str(c) == cluster_id])} docs)
Entidades principais: {', '.join([f"{e[0]}({e[1]})" for e in cluster_entities[:5]])}
Exemplo de documento: "{cluster_docs[0][:150]}..."
""")
graph_summary = f"""
ANÁLISE DE KNOWLEDGE GRAPH COM CONTEXTO
## Visão Geral
- {len(nodes)} entidades principais
- {insights.get('total_connections', 0)} conexões totais
- {insights.get('hub_count', 0)} hubs identificados
- {len(unique_clusters)} clusters de documentos
## Entidades Principais (por importância):
{chr(10).join([f"- {n['entity']} ({n['type']}): {n['docs']} docs, centralidade {n.get('centrality', 0)}" for n in nodes])}
## Hubs Centrais (entidades mais conectadas):
{chr(10).join([f"- {h['entity']} ({h['type']}): {h['degree']} conexões" for h in hubs]) if hubs else "Nenhum hub identificado"}
## Conexões Mais Fortes:
{chr(10).join([f"- {e['source_entity']}{e['target_entity']}: {e['weight']} co-ocorrências" for e in edges[:10]])}
## CONTEXTO POR CLUSTER (IMPORTANTE - USE ESTAS REFERÊNCIAS):
{chr(10).join(cluster_context)}
"""
# Prompt para análise
prompt = f"""Você é um analista de inteligência especializado em análise de redes e grafos de conhecimento.
Analise o seguinte Knowledge Graph extraído de documentos e forneça insights acionáveis:
{graph_summary}
Por favor forneça:
1. **Narrativa Central**: Qual é a história principal que conecta estas entidades?
2. **Atores Chave**: Quem são os principais players e qual seu papel?
3. **Relações Ocultas**: Que conexões não-óbvias você identifica?
4. **Padrões por Cluster**: Como as entidades se distribuem entre os clusters? Qual cluster tem foco diferente?
5. **Investigação**: O que valeria a pena investigar mais a fundo?
IMPORTANTE: Sempre referencie os clusters específicos (ex: "No Cluster 0...", "Já no Cluster 1...").
Use os dados concretos fornecidos, não generalize. Seja específico citando entidades e clusters."""
# Chamar LLM
completion = groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
analysis = completion.choices[0].message.content
return {
"analysis": analysis,
"graph_summary": {
"entities": len(nodes),
"connections": insights.get('total_connections', 0),
"hubs": len(hubs)
}
}
except Exception as e:
logging.error(f"ERRO EM /analyze_graph/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))