diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,1306 +1,1306 @@ -# ============================================================================== -# API do AetherMap — VERSÃO 7.4 (FAISS EDITION) -# Backend com RAG Híbrido, CSV, Tavily, NER Entity Graph, FAISS ANN -# ============================================================================== - -import numpy as np -import pandas as pd -import torch -import gc -import uuid -import os -import io -import json -import logging -import time -import nltk -from nltk.corpus import stopwords -from collections import defaultdict - -from fastapi import FastAPI, UploadFile, File, Form, HTTPException -from fastapi.responses import JSONResponse -from typing import List, Dict, Any, Tuple -from functools import lru_cache - -# Ferramentas de Alquimia (ML & NLP) -from sentence_transformers import SentenceTransformer, CrossEncoder -import umap -import hdbscan -from sklearn.preprocessing import StandardScaler -from sklearn.metrics.pairwise import cosine_similarity -from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer -from scipy.stats import entropy -from scipy.sparse import csr_matrix -import faiss - -# ============================================================================== -# BM25 IMPLEMENTAÇÃO SIMPLES (Sem dependência externa) -# ============================================================================== -class SimpleBM25: - """ - Implementação simples de BM25 usando TF-IDF com ajustes. - Evita dependência externa do rank_bm25. - """ - def __init__(self, corpus: List[str], k1: float = 1.5, b: float = 0.75): - self.k1 = k1 - self.b = b - self.corpus = corpus - self.corpus_size = len(corpus) - - # Tokenização simples - self.tokenized_corpus = [doc.lower().split() for doc in corpus] - - # Calcular IDF - self.doc_freqs = {} - for doc in self.tokenized_corpus: - for term in set(doc): - self.doc_freqs[term] = self.doc_freqs.get(term, 0) + 1 - - # Calcular average document length - self.avgdl = sum(len(doc) for doc in self.tokenized_corpus) / self.corpus_size - - # IDF pre-computado - self.idf = {} - for term, freq in self.doc_freqs.items(): - self.idf[term] = np.log((self.corpus_size - freq + 0.5) / (freq + 0.5) + 1) - - def get_scores(self, query: str) -> np.ndarray: - """Retorna scores BM25 para todos os documentos.""" - query_terms = query.lower().split() - scores = np.zeros(self.corpus_size) - - for i, doc in enumerate(self.tokenized_corpus): - doc_len = len(doc) - term_freqs = {} - for term in doc: - term_freqs[term] = term_freqs.get(term, 0) + 1 - - score = 0.0 - for term in query_terms: - if term in term_freqs: - tf = term_freqs[term] - idf = self.idf.get(term, 0) - # BM25 formula - numerator = tf * (self.k1 + 1) - denominator = tf + self.k1 * (1 - self.b + self.b * (doc_len / self.avgdl)) - score += idf * (numerator / denominator) - - scores[i] = score - - return scores - - def get_top_n(self, query: str, n: int = 50) -> Tuple[np.ndarray, np.ndarray]: - """Retorna top N documentos (índices, scores).""" - scores = self.get_scores(query) - top_indices = np.argsort(scores)[-n:][::-1] - return top_indices, scores[top_indices] - -# NER & Language Detection -import spacy -from langdetect import detect, LangDetectException - -# Monitoramento (O Toque da Berta) -from prometheus_fastapi_instrumentator import Instrumentator -from prometheus_client import Histogram - -# A Conexão com o Oráculo (OpenRouter - OpenAI Compatible) -from openai import OpenAI - -# ============================================================================== -# CONFIGURAÇÕES GERAIS E LOGGING -# ============================================================================== -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") - -# Modelos de IA -RETRIEVAL_MODEL = "all-MiniLM-L6-v2" # Rápido para varredura inicial -RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2" # Preciso para reordenação - -# Parâmetros de Processamento -BATCH_SIZE = 256 -UMAP_N_NEIGHBORS = 30 - -# Cache de Sessão (Na memória RAM) -cache: Dict[str, Any] = {} - -# Definição de Métricas Customizadas do Prometheus -LLM_LATENCY = Histogram( - "llm_api_latency_seconds", - "Tempo de resposta da API externa LLM (OpenRouter)", - buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0] -) - -# Modelo LLM (OpenRouter) -LLM_MODEL = os.environ.get("LLM_MODEL", "google/gemini-2.0-flash-exp:free") - -# Inicialização do Cliente OpenRouter (OpenAI Compatible) -OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") -try: - if not OPENROUTER_API_KEY: - logging.warning("OPENROUTER_API_KEY não encontrada. Funcionalidades de LLM estarão indisponíveis.") - llm_client = None - else: - llm_client = OpenAI( - base_url="https://openrouter.ai/api/v1", - api_key=OPENROUTER_API_KEY - ) - logging.info(f"Cliente OpenRouter inicializado com modelo: {LLM_MODEL}") -except Exception as e: - logging.error(f"FALHA AO INICIALIZAR OPENROUTER: {e}") - llm_client = None - -# Inicialização do Cliente Tavily (Web Search) -TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY") -tavily_client = None -try: - if TAVILY_API_KEY: - from tavily import TavilyClient - tavily_client = TavilyClient(api_key=TAVILY_API_KEY) - logging.info("Cliente Tavily inicializado com sucesso.") - else: - logging.warning("TAVILY_API_KEY não encontrada. Busca web estará indisponível.") -except Exception as e: - logging.error(f"FALHA AO INICIALIZAR TAVILY: {e}") - tavily_client = None - - -# ============================================================================== -# GERENCIAMENTO HÍBRIDO DE STOP WORDS (NLTK + ARQUIVO TXT) -# ============================================================================== -def carregar_stopwords(): - """ - Carrega stop words do NLTK e combina com um arquivo externo 'stopwords.txt'. - """ - logging.info("Iniciando carregamento de Stop Words...") - - # 1. Base Gramatical (NLTK - Inglês e Português) - try: - nltk.data.find('corpora/stopwords') - except LookupError: - logging.info("Baixando corpus de stopwords...") - nltk.download('stopwords') - - # Cria um conjunto único com PT e EN - final_stops = set(stopwords.words('portuguese')) | set(stopwords.words('english')) - logging.info(f"Stopwords base (NLTK) carregadas: {len(final_stops)}") - - # 2. Base Customizada - arquivo_custom = "stopwords.txt" - - if os.path.exists(arquivo_custom): - logging.info(f"Arquivo '{arquivo_custom}' encontrado. Lendo palavras customizadas...") - try: - count_custom = 0 - with open(arquivo_custom, "r", encoding="utf-8") as f: - for linha in f: - palavra = linha.split('#')[0].strip().lower() - if palavra and len(palavra) > 1: - final_stops.add(palavra) - count_custom += 1 - logging.info(f"{count_custom} stop words customizadas importadas do arquivo.") - except Exception as e: - logging.error(f"Erro ao ler '{arquivo_custom}': {e}") - else: - logging.warning(f"Arquivo '{arquivo_custom}' não encontrado. Usando apenas NLTK.") - - lista_final = list(final_stops) - logging.info(f"Total final de Stop Words ativas: {len(lista_final)}") - return lista_final - -# Variável global carregada na inicialização -STOP_WORDS_MULTILINGUAL = carregar_stopwords() - - -# ============================================================================== -# CARREGAMENTO DE MODELOS (COM CACHE) -# ============================================================================== -@lru_cache(maxsize=1) -def load_retriever(): - device = "cuda" if torch.cuda.is_available() else "cpu" - logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}") - return SentenceTransformer(RETRIEVAL_MODEL, device=device) - -@lru_cache(maxsize=1) -def load_reranker(): - device = "cuda" if torch.cuda.is_available() else "cpu" - logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}") - return CrossEncoder(RERANKER_MODEL, device=device) - -# Cache for spaCy models -_spacy_models = {} -_spacy_available = True - -def load_spacy_model(lang: str): - """Carrega modelo spaCy com cache.""" - global _spacy_available - - if not _spacy_available: - return None - - if lang not in _spacy_models: - model_name = "pt_core_news_sm" if lang == "pt" else "en_core_web_sm" - try: - _spacy_models[lang] = spacy.load(model_name) - logging.info(f"Modelo spaCy '{model_name}' carregado.") - except OSError: - logging.warning(f"Modelo {model_name} não encontrado. Tentando baixar...") - try: - import subprocess - subprocess.run(["python", "-m", "spacy", "download", model_name], check=True) - _spacy_models[lang] = spacy.load(model_name) - except Exception as e: - logging.error(f"Falha ao baixar modelo spaCy: {e}") - _spacy_available = False - return None - return _spacy_models[lang] - -def detect_language(texts: List[str]) -> str: - """Detecta idioma predominante nos textos.""" - sample = " ".join(texts[:10])[:1000] - try: - lang = detect(sample) - return "pt" if lang == "pt" else "en" - except LangDetectException: - return "en" - -def extract_entities(textos: List[str]) -> List[List[Tuple[str, str]]]: - """Extrai entidades nomeadas de cada texto.""" - lang = detect_language(textos) - nlp = load_spacy_model(lang) - - # Fallback se spaCy não estiver disponível - if nlp is None: - logging.warning("spaCy não disponível. Retornando entidades vazias.") - return [[] for _ in textos] - - entities_by_doc = [] - for text in textos: - # Limitar tamanho do texto para performance - doc = nlp(text[:2000]) - entities = [(ent.text.lower().strip(), ent.label_) for ent in doc.ents - if len(ent.text.strip()) > 2 and ent.label_ in ("PERSON", "PER", "ORG", "GPE", "LOC")] - entities_by_doc.append(entities) - - # Normalizar entidades para deduplicação - return normalize_entities(entities_by_doc) - -def normalize_entities(entities_by_doc: List[List[Tuple[str, str]]]) -> List[List[Tuple[str, str]]]: - """Normaliza entidades para agrupar variações do mesmo nome.""" - - # Coletar todas as entidades únicas por tipo - all_entities = defaultdict(set) - for entities in entities_by_doc: - for text, etype in entities: - all_entities[etype].add(text) - - # Criar mapeamento de normalização - # Agrupa entidades onde uma contém a outra ou são muito similares - normalization_map = {} - - for etype, entity_set in all_entities.items(): - entities_list = sorted(entity_set, key=len, reverse=True) # Maiores primeiro - - for entity in entities_list: - if entity in normalization_map: - continue - - # Encontrar entidades que são parte desta ou similares - canonical = entity - for other in entities_list: - if other == entity: - continue - - # Se uma contém a outra (ex: "donald trump" contém "trump") - if other in entity or entity in other: - # Usar a mais completa como canônica - if len(entity) >= len(other): - normalization_map[(other, etype)] = (entity, etype) - else: - normalization_map[(entity, etype)] = (other, etype) - canonical = other - - # Mapear para si mesmo se não foi mapeado - if (entity, etype) not in normalization_map: - normalization_map[(entity, etype)] = (canonical, etype) - - # Aplicar normalização - normalized_docs = [] - for entities in entities_by_doc: - normalized = [] - seen = set() - for text, etype in entities: - canonical = normalization_map.get((text, etype), (text, etype)) - if canonical not in seen: - seen.add(canonical) - normalized.append(canonical) - normalized_docs.append(normalized) - - logging.info(f"Normalização: {len(all_entities)} tipos, mapa com {len(normalization_map)} entradas") - return normalized_docs - -def build_entity_graph(entities_by_doc: List[List[Tuple[str, str]]], - positions: List[Dict]) -> Dict[str, Any]: - """Constrói grafo de conexões baseado em entidades compartilhadas.""" - # Inverter: entidade -> lista de doc indices - entity_to_docs = defaultdict(set) - - for doc_idx, entities in enumerate(entities_by_doc): - for entity_text, entity_type in entities: - entity_to_docs[(entity_text, entity_type)].add(doc_idx) - - # Construir arestas (conexões entre docs que compartilham entidades) - edges = [] - seen_pairs = set() - - for (entity_text, entity_type), doc_indices in entity_to_docs.items(): - if len(doc_indices) < 2: - continue - - doc_list = sorted(doc_indices) - for i in range(len(doc_list)): - for j in range(i + 1, len(doc_list)): - pair = (doc_list[i], doc_list[j]) - if pair not in seen_pairs: - seen_pairs.add(pair) - edges.append({ - "source": doc_list[i], - "target": doc_list[j], - "entity": entity_text, - "entity_type": entity_type - }) - - # Contar entidades mais frequentes - entity_counts = [(k, len(v)) for k, v in entity_to_docs.items() if len(v) >= 2] - top_entities = sorted(entity_counts, key=lambda x: x[1], reverse=True)[:20] - - return { - "edges": edges, - "edge_count": len(edges), - "connected_pairs": len(seen_pairs), - "top_entities": [{"entity": e[0][0], "type": e[0][1], "docs": e[1]} for e in top_entities] - } - -def build_entity_to_entity_graph(entities_by_doc: List[List[Tuple[str, str]]]) -> Dict[str, Any]: - """ - Constrói grafo onde os NÓS são entidades e ARESTAS são co-ocorrências. - Entidades que aparecem no mesmo documento são conectadas. - """ - # Contar co-ocorrências - cooccurrence = defaultdict(int) - entity_doc_count = defaultdict(int) - - for entities in entities_by_doc: - unique_entities = list(set(entities)) - - # Contar docs por entidade - for ent in unique_entities: - entity_doc_count[ent] += 1 - - # Criar pares de co-ocorrência - for i in range(len(unique_entities)): - for j in range(i + 1, len(unique_entities)): - pair = tuple(sorted([unique_entities[i], unique_entities[j]], key=str)) - cooccurrence[pair] += 1 - - # Construir nós (entidades com >= 2 docs, limitado a top 50) - nodes = [] - entity_to_id = {} - - # Filtrar e ordenar por frequência - valid_entities = [(e, c) for e, c in entity_doc_count.items() if c >= 2] - valid_entities.sort(key=lambda x: x[1], reverse=True) - valid_entities = valid_entities[:50] # Limitar a 50 nós principais - - node_count = len(valid_entities) - - node_idx = 0 - for entity, count in valid_entities: - entity_to_id[entity] = node_idx - - # Layout em esfera 3D (melhor para muitos nós) - # Golden angle para distribuição uniforme - golden_angle = np.pi * (3 - np.sqrt(5)) - theta = node_idx * golden_angle - phi = np.arccos(1 - 2 * (node_idx + 0.5) / max(node_count, 1)) - - radius = 3.0 # Raio fixo maior - x = radius * np.sin(phi) * np.cos(theta) - y = radius * np.sin(phi) * np.sin(theta) - z = radius * np.cos(phi) - - nodes.append({ - "id": node_idx, - "entity": entity[0], - "type": entity[1], - "docs": count, - "x": float(x), - "y": float(y), - "z": float(z) - }) - node_idx += 1 - - # Construir arestas (co-ocorrências) - entity_edges = [] - for (ent1, ent2), weight in cooccurrence.items(): - if ent1 in entity_to_id and ent2 in entity_to_id and weight >= 1: - entity_edges.append({ - "source": entity_to_id[ent1], - "target": entity_to_id[ent2], - "weight": weight, - "source_entity": ent1[0], - "target_entity": ent2[0], - "reason": f"Aparecem juntos em {weight} documento(s)" - }) - - # Ordenar arestas por peso - entity_edges.sort(key=lambda x: x["weight"], reverse=True) - - # Calcular métricas de grafo - # Degree centrality (número de conexões de cada nó) - degree = defaultdict(int) - for edge in entity_edges: - degree[edge["source"]] += edge["weight"] - degree[edge["target"]] += edge["weight"] - - # Calcular max degree para normalização - max_degree = max(degree.values()) if degree else 1 - - # Atualizar nós com métricas - hubs = [] - for node in nodes: - node_degree = degree.get(node["id"], 0) - node["degree"] = node_degree - node["centrality"] = round(node_degree / max_degree, 3) - - # Classificar nó - if node["centrality"] > 0.7: - node["role"] = "hub" # Hub central - hubs.append(node) - elif node["centrality"] > 0.3: - node["role"] = "connector" # Conector - else: - node["role"] = "peripheral" # Periférico - - # Top hubs - hubs.sort(key=lambda x: x["degree"], reverse=True) - - return { - "nodes": nodes, - "edges": entity_edges[:200], # Limitar a 200 arestas mais fortes - "node_count": len(nodes), - "edge_count": len(entity_edges), - "hubs": [{"entity": h["entity"], "type": h["type"], "degree": h["degree"]} for h in hubs[:5]], - "insights": { - "total_connections": sum(degree.values()) // 2, - "avg_degree": round(sum(degree.values()) / len(degree), 1) if degree else 0, - "hub_count": len(hubs) - } - } - - -# ============================================================================== -# PIPELINE DE PROCESSAMENTO DE DADOS -# ============================================================================== -def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]: - """Prepara textos de arquivo TXT (uma linha por documento).""" - linhas = file_bytes.decode("utf-8", errors="ignore").splitlines() - textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3] - return textos[:n_samples] - -def detect_csv_separator(file_bytes: bytes) -> str: - """Detecta separador do CSV (vírgula ou ponto-e-vírgula).""" - sample = file_bytes[:4096].decode("utf-8", errors="ignore") - first_line = sample.split('\n')[0] - - # Conta ocorrências de cada separador na primeira linha - commas = first_line.count(',') - semicolons = first_line.count(';') - - return ';' if semicolons > commas else ',' - -def read_csv_smart(file_bytes: bytes, nrows=None) -> pd.DataFrame: - """Lê CSV com detecção automática de separador e encoding.""" - sep = detect_csv_separator(file_bytes) - - try: - df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="utf-8", nrows=nrows) - except UnicodeDecodeError: - df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="latin-1", nrows=nrows) - - return df - -def preparar_textos_csv(file_bytes: bytes, text_column: str, n_samples: int) -> List[str]: - """Prepara textos de arquivo CSV extraindo coluna especificada.""" - df = read_csv_smart(file_bytes) - - if text_column not in df.columns: - available = ", ".join(df.columns.tolist()[:10]) - raise ValueError(f"Coluna '{text_column}' não encontrada. Colunas disponíveis: {available}") - - textos = df[text_column].dropna().astype(str).tolist() - # Filtrar textos muito curtos - textos = [t.strip() for t in textos if len(t.strip().split()) > 3] - return textos[:n_samples] - -def get_csv_columns(file_bytes: bytes) -> List[str]: - """Retorna lista de colunas de um arquivo CSV.""" - df = read_csv_smart(file_bytes, nrows=0) - return df.columns.tolist() - - -def processar_pipeline( - textos: List[str], - small_dataset: bool = False, - fast_mode: bool = False, - custom_min_cluster_size: int = 0, # 0 = Auto - custom_min_samples: int = 0 # 0 = Auto -) -> (pd.DataFrame, np.ndarray): - """ - Processa textos através do pipeline de embeddings, redução dimensional e clustering. - - Args: - textos: Lista de textos - small_dataset: Se True, usa parâmetros otimizados para datasets pequenos (Tavily) - fast_mode: Se True, usa PCA ao invés de UMAP (10-100x mais rápido) - custom_min_cluster_size: Tamanho mínimo do cluster (0 = Auto) - custom_min_samples: Número mínimo de amostras (0 = Auto) - """ - num_textos = len(textos) - logging.info(f"Iniciando pipeline para {num_textos} textos (fast_mode={fast_mode})...") - model = load_retriever() - - # 1. Embeddings - embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True) - - # 2. Redução Dimensional - if fast_mode: - # PCA - O(N) muito mais rápido que UMAP O(N²) - from sklearn.decomposition import PCA - logging.info("Usando PCA (modo rápido)...") - reducer = PCA(n_components=3, random_state=42) - emb_3d = reducer.fit_transform(embeddings) - else: - # UMAP - mais preciso mas lento - n_neighbors = min(15, max(3, num_textos - 1)) if small_dataset else UMAP_N_NEIGHBORS - logging.info(f"Usando UMAP (n_neighbors={n_neighbors})...") - reducer = umap.UMAP(n_components=3, n_neighbors=n_neighbors, min_dist=0.0, metric="cosine", random_state=42) - emb_3d = reducer.fit_transform(embeddings) - - emb_3d = StandardScaler().fit_transform(emb_3d) - - # 3. HDBSCAN - usa parâmetros customizados ou adaptativos - if custom_min_cluster_size > 0: - # Usar valores customizados do usuário - min_size = custom_min_cluster_size - min_samples_val = custom_min_samples if custom_min_samples > 0 else None - logging.info(f"Usando parâmetros customizados: min_cluster_size={min_size}, min_samples={min_samples_val}") - elif small_dataset: - # Para Tavily (10-50 docs): clusters menores, mais agressivo - min_size = max(2, int(num_textos * 0.1)) - min_samples_val = 1 - elif fast_mode: - # Para PCA: clusters menores já que PCA não separa tão bem quanto UMAP - min_size = max(8, int(num_textos * 0.01)) - min_samples_val = 3 - else: - # Para datasets grandes com UMAP: comportamento padrão - min_size = max(10, int(num_textos * 0.02)) - min_samples_val = None - - logging.info(f"HDBSCAN: min_cluster_size={min_size}, min_samples={min_samples_val}") - - clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size, min_samples=min_samples_val) - clusters = clusterer.fit_predict(emb_3d) - - # 4. DataFrame - df = pd.DataFrame({ - "x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2], - "full_text": textos, "cluster": clusters.astype(str) - }) - - del reducer, clusterer, emb_3d; gc.collect() - return df, embeddings - -def calcular_metricas(textos: List[str]) -> Dict[str, Any]: - logging.info("Calculando métricas globais...") - if not textos: return {} - - # Token pattern: só palavras alfabéticas com 3+ caracteres (ignora números) - token_pattern = r'\b[a-zA-ZÀ-ÿ]{3,}\b' - - vectorizer_count = CountVectorizer( - stop_words=STOP_WORDS_MULTILINGUAL, - max_features=1000, - token_pattern=token_pattern - ) - vectorizer_tfidf = TfidfVectorizer( - stop_words=STOP_WORDS_MULTILINGUAL, - max_features=1000, - token_pattern=token_pattern - ) - - try: - counts_matrix = vectorizer_count.fit_transform(textos) - tfidf_matrix = vectorizer_tfidf.fit_transform(textos) - except ValueError: - return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0} - - vocab_count = vectorizer_count.get_feature_names_out() - contagens = counts_matrix.sum(axis=0).A1 - - vocab_tfidf = vectorizer_tfidf.get_feature_names_out() - soma_tfidf = tfidf_matrix.sum(axis=0).A1 - top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1] - top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf] - - return { - "riqueza_lexical": len(vocab_count), - "top_tfidf_palavras": top_tfidf, - "entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0 - } - -def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]: - logging.info("Detectando duplicados...") - mask = df["full_text"].duplicated(keep=False) - grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()} - pares_semanticos = [] - - if 2 < len(embeddings) < 5000: - sim = cosine_similarity(embeddings) - triu_indices = np.triu_indices_from(sim, k=1) - sim_vetor = sim[triu_indices] - pares_idx = np.where(sim_vetor > 0.98)[0] - top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]] - for i in top_pares_idx: - idx1, idx2 = triu_indices[0][i], triu_indices[1][i] - if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]: - pares_semanticos.append({ - "similaridade": float(sim[idx1, idx2]), - "texto1": df["full_text"].iloc[idx1], - "texto2": df["full_text"].iloc[idx2] - }) - return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos} - -def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]: - logging.info("Analisando clusters...") - analise = {} - ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) - for cid in ids_clusters_validos: - textos_cluster = df[df["cluster"] == cid]["full_text"].tolist() - if len(textos_cluster) < 2: continue - try: - vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000) - tfidf_matrix = vectorizer.fit_transform(textos_cluster) - vocab = vectorizer.get_feature_names_out() - soma = tfidf_matrix.sum(axis=0).A1 - top_idx = np.argsort(soma)[-5:][::-1] - top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx] - except ValueError: - top_palavras = [] - analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras} - return analise - - -# ============================================================================== -# API FASTAPI & INSTRUMENTAÇÃO -# ============================================================================== -app = FastAPI(title="AetherMap API 7.2", version="7.2.0", description="Backend Semantic Search + CSV + Tavily Web Search") - -# --- A MÁGICA ACONTECE AQUI --- -# Isso expõe automaticamente o endpoint /metrics para o Prometheus/Grafana -Instrumentator().instrument(app).expose(app) -# ------------------------------ - -@app.get("/") -async def root(): - return {"status": "online", "message": "Aether Map API 7.2 (CSV + Tavily Ready)."} - -@app.post("/csv_columns/") -async def get_columns_api(file: UploadFile = File(...)): - """Retorna as colunas de um arquivo CSV para preview.""" - if not file.filename.lower().endswith('.csv'): - raise HTTPException(status_code=400, detail="Arquivo deve ser CSV.") - try: - file_bytes = await file.read() - columns = get_csv_columns(file_bytes) - return {"columns": columns, "filename": file.filename} - except Exception as e: - raise HTTPException(status_code=400, detail=f"Erro ao ler CSV: {str(e)}") - -@app.post("/process/") -async def process_api( - n_samples: int = Form(10000), - file: UploadFile = File(...), - text_column: str = Form(None), # Coluna de texto para CSV - fast_mode: str = Form("false"), # Modo rápido (PCA ao invés de UMAP) - min_cluster_size: str = Form("0"), # 0 = Auto - min_samples: str = Form("0") # 0 = Auto -): - # Converter strings para tipos apropriados - fast_mode_bool = fast_mode.lower() in ("true", "1", "yes") - min_cluster_size_int = int(min_cluster_size) if min_cluster_size.isdigit() else 0 - min_samples_int = int(min_samples) if min_samples.isdigit() else 0 - - logging.info(f"Processando: {file.filename} (fast_mode={fast_mode_bool}, min_cluster={min_cluster_size_int}, min_samples={min_samples_int})") - try: - file_bytes = await file.read() - - # Detectar tipo de arquivo e processar - if file.filename.lower().endswith('.csv'): - if not text_column: - raise HTTPException(status_code=400, detail="Para CSV, informe 'text_column'.") - textos = preparar_textos_csv(file_bytes, text_column, n_samples) - else: - textos = preparar_textos(file_bytes, n_samples) - - if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.") - - # Auto-enable fast_mode para datasets grandes - if len(textos) > 5000 and not fast_mode_bool: - logging.warning(f"Dataset grande ({len(textos)} docs) - considere usar fast_mode=True") - - df, embeddings = processar_pipeline( - textos, - fast_mode=fast_mode_bool, - custom_min_cluster_size=min_cluster_size_int, - custom_min_samples=min_samples_int - ) - - # Criar índice FAISS para busca rápida (semântica) - embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) - faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1]) # Inner Product = Cosine sim para vetores normalizados - faiss_index.add(embeddings_normalized.astype('float32')) - - # Criar índice BM25 para busca lexical (Hybrid Search) - corpus_texts = df["full_text"].tolist() - bm25_index = SimpleBM25(corpus_texts) - logging.info(f"BM25 index criado com {len(corpus_texts)} documentos") - - job_id = str(uuid.uuid4()) - cache[job_id] = { - "embeddings": embeddings, - "embeddings_normalized": embeddings_normalized, - "faiss_index": faiss_index, - "bm25_index": bm25_index, # Novo: índice BM25 - "df": df - } - logging.info(f"Job criado: {job_id} (FAISS + BM25 hybrid search)") - - metricas_globais = calcular_metricas(df["full_text"].tolist()) - analise_de_duplicados = encontrar_duplicados(df, embeddings) - analise_por_cluster_tfidf = analisar_clusters(df) - - n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0) - n_ruido = int((df["cluster"] == "-1").sum()) - - return { - "job_id": job_id, - "metadata": { - "filename": file.filename, - "num_documents_processed": len(df), - "num_clusters_found": n_clusters, - "num_noise_points": n_ruido - }, - "metrics": metricas_globais, - "duplicates": analise_de_duplicados, - "cluster_analysis": analise_por_cluster_tfidf, - "plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"), - } - except Exception as e: - logging.error(f"ERRO EM /process/: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) - - -@app.post("/search/") -async def search_api(query: str = Form(...), job_id: str = Form(...)): - """ - ENDPOINT DE BUSCA (RAG Híbrido) com Monitoramento de Latência - """ - logging.info(f"Busca: '{query}' [Job: {job_id}]") - if job_id not in cache: - raise HTTPException(status_code=404, detail="Job ID não encontrado.") - - try: - model = load_retriever() - reranker = load_reranker() - - cached_data = cache[job_id] - df = cached_data["df"] - faiss_index = cached_data.get("faiss_index") - bm25_index = cached_data.get("bm25_index") - - # ================================================================== - # FASE 1: HYBRID SEARCH (FAISS Semântico + BM25 Lexical) - # ================================================================== - query_embedding = model.encode([query], convert_to_numpy=True) - query_normalized = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True) - - top_k_retrieval = min(100, len(df)) # Aumentado para hybrid search - - # --- 1A: Busca Semântica (FAISS) --- - semantic_scores = {} - semantic_ranks = {} - - if faiss_index: - faiss_scores, faiss_indices = faiss_index.search( - query_normalized.astype('float32'), - min(top_k_retrieval, faiss_index.ntotal) - ) - faiss_scores = faiss_scores[0] - faiss_indices = faiss_indices[0] - - for rank, (idx, score) in enumerate(zip(faiss_indices, faiss_scores)): - if idx >= 0: - semantic_scores[int(idx)] = float(score) - semantic_ranks[int(idx)] = rank + 1 # 1-indexed rank - - logging.info(f"FAISS: top score = {faiss_scores[0]:.3f}") - - # --- 1B: Busca Lexical (BM25) --- - lexical_scores = {} - lexical_ranks = {} - - if bm25_index: - bm25_indices, bm25_scores = bm25_index.get_top_n(query, n=top_k_retrieval) - - for rank, (idx, score) in enumerate(zip(bm25_indices, bm25_scores)): - if score > 0: # Só inclui se teve match - lexical_scores[int(idx)] = float(score) - lexical_ranks[int(idx)] = rank + 1 - - if bm25_scores[0] > 0: - logging.info(f"BM25: top score = {bm25_scores[0]:.3f}, docs matched = {len(lexical_scores)}") - else: - logging.info("BM25: nenhum match léxico encontrado") - - # --- 1C: Reciprocal Rank Fusion (RRF) --- - # Combina rankings de ambas as buscas - # RRF score = sum(1 / (k + rank)) onde k é constante (tipicamente 60) - k = 60 # Constante de suavização do RRF - all_indices = set(semantic_ranks.keys()) | set(lexical_ranks.keys()) - - hybrid_scores = {} - for idx in all_indices: - rrf_score = 0.0 - - # Contribuição semântica - if idx in semantic_ranks: - rrf_score += 1.0 / (k + semantic_ranks[idx]) - - # Contribuição lexical - if idx in lexical_ranks: - rrf_score += 1.0 / (k + lexical_ranks[idx]) - - hybrid_scores[idx] = rrf_score - - # Ordenar por RRF score - sorted_candidates = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True) - - # Log de debug - n_semantic_only = len(semantic_ranks.keys() - lexical_ranks.keys()) - n_lexical_only = len(lexical_ranks.keys() - semantic_ranks.keys()) - n_both = len(semantic_ranks.keys() & lexical_ranks.keys()) - logging.info(f"Hybrid: {n_both} em ambos, {n_semantic_only} só semântico, {n_lexical_only} só léxico") - - # Preparar candidatos para reranking - candidate_docs = [] - candidate_indices = [] - retrieval_scores = {} - - for idx, rrf_score in sorted_candidates[:top_k_retrieval]: - doc_text = df.iloc[idx]["full_text"] - candidate_docs.append([query, doc_text]) - candidate_indices.append(idx) - retrieval_scores[idx] = rrf_score # Guardar RRF score - - if not candidate_docs: - return {"summary": "Não foram encontrados documentos relevantes.", "results": []} - - # FASE 2: Reranking - logging.info(f"Reranking {len(candidate_docs)} documentos...") - rerank_scores = reranker.predict(candidate_docs) - - rerank_results = sorted( - zip(candidate_indices, rerank_scores), - key=lambda x: x[1], - reverse=True - ) - - final_top_k = 5 - final_results = [] - context_parts = [] - - for rank, (idx, score) in enumerate(rerank_results[:final_top_k]): - doc_text = df.iloc[idx]["full_text"] - context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------") - - final_results.append({ - "index": idx, - "score": float(score), - "cosine_score": retrieval_scores.get(idx, 0.0), - "citation_id": rank + 1 - }) - - # FASE 3: Geração (OpenRouter) com TELEMETRIA - summary = "" - if llm_client: - context_str = "\n".join(context_parts) - rag_prompt = ( - "INSTRUÇÃO DE SISTEMA:\n" - "Você é o Aetherius, um motor de busca semântica de alta precisão.\n" - "Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n" - "REGRAS OBRIGATÓRIAS:\n" - "1. CITAÇÕES: Toda afirmação deve ter fonte [ID: x]. Ex: 'O lucro subiu [ID: 1].'\n" - "2. HONESTIDADE: Se não estiver no texto, diga que não encontrou.\n" - "3. IDIOMA: Português do Brasil.\n\n" - f"CONTEXTO RECUPERADO:\n{context_str}\n\n" - f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n" - "RESPOSTA:" - ) - - try: - # --- INÍCIO DA MEDIÇÃO DA API EXTERNA --- - start_time_llm = time.time() - - chat_completion = llm_client.chat.completions.create( - messages=[{"role": "user", "content": rag_prompt}], - model=LLM_MODEL, - temperature=0.1, - max_tokens=1024 - ) - - # Registra o tempo gasto apenas na chamada da API - duration = time.time() - start_time_llm - LLM_LATENCY.observe(duration) - # --- FIM DA MEDIÇÃO --- - - summary = chat_completion.choices[0].message.content.strip() - except Exception as e: - logging.warning(f"Erro na geração do LLM: {e}") - summary = "Não foi possível gerar o resumo automático, mas os documentos estão listados abaixo." - - return {"summary": summary, "results": final_results} - - except Exception as e: - logging.error(f"ERRO EM /search/: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) - - -@app.post("/describe_clusters/") -async def describe_clusters_api(job_id: str = Form(...)): - logging.info(f"Descrevendo clusters para Job: {job_id}") - if not llm_client: raise HTTPException(status_code=503, detail="LLM indisponível.") - if job_id not in cache: raise HTTPException(status_code=404, detail="Job não encontrado.") - - try: - cached_data = cache[job_id] - df = cached_data["df"] - embeddings = cached_data["embeddings"] - - champion_docs_by_cluster = {} - cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) - - for cid in cluster_ids: - mask = df["cluster"] == cid - cluster_embeddings = embeddings[mask] - cluster_texts = df[mask]["full_text"].tolist() - if len(cluster_texts) < 3: continue - - centroid = np.mean(cluster_embeddings, axis=0) - similarities = cosine_similarity([centroid], cluster_embeddings)[0] - top_indices = np.argsort(similarities)[-3:][::-1] - champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices] - - if not champion_docs_by_cluster: return {"insights": {}} - - prompt_sections = [] - for cid, docs in champion_docs_by_cluster.items(): - doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs]) - prompt_sections.append(f"Grupo {cid}:\n{doc_list}") - - master_prompt = ( - "Analise os grupos de texto abaixo. Para cada grupo, retorne um JSON com 'topic_name' e 'core_insight'.\n" - "Responda APENAS o JSON válido.\n\n" + "\n\n".join(prompt_sections) - ) - - # --- INÍCIO DA MEDIÇÃO DA API EXTERNA --- - start_time_llm = time.time() - - chat_completion = llm_client.chat.completions.create( - messages=[ - {"role": "system", "content": "JSON Output Only."}, - {"role": "user", "content": master_prompt}, - ], model=LLM_MODEL, temperature=0.2, - ) - - duration = time.time() - start_time_llm - LLM_LATENCY.observe(duration) - # --- FIM DA MEDIÇÃO --- - - response_content = chat_completion.choices[0].message.content - insights = json.loads(response_content.strip().replace("```json", "").replace("```", "")) - return {"insights": insights} - - except Exception as e: - logging.error(f"ERRO EM /describe_clusters/: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) - - -# ============================================================================== -# ENDPOINT TAVILY WEB SEARCH -# ============================================================================== -@app.post("/search_web/") -async def search_web_api( - query: str = Form(...), - max_results: int = Form(20), - search_depth: str = Form("basic") # "basic" ou "advanced" -): - """ - Busca na web via Tavily e processa resultados para visualização. - """ - if not tavily_client: - raise HTTPException(status_code=503, detail="Tavily não configurado. Defina TAVILY_API_KEY.") - - logging.info(f"Tavily Search: '{query}' (max: {max_results})") - - try: - # Buscar via Tavily - search_result = tavily_client.search( - query=query, - max_results=max_results, - search_depth=search_depth, - include_answer=False - ) - - results = search_result.get("results", []) - if not results: - return {"error": "Nenhum resultado encontrado.", "results_count": 0} - - # Extrair textos dos resultados - textos = [] - sources = [] - for r in results: - title = r.get("title", "") - content = r.get("content", "") - url = r.get("url", "") - - # Combinar título + conteúdo - full_text = f"{title}: {content}" if title else content - if len(full_text.strip().split()) > 5: - textos.append(full_text.strip()) - sources.append({"title": title, "url": url}) - - if not textos: - return {"error": "Resultados sem conteúdo válido.", "results_count": 0} - - # Processar através do pipeline com parâmetros para datasets pequenos - df, embeddings = processar_pipeline(textos, small_dataset=True) - - # Criar índice FAISS para busca rápida - embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) - faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1]) - faiss_index.add(embeddings_normalized.astype('float32')) - - # Adicionar URL de origem a cada ponto - df["source_url"] = [sources[i]["url"] if i < len(sources) else "" for i in range(len(df))] - df["source_title"] = [sources[i]["title"] if i < len(sources) else "" for i in range(len(df))] - - # Criar job e cachear - job_id = str(uuid.uuid4()) - cache[job_id] = { - "embeddings": embeddings, - "embeddings_normalized": embeddings_normalized, - "faiss_index": faiss_index, - "df": df, - "sources": sources - } - logging.info(f"Tavily Job criado: {job_id}") - - # Calcular métricas e análises - metricas_globais = calcular_metricas(df["full_text"].tolist()) - analise_por_cluster_tfidf = analisar_clusters(df) - - n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0) - n_ruido = int((df["cluster"] == "-1").sum()) - - return { - "job_id": job_id, - "metadata": { - "query": query, - "source": "tavily_web_search", - "num_documents_processed": len(df), - "num_clusters_found": n_clusters, - "num_noise_points": n_ruido - }, - "metrics": metricas_globais, - "cluster_analysis": analise_por_cluster_tfidf, - "plot_data": df[["x", "y", "z", "cluster", "full_text", "source_url", "source_title"]].to_dict("records"), - "sources": sources - } - - except Exception as e: - logging.error(f"ERRO EM /search_web/: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) - - -# ============================================================================== -# ENDPOINT KNOWLEDGE GRAPH (NER) -# ============================================================================== -@app.post("/entity_graph/") -async def entity_graph_api(job_id: str = Form(...)): - """ - Extrai entidades nomeadas e constrói grafo de conexões entre documentos. - """ - if job_id not in cache: - raise HTTPException(status_code=404, detail="Job ID não encontrado.") - - logging.info(f"Construindo Knowledge Graph para Job: {job_id}") - - try: - cached_data = cache[job_id] - df = cached_data["df"] - textos = df["full_text"].tolist() - - # Extrair entidades - logging.info(f"Extraindo entidades de {len(textos)} documentos...") - entities_by_doc = extract_entities(textos) - - # Construir posições dos pontos - positions = df[["x", "y", "z"]].to_dict("records") - - # Construir grafo - graph_data = build_entity_graph(entities_by_doc, positions) - - # Adicionar posições ao resultado - graph_data["positions"] = positions - graph_data["num_documents"] = len(textos) - - # Entidades por documento (para tooltip) - graph_data["entities_by_doc"] = [ - [{"text": e[0], "type": e[1]} for e in ents] - for ents in entities_by_doc - ] - - # Adicionar grafo entidade-entidade - entity_network = build_entity_to_entity_graph(entities_by_doc) - graph_data["entity_network"] = entity_network - - logging.info(f"Grafo construído: {graph_data['edge_count']} arestas doc-doc, {entity_network['node_count']} nós entidade") - - return graph_data - - except Exception as e: - logging.error(f"ERRO EM /entity_graph/: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) - - -# ============================================================================== -# ENDPOINT ANÁLISE DO GRAFO COM LLM (SÁBIO) -# ============================================================================== -@app.post("/analyze_graph/") -async def analyze_graph_api(job_id: str = Form(...)): - """ - Usa o LLM para analisar o Knowledge Graph e extrair insights semânticos. - """ - if job_id not in cache: - raise HTTPException(status_code=404, detail="Job ID não encontrado.") - - logging.info(f"Analisando grafo com LLM para Job: {job_id}") - - try: - cached_data = cache[job_id] - df = cached_data["df"] - textos = df["full_text"].tolist() - clusters = df["cluster"].tolist() if "cluster" in df.columns else ["0"] * len(textos) - - # Extrair entidades e construir grafo - entities_by_doc = extract_entities(textos) - entity_network = build_entity_to_entity_graph(entities_by_doc) - - # Mapear entidades por cluster - entity_clusters = defaultdict(lambda: defaultdict(int)) - for doc_idx, (entities, cluster) in enumerate(zip(entities_by_doc, clusters)): - for ent_text, ent_type in entities: - entity_clusters[ent_text][str(cluster)] += 1 - - # Preparar resumo do grafo para o LLM - nodes = entity_network.get("nodes", [])[:15] - edges = entity_network.get("edges", [])[:20] - hubs = entity_network.get("hubs", [])[:5] - insights = entity_network.get("insights", {}) - - # Criar contexto de clusters - cluster_context = [] - unique_clusters = sorted(set(str(c) for c in clusters if str(c) != "-1")) - for cluster_id in unique_clusters[:5]: - cluster_docs = [textos[i][:200] for i, c in enumerate(clusters) if str(c) == cluster_id][:3] - cluster_entities = [(ent, entity_clusters[ent].get(cluster_id, 0)) - for ent in entity_clusters if entity_clusters[ent].get(cluster_id, 0) > 0] - cluster_entities.sort(key=lambda x: x[1], reverse=True) - - cluster_context.append(f""" -### Cluster {cluster_id} ({len([c for c in clusters if str(c) == cluster_id])} docs) -Entidades principais: {', '.join([f"{e[0]}({e[1]})" for e in cluster_entities[:5]])} -Exemplo de documento: "{cluster_docs[0][:150]}..." -""") - - graph_summary = f""" -ANÁLISE DE KNOWLEDGE GRAPH COM CONTEXTO - -## Visão Geral -- {len(nodes)} entidades principais -- {insights.get('total_connections', 0)} conexões totais -- {insights.get('hub_count', 0)} hubs identificados -- {len(unique_clusters)} clusters de documentos - -## Entidades Principais (por importância): -{chr(10).join([f"- {n['entity']} ({n['type']}): {n['docs']} docs, centralidade {n.get('centrality', 0)}" for n in nodes])} - -## Hubs Centrais (entidades mais conectadas): -{chr(10).join([f"- {h['entity']} ({h['type']}): {h['degree']} conexões" for h in hubs]) if hubs else "Nenhum hub identificado"} - -## Conexões Mais Fortes: -{chr(10).join([f"- {e['source_entity']} ↔ {e['target_entity']}: {e['weight']} co-ocorrências" for e in edges[:10]])} - -## CONTEXTO POR CLUSTER (IMPORTANTE - USE ESTAS REFERÊNCIAS): -{chr(10).join(cluster_context)} -""" - - # Prompt para análise - prompt = f"""Você é um analista de inteligência especializado em análise de redes e grafos de conhecimento. - -Analise o seguinte Knowledge Graph extraído de documentos e forneça insights acionáveis: - -{graph_summary} - -Por favor forneça: -1. **Narrativa Central**: Qual é a história principal que conecta estas entidades? -2. **Atores Chave**: Quem são os principais players e qual seu papel? -3. **Relações Ocultas**: Que conexões não-óbvias você identifica? -4. **Padrões por Cluster**: Como as entidades se distribuem entre os clusters? Qual cluster tem foco diferente? -5. **Investigação**: O que valeria a pena investigar mais a fundo? - -IMPORTANTE: Sempre referencie os clusters específicos (ex: "No Cluster 0...", "Já no Cluster 1..."). -Use os dados concretos fornecidos, não generalize. Seja específico citando entidades e clusters.""" - - # Chamar LLM - completion = groq_client.chat.completions.create( - model="llama-3.3-70b-versatile", - messages=[{"role": "user", "content": prompt}], - temperature=0.7, - max_tokens=1500 - ) - - analysis = completion.choices[0].message.content - - return { - "analysis": analysis, - "graph_summary": { - "entities": len(nodes), - "connections": insights.get('total_connections', 0), - "hubs": len(hubs) - } - } - - except Exception as e: - logging.error(f"ERRO EM /analyze_graph/: {e}", exc_info=True) +# ============================================================================== +# API do AetherMap — VERSÃO 7.4 (FAISS EDITION) +# Backend com RAG Híbrido, CSV, Tavily, NER Entity Graph, FAISS ANN +# ============================================================================== + +import numpy as np +import pandas as pd +import torch +import gc +import uuid +import os +import io +import json +import logging +import time +import nltk +from nltk.corpus import stopwords +from collections import defaultdict + +from fastapi import FastAPI, UploadFile, File, Form, HTTPException +from fastapi.responses import JSONResponse +from typing import List, Dict, Any, Tuple +from functools import lru_cache + +# Ferramentas de Alquimia (ML & NLP) +from sentence_transformers import SentenceTransformer, CrossEncoder +import umap +import hdbscan +from sklearn.preprocessing import StandardScaler +from sklearn.metrics.pairwise import cosine_similarity +from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer +from scipy.stats import entropy +from scipy.sparse import csr_matrix +import faiss + +# ============================================================================== +# BM25 IMPLEMENTAÇÃO SIMPLES (Sem dependência externa) +# ============================================================================== +class SimpleBM25: + """ + Implementação simples de BM25 usando TF-IDF com ajustes. + Evita dependência externa do rank_bm25. + """ + def __init__(self, corpus: List[str], k1: float = 1.5, b: float = 0.75): + self.k1 = k1 + self.b = b + self.corpus = corpus + self.corpus_size = len(corpus) + + # Tokenização simples + self.tokenized_corpus = [doc.lower().split() for doc in corpus] + + # Calcular IDF + self.doc_freqs = {} + for doc in self.tokenized_corpus: + for term in set(doc): + self.doc_freqs[term] = self.doc_freqs.get(term, 0) + 1 + + # Calcular average document length + self.avgdl = sum(len(doc) for doc in self.tokenized_corpus) / self.corpus_size + + # IDF pre-computado + self.idf = {} + for term, freq in self.doc_freqs.items(): + self.idf[term] = np.log((self.corpus_size - freq + 0.5) / (freq + 0.5) + 1) + + def get_scores(self, query: str) -> np.ndarray: + """Retorna scores BM25 para todos os documentos.""" + query_terms = query.lower().split() + scores = np.zeros(self.corpus_size) + + for i, doc in enumerate(self.tokenized_corpus): + doc_len = len(doc) + term_freqs = {} + for term in doc: + term_freqs[term] = term_freqs.get(term, 0) + 1 + + score = 0.0 + for term in query_terms: + if term in term_freqs: + tf = term_freqs[term] + idf = self.idf.get(term, 0) + # BM25 formula + numerator = tf * (self.k1 + 1) + denominator = tf + self.k1 * (1 - self.b + self.b * (doc_len / self.avgdl)) + score += idf * (numerator / denominator) + + scores[i] = score + + return scores + + def get_top_n(self, query: str, n: int = 50) -> Tuple[np.ndarray, np.ndarray]: + """Retorna top N documentos (índices, scores).""" + scores = self.get_scores(query) + top_indices = np.argsort(scores)[-n:][::-1] + return top_indices, scores[top_indices] + +# NER & Language Detection +import spacy +from langdetect import detect, LangDetectException + +# Monitoramento (O Toque da Berta) +from prometheus_fastapi_instrumentator import Instrumentator +from prometheus_client import Histogram + +# A Conexão com o Oráculo (OpenRouter - OpenAI Compatible) +from openai import OpenAI + +# ============================================================================== +# CONFIGURAÇÕES GERAIS E LOGGING +# ============================================================================== +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + +# Modelos de IA +RETRIEVAL_MODEL = "all-MiniLM-L6-v2" # Rápido para varredura inicial +RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2" # Preciso para reordenação + +# Parâmetros de Processamento +BATCH_SIZE = 256 +UMAP_N_NEIGHBORS = 30 + +# Cache de Sessão (Na memória RAM) +cache: Dict[str, Any] = {} + +# Definição de Métricas Customizadas do Prometheus +LLM_LATENCY = Histogram( + "llm_api_latency_seconds", + "Tempo de resposta da API externa LLM (OpenRouter)", + buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0] +) + +# Modelo LLM (OpenRouter) +LLM_MODEL = os.environ.get("LLM_MODEL", "nex-agi/deepseek-v3.1-nex-n1:free") + +# Inicialização do Cliente OpenRouter (OpenAI Compatible) +OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") +try: + if not OPENROUTER_API_KEY: + logging.warning("OPENROUTER_API_KEY não encontrada. Funcionalidades de LLM estarão indisponíveis.") + llm_client = None + else: + llm_client = OpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=OPENROUTER_API_KEY + ) + logging.info(f"Cliente OpenRouter inicializado com modelo: {LLM_MODEL}") +except Exception as e: + logging.error(f"FALHA AO INICIALIZAR OPENROUTER: {e}") + llm_client = None + +# Inicialização do Cliente Tavily (Web Search) +TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY") +tavily_client = None +try: + if TAVILY_API_KEY: + from tavily import TavilyClient + tavily_client = TavilyClient(api_key=TAVILY_API_KEY) + logging.info("Cliente Tavily inicializado com sucesso.") + else: + logging.warning("TAVILY_API_KEY não encontrada. Busca web estará indisponível.") +except Exception as e: + logging.error(f"FALHA AO INICIALIZAR TAVILY: {e}") + tavily_client = None + + +# ============================================================================== +# GERENCIAMENTO HÍBRIDO DE STOP WORDS (NLTK + ARQUIVO TXT) +# ============================================================================== +def carregar_stopwords(): + """ + Carrega stop words do NLTK e combina com um arquivo externo 'stopwords.txt'. + """ + logging.info("Iniciando carregamento de Stop Words...") + + # 1. Base Gramatical (NLTK - Inglês e Português) + try: + nltk.data.find('corpora/stopwords') + except LookupError: + logging.info("Baixando corpus de stopwords...") + nltk.download('stopwords') + + # Cria um conjunto único com PT e EN + final_stops = set(stopwords.words('portuguese')) | set(stopwords.words('english')) + logging.info(f"Stopwords base (NLTK) carregadas: {len(final_stops)}") + + # 2. Base Customizada + arquivo_custom = "stopwords.txt" + + if os.path.exists(arquivo_custom): + logging.info(f"Arquivo '{arquivo_custom}' encontrado. Lendo palavras customizadas...") + try: + count_custom = 0 + with open(arquivo_custom, "r", encoding="utf-8") as f: + for linha in f: + palavra = linha.split('#')[0].strip().lower() + if palavra and len(palavra) > 1: + final_stops.add(palavra) + count_custom += 1 + logging.info(f"{count_custom} stop words customizadas importadas do arquivo.") + except Exception as e: + logging.error(f"Erro ao ler '{arquivo_custom}': {e}") + else: + logging.warning(f"Arquivo '{arquivo_custom}' não encontrado. Usando apenas NLTK.") + + lista_final = list(final_stops) + logging.info(f"Total final de Stop Words ativas: {len(lista_final)}") + return lista_final + +# Variável global carregada na inicialização +STOP_WORDS_MULTILINGUAL = carregar_stopwords() + + +# ============================================================================== +# CARREGAMENTO DE MODELOS (COM CACHE) +# ============================================================================== +@lru_cache(maxsize=1) +def load_retriever(): + device = "cuda" if torch.cuda.is_available() else "cpu" + logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}") + return SentenceTransformer(RETRIEVAL_MODEL, device=device) + +@lru_cache(maxsize=1) +def load_reranker(): + device = "cuda" if torch.cuda.is_available() else "cpu" + logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}") + return CrossEncoder(RERANKER_MODEL, device=device) + +# Cache for spaCy models +_spacy_models = {} +_spacy_available = True + +def load_spacy_model(lang: str): + """Carrega modelo spaCy com cache.""" + global _spacy_available + + if not _spacy_available: + return None + + if lang not in _spacy_models: + model_name = "pt_core_news_sm" if lang == "pt" else "en_core_web_sm" + try: + _spacy_models[lang] = spacy.load(model_name) + logging.info(f"Modelo spaCy '{model_name}' carregado.") + except OSError: + logging.warning(f"Modelo {model_name} não encontrado. Tentando baixar...") + try: + import subprocess + subprocess.run(["python", "-m", "spacy", "download", model_name], check=True) + _spacy_models[lang] = spacy.load(model_name) + except Exception as e: + logging.error(f"Falha ao baixar modelo spaCy: {e}") + _spacy_available = False + return None + return _spacy_models[lang] + +def detect_language(texts: List[str]) -> str: + """Detecta idioma predominante nos textos.""" + sample = " ".join(texts[:10])[:1000] + try: + lang = detect(sample) + return "pt" if lang == "pt" else "en" + except LangDetectException: + return "en" + +def extract_entities(textos: List[str]) -> List[List[Tuple[str, str]]]: + """Extrai entidades nomeadas de cada texto.""" + lang = detect_language(textos) + nlp = load_spacy_model(lang) + + # Fallback se spaCy não estiver disponível + if nlp is None: + logging.warning("spaCy não disponível. Retornando entidades vazias.") + return [[] for _ in textos] + + entities_by_doc = [] + for text in textos: + # Limitar tamanho do texto para performance + doc = nlp(text[:2000]) + entities = [(ent.text.lower().strip(), ent.label_) for ent in doc.ents + if len(ent.text.strip()) > 2 and ent.label_ in ("PERSON", "PER", "ORG", "GPE", "LOC")] + entities_by_doc.append(entities) + + # Normalizar entidades para deduplicação + return normalize_entities(entities_by_doc) + +def normalize_entities(entities_by_doc: List[List[Tuple[str, str]]]) -> List[List[Tuple[str, str]]]: + """Normaliza entidades para agrupar variações do mesmo nome.""" + + # Coletar todas as entidades únicas por tipo + all_entities = defaultdict(set) + for entities in entities_by_doc: + for text, etype in entities: + all_entities[etype].add(text) + + # Criar mapeamento de normalização + # Agrupa entidades onde uma contém a outra ou são muito similares + normalization_map = {} + + for etype, entity_set in all_entities.items(): + entities_list = sorted(entity_set, key=len, reverse=True) # Maiores primeiro + + for entity in entities_list: + if entity in normalization_map: + continue + + # Encontrar entidades que são parte desta ou similares + canonical = entity + for other in entities_list: + if other == entity: + continue + + # Se uma contém a outra (ex: "donald trump" contém "trump") + if other in entity or entity in other: + # Usar a mais completa como canônica + if len(entity) >= len(other): + normalization_map[(other, etype)] = (entity, etype) + else: + normalization_map[(entity, etype)] = (other, etype) + canonical = other + + # Mapear para si mesmo se não foi mapeado + if (entity, etype) not in normalization_map: + normalization_map[(entity, etype)] = (canonical, etype) + + # Aplicar normalização + normalized_docs = [] + for entities in entities_by_doc: + normalized = [] + seen = set() + for text, etype in entities: + canonical = normalization_map.get((text, etype), (text, etype)) + if canonical not in seen: + seen.add(canonical) + normalized.append(canonical) + normalized_docs.append(normalized) + + logging.info(f"Normalização: {len(all_entities)} tipos, mapa com {len(normalization_map)} entradas") + return normalized_docs + +def build_entity_graph(entities_by_doc: List[List[Tuple[str, str]]], + positions: List[Dict]) -> Dict[str, Any]: + """Constrói grafo de conexões baseado em entidades compartilhadas.""" + # Inverter: entidade -> lista de doc indices + entity_to_docs = defaultdict(set) + + for doc_idx, entities in enumerate(entities_by_doc): + for entity_text, entity_type in entities: + entity_to_docs[(entity_text, entity_type)].add(doc_idx) + + # Construir arestas (conexões entre docs que compartilham entidades) + edges = [] + seen_pairs = set() + + for (entity_text, entity_type), doc_indices in entity_to_docs.items(): + if len(doc_indices) < 2: + continue + + doc_list = sorted(doc_indices) + for i in range(len(doc_list)): + for j in range(i + 1, len(doc_list)): + pair = (doc_list[i], doc_list[j]) + if pair not in seen_pairs: + seen_pairs.add(pair) + edges.append({ + "source": doc_list[i], + "target": doc_list[j], + "entity": entity_text, + "entity_type": entity_type + }) + + # Contar entidades mais frequentes + entity_counts = [(k, len(v)) for k, v in entity_to_docs.items() if len(v) >= 2] + top_entities = sorted(entity_counts, key=lambda x: x[1], reverse=True)[:20] + + return { + "edges": edges, + "edge_count": len(edges), + "connected_pairs": len(seen_pairs), + "top_entities": [{"entity": e[0][0], "type": e[0][1], "docs": e[1]} for e in top_entities] + } + +def build_entity_to_entity_graph(entities_by_doc: List[List[Tuple[str, str]]]) -> Dict[str, Any]: + """ + Constrói grafo onde os NÓS são entidades e ARESTAS são co-ocorrências. + Entidades que aparecem no mesmo documento são conectadas. + """ + # Contar co-ocorrências + cooccurrence = defaultdict(int) + entity_doc_count = defaultdict(int) + + for entities in entities_by_doc: + unique_entities = list(set(entities)) + + # Contar docs por entidade + for ent in unique_entities: + entity_doc_count[ent] += 1 + + # Criar pares de co-ocorrência + for i in range(len(unique_entities)): + for j in range(i + 1, len(unique_entities)): + pair = tuple(sorted([unique_entities[i], unique_entities[j]], key=str)) + cooccurrence[pair] += 1 + + # Construir nós (entidades com >= 2 docs, limitado a top 50) + nodes = [] + entity_to_id = {} + + # Filtrar e ordenar por frequência + valid_entities = [(e, c) for e, c in entity_doc_count.items() if c >= 2] + valid_entities.sort(key=lambda x: x[1], reverse=True) + valid_entities = valid_entities[:50] # Limitar a 50 nós principais + + node_count = len(valid_entities) + + node_idx = 0 + for entity, count in valid_entities: + entity_to_id[entity] = node_idx + + # Layout em esfera 3D (melhor para muitos nós) + # Golden angle para distribuição uniforme + golden_angle = np.pi * (3 - np.sqrt(5)) + theta = node_idx * golden_angle + phi = np.arccos(1 - 2 * (node_idx + 0.5) / max(node_count, 1)) + + radius = 3.0 # Raio fixo maior + x = radius * np.sin(phi) * np.cos(theta) + y = radius * np.sin(phi) * np.sin(theta) + z = radius * np.cos(phi) + + nodes.append({ + "id": node_idx, + "entity": entity[0], + "type": entity[1], + "docs": count, + "x": float(x), + "y": float(y), + "z": float(z) + }) + node_idx += 1 + + # Construir arestas (co-ocorrências) + entity_edges = [] + for (ent1, ent2), weight in cooccurrence.items(): + if ent1 in entity_to_id and ent2 in entity_to_id and weight >= 1: + entity_edges.append({ + "source": entity_to_id[ent1], + "target": entity_to_id[ent2], + "weight": weight, + "source_entity": ent1[0], + "target_entity": ent2[0], + "reason": f"Aparecem juntos em {weight} documento(s)" + }) + + # Ordenar arestas por peso + entity_edges.sort(key=lambda x: x["weight"], reverse=True) + + # Calcular métricas de grafo + # Degree centrality (número de conexões de cada nó) + degree = defaultdict(int) + for edge in entity_edges: + degree[edge["source"]] += edge["weight"] + degree[edge["target"]] += edge["weight"] + + # Calcular max degree para normalização + max_degree = max(degree.values()) if degree else 1 + + # Atualizar nós com métricas + hubs = [] + for node in nodes: + node_degree = degree.get(node["id"], 0) + node["degree"] = node_degree + node["centrality"] = round(node_degree / max_degree, 3) + + # Classificar nó + if node["centrality"] > 0.7: + node["role"] = "hub" # Hub central + hubs.append(node) + elif node["centrality"] > 0.3: + node["role"] = "connector" # Conector + else: + node["role"] = "peripheral" # Periférico + + # Top hubs + hubs.sort(key=lambda x: x["degree"], reverse=True) + + return { + "nodes": nodes, + "edges": entity_edges[:200], # Limitar a 200 arestas mais fortes + "node_count": len(nodes), + "edge_count": len(entity_edges), + "hubs": [{"entity": h["entity"], "type": h["type"], "degree": h["degree"]} for h in hubs[:5]], + "insights": { + "total_connections": sum(degree.values()) // 2, + "avg_degree": round(sum(degree.values()) / len(degree), 1) if degree else 0, + "hub_count": len(hubs) + } + } + + +# ============================================================================== +# PIPELINE DE PROCESSAMENTO DE DADOS +# ============================================================================== +def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]: + """Prepara textos de arquivo TXT (uma linha por documento).""" + linhas = file_bytes.decode("utf-8", errors="ignore").splitlines() + textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3] + return textos[:n_samples] + +def detect_csv_separator(file_bytes: bytes) -> str: + """Detecta separador do CSV (vírgula ou ponto-e-vírgula).""" + sample = file_bytes[:4096].decode("utf-8", errors="ignore") + first_line = sample.split('\n')[0] + + # Conta ocorrências de cada separador na primeira linha + commas = first_line.count(',') + semicolons = first_line.count(';') + + return ';' if semicolons > commas else ',' + +def read_csv_smart(file_bytes: bytes, nrows=None) -> pd.DataFrame: + """Lê CSV com detecção automática de separador e encoding.""" + sep = detect_csv_separator(file_bytes) + + try: + df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="utf-8", nrows=nrows) + except UnicodeDecodeError: + df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="latin-1", nrows=nrows) + + return df + +def preparar_textos_csv(file_bytes: bytes, text_column: str, n_samples: int) -> List[str]: + """Prepara textos de arquivo CSV extraindo coluna especificada.""" + df = read_csv_smart(file_bytes) + + if text_column not in df.columns: + available = ", ".join(df.columns.tolist()[:10]) + raise ValueError(f"Coluna '{text_column}' não encontrada. Colunas disponíveis: {available}") + + textos = df[text_column].dropna().astype(str).tolist() + # Filtrar textos muito curtos + textos = [t.strip() for t in textos if len(t.strip().split()) > 3] + return textos[:n_samples] + +def get_csv_columns(file_bytes: bytes) -> List[str]: + """Retorna lista de colunas de um arquivo CSV.""" + df = read_csv_smart(file_bytes, nrows=0) + return df.columns.tolist() + + +def processar_pipeline( + textos: List[str], + small_dataset: bool = False, + fast_mode: bool = False, + custom_min_cluster_size: int = 0, # 0 = Auto + custom_min_samples: int = 0 # 0 = Auto +) -> (pd.DataFrame, np.ndarray): + """ + Processa textos através do pipeline de embeddings, redução dimensional e clustering. + + Args: + textos: Lista de textos + small_dataset: Se True, usa parâmetros otimizados para datasets pequenos (Tavily) + fast_mode: Se True, usa PCA ao invés de UMAP (10-100x mais rápido) + custom_min_cluster_size: Tamanho mínimo do cluster (0 = Auto) + custom_min_samples: Número mínimo de amostras (0 = Auto) + """ + num_textos = len(textos) + logging.info(f"Iniciando pipeline para {num_textos} textos (fast_mode={fast_mode})...") + model = load_retriever() + + # 1. Embeddings + embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True) + + # 2. Redução Dimensional + if fast_mode: + # PCA - O(N) muito mais rápido que UMAP O(N²) + from sklearn.decomposition import PCA + logging.info("Usando PCA (modo rápido)...") + reducer = PCA(n_components=3, random_state=42) + emb_3d = reducer.fit_transform(embeddings) + else: + # UMAP - mais preciso mas lento + n_neighbors = min(15, max(3, num_textos - 1)) if small_dataset else UMAP_N_NEIGHBORS + logging.info(f"Usando UMAP (n_neighbors={n_neighbors})...") + reducer = umap.UMAP(n_components=3, n_neighbors=n_neighbors, min_dist=0.0, metric="cosine", random_state=42) + emb_3d = reducer.fit_transform(embeddings) + + emb_3d = StandardScaler().fit_transform(emb_3d) + + # 3. HDBSCAN - usa parâmetros customizados ou adaptativos + if custom_min_cluster_size > 0: + # Usar valores customizados do usuário + min_size = custom_min_cluster_size + min_samples_val = custom_min_samples if custom_min_samples > 0 else None + logging.info(f"Usando parâmetros customizados: min_cluster_size={min_size}, min_samples={min_samples_val}") + elif small_dataset: + # Para Tavily (10-50 docs): clusters menores, mais agressivo + min_size = max(2, int(num_textos * 0.1)) + min_samples_val = 1 + elif fast_mode: + # Para PCA: clusters menores já que PCA não separa tão bem quanto UMAP + min_size = max(8, int(num_textos * 0.01)) + min_samples_val = 3 + else: + # Para datasets grandes com UMAP: comportamento padrão + min_size = max(10, int(num_textos * 0.02)) + min_samples_val = None + + logging.info(f"HDBSCAN: min_cluster_size={min_size}, min_samples={min_samples_val}") + + clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size, min_samples=min_samples_val) + clusters = clusterer.fit_predict(emb_3d) + + # 4. DataFrame + df = pd.DataFrame({ + "x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2], + "full_text": textos, "cluster": clusters.astype(str) + }) + + del reducer, clusterer, emb_3d; gc.collect() + return df, embeddings + +def calcular_metricas(textos: List[str]) -> Dict[str, Any]: + logging.info("Calculando métricas globais...") + if not textos: return {} + + # Token pattern: só palavras alfabéticas com 3+ caracteres (ignora números) + token_pattern = r'\b[a-zA-ZÀ-ÿ]{3,}\b' + + vectorizer_count = CountVectorizer( + stop_words=STOP_WORDS_MULTILINGUAL, + max_features=1000, + token_pattern=token_pattern + ) + vectorizer_tfidf = TfidfVectorizer( + stop_words=STOP_WORDS_MULTILINGUAL, + max_features=1000, + token_pattern=token_pattern + ) + + try: + counts_matrix = vectorizer_count.fit_transform(textos) + tfidf_matrix = vectorizer_tfidf.fit_transform(textos) + except ValueError: + return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0} + + vocab_count = vectorizer_count.get_feature_names_out() + contagens = counts_matrix.sum(axis=0).A1 + + vocab_tfidf = vectorizer_tfidf.get_feature_names_out() + soma_tfidf = tfidf_matrix.sum(axis=0).A1 + top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1] + top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf] + + return { + "riqueza_lexical": len(vocab_count), + "top_tfidf_palavras": top_tfidf, + "entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0 + } + +def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]: + logging.info("Detectando duplicados...") + mask = df["full_text"].duplicated(keep=False) + grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()} + pares_semanticos = [] + + if 2 < len(embeddings) < 5000: + sim = cosine_similarity(embeddings) + triu_indices = np.triu_indices_from(sim, k=1) + sim_vetor = sim[triu_indices] + pares_idx = np.where(sim_vetor > 0.98)[0] + top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]] + for i in top_pares_idx: + idx1, idx2 = triu_indices[0][i], triu_indices[1][i] + if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]: + pares_semanticos.append({ + "similaridade": float(sim[idx1, idx2]), + "texto1": df["full_text"].iloc[idx1], + "texto2": df["full_text"].iloc[idx2] + }) + return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos} + +def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]: + logging.info("Analisando clusters...") + analise = {} + ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) + for cid in ids_clusters_validos: + textos_cluster = df[df["cluster"] == cid]["full_text"].tolist() + if len(textos_cluster) < 2: continue + try: + vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000) + tfidf_matrix = vectorizer.fit_transform(textos_cluster) + vocab = vectorizer.get_feature_names_out() + soma = tfidf_matrix.sum(axis=0).A1 + top_idx = np.argsort(soma)[-5:][::-1] + top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx] + except ValueError: + top_palavras = [] + analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras} + return analise + + +# ============================================================================== +# API FASTAPI & INSTRUMENTAÇÃO +# ============================================================================== +app = FastAPI(title="AetherMap API 7.2", version="7.2.0", description="Backend Semantic Search + CSV + Tavily Web Search") + +# --- A MÁGICA ACONTECE AQUI --- +# Isso expõe automaticamente o endpoint /metrics para o Prometheus/Grafana +Instrumentator().instrument(app).expose(app) +# ------------------------------ + +@app.get("/") +async def root(): + return {"status": "online", "message": "Aether Map API 7.2 (CSV + Tavily Ready)."} + +@app.post("/csv_columns/") +async def get_columns_api(file: UploadFile = File(...)): + """Retorna as colunas de um arquivo CSV para preview.""" + if not file.filename.lower().endswith('.csv'): + raise HTTPException(status_code=400, detail="Arquivo deve ser CSV.") + try: + file_bytes = await file.read() + columns = get_csv_columns(file_bytes) + return {"columns": columns, "filename": file.filename} + except Exception as e: + raise HTTPException(status_code=400, detail=f"Erro ao ler CSV: {str(e)}") + +@app.post("/process/") +async def process_api( + n_samples: int = Form(10000), + file: UploadFile = File(...), + text_column: str = Form(None), # Coluna de texto para CSV + fast_mode: str = Form("false"), # Modo rápido (PCA ao invés de UMAP) + min_cluster_size: str = Form("0"), # 0 = Auto + min_samples: str = Form("0") # 0 = Auto +): + # Converter strings para tipos apropriados + fast_mode_bool = fast_mode.lower() in ("true", "1", "yes") + min_cluster_size_int = int(min_cluster_size) if min_cluster_size.isdigit() else 0 + min_samples_int = int(min_samples) if min_samples.isdigit() else 0 + + logging.info(f"Processando: {file.filename} (fast_mode={fast_mode_bool}, min_cluster={min_cluster_size_int}, min_samples={min_samples_int})") + try: + file_bytes = await file.read() + + # Detectar tipo de arquivo e processar + if file.filename.lower().endswith('.csv'): + if not text_column: + raise HTTPException(status_code=400, detail="Para CSV, informe 'text_column'.") + textos = preparar_textos_csv(file_bytes, text_column, n_samples) + else: + textos = preparar_textos(file_bytes, n_samples) + + if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.") + + # Auto-enable fast_mode para datasets grandes + if len(textos) > 5000 and not fast_mode_bool: + logging.warning(f"Dataset grande ({len(textos)} docs) - considere usar fast_mode=True") + + df, embeddings = processar_pipeline( + textos, + fast_mode=fast_mode_bool, + custom_min_cluster_size=min_cluster_size_int, + custom_min_samples=min_samples_int + ) + + # Criar índice FAISS para busca rápida (semântica) + embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) + faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1]) # Inner Product = Cosine sim para vetores normalizados + faiss_index.add(embeddings_normalized.astype('float32')) + + # Criar índice BM25 para busca lexical (Hybrid Search) + corpus_texts = df["full_text"].tolist() + bm25_index = SimpleBM25(corpus_texts) + logging.info(f"BM25 index criado com {len(corpus_texts)} documentos") + + job_id = str(uuid.uuid4()) + cache[job_id] = { + "embeddings": embeddings, + "embeddings_normalized": embeddings_normalized, + "faiss_index": faiss_index, + "bm25_index": bm25_index, # Novo: índice BM25 + "df": df + } + logging.info(f"Job criado: {job_id} (FAISS + BM25 hybrid search)") + + metricas_globais = calcular_metricas(df["full_text"].tolist()) + analise_de_duplicados = encontrar_duplicados(df, embeddings) + analise_por_cluster_tfidf = analisar_clusters(df) + + n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0) + n_ruido = int((df["cluster"] == "-1").sum()) + + return { + "job_id": job_id, + "metadata": { + "filename": file.filename, + "num_documents_processed": len(df), + "num_clusters_found": n_clusters, + "num_noise_points": n_ruido + }, + "metrics": metricas_globais, + "duplicates": analise_de_duplicados, + "cluster_analysis": analise_por_cluster_tfidf, + "plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"), + } + except Exception as e: + logging.error(f"ERRO EM /process/: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/search/") +async def search_api(query: str = Form(...), job_id: str = Form(...)): + """ + ENDPOINT DE BUSCA (RAG Híbrido) com Monitoramento de Latência + """ + logging.info(f"Busca: '{query}' [Job: {job_id}]") + if job_id not in cache: + raise HTTPException(status_code=404, detail="Job ID não encontrado.") + + try: + model = load_retriever() + reranker = load_reranker() + + cached_data = cache[job_id] + df = cached_data["df"] + faiss_index = cached_data.get("faiss_index") + bm25_index = cached_data.get("bm25_index") + + # ================================================================== + # FASE 1: HYBRID SEARCH (FAISS Semântico + BM25 Lexical) + # ================================================================== + query_embedding = model.encode([query], convert_to_numpy=True) + query_normalized = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True) + + top_k_retrieval = min(100, len(df)) # Aumentado para hybrid search + + # --- 1A: Busca Semântica (FAISS) --- + semantic_scores = {} + semantic_ranks = {} + + if faiss_index: + faiss_scores, faiss_indices = faiss_index.search( + query_normalized.astype('float32'), + min(top_k_retrieval, faiss_index.ntotal) + ) + faiss_scores = faiss_scores[0] + faiss_indices = faiss_indices[0] + + for rank, (idx, score) in enumerate(zip(faiss_indices, faiss_scores)): + if idx >= 0: + semantic_scores[int(idx)] = float(score) + semantic_ranks[int(idx)] = rank + 1 # 1-indexed rank + + logging.info(f"FAISS: top score = {faiss_scores[0]:.3f}") + + # --- 1B: Busca Lexical (BM25) --- + lexical_scores = {} + lexical_ranks = {} + + if bm25_index: + bm25_indices, bm25_scores = bm25_index.get_top_n(query, n=top_k_retrieval) + + for rank, (idx, score) in enumerate(zip(bm25_indices, bm25_scores)): + if score > 0: # Só inclui se teve match + lexical_scores[int(idx)] = float(score) + lexical_ranks[int(idx)] = rank + 1 + + if bm25_scores[0] > 0: + logging.info(f"BM25: top score = {bm25_scores[0]:.3f}, docs matched = {len(lexical_scores)}") + else: + logging.info("BM25: nenhum match léxico encontrado") + + # --- 1C: Reciprocal Rank Fusion (RRF) --- + # Combina rankings de ambas as buscas + # RRF score = sum(1 / (k + rank)) onde k é constante (tipicamente 60) + k = 60 # Constante de suavização do RRF + all_indices = set(semantic_ranks.keys()) | set(lexical_ranks.keys()) + + hybrid_scores = {} + for idx in all_indices: + rrf_score = 0.0 + + # Contribuição semântica + if idx in semantic_ranks: + rrf_score += 1.0 / (k + semantic_ranks[idx]) + + # Contribuição lexical + if idx in lexical_ranks: + rrf_score += 1.0 / (k + lexical_ranks[idx]) + + hybrid_scores[idx] = rrf_score + + # Ordenar por RRF score + sorted_candidates = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True) + + # Log de debug + n_semantic_only = len(semantic_ranks.keys() - lexical_ranks.keys()) + n_lexical_only = len(lexical_ranks.keys() - semantic_ranks.keys()) + n_both = len(semantic_ranks.keys() & lexical_ranks.keys()) + logging.info(f"Hybrid: {n_both} em ambos, {n_semantic_only} só semântico, {n_lexical_only} só léxico") + + # Preparar candidatos para reranking + candidate_docs = [] + candidate_indices = [] + retrieval_scores = {} + + for idx, rrf_score in sorted_candidates[:top_k_retrieval]: + doc_text = df.iloc[idx]["full_text"] + candidate_docs.append([query, doc_text]) + candidate_indices.append(idx) + retrieval_scores[idx] = rrf_score # Guardar RRF score + + if not candidate_docs: + return {"summary": "Não foram encontrados documentos relevantes.", "results": []} + + # FASE 2: Reranking + logging.info(f"Reranking {len(candidate_docs)} documentos...") + rerank_scores = reranker.predict(candidate_docs) + + rerank_results = sorted( + zip(candidate_indices, rerank_scores), + key=lambda x: x[1], + reverse=True + ) + + final_top_k = 5 + final_results = [] + context_parts = [] + + for rank, (idx, score) in enumerate(rerank_results[:final_top_k]): + doc_text = df.iloc[idx]["full_text"] + context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------") + + final_results.append({ + "index": idx, + "score": float(score), + "cosine_score": retrieval_scores.get(idx, 0.0), + "citation_id": rank + 1 + }) + + # FASE 3: Geração (OpenRouter) com TELEMETRIA + summary = "" + if llm_client: + context_str = "\n".join(context_parts) + rag_prompt = ( + "INSTRUÇÃO DE SISTEMA:\n" + "Você é o Aetherius, um motor de busca semântica de alta precisão.\n" + "Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n" + "REGRAS OBRIGATÓRIAS:\n" + "1. CITAÇÕES: Toda afirmação deve ter fonte [ID: x]. Ex: 'O lucro subiu [ID: 1].'\n" + "2. HONESTIDADE: Se não estiver no texto, diga que não encontrou.\n" + "3. IDIOMA: Português do Brasil.\n\n" + f"CONTEXTO RECUPERADO:\n{context_str}\n\n" + f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n" + "RESPOSTA:" + ) + + try: + # --- INÍCIO DA MEDIÇÃO DA API EXTERNA --- + start_time_llm = time.time() + + chat_completion = llm_client.chat.completions.create( + messages=[{"role": "user", "content": rag_prompt}], + model=LLM_MODEL, + temperature=0.1, + max_tokens=1024 + ) + + # Registra o tempo gasto apenas na chamada da API + duration = time.time() - start_time_llm + LLM_LATENCY.observe(duration) + # --- FIM DA MEDIÇÃO --- + + summary = chat_completion.choices[0].message.content.strip() + except Exception as e: + logging.warning(f"Erro na geração do LLM: {e}") + summary = "Não foi possível gerar o resumo automático, mas os documentos estão listados abaixo." + + return {"summary": summary, "results": final_results} + + except Exception as e: + logging.error(f"ERRO EM /search/: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/describe_clusters/") +async def describe_clusters_api(job_id: str = Form(...)): + logging.info(f"Descrevendo clusters para Job: {job_id}") + if not llm_client: raise HTTPException(status_code=503, detail="LLM indisponível.") + if job_id not in cache: raise HTTPException(status_code=404, detail="Job não encontrado.") + + try: + cached_data = cache[job_id] + df = cached_data["df"] + embeddings = cached_data["embeddings"] + + champion_docs_by_cluster = {} + cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) + + for cid in cluster_ids: + mask = df["cluster"] == cid + cluster_embeddings = embeddings[mask] + cluster_texts = df[mask]["full_text"].tolist() + if len(cluster_texts) < 3: continue + + centroid = np.mean(cluster_embeddings, axis=0) + similarities = cosine_similarity([centroid], cluster_embeddings)[0] + top_indices = np.argsort(similarities)[-3:][::-1] + champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices] + + if not champion_docs_by_cluster: return {"insights": {}} + + prompt_sections = [] + for cid, docs in champion_docs_by_cluster.items(): + doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs]) + prompt_sections.append(f"Grupo {cid}:\n{doc_list}") + + master_prompt = ( + "Analise os grupos de texto abaixo. Para cada grupo, retorne um JSON com 'topic_name' e 'core_insight'.\n" + "Responda APENAS o JSON válido.\n\n" + "\n\n".join(prompt_sections) + ) + + # --- INÍCIO DA MEDIÇÃO DA API EXTERNA --- + start_time_llm = time.time() + + chat_completion = llm_client.chat.completions.create( + messages=[ + {"role": "system", "content": "JSON Output Only."}, + {"role": "user", "content": master_prompt}, + ], model=LLM_MODEL, temperature=0.2, + ) + + duration = time.time() - start_time_llm + LLM_LATENCY.observe(duration) + # --- FIM DA MEDIÇÃO --- + + response_content = chat_completion.choices[0].message.content + insights = json.loads(response_content.strip().replace("```json", "").replace("```", "")) + return {"insights": insights} + + except Exception as e: + logging.error(f"ERRO EM /describe_clusters/: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +# ============================================================================== +# ENDPOINT TAVILY WEB SEARCH +# ============================================================================== +@app.post("/search_web/") +async def search_web_api( + query: str = Form(...), + max_results: int = Form(20), + search_depth: str = Form("basic") # "basic" ou "advanced" +): + """ + Busca na web via Tavily e processa resultados para visualização. + """ + if not tavily_client: + raise HTTPException(status_code=503, detail="Tavily não configurado. Defina TAVILY_API_KEY.") + + logging.info(f"Tavily Search: '{query}' (max: {max_results})") + + try: + # Buscar via Tavily + search_result = tavily_client.search( + query=query, + max_results=max_results, + search_depth=search_depth, + include_answer=False + ) + + results = search_result.get("results", []) + if not results: + return {"error": "Nenhum resultado encontrado.", "results_count": 0} + + # Extrair textos dos resultados + textos = [] + sources = [] + for r in results: + title = r.get("title", "") + content = r.get("content", "") + url = r.get("url", "") + + # Combinar título + conteúdo + full_text = f"{title}: {content}" if title else content + if len(full_text.strip().split()) > 5: + textos.append(full_text.strip()) + sources.append({"title": title, "url": url}) + + if not textos: + return {"error": "Resultados sem conteúdo válido.", "results_count": 0} + + # Processar através do pipeline com parâmetros para datasets pequenos + df, embeddings = processar_pipeline(textos, small_dataset=True) + + # Criar índice FAISS para busca rápida + embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) + faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1]) + faiss_index.add(embeddings_normalized.astype('float32')) + + # Adicionar URL de origem a cada ponto + df["source_url"] = [sources[i]["url"] if i < len(sources) else "" for i in range(len(df))] + df["source_title"] = [sources[i]["title"] if i < len(sources) else "" for i in range(len(df))] + + # Criar job e cachear + job_id = str(uuid.uuid4()) + cache[job_id] = { + "embeddings": embeddings, + "embeddings_normalized": embeddings_normalized, + "faiss_index": faiss_index, + "df": df, + "sources": sources + } + logging.info(f"Tavily Job criado: {job_id}") + + # Calcular métricas e análises + metricas_globais = calcular_metricas(df["full_text"].tolist()) + analise_por_cluster_tfidf = analisar_clusters(df) + + n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0) + n_ruido = int((df["cluster"] == "-1").sum()) + + return { + "job_id": job_id, + "metadata": { + "query": query, + "source": "tavily_web_search", + "num_documents_processed": len(df), + "num_clusters_found": n_clusters, + "num_noise_points": n_ruido + }, + "metrics": metricas_globais, + "cluster_analysis": analise_por_cluster_tfidf, + "plot_data": df[["x", "y", "z", "cluster", "full_text", "source_url", "source_title"]].to_dict("records"), + "sources": sources + } + + except Exception as e: + logging.error(f"ERRO EM /search_web/: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +# ============================================================================== +# ENDPOINT KNOWLEDGE GRAPH (NER) +# ============================================================================== +@app.post("/entity_graph/") +async def entity_graph_api(job_id: str = Form(...)): + """ + Extrai entidades nomeadas e constrói grafo de conexões entre documentos. + """ + if job_id not in cache: + raise HTTPException(status_code=404, detail="Job ID não encontrado.") + + logging.info(f"Construindo Knowledge Graph para Job: {job_id}") + + try: + cached_data = cache[job_id] + df = cached_data["df"] + textos = df["full_text"].tolist() + + # Extrair entidades + logging.info(f"Extraindo entidades de {len(textos)} documentos...") + entities_by_doc = extract_entities(textos) + + # Construir posições dos pontos + positions = df[["x", "y", "z"]].to_dict("records") + + # Construir grafo + graph_data = build_entity_graph(entities_by_doc, positions) + + # Adicionar posições ao resultado + graph_data["positions"] = positions + graph_data["num_documents"] = len(textos) + + # Entidades por documento (para tooltip) + graph_data["entities_by_doc"] = [ + [{"text": e[0], "type": e[1]} for e in ents] + for ents in entities_by_doc + ] + + # Adicionar grafo entidade-entidade + entity_network = build_entity_to_entity_graph(entities_by_doc) + graph_data["entity_network"] = entity_network + + logging.info(f"Grafo construído: {graph_data['edge_count']} arestas doc-doc, {entity_network['node_count']} nós entidade") + + return graph_data + + except Exception as e: + logging.error(f"ERRO EM /entity_graph/: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +# ============================================================================== +# ENDPOINT ANÁLISE DO GRAFO COM LLM (SÁBIO) +# ============================================================================== +@app.post("/analyze_graph/") +async def analyze_graph_api(job_id: str = Form(...)): + """ + Usa o LLM para analisar o Knowledge Graph e extrair insights semânticos. + """ + if job_id not in cache: + raise HTTPException(status_code=404, detail="Job ID não encontrado.") + + logging.info(f"Analisando grafo com LLM para Job: {job_id}") + + try: + cached_data = cache[job_id] + df = cached_data["df"] + textos = df["full_text"].tolist() + clusters = df["cluster"].tolist() if "cluster" in df.columns else ["0"] * len(textos) + + # Extrair entidades e construir grafo + entities_by_doc = extract_entities(textos) + entity_network = build_entity_to_entity_graph(entities_by_doc) + + # Mapear entidades por cluster + entity_clusters = defaultdict(lambda: defaultdict(int)) + for doc_idx, (entities, cluster) in enumerate(zip(entities_by_doc, clusters)): + for ent_text, ent_type in entities: + entity_clusters[ent_text][str(cluster)] += 1 + + # Preparar resumo do grafo para o LLM + nodes = entity_network.get("nodes", [])[:15] + edges = entity_network.get("edges", [])[:20] + hubs = entity_network.get("hubs", [])[:5] + insights = entity_network.get("insights", {}) + + # Criar contexto de clusters + cluster_context = [] + unique_clusters = sorted(set(str(c) for c in clusters if str(c) != "-1")) + for cluster_id in unique_clusters[:5]: + cluster_docs = [textos[i][:200] for i, c in enumerate(clusters) if str(c) == cluster_id][:3] + cluster_entities = [(ent, entity_clusters[ent].get(cluster_id, 0)) + for ent in entity_clusters if entity_clusters[ent].get(cluster_id, 0) > 0] + cluster_entities.sort(key=lambda x: x[1], reverse=True) + + cluster_context.append(f""" +### Cluster {cluster_id} ({len([c for c in clusters if str(c) == cluster_id])} docs) +Entidades principais: {', '.join([f"{e[0]}({e[1]})" for e in cluster_entities[:5]])} +Exemplo de documento: "{cluster_docs[0][:150]}..." +""") + + graph_summary = f""" +ANÁLISE DE KNOWLEDGE GRAPH COM CONTEXTO + +## Visão Geral +- {len(nodes)} entidades principais +- {insights.get('total_connections', 0)} conexões totais +- {insights.get('hub_count', 0)} hubs identificados +- {len(unique_clusters)} clusters de documentos + +## Entidades Principais (por importância): +{chr(10).join([f"- {n['entity']} ({n['type']}): {n['docs']} docs, centralidade {n.get('centrality', 0)}" for n in nodes])} + +## Hubs Centrais (entidades mais conectadas): +{chr(10).join([f"- {h['entity']} ({h['type']}): {h['degree']} conexões" for h in hubs]) if hubs else "Nenhum hub identificado"} + +## Conexões Mais Fortes: +{chr(10).join([f"- {e['source_entity']} ↔ {e['target_entity']}: {e['weight']} co-ocorrências" for e in edges[:10]])} + +## CONTEXTO POR CLUSTER (IMPORTANTE - USE ESTAS REFERÊNCIAS): +{chr(10).join(cluster_context)} +""" + + # Prompt para análise + prompt = f"""Você é um analista de inteligência especializado em análise de redes e grafos de conhecimento. + +Analise o seguinte Knowledge Graph extraído de documentos e forneça insights acionáveis: + +{graph_summary} + +Por favor forneça: +1. **Narrativa Central**: Qual é a história principal que conecta estas entidades? +2. **Atores Chave**: Quem são os principais players e qual seu papel? +3. **Relações Ocultas**: Que conexões não-óbvias você identifica? +4. **Padrões por Cluster**: Como as entidades se distribuem entre os clusters? Qual cluster tem foco diferente? +5. **Investigação**: O que valeria a pena investigar mais a fundo? + +IMPORTANTE: Sempre referencie os clusters específicos (ex: "No Cluster 0...", "Já no Cluster 1..."). +Use os dados concretos fornecidos, não generalize. Seja específico citando entidades e clusters.""" + + # Chamar LLM + completion = groq_client.chat.completions.create( + model="llama-3.3-70b-versatile", + messages=[{"role": "user", "content": prompt}], + temperature=0.7, + max_tokens=1500 + ) + + analysis = completion.choices[0].message.content + + return { + "analysis": analysis, + "graph_summary": { + "entities": len(nodes), + "connections": insights.get('total_connections', 0), + "hubs": len(hubs) + } + } + + except Exception as e: + logging.error(f"ERRO EM /analyze_graph/: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file