# ============================================================================== # API do AetherMap — VERSÃO 7.4 (FAISS EDITION) # Backend com RAG Híbrido, CSV, Tavily, NER Entity Graph, FAISS ANN # ============================================================================== import numpy as np import pandas as pd import torch import gc import uuid import os import io import json import logging import time import nltk from nltk.corpus import stopwords from collections import defaultdict from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import JSONResponse from typing import List, Dict, Any, Tuple from functools import lru_cache # Ferramentas de Alquimia (ML & NLP) from sentence_transformers import SentenceTransformer, CrossEncoder import umap import hdbscan from sklearn.preprocessing import StandardScaler from sklearn.metrics.pairwise import cosine_similarity from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer from scipy.stats import entropy import faiss # NER & Language Detection import spacy from langdetect import detect, LangDetectException # Monitoramento (O Toque da Berta) from prometheus_fastapi_instrumentator import Instrumentator from prometheus_client import Histogram # A Conexão com o Oráculo from groq import Groq # ============================================================================== # CONFIGURAÇÕES GERAIS E LOGGING # ============================================================================== logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Modelos de IA RETRIEVAL_MODEL = "all-MiniLM-L6-v2" # Rápido para varredura inicial RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2" # Preciso para reordenação # Parâmetros de Processamento BATCH_SIZE = 256 UMAP_N_NEIGHBORS = 30 # Cache de Sessão (Na memória RAM) cache: Dict[str, Any] = {} # Definição de Métricas Customizadas do Prometheus # Isso permite separar a latência da sua lógica vs a latência da API externa GROQ_LATENCY = Histogram( "groq_api_latency_seconds", "Tempo de resposta da API externa Groq (LLM Generation)", buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0] ) # Inicialização do Cliente Groq GROQ_API_KEY = os.environ.get("GROQ_API_KEY") try: if not GROQ_API_KEY: logging.warning("GROQ_API_KEY não encontrada. Funcionalidades de LLM estarão indisponíveis.") groq_client = None else: groq_client = Groq(api_key=GROQ_API_KEY) logging.info("Cliente Groq inicializado com sucesso.") except Exception as e: logging.error(f"FALHA AO INICIALIZAR GROQ: {e}") groq_client = None # Inicialização do Cliente Tavily (Web Search) TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY") tavily_client = None try: if TAVILY_API_KEY: from tavily import TavilyClient tavily_client = TavilyClient(api_key=TAVILY_API_KEY) logging.info("Cliente Tavily inicializado com sucesso.") else: logging.warning("TAVILY_API_KEY não encontrada. Busca web estará indisponível.") except Exception as e: logging.error(f"FALHA AO INICIALIZAR TAVILY: {e}") tavily_client = None # ============================================================================== # GERENCIAMENTO HÍBRIDO DE STOP WORDS (NLTK + ARQUIVO TXT) # ============================================================================== def carregar_stopwords(): """ Carrega stop words do NLTK e combina com um arquivo externo 'stopwords.txt'. """ logging.info("Iniciando carregamento de Stop Words...") # 1. Base Gramatical (NLTK - Inglês e Português) try: nltk.data.find('corpora/stopwords') except LookupError: logging.info("Baixando corpus de stopwords...") nltk.download('stopwords') # Cria um conjunto único com PT e EN final_stops = set(stopwords.words('portuguese')) | set(stopwords.words('english')) logging.info(f"Stopwords base (NLTK) carregadas: {len(final_stops)}") # 2. Base Customizada arquivo_custom = "stopwords.txt" if os.path.exists(arquivo_custom): logging.info(f"Arquivo '{arquivo_custom}' encontrado. Lendo palavras customizadas...") try: count_custom = 0 with open(arquivo_custom, "r", encoding="utf-8") as f: for linha in f: palavra = linha.split('#')[0].strip().lower() if palavra and len(palavra) > 1: final_stops.add(palavra) count_custom += 1 logging.info(f"{count_custom} stop words customizadas importadas do arquivo.") except Exception as e: logging.error(f"Erro ao ler '{arquivo_custom}': {e}") else: logging.warning(f"Arquivo '{arquivo_custom}' não encontrado. Usando apenas NLTK.") lista_final = list(final_stops) logging.info(f"Total final de Stop Words ativas: {len(lista_final)}") return lista_final # Variável global carregada na inicialização STOP_WORDS_MULTILINGUAL = carregar_stopwords() # ============================================================================== # CARREGAMENTO DE MODELOS (COM CACHE) # ============================================================================== @lru_cache(maxsize=1) def load_retriever(): device = "cuda" if torch.cuda.is_available() else "cpu" logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}") return SentenceTransformer(RETRIEVAL_MODEL, device=device) @lru_cache(maxsize=1) def load_reranker(): device = "cuda" if torch.cuda.is_available() else "cpu" logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}") return CrossEncoder(RERANKER_MODEL, device=device) # Cache for spaCy models _spacy_models = {} _spacy_available = True def load_spacy_model(lang: str): """Carrega modelo spaCy com cache.""" global _spacy_available if not _spacy_available: return None if lang not in _spacy_models: model_name = "pt_core_news_sm" if lang == "pt" else "en_core_web_sm" try: _spacy_models[lang] = spacy.load(model_name) logging.info(f"Modelo spaCy '{model_name}' carregado.") except OSError: logging.warning(f"Modelo {model_name} não encontrado. Tentando baixar...") try: import subprocess subprocess.run(["python", "-m", "spacy", "download", model_name], check=True) _spacy_models[lang] = spacy.load(model_name) except Exception as e: logging.error(f"Falha ao baixar modelo spaCy: {e}") _spacy_available = False return None return _spacy_models[lang] def detect_language(texts: List[str]) -> str: """Detecta idioma predominante nos textos.""" sample = " ".join(texts[:10])[:1000] try: lang = detect(sample) return "pt" if lang == "pt" else "en" except LangDetectException: return "en" def extract_entities(textos: List[str]) -> List[List[Tuple[str, str]]]: """Extrai entidades nomeadas de cada texto.""" lang = detect_language(textos) nlp = load_spacy_model(lang) # Fallback se spaCy não estiver disponível if nlp is None: logging.warning("spaCy não disponível. Retornando entidades vazias.") return [[] for _ in textos] entities_by_doc = [] for text in textos: # Limitar tamanho do texto para performance doc = nlp(text[:2000]) entities = [(ent.text.lower().strip(), ent.label_) for ent in doc.ents if len(ent.text.strip()) > 2 and ent.label_ in ("PERSON", "PER", "ORG", "GPE", "LOC")] entities_by_doc.append(entities) # Normalizar entidades para deduplicação return normalize_entities(entities_by_doc) def normalize_entities(entities_by_doc: List[List[Tuple[str, str]]]) -> List[List[Tuple[str, str]]]: """Normaliza entidades para agrupar variações do mesmo nome.""" # Coletar todas as entidades únicas por tipo all_entities = defaultdict(set) for entities in entities_by_doc: for text, etype in entities: all_entities[etype].add(text) # Criar mapeamento de normalização # Agrupa entidades onde uma contém a outra ou são muito similares normalization_map = {} for etype, entity_set in all_entities.items(): entities_list = sorted(entity_set, key=len, reverse=True) # Maiores primeiro for entity in entities_list: if entity in normalization_map: continue # Encontrar entidades que são parte desta ou similares canonical = entity for other in entities_list: if other == entity: continue # Se uma contém a outra (ex: "donald trump" contém "trump") if other in entity or entity in other: # Usar a mais completa como canônica if len(entity) >= len(other): normalization_map[(other, etype)] = (entity, etype) else: normalization_map[(entity, etype)] = (other, etype) canonical = other # Mapear para si mesmo se não foi mapeado if (entity, etype) not in normalization_map: normalization_map[(entity, etype)] = (canonical, etype) # Aplicar normalização normalized_docs = [] for entities in entities_by_doc: normalized = [] seen = set() for text, etype in entities: canonical = normalization_map.get((text, etype), (text, etype)) if canonical not in seen: seen.add(canonical) normalized.append(canonical) normalized_docs.append(normalized) logging.info(f"Normalização: {len(all_entities)} tipos, mapa com {len(normalization_map)} entradas") return normalized_docs def build_entity_graph(entities_by_doc: List[List[Tuple[str, str]]], positions: List[Dict]) -> Dict[str, Any]: """Constrói grafo de conexões baseado em entidades compartilhadas.""" # Inverter: entidade -> lista de doc indices entity_to_docs = defaultdict(set) for doc_idx, entities in enumerate(entities_by_doc): for entity_text, entity_type in entities: entity_to_docs[(entity_text, entity_type)].add(doc_idx) # Construir arestas (conexões entre docs que compartilham entidades) edges = [] seen_pairs = set() for (entity_text, entity_type), doc_indices in entity_to_docs.items(): if len(doc_indices) < 2: continue doc_list = sorted(doc_indices) for i in range(len(doc_list)): for j in range(i + 1, len(doc_list)): pair = (doc_list[i], doc_list[j]) if pair not in seen_pairs: seen_pairs.add(pair) edges.append({ "source": doc_list[i], "target": doc_list[j], "entity": entity_text, "entity_type": entity_type }) # Contar entidades mais frequentes entity_counts = [(k, len(v)) for k, v in entity_to_docs.items() if len(v) >= 2] top_entities = sorted(entity_counts, key=lambda x: x[1], reverse=True)[:20] return { "edges": edges, "edge_count": len(edges), "connected_pairs": len(seen_pairs), "top_entities": [{"entity": e[0][0], "type": e[0][1], "docs": e[1]} for e in top_entities] } def build_entity_to_entity_graph(entities_by_doc: List[List[Tuple[str, str]]]) -> Dict[str, Any]: """ Constrói grafo onde os NÓS são entidades e ARESTAS são co-ocorrências. Entidades que aparecem no mesmo documento são conectadas. """ # Contar co-ocorrências cooccurrence = defaultdict(int) entity_doc_count = defaultdict(int) for entities in entities_by_doc: unique_entities = list(set(entities)) # Contar docs por entidade for ent in unique_entities: entity_doc_count[ent] += 1 # Criar pares de co-ocorrência for i in range(len(unique_entities)): for j in range(i + 1, len(unique_entities)): pair = tuple(sorted([unique_entities[i], unique_entities[j]], key=str)) cooccurrence[pair] += 1 # Construir nós (entidades com >= 2 docs, limitado a top 50) nodes = [] entity_to_id = {} # Filtrar e ordenar por frequência valid_entities = [(e, c) for e, c in entity_doc_count.items() if c >= 2] valid_entities.sort(key=lambda x: x[1], reverse=True) valid_entities = valid_entities[:50] # Limitar a 50 nós principais node_count = len(valid_entities) node_idx = 0 for entity, count in valid_entities: entity_to_id[entity] = node_idx # Layout em esfera 3D (melhor para muitos nós) # Golden angle para distribuição uniforme golden_angle = np.pi * (3 - np.sqrt(5)) theta = node_idx * golden_angle phi = np.arccos(1 - 2 * (node_idx + 0.5) / max(node_count, 1)) radius = 3.0 # Raio fixo maior x = radius * np.sin(phi) * np.cos(theta) y = radius * np.sin(phi) * np.sin(theta) z = radius * np.cos(phi) nodes.append({ "id": node_idx, "entity": entity[0], "type": entity[1], "docs": count, "x": float(x), "y": float(y), "z": float(z) }) node_idx += 1 # Construir arestas (co-ocorrências) entity_edges = [] for (ent1, ent2), weight in cooccurrence.items(): if ent1 in entity_to_id and ent2 in entity_to_id and weight >= 1: entity_edges.append({ "source": entity_to_id[ent1], "target": entity_to_id[ent2], "weight": weight, "source_entity": ent1[0], "target_entity": ent2[0], "reason": f"Aparecem juntos em {weight} documento(s)" }) # Ordenar arestas por peso entity_edges.sort(key=lambda x: x["weight"], reverse=True) # Calcular métricas de grafo # Degree centrality (número de conexões de cada nó) degree = defaultdict(int) for edge in entity_edges: degree[edge["source"]] += edge["weight"] degree[edge["target"]] += edge["weight"] # Calcular max degree para normalização max_degree = max(degree.values()) if degree else 1 # Atualizar nós com métricas hubs = [] for node in nodes: node_degree = degree.get(node["id"], 0) node["degree"] = node_degree node["centrality"] = round(node_degree / max_degree, 3) # Classificar nó if node["centrality"] > 0.7: node["role"] = "hub" # Hub central hubs.append(node) elif node["centrality"] > 0.3: node["role"] = "connector" # Conector else: node["role"] = "peripheral" # Periférico # Top hubs hubs.sort(key=lambda x: x["degree"], reverse=True) return { "nodes": nodes, "edges": entity_edges[:200], # Limitar a 200 arestas mais fortes "node_count": len(nodes), "edge_count": len(entity_edges), "hubs": [{"entity": h["entity"], "type": h["type"], "degree": h["degree"]} for h in hubs[:5]], "insights": { "total_connections": sum(degree.values()) // 2, "avg_degree": round(sum(degree.values()) / len(degree), 1) if degree else 0, "hub_count": len(hubs) } } # ============================================================================== # PIPELINE DE PROCESSAMENTO DE DADOS # ============================================================================== def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]: """Prepara textos de arquivo TXT (uma linha por documento).""" linhas = file_bytes.decode("utf-8", errors="ignore").splitlines() textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3] return textos[:n_samples] def detect_csv_separator(file_bytes: bytes) -> str: """Detecta separador do CSV (vírgula ou ponto-e-vírgula).""" sample = file_bytes[:4096].decode("utf-8", errors="ignore") first_line = sample.split('\n')[0] # Conta ocorrências de cada separador na primeira linha commas = first_line.count(',') semicolons = first_line.count(';') return ';' if semicolons > commas else ',' def read_csv_smart(file_bytes: bytes, nrows=None) -> pd.DataFrame: """Lê CSV com detecção automática de separador e encoding.""" sep = detect_csv_separator(file_bytes) try: df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="utf-8", nrows=nrows) except UnicodeDecodeError: df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="latin-1", nrows=nrows) return df def preparar_textos_csv(file_bytes: bytes, text_column: str, n_samples: int) -> List[str]: """Prepara textos de arquivo CSV extraindo coluna especificada.""" df = read_csv_smart(file_bytes) if text_column not in df.columns: available = ", ".join(df.columns.tolist()[:10]) raise ValueError(f"Coluna '{text_column}' não encontrada. Colunas disponíveis: {available}") textos = df[text_column].dropna().astype(str).tolist() # Filtrar textos muito curtos textos = [t.strip() for t in textos if len(t.strip().split()) > 3] return textos[:n_samples] def get_csv_columns(file_bytes: bytes) -> List[str]: """Retorna lista de colunas de um arquivo CSV.""" df = read_csv_smart(file_bytes, nrows=0) return df.columns.tolist() def processar_pipeline( textos: List[str], small_dataset: bool = False, fast_mode: bool = False, custom_min_cluster_size: int = 0, # 0 = Auto custom_min_samples: int = 0 # 0 = Auto ) -> (pd.DataFrame, np.ndarray): """ Processa textos através do pipeline de embeddings, redução dimensional e clustering. Args: textos: Lista de textos small_dataset: Se True, usa parâmetros otimizados para datasets pequenos (Tavily) fast_mode: Se True, usa PCA ao invés de UMAP (10-100x mais rápido) custom_min_cluster_size: Tamanho mínimo do cluster (0 = Auto) custom_min_samples: Número mínimo de amostras (0 = Auto) """ num_textos = len(textos) logging.info(f"Iniciando pipeline para {num_textos} textos (fast_mode={fast_mode})...") model = load_retriever() # 1. Embeddings embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True) # 2. Redução Dimensional if fast_mode: # PCA - O(N) muito mais rápido que UMAP O(N²) from sklearn.decomposition import PCA logging.info("Usando PCA (modo rápido)...") reducer = PCA(n_components=3, random_state=42) emb_3d = reducer.fit_transform(embeddings) else: # UMAP - mais preciso mas lento n_neighbors = min(15, max(3, num_textos - 1)) if small_dataset else UMAP_N_NEIGHBORS logging.info(f"Usando UMAP (n_neighbors={n_neighbors})...") reducer = umap.UMAP(n_components=3, n_neighbors=n_neighbors, min_dist=0.0, metric="cosine", random_state=42) emb_3d = reducer.fit_transform(embeddings) emb_3d = StandardScaler().fit_transform(emb_3d) # 3. HDBSCAN - usa parâmetros customizados ou adaptativos if custom_min_cluster_size > 0: # Usar valores customizados do usuário min_size = custom_min_cluster_size min_samples_val = custom_min_samples if custom_min_samples > 0 else None logging.info(f"Usando parâmetros customizados: min_cluster_size={min_size}, min_samples={min_samples_val}") elif small_dataset: # Para Tavily (10-50 docs): clusters menores, mais agressivo min_size = max(2, int(num_textos * 0.1)) min_samples_val = 1 elif fast_mode: # Para PCA: clusters menores já que PCA não separa tão bem quanto UMAP min_size = max(8, int(num_textos * 0.01)) min_samples_val = 3 else: # Para datasets grandes com UMAP: comportamento padrão min_size = max(10, int(num_textos * 0.02)) min_samples_val = None logging.info(f"HDBSCAN: min_cluster_size={min_size}, min_samples={min_samples_val}") clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size, min_samples=min_samples_val) clusters = clusterer.fit_predict(emb_3d) # 4. DataFrame df = pd.DataFrame({ "x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2], "full_text": textos, "cluster": clusters.astype(str) }) del reducer, clusterer, emb_3d; gc.collect() return df, embeddings def calcular_metricas(textos: List[str]) -> Dict[str, Any]: logging.info("Calculando métricas globais...") if not textos: return {} # Token pattern: só palavras alfabéticas com 3+ caracteres (ignora números) token_pattern = r'\b[a-zA-ZÀ-ÿ]{3,}\b' vectorizer_count = CountVectorizer( stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000, token_pattern=token_pattern ) vectorizer_tfidf = TfidfVectorizer( stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000, token_pattern=token_pattern ) try: counts_matrix = vectorizer_count.fit_transform(textos) tfidf_matrix = vectorizer_tfidf.fit_transform(textos) except ValueError: return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0} vocab_count = vectorizer_count.get_feature_names_out() contagens = counts_matrix.sum(axis=0).A1 vocab_tfidf = vectorizer_tfidf.get_feature_names_out() soma_tfidf = tfidf_matrix.sum(axis=0).A1 top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1] top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf] return { "riqueza_lexical": len(vocab_count), "top_tfidf_palavras": top_tfidf, "entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0 } def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]: logging.info("Detectando duplicados...") mask = df["full_text"].duplicated(keep=False) grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()} pares_semanticos = [] if 2 < len(embeddings) < 5000: sim = cosine_similarity(embeddings) triu_indices = np.triu_indices_from(sim, k=1) sim_vetor = sim[triu_indices] pares_idx = np.where(sim_vetor > 0.98)[0] top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]] for i in top_pares_idx: idx1, idx2 = triu_indices[0][i], triu_indices[1][i] if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]: pares_semanticos.append({ "similaridade": float(sim[idx1, idx2]), "texto1": df["full_text"].iloc[idx1], "texto2": df["full_text"].iloc[idx2] }) return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos} def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]: logging.info("Analisando clusters...") analise = {} ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) for cid in ids_clusters_validos: textos_cluster = df[df["cluster"] == cid]["full_text"].tolist() if len(textos_cluster) < 2: continue try: vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000) tfidf_matrix = vectorizer.fit_transform(textos_cluster) vocab = vectorizer.get_feature_names_out() soma = tfidf_matrix.sum(axis=0).A1 top_idx = np.argsort(soma)[-5:][::-1] top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx] except ValueError: top_palavras = [] analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras} return analise # ============================================================================== # API FASTAPI & INSTRUMENTAÇÃO # ============================================================================== app = FastAPI(title="AetherMap API 7.2", version="7.2.0", description="Backend Semantic Search + CSV + Tavily Web Search") # --- A MÁGICA ACONTECE AQUI --- # Isso expõe automaticamente o endpoint /metrics para o Prometheus/Grafana Instrumentator().instrument(app).expose(app) # ------------------------------ @app.get("/") async def root(): return {"status": "online", "message": "Aether Map API 7.2 (CSV + Tavily Ready)."} @app.post("/csv_columns/") async def get_columns_api(file: UploadFile = File(...)): """Retorna as colunas de um arquivo CSV para preview.""" if not file.filename.lower().endswith('.csv'): raise HTTPException(status_code=400, detail="Arquivo deve ser CSV.") try: file_bytes = await file.read() columns = get_csv_columns(file_bytes) return {"columns": columns, "filename": file.filename} except Exception as e: raise HTTPException(status_code=400, detail=f"Erro ao ler CSV: {str(e)}") @app.post("/process/") async def process_api( n_samples: int = Form(10000), file: UploadFile = File(...), text_column: str = Form(None), # Coluna de texto para CSV fast_mode: str = Form("false"), # Modo rápido (PCA ao invés de UMAP) min_cluster_size: str = Form("0"), # 0 = Auto min_samples: str = Form("0") # 0 = Auto ): # Converter strings para tipos apropriados fast_mode_bool = fast_mode.lower() in ("true", "1", "yes") min_cluster_size_int = int(min_cluster_size) if min_cluster_size.isdigit() else 0 min_samples_int = int(min_samples) if min_samples.isdigit() else 0 logging.info(f"Processando: {file.filename} (fast_mode={fast_mode_bool}, min_cluster={min_cluster_size_int}, min_samples={min_samples_int})") try: file_bytes = await file.read() # Detectar tipo de arquivo e processar if file.filename.lower().endswith('.csv'): if not text_column: raise HTTPException(status_code=400, detail="Para CSV, informe 'text_column'.") textos = preparar_textos_csv(file_bytes, text_column, n_samples) else: textos = preparar_textos(file_bytes, n_samples) if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.") # Auto-enable fast_mode para datasets grandes if len(textos) > 5000 and not fast_mode_bool: logging.warning(f"Dataset grande ({len(textos)} docs) - considere usar fast_mode=True") df, embeddings = processar_pipeline( textos, fast_mode=fast_mode_bool, custom_min_cluster_size=min_cluster_size_int, custom_min_samples=min_samples_int ) # Criar índice FAISS para busca rápida embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1]) # Inner Product = Cosine sim para vetores normalizados faiss_index.add(embeddings_normalized.astype('float32')) job_id = str(uuid.uuid4()) cache[job_id] = { "embeddings": embeddings, "embeddings_normalized": embeddings_normalized, "faiss_index": faiss_index, "df": df } logging.info(f"Job criado: {job_id} (FAISS index com {faiss_index.ntotal} vetores)") metricas_globais = calcular_metricas(df["full_text"].tolist()) analise_de_duplicados = encontrar_duplicados(df, embeddings) analise_por_cluster_tfidf = analisar_clusters(df) n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0) n_ruido = int((df["cluster"] == "-1").sum()) return { "job_id": job_id, "metadata": { "filename": file.filename, "num_documents_processed": len(df), "num_clusters_found": n_clusters, "num_noise_points": n_ruido }, "metrics": metricas_globais, "duplicates": analise_de_duplicados, "cluster_analysis": analise_por_cluster_tfidf, "plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"), } except Exception as e: logging.error(f"ERRO EM /process/: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/search/") async def search_api(query: str = Form(...), job_id: str = Form(...)): """ ENDPOINT DE BUSCA (RAG Híbrido) com Monitoramento de Latência """ logging.info(f"Busca: '{query}' [Job: {job_id}]") if job_id not in cache: raise HTTPException(status_code=404, detail="Job ID não encontrado.") try: model = load_retriever() reranker = load_reranker() cached_data = cache[job_id] df = cached_data["df"] faiss_index = cached_data.get("faiss_index") # FASE 1: Busca FAISS (O(log N) ao invés de O(N)) query_embedding = model.encode([query], convert_to_numpy=True) query_normalized = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True) top_k_retrieval = min(50, faiss_index.ntotal) if faiss_index else 50 if faiss_index: # FAISS search - retorna (distances, indices) scores, top_indices = faiss_index.search(query_normalized.astype('float32'), top_k_retrieval) scores = scores[0] # Flatten top_indices = top_indices[0] logging.info(f"FAISS search: top score = {scores[0]:.3f}") else: # Fallback para busca bruta se não tiver FAISS corpus_embeddings = cached_data["embeddings"] similarities = cosine_similarity(query_embedding, corpus_embeddings)[0] top_indices = np.argsort(similarities)[-top_k_retrieval:][::-1] scores = similarities[top_indices] candidate_docs = [] candidate_indices = [] for i, idx in enumerate(top_indices): if idx < 0: # FAISS pode retornar -1 se não tiver resultados suficientes continue if scores[i] > 0.15: doc_text = df.iloc[int(idx)]["full_text"] candidate_docs.append([query, doc_text]) candidate_indices.append(int(idx)) if not candidate_docs: return {"summary": "Não foram encontrados documentos relevantes.", "results": []} # FASE 2: Reranking logging.info(f"Reranking {len(candidate_docs)} documentos...") rerank_scores = reranker.predict(candidate_docs) rerank_results = sorted( zip(candidate_indices, rerank_scores), key=lambda x: x[1], reverse=True ) final_top_k = 5 final_results = [] context_parts = [] for rank, (idx, score) in enumerate(rerank_results[:final_top_k]): doc_text = df.iloc[idx]["full_text"] context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------") final_results.append({ "index": idx, "score": float(score), "cosine_score": float(similarities[idx]), "citation_id": rank + 1 }) # FASE 3: Geração (Groq) com TELEMETRIA summary = "" if groq_client: context_str = "\n".join(context_parts) rag_prompt = ( "INSTRUÇÃO DE SISTEMA:\n" "Você é o Aetherius, um motor de busca semântica de alta precisão.\n" "Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n" "REGRAS OBRIGATÓRIAS:\n" "1. CITAÇÕES: Toda afirmação deve ter fonte [ID: x]. Ex: 'O lucro subiu [ID: 1].'\n" "2. HONESTIDADE: Se não estiver no texto, diga que não encontrou.\n" "3. IDIOMA: Português do Brasil.\n\n" f"CONTEXTO RECUPERADO:\n{context_str}\n\n" f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n" "RESPOSTA:" ) try: # --- INÍCIO DA MEDIÇÃO DA API EXTERNA --- start_time_groq = time.time() chat_completion = groq_client.chat.completions.create( messages=[{"role": "user", "content": rag_prompt}], model="moonshotai/kimi-k2-instruct-0905", temperature=0.1, max_tokens=1024 ) # Registra o tempo gasto apenas na chamada da API duration = time.time() - start_time_groq GROQ_LATENCY.observe(duration) # --- FIM DA MEDIÇÃO --- summary = chat_completion.choices[0].message.content.strip() except Exception as e: logging.warning(f"Erro na geração do LLM: {e}") summary = "Não foi possível gerar o resumo automático, mas os documentos estão listados abaixo." return {"summary": summary, "results": final_results} except Exception as e: logging.error(f"ERRO EM /search/: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/describe_clusters/") async def describe_clusters_api(job_id: str = Form(...)): logging.info(f"Descrevendo clusters para Job: {job_id}") if not groq_client: raise HTTPException(status_code=503, detail="Groq indisponível.") if job_id not in cache: raise HTTPException(status_code=404, detail="Job não encontrado.") try: cached_data = cache[job_id] df = cached_data["df"] embeddings = cached_data["embeddings"] champion_docs_by_cluster = {} cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) for cid in cluster_ids: mask = df["cluster"] == cid cluster_embeddings = embeddings[mask] cluster_texts = df[mask]["full_text"].tolist() if len(cluster_texts) < 3: continue centroid = np.mean(cluster_embeddings, axis=0) similarities = cosine_similarity([centroid], cluster_embeddings)[0] top_indices = np.argsort(similarities)[-3:][::-1] champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices] if not champion_docs_by_cluster: return {"insights": {}} prompt_sections = [] for cid, docs in champion_docs_by_cluster.items(): doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs]) prompt_sections.append(f"Grupo {cid}:\n{doc_list}") master_prompt = ( "Analise os grupos de texto abaixo. Para cada grupo, retorne um JSON com 'topic_name' e 'core_insight'.\n" "Responda APENAS o JSON válido.\n\n" + "\n\n".join(prompt_sections) ) # --- INÍCIO DA MEDIÇÃO DA API EXTERNA --- start_time_groq = time.time() chat_completion = groq_client.chat.completions.create( messages=[ {"role": "system", "content": "JSON Output Only."}, {"role": "user", "content": master_prompt}, ], model="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0.2, ) duration = time.time() - start_time_groq GROQ_LATENCY.observe(duration) # --- FIM DA MEDIÇÃO --- response_content = chat_completion.choices[0].message.content insights = json.loads(response_content.strip().replace("```json", "").replace("```", "")) return {"insights": insights} except Exception as e: logging.error(f"ERRO EM /describe_clusters/: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) # ============================================================================== # ENDPOINT TAVILY WEB SEARCH # ============================================================================== @app.post("/search_web/") async def search_web_api( query: str = Form(...), max_results: int = Form(20), search_depth: str = Form("basic") # "basic" ou "advanced" ): """ Busca na web via Tavily e processa resultados para visualização. """ if not tavily_client: raise HTTPException(status_code=503, detail="Tavily não configurado. Defina TAVILY_API_KEY.") logging.info(f"Tavily Search: '{query}' (max: {max_results})") try: # Buscar via Tavily search_result = tavily_client.search( query=query, max_results=max_results, search_depth=search_depth, include_answer=False ) results = search_result.get("results", []) if not results: return {"error": "Nenhum resultado encontrado.", "results_count": 0} # Extrair textos dos resultados textos = [] sources = [] for r in results: title = r.get("title", "") content = r.get("content", "") url = r.get("url", "") # Combinar título + conteúdo full_text = f"{title}: {content}" if title else content if len(full_text.strip().split()) > 5: textos.append(full_text.strip()) sources.append({"title": title, "url": url}) if not textos: return {"error": "Resultados sem conteúdo válido.", "results_count": 0} # Processar através do pipeline com parâmetros para datasets pequenos df, embeddings = processar_pipeline(textos, small_dataset=True) # Criar índice FAISS para busca rápida embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) faiss_index = faiss.IndexFlatIP(embeddings_normalized.shape[1]) faiss_index.add(embeddings_normalized.astype('float32')) # Adicionar URL de origem a cada ponto df["source_url"] = [sources[i]["url"] if i < len(sources) else "" for i in range(len(df))] df["source_title"] = [sources[i]["title"] if i < len(sources) else "" for i in range(len(df))] # Criar job e cachear job_id = str(uuid.uuid4()) cache[job_id] = { "embeddings": embeddings, "embeddings_normalized": embeddings_normalized, "faiss_index": faiss_index, "df": df, "sources": sources } logging.info(f"Tavily Job criado: {job_id}") # Calcular métricas e análises metricas_globais = calcular_metricas(df["full_text"].tolist()) analise_por_cluster_tfidf = analisar_clusters(df) n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0) n_ruido = int((df["cluster"] == "-1").sum()) return { "job_id": job_id, "metadata": { "query": query, "source": "tavily_web_search", "num_documents_processed": len(df), "num_clusters_found": n_clusters, "num_noise_points": n_ruido }, "metrics": metricas_globais, "cluster_analysis": analise_por_cluster_tfidf, "plot_data": df[["x", "y", "z", "cluster", "full_text", "source_url", "source_title"]].to_dict("records"), "sources": sources } except Exception as e: logging.error(f"ERRO EM /search_web/: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) # ============================================================================== # ENDPOINT KNOWLEDGE GRAPH (NER) # ============================================================================== @app.post("/entity_graph/") async def entity_graph_api(job_id: str = Form(...)): """ Extrai entidades nomeadas e constrói grafo de conexões entre documentos. """ if job_id not in cache: raise HTTPException(status_code=404, detail="Job ID não encontrado.") logging.info(f"Construindo Knowledge Graph para Job: {job_id}") try: cached_data = cache[job_id] df = cached_data["df"] textos = df["full_text"].tolist() # Extrair entidades logging.info(f"Extraindo entidades de {len(textos)} documentos...") entities_by_doc = extract_entities(textos) # Construir posições dos pontos positions = df[["x", "y", "z"]].to_dict("records") # Construir grafo graph_data = build_entity_graph(entities_by_doc, positions) # Adicionar posições ao resultado graph_data["positions"] = positions graph_data["num_documents"] = len(textos) # Entidades por documento (para tooltip) graph_data["entities_by_doc"] = [ [{"text": e[0], "type": e[1]} for e in ents] for ents in entities_by_doc ] # Adicionar grafo entidade-entidade entity_network = build_entity_to_entity_graph(entities_by_doc) graph_data["entity_network"] = entity_network logging.info(f"Grafo construído: {graph_data['edge_count']} arestas doc-doc, {entity_network['node_count']} nós entidade") return graph_data except Exception as e: logging.error(f"ERRO EM /entity_graph/: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) # ============================================================================== # ENDPOINT ANÁLISE DO GRAFO COM LLM (SÁBIO) # ============================================================================== @app.post("/analyze_graph/") async def analyze_graph_api(job_id: str = Form(...)): """ Usa o LLM para analisar o Knowledge Graph e extrair insights semânticos. """ if job_id not in cache: raise HTTPException(status_code=404, detail="Job ID não encontrado.") logging.info(f"Analisando grafo com LLM para Job: {job_id}") try: cached_data = cache[job_id] df = cached_data["df"] textos = df["full_text"].tolist() clusters = df["cluster"].tolist() if "cluster" in df.columns else ["0"] * len(textos) # Extrair entidades e construir grafo entities_by_doc = extract_entities(textos) entity_network = build_entity_to_entity_graph(entities_by_doc) # Mapear entidades por cluster entity_clusters = defaultdict(lambda: defaultdict(int)) for doc_idx, (entities, cluster) in enumerate(zip(entities_by_doc, clusters)): for ent_text, ent_type in entities: entity_clusters[ent_text][str(cluster)] += 1 # Preparar resumo do grafo para o LLM nodes = entity_network.get("nodes", [])[:15] edges = entity_network.get("edges", [])[:20] hubs = entity_network.get("hubs", [])[:5] insights = entity_network.get("insights", {}) # Criar contexto de clusters cluster_context = [] unique_clusters = sorted(set(str(c) for c in clusters if str(c) != "-1")) for cluster_id in unique_clusters[:5]: cluster_docs = [textos[i][:200] for i, c in enumerate(clusters) if str(c) == cluster_id][:3] cluster_entities = [(ent, entity_clusters[ent].get(cluster_id, 0)) for ent in entity_clusters if entity_clusters[ent].get(cluster_id, 0) > 0] cluster_entities.sort(key=lambda x: x[1], reverse=True) cluster_context.append(f""" ### Cluster {cluster_id} ({len([c for c in clusters if str(c) == cluster_id])} docs) Entidades principais: {', '.join([f"{e[0]}({e[1]})" for e in cluster_entities[:5]])} Exemplo de documento: "{cluster_docs[0][:150]}..." """) graph_summary = f""" ANÁLISE DE KNOWLEDGE GRAPH COM CONTEXTO ## Visão Geral - {len(nodes)} entidades principais - {insights.get('total_connections', 0)} conexões totais - {insights.get('hub_count', 0)} hubs identificados - {len(unique_clusters)} clusters de documentos ## Entidades Principais (por importância): {chr(10).join([f"- {n['entity']} ({n['type']}): {n['docs']} docs, centralidade {n.get('centrality', 0)}" for n in nodes])} ## Hubs Centrais (entidades mais conectadas): {chr(10).join([f"- {h['entity']} ({h['type']}): {h['degree']} conexões" for h in hubs]) if hubs else "Nenhum hub identificado"} ## Conexões Mais Fortes: {chr(10).join([f"- {e['source_entity']} ↔ {e['target_entity']}: {e['weight']} co-ocorrências" for e in edges[:10]])} ## CONTEXTO POR CLUSTER (IMPORTANTE - USE ESTAS REFERÊNCIAS): {chr(10).join(cluster_context)} """ # Prompt para análise prompt = f"""Você é um analista de inteligência especializado em análise de redes e grafos de conhecimento. Analise o seguinte Knowledge Graph extraído de documentos e forneça insights acionáveis: {graph_summary} Por favor forneça: 1. **Narrativa Central**: Qual é a história principal que conecta estas entidades? 2. **Atores Chave**: Quem são os principais players e qual seu papel? 3. **Relações Ocultas**: Que conexões não-óbvias você identifica? 4. **Padrões por Cluster**: Como as entidades se distribuem entre os clusters? Qual cluster tem foco diferente? 5. **Investigação**: O que valeria a pena investigar mais a fundo? IMPORTANTE: Sempre referencie os clusters específicos (ex: "No Cluster 0...", "Já no Cluster 1..."). Use os dados concretos fornecidos, não generalize. Seja específico citando entidades e clusters.""" # Chamar LLM completion = groq_client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=1500 ) analysis = completion.choices[0].message.content return { "analysis": analysis, "graph_summary": { "entities": len(nodes), "connections": insights.get('total_connections', 0), "hubs": len(hubs) } } except Exception as e: logging.error(f"ERRO EM /analyze_graph/: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e))