|
|
|
|
|
|
|
|
|
|
| import numpy as np
|
| import pandas as pd
|
| import torch
|
| import gc
|
| import uuid
|
| import os
|
| import io
|
| import json
|
| import logging
|
| import time
|
| import nltk
|
| from nltk.corpus import stopwords
|
| from collections import defaultdict
|
|
|
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException
|
| from fastapi.responses import JSONResponse
|
| from typing import List, Dict, Any, Tuple
|
| from functools import lru_cache
|
|
|
|
|
| from sentence_transformers import SentenceTransformer, CrossEncoder
|
| import umap
|
| import hdbscan
|
| from sklearn.preprocessing import StandardScaler
|
| from sklearn.metrics.pairwise import cosine_similarity
|
| from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
|
| from scipy.stats import entropy
|
|
|
|
|
| import spacy
|
| from langdetect import detect, LangDetectException
|
|
|
|
|
| from prometheus_fastapi_instrumentator import Instrumentator
|
| from prometheus_client import Histogram
|
|
|
|
|
| from groq import Groq
|
|
|
|
|
|
|
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
|
|
|
| RETRIEVAL_MODEL = "all-MiniLM-L6-v2"
|
| RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2"
|
|
|
|
|
| BATCH_SIZE = 256
|
| UMAP_N_NEIGHBORS = 30
|
|
|
|
|
| cache: Dict[str, Any] = {}
|
|
|
|
|
|
|
| GROQ_LATENCY = Histogram(
|
| "groq_api_latency_seconds",
|
| "Tempo de resposta da API externa Groq (LLM Generation)",
|
| buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0]
|
| )
|
|
|
|
|
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
|
| try:
|
| if not GROQ_API_KEY:
|
| logging.warning("GROQ_API_KEY não encontrada. Funcionalidades de LLM estarão indisponíveis.")
|
| groq_client = None
|
| else:
|
| groq_client = Groq(api_key=GROQ_API_KEY)
|
| logging.info("Cliente Groq inicializado com sucesso.")
|
| except Exception as e:
|
| logging.error(f"FALHA AO INICIALIZAR GROQ: {e}")
|
| groq_client = None
|
|
|
|
|
| TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY")
|
| tavily_client = None
|
| try:
|
| if TAVILY_API_KEY:
|
| from tavily import TavilyClient
|
| tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
|
| logging.info("Cliente Tavily inicializado com sucesso.")
|
| else:
|
| logging.warning("TAVILY_API_KEY não encontrada. Busca web estará indisponível.")
|
| except Exception as e:
|
| logging.error(f"FALHA AO INICIALIZAR TAVILY: {e}")
|
| tavily_client = None
|
|
|
|
|
|
|
|
|
|
|
| def carregar_stopwords():
|
| """
|
| Carrega stop words do NLTK e combina com um arquivo externo 'stopwords.txt'.
|
| """
|
| logging.info("Iniciando carregamento de Stop Words...")
|
|
|
|
|
| try:
|
| nltk.data.find('corpora/stopwords')
|
| except LookupError:
|
| logging.info("Baixando corpus de stopwords...")
|
| nltk.download('stopwords')
|
|
|
|
|
| final_stops = set(stopwords.words('portuguese')) | set(stopwords.words('english'))
|
| logging.info(f"Stopwords base (NLTK) carregadas: {len(final_stops)}")
|
|
|
|
|
| arquivo_custom = "stopwords.txt"
|
|
|
| if os.path.exists(arquivo_custom):
|
| logging.info(f"Arquivo '{arquivo_custom}' encontrado. Lendo palavras customizadas...")
|
| try:
|
| count_custom = 0
|
| with open(arquivo_custom, "r", encoding="utf-8") as f:
|
| for linha in f:
|
| palavra = linha.split('#')[0].strip().lower()
|
| if palavra and len(palavra) > 1:
|
| final_stops.add(palavra)
|
| count_custom += 1
|
| logging.info(f"{count_custom} stop words customizadas importadas do arquivo.")
|
| except Exception as e:
|
| logging.error(f"Erro ao ler '{arquivo_custom}': {e}")
|
| else:
|
| logging.warning(f"Arquivo '{arquivo_custom}' não encontrado. Usando apenas NLTK.")
|
|
|
| lista_final = list(final_stops)
|
| logging.info(f"Total final de Stop Words ativas: {len(lista_final)}")
|
| return lista_final
|
|
|
|
|
| STOP_WORDS_MULTILINGUAL = carregar_stopwords()
|
|
|
|
|
|
|
|
|
|
|
| @lru_cache(maxsize=1)
|
| def load_retriever():
|
| device = "cuda" if torch.cuda.is_available() else "cpu"
|
| logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}")
|
| return SentenceTransformer(RETRIEVAL_MODEL, device=device)
|
|
|
| @lru_cache(maxsize=1)
|
| def load_reranker():
|
| device = "cuda" if torch.cuda.is_available() else "cpu"
|
| logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}")
|
| return CrossEncoder(RERANKER_MODEL, device=device)
|
|
|
|
|
| _spacy_models = {}
|
| _spacy_available = True
|
|
|
| def load_spacy_model(lang: str):
|
| """Carrega modelo spaCy com cache."""
|
| global _spacy_available
|
|
|
| if not _spacy_available:
|
| return None
|
|
|
| if lang not in _spacy_models:
|
| model_name = "pt_core_news_sm" if lang == "pt" else "en_core_web_sm"
|
| try:
|
| _spacy_models[lang] = spacy.load(model_name)
|
| logging.info(f"Modelo spaCy '{model_name}' carregado.")
|
| except OSError:
|
| logging.warning(f"Modelo {model_name} não encontrado. Tentando baixar...")
|
| try:
|
| import subprocess
|
| subprocess.run(["python", "-m", "spacy", "download", model_name], check=True)
|
| _spacy_models[lang] = spacy.load(model_name)
|
| except Exception as e:
|
| logging.error(f"Falha ao baixar modelo spaCy: {e}")
|
| _spacy_available = False
|
| return None
|
| return _spacy_models[lang]
|
|
|
| def detect_language(texts: List[str]) -> str:
|
| """Detecta idioma predominante nos textos."""
|
| sample = " ".join(texts[:10])[:1000]
|
| try:
|
| lang = detect(sample)
|
| return "pt" if lang == "pt" else "en"
|
| except LangDetectException:
|
| return "en"
|
|
|
| def extract_entities(textos: List[str]) -> List[List[Tuple[str, str]]]:
|
| """Extrai entidades nomeadas de cada texto."""
|
| lang = detect_language(textos)
|
| nlp = load_spacy_model(lang)
|
|
|
|
|
| if nlp is None:
|
| logging.warning("spaCy não disponível. Retornando entidades vazias.")
|
| return [[] for _ in textos]
|
|
|
| entities_by_doc = []
|
| for text in textos:
|
|
|
| doc = nlp(text[:2000])
|
| entities = [(ent.text.lower().strip(), ent.label_) for ent in doc.ents
|
| if len(ent.text.strip()) > 2 and ent.label_ in ("PERSON", "PER", "ORG", "GPE", "LOC")]
|
| entities_by_doc.append(entities)
|
|
|
|
|
| return normalize_entities(entities_by_doc)
|
|
|
| def normalize_entities(entities_by_doc: List[List[Tuple[str, str]]]) -> List[List[Tuple[str, str]]]:
|
| """Normaliza entidades para agrupar variações do mesmo nome."""
|
|
|
|
|
| all_entities = defaultdict(set)
|
| for entities in entities_by_doc:
|
| for text, etype in entities:
|
| all_entities[etype].add(text)
|
|
|
|
|
|
|
| normalization_map = {}
|
|
|
| for etype, entity_set in all_entities.items():
|
| entities_list = sorted(entity_set, key=len, reverse=True)
|
|
|
| for entity in entities_list:
|
| if entity in normalization_map:
|
| continue
|
|
|
|
|
| canonical = entity
|
| for other in entities_list:
|
| if other == entity:
|
| continue
|
|
|
|
|
| if other in entity or entity in other:
|
|
|
| if len(entity) >= len(other):
|
| normalization_map[(other, etype)] = (entity, etype)
|
| else:
|
| normalization_map[(entity, etype)] = (other, etype)
|
| canonical = other
|
|
|
|
|
| if (entity, etype) not in normalization_map:
|
| normalization_map[(entity, etype)] = (canonical, etype)
|
|
|
|
|
| normalized_docs = []
|
| for entities in entities_by_doc:
|
| normalized = []
|
| seen = set()
|
| for text, etype in entities:
|
| canonical = normalization_map.get((text, etype), (text, etype))
|
| if canonical not in seen:
|
| seen.add(canonical)
|
| normalized.append(canonical)
|
| normalized_docs.append(normalized)
|
|
|
| logging.info(f"Normalização: {len(all_entities)} tipos, mapa com {len(normalization_map)} entradas")
|
| return normalized_docs
|
|
|
| def build_entity_graph(entities_by_doc: List[List[Tuple[str, str]]],
|
| positions: List[Dict]) -> Dict[str, Any]:
|
| """Constrói grafo de conexões baseado em entidades compartilhadas."""
|
|
|
| entity_to_docs = defaultdict(set)
|
|
|
| for doc_idx, entities in enumerate(entities_by_doc):
|
| for entity_text, entity_type in entities:
|
| entity_to_docs[(entity_text, entity_type)].add(doc_idx)
|
|
|
|
|
| edges = []
|
| seen_pairs = set()
|
|
|
| for (entity_text, entity_type), doc_indices in entity_to_docs.items():
|
| if len(doc_indices) < 2:
|
| continue
|
|
|
| doc_list = sorted(doc_indices)
|
| for i in range(len(doc_list)):
|
| for j in range(i + 1, len(doc_list)):
|
| pair = (doc_list[i], doc_list[j])
|
| if pair not in seen_pairs:
|
| seen_pairs.add(pair)
|
| edges.append({
|
| "source": doc_list[i],
|
| "target": doc_list[j],
|
| "entity": entity_text,
|
| "entity_type": entity_type
|
| })
|
|
|
|
|
| entity_counts = [(k, len(v)) for k, v in entity_to_docs.items() if len(v) >= 2]
|
| top_entities = sorted(entity_counts, key=lambda x: x[1], reverse=True)[:20]
|
|
|
| return {
|
| "edges": edges,
|
| "edge_count": len(edges),
|
| "connected_pairs": len(seen_pairs),
|
| "top_entities": [{"entity": e[0][0], "type": e[0][1], "docs": e[1]} for e in top_entities]
|
| }
|
|
|
| def build_entity_to_entity_graph(entities_by_doc: List[List[Tuple[str, str]]]) -> Dict[str, Any]:
|
| """
|
| Constrói grafo onde os NÓS são entidades e ARESTAS são co-ocorrências.
|
| Entidades que aparecem no mesmo documento são conectadas.
|
| """
|
|
|
| cooccurrence = defaultdict(int)
|
| entity_doc_count = defaultdict(int)
|
|
|
| for entities in entities_by_doc:
|
| unique_entities = list(set(entities))
|
|
|
|
|
| for ent in unique_entities:
|
| entity_doc_count[ent] += 1
|
|
|
|
|
| for i in range(len(unique_entities)):
|
| for j in range(i + 1, len(unique_entities)):
|
| pair = tuple(sorted([unique_entities[i], unique_entities[j]], key=str))
|
| cooccurrence[pair] += 1
|
|
|
|
|
| nodes = []
|
| entity_to_id = {}
|
| node_count = sum(1 for e, c in entity_doc_count.items() if c >= 2)
|
|
|
| node_idx = 0
|
| for entity, count in sorted(entity_doc_count.items(), key=lambda x: x[1], reverse=True):
|
| if count >= 2:
|
| entity_to_id[entity] = node_idx
|
|
|
|
|
| angle = 2 * np.pi * node_idx / max(node_count, 1) * 3
|
| radius = 1.5 + (node_idx / max(node_count, 1)) * 2
|
| z_pos = (node_idx / max(node_count, 1) - 0.5) * 3
|
|
|
| nodes.append({
|
| "id": node_idx,
|
| "entity": entity[0],
|
| "type": entity[1],
|
| "docs": count,
|
| "x": float(radius * np.cos(angle)),
|
| "y": float(radius * np.sin(angle)),
|
| "z": float(z_pos)
|
| })
|
| node_idx += 1
|
|
|
|
|
| entity_edges = []
|
| for (ent1, ent2), weight in cooccurrence.items():
|
| if ent1 in entity_to_id and ent2 in entity_to_id and weight >= 1:
|
| entity_edges.append({
|
| "source": entity_to_id[ent1],
|
| "target": entity_to_id[ent2],
|
| "weight": weight,
|
| "source_entity": ent1[0],
|
| "target_entity": ent2[0],
|
| "reason": f"Aparecem juntos em {weight} documento(s)"
|
| })
|
|
|
|
|
| entity_edges.sort(key=lambda x: x["weight"], reverse=True)
|
|
|
|
|
|
|
| degree = defaultdict(int)
|
| for edge in entity_edges:
|
| degree[edge["source"]] += edge["weight"]
|
| degree[edge["target"]] += edge["weight"]
|
|
|
|
|
| max_degree = max(degree.values()) if degree else 1
|
|
|
|
|
| hubs = []
|
| for node in nodes:
|
| node_degree = degree.get(node["id"], 0)
|
| node["degree"] = node_degree
|
| node["centrality"] = round(node_degree / max_degree, 3)
|
|
|
|
|
| if node["centrality"] > 0.7:
|
| node["role"] = "hub"
|
| hubs.append(node)
|
| elif node["centrality"] > 0.3:
|
| node["role"] = "connector"
|
| else:
|
| node["role"] = "peripheral"
|
|
|
|
|
| hubs.sort(key=lambda x: x["degree"], reverse=True)
|
|
|
| return {
|
| "nodes": nodes,
|
| "edges": entity_edges[:200],
|
| "node_count": len(nodes),
|
| "edge_count": len(entity_edges),
|
| "hubs": [{"entity": h["entity"], "type": h["type"], "degree": h["degree"]} for h in hubs[:5]],
|
| "insights": {
|
| "total_connections": sum(degree.values()) // 2,
|
| "avg_degree": round(sum(degree.values()) / len(degree), 1) if degree else 0,
|
| "hub_count": len(hubs)
|
| }
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]:
|
| """Prepara textos de arquivo TXT (uma linha por documento)."""
|
| linhas = file_bytes.decode("utf-8", errors="ignore").splitlines()
|
| textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3]
|
| return textos[:n_samples]
|
|
|
| def detect_csv_separator(file_bytes: bytes) -> str:
|
| """Detecta separador do CSV (vírgula ou ponto-e-vírgula)."""
|
| sample = file_bytes[:4096].decode("utf-8", errors="ignore")
|
| first_line = sample.split('\n')[0]
|
|
|
|
|
| commas = first_line.count(',')
|
| semicolons = first_line.count(';')
|
|
|
| return ';' if semicolons > commas else ','
|
|
|
| def read_csv_smart(file_bytes: bytes, nrows=None) -> pd.DataFrame:
|
| """Lê CSV com detecção automática de separador e encoding."""
|
| sep = detect_csv_separator(file_bytes)
|
|
|
| try:
|
| df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="utf-8", nrows=nrows)
|
| except UnicodeDecodeError:
|
| df = pd.read_csv(io.BytesIO(file_bytes), sep=sep, encoding="latin-1", nrows=nrows)
|
|
|
| return df
|
|
|
| def preparar_textos_csv(file_bytes: bytes, text_column: str, n_samples: int) -> List[str]:
|
| """Prepara textos de arquivo CSV extraindo coluna especificada."""
|
| df = read_csv_smart(file_bytes)
|
|
|
| if text_column not in df.columns:
|
| available = ", ".join(df.columns.tolist()[:10])
|
| raise ValueError(f"Coluna '{text_column}' não encontrada. Colunas disponíveis: {available}")
|
|
|
| textos = df[text_column].dropna().astype(str).tolist()
|
|
|
| textos = [t.strip() for t in textos if len(t.strip().split()) > 3]
|
| return textos[:n_samples]
|
|
|
| def get_csv_columns(file_bytes: bytes) -> List[str]:
|
| """Retorna lista de colunas de um arquivo CSV."""
|
| df = read_csv_smart(file_bytes, nrows=0)
|
| return df.columns.tolist()
|
|
|
|
|
| def processar_pipeline(textos: List[str], small_dataset: bool = False) -> (pd.DataFrame, np.ndarray):
|
| """
|
| Processa textos através do pipeline de embeddings, UMAP e HDBSCAN.
|
|
|
| Args:
|
| textos: Lista de textos
|
| small_dataset: Se True, usa parâmetros otimizados para datasets pequenos (Tavily)
|
| """
|
| logging.info(f"Iniciando pipeline para {len(textos)} textos (small_dataset={small_dataset})...")
|
| model = load_retriever()
|
|
|
|
|
| embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True)
|
|
|
|
|
| num_textos = len(textos)
|
| n_neighbors = min(15, max(3, num_textos - 1)) if small_dataset else UMAP_N_NEIGHBORS
|
|
|
| reducer = umap.UMAP(n_components=3, n_neighbors=n_neighbors, min_dist=0.0, metric="cosine", random_state=42)
|
| emb_3d = reducer.fit_transform(embeddings)
|
| emb_3d = StandardScaler().fit_transform(emb_3d)
|
|
|
|
|
| if small_dataset:
|
|
|
| min_size = max(2, int(num_textos * 0.1))
|
| min_samples = 1
|
| else:
|
|
|
| min_size = max(10, int(num_textos * 0.02))
|
| min_samples = None
|
|
|
| logging.info(f"HDBSCAN: min_cluster_size={min_size}, min_samples={min_samples}")
|
|
|
| clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size, min_samples=min_samples)
|
| clusters = clusterer.fit_predict(emb_3d)
|
|
|
|
|
| df = pd.DataFrame({
|
| "x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2],
|
| "full_text": textos, "cluster": clusters.astype(str)
|
| })
|
|
|
| del reducer, clusterer, emb_3d; gc.collect()
|
| return df, embeddings
|
|
|
| def calcular_metricas(textos: List[str]) -> Dict[str, Any]:
|
| logging.info("Calculando métricas globais...")
|
| if not textos: return {}
|
|
|
| vectorizer_count = CountVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000)
|
| vectorizer_tfidf = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000)
|
|
|
| try:
|
| counts_matrix = vectorizer_count.fit_transform(textos)
|
| tfidf_matrix = vectorizer_tfidf.fit_transform(textos)
|
| except ValueError:
|
| return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0}
|
|
|
| vocab_count = vectorizer_count.get_feature_names_out()
|
| contagens = counts_matrix.sum(axis=0).A1
|
|
|
| vocab_tfidf = vectorizer_tfidf.get_feature_names_out()
|
| soma_tfidf = tfidf_matrix.sum(axis=0).A1
|
| top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1]
|
| top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf]
|
|
|
| return {
|
| "riqueza_lexical": len(vocab_count),
|
| "top_tfidf_palavras": top_tfidf,
|
| "entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0
|
| }
|
|
|
| def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]:
|
| logging.info("Detectando duplicados...")
|
| mask = df["full_text"].duplicated(keep=False)
|
| grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()}
|
| pares_semanticos = []
|
|
|
| if 2 < len(embeddings) < 5000:
|
| sim = cosine_similarity(embeddings)
|
| triu_indices = np.triu_indices_from(sim, k=1)
|
| sim_vetor = sim[triu_indices]
|
| pares_idx = np.where(sim_vetor > 0.98)[0]
|
| top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]]
|
| for i in top_pares_idx:
|
| idx1, idx2 = triu_indices[0][i], triu_indices[1][i]
|
| if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]:
|
| pares_semanticos.append({
|
| "similaridade": float(sim[idx1, idx2]),
|
| "texto1": df["full_text"].iloc[idx1],
|
| "texto2": df["full_text"].iloc[idx2]
|
| })
|
| return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos}
|
|
|
| def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]:
|
| logging.info("Analisando clusters...")
|
| analise = {}
|
| ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
|
| for cid in ids_clusters_validos:
|
| textos_cluster = df[df["cluster"] == cid]["full_text"].tolist()
|
| if len(textos_cluster) < 2: continue
|
| try:
|
| vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_MULTILINGUAL, max_features=1000)
|
| tfidf_matrix = vectorizer.fit_transform(textos_cluster)
|
| vocab = vectorizer.get_feature_names_out()
|
| soma = tfidf_matrix.sum(axis=0).A1
|
| top_idx = np.argsort(soma)[-5:][::-1]
|
| top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx]
|
| except ValueError:
|
| top_palavras = []
|
| analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras}
|
| return analise
|
|
|
|
|
|
|
|
|
|
|
| app = FastAPI(title="AetherMap API 7.2", version="7.2.0", description="Backend Semantic Search + CSV + Tavily Web Search")
|
|
|
|
|
|
|
| Instrumentator().instrument(app).expose(app)
|
|
|
|
|
| @app.get("/")
|
| async def root():
|
| return {"status": "online", "message": "Aether Map API 7.2 (CSV + Tavily Ready)."}
|
|
|
| @app.post("/csv_columns/")
|
| async def get_columns_api(file: UploadFile = File(...)):
|
| """Retorna as colunas de um arquivo CSV para preview."""
|
| if not file.filename.lower().endswith('.csv'):
|
| raise HTTPException(status_code=400, detail="Arquivo deve ser CSV.")
|
| try:
|
| file_bytes = await file.read()
|
| columns = get_csv_columns(file_bytes)
|
| return {"columns": columns, "filename": file.filename}
|
| except Exception as e:
|
| raise HTTPException(status_code=400, detail=f"Erro ao ler CSV: {str(e)}")
|
|
|
| @app.post("/process/")
|
| async def process_api(
|
| n_samples: int = Form(10000),
|
| file: UploadFile = File(...),
|
| text_column: str = Form(None)
|
| ):
|
| logging.info(f"Processando arquivo: {file.filename}")
|
| try:
|
| file_bytes = await file.read()
|
|
|
|
|
| if file.filename.lower().endswith('.csv'):
|
| if not text_column:
|
| raise HTTPException(status_code=400, detail="Para CSV, informe 'text_column'.")
|
| textos = preparar_textos_csv(file_bytes, text_column, n_samples)
|
| else:
|
| textos = preparar_textos(file_bytes, n_samples)
|
|
|
| if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.")
|
|
|
| df, embeddings = processar_pipeline(textos)
|
|
|
| job_id = str(uuid.uuid4())
|
| cache[job_id] = {"embeddings": embeddings, "df": df}
|
| logging.info(f"Job criado: {job_id}")
|
|
|
| metricas_globais = calcular_metricas(df["full_text"].tolist())
|
| analise_de_duplicados = encontrar_duplicados(df, embeddings)
|
| analise_por_cluster_tfidf = analisar_clusters(df)
|
|
|
| n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0)
|
| n_ruido = int((df["cluster"] == "-1").sum())
|
|
|
| return {
|
| "job_id": job_id,
|
| "metadata": {
|
| "filename": file.filename,
|
| "num_documents_processed": len(df),
|
| "num_clusters_found": n_clusters,
|
| "num_noise_points": n_ruido
|
| },
|
| "metrics": metricas_globais,
|
| "duplicates": analise_de_duplicados,
|
| "cluster_analysis": analise_por_cluster_tfidf,
|
| "plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"),
|
| }
|
| except Exception as e:
|
| logging.error(f"ERRO EM /process/: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
| @app.post("/search/")
|
| async def search_api(query: str = Form(...), job_id: str = Form(...)):
|
| """
|
| ENDPOINT DE BUSCA (RAG Híbrido) com Monitoramento de Latência
|
| """
|
| logging.info(f"Busca: '{query}' [Job: {job_id}]")
|
| if job_id not in cache:
|
| raise HTTPException(status_code=404, detail="Job ID não encontrado.")
|
|
|
| try:
|
| model = load_retriever()
|
| reranker = load_reranker()
|
|
|
| cached_data = cache[job_id]
|
| df = cached_data["df"]
|
| corpus_embeddings = cached_data["embeddings"]
|
|
|
|
|
| query_embedding = model.encode([query], convert_to_numpy=True)
|
| similarities = cosine_similarity(query_embedding, corpus_embeddings)[0]
|
|
|
| top_k_retrieval = 50
|
| top_indices = np.argsort(similarities)[-top_k_retrieval:][::-1]
|
|
|
| candidate_docs = []
|
| candidate_indices = []
|
|
|
| for idx in top_indices:
|
| if similarities[idx] > 0.15:
|
| doc_text = df.iloc[int(idx)]["full_text"]
|
| candidate_docs.append([query, doc_text])
|
| candidate_indices.append(int(idx))
|
|
|
| if not candidate_docs:
|
| return {"summary": "Não foram encontrados documentos relevantes.", "results": []}
|
|
|
|
|
| logging.info(f"Reranking {len(candidate_docs)} documentos...")
|
| rerank_scores = reranker.predict(candidate_docs)
|
|
|
| rerank_results = sorted(
|
| zip(candidate_indices, rerank_scores),
|
| key=lambda x: x[1],
|
| reverse=True
|
| )
|
|
|
| final_top_k = 5
|
| final_results = []
|
| context_parts = []
|
|
|
| for rank, (idx, score) in enumerate(rerank_results[:final_top_k]):
|
| doc_text = df.iloc[idx]["full_text"]
|
| context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------")
|
|
|
| final_results.append({
|
| "index": idx,
|
| "score": float(score),
|
| "cosine_score": float(similarities[idx]),
|
| "citation_id": rank + 1
|
| })
|
|
|
|
|
| summary = ""
|
| if groq_client:
|
| context_str = "\n".join(context_parts)
|
| rag_prompt = (
|
| "INSTRUÇÃO DE SISTEMA:\n"
|
| "Você é o Aetherius, um motor de busca semântica de alta precisão.\n"
|
| "Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n"
|
| "REGRAS OBRIGATÓRIAS:\n"
|
| "1. CITAÇÕES: Toda afirmação deve ter fonte [ID: x]. Ex: 'O lucro subiu [ID: 1].'\n"
|
| "2. HONESTIDADE: Se não estiver no texto, diga que não encontrou.\n"
|
| "3. IDIOMA: Português do Brasil.\n\n"
|
| f"CONTEXTO RECUPERADO:\n{context_str}\n\n"
|
| f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n"
|
| "RESPOSTA:"
|
| )
|
|
|
| try:
|
|
|
| start_time_groq = time.time()
|
|
|
| chat_completion = groq_client.chat.completions.create(
|
| messages=[{"role": "user", "content": rag_prompt}],
|
| model="moonshotai/kimi-k2-instruct-0905",
|
| temperature=0.1,
|
| max_tokens=1024
|
| )
|
|
|
|
|
| duration = time.time() - start_time_groq
|
| GROQ_LATENCY.observe(duration)
|
|
|
|
|
| summary = chat_completion.choices[0].message.content.strip()
|
| except Exception as e:
|
| logging.warning(f"Erro na geração do LLM: {e}")
|
| summary = "Não foi possível gerar o resumo automático, mas os documentos estão listados abaixo."
|
|
|
| return {"summary": summary, "results": final_results}
|
|
|
| except Exception as e:
|
| logging.error(f"ERRO EM /search/: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
| @app.post("/describe_clusters/")
|
| async def describe_clusters_api(job_id: str = Form(...)):
|
| logging.info(f"Descrevendo clusters para Job: {job_id}")
|
| if not groq_client: raise HTTPException(status_code=503, detail="Groq indisponível.")
|
| if job_id not in cache: raise HTTPException(status_code=404, detail="Job não encontrado.")
|
|
|
| try:
|
| cached_data = cache[job_id]
|
| df = cached_data["df"]
|
| embeddings = cached_data["embeddings"]
|
|
|
| champion_docs_by_cluster = {}
|
| cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
|
|
|
| for cid in cluster_ids:
|
| mask = df["cluster"] == cid
|
| cluster_embeddings = embeddings[mask]
|
| cluster_texts = df[mask]["full_text"].tolist()
|
| if len(cluster_texts) < 3: continue
|
|
|
| centroid = np.mean(cluster_embeddings, axis=0)
|
| similarities = cosine_similarity([centroid], cluster_embeddings)[0]
|
| top_indices = np.argsort(similarities)[-3:][::-1]
|
| champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices]
|
|
|
| if not champion_docs_by_cluster: return {"insights": {}}
|
|
|
| prompt_sections = []
|
| for cid, docs in champion_docs_by_cluster.items():
|
| doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs])
|
| prompt_sections.append(f"Grupo {cid}:\n{doc_list}")
|
|
|
| master_prompt = (
|
| "Analise os grupos de texto abaixo. Para cada grupo, retorne um JSON com 'topic_name' e 'core_insight'.\n"
|
| "Responda APENAS o JSON válido.\n\n" + "\n\n".join(prompt_sections)
|
| )
|
|
|
|
|
| start_time_groq = time.time()
|
|
|
| chat_completion = groq_client.chat.completions.create(
|
| messages=[
|
| {"role": "system", "content": "JSON Output Only."},
|
| {"role": "user", "content": master_prompt},
|
| ], model="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0.2,
|
| )
|
|
|
| duration = time.time() - start_time_groq
|
| GROQ_LATENCY.observe(duration)
|
|
|
|
|
| response_content = chat_completion.choices[0].message.content
|
| insights = json.loads(response_content.strip().replace("```json", "").replace("```", ""))
|
| return {"insights": insights}
|
|
|
| except Exception as e:
|
| logging.error(f"ERRO EM /describe_clusters/: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
|
|
| @app.post("/search_web/")
|
| async def search_web_api(
|
| query: str = Form(...),
|
| max_results: int = Form(20),
|
| search_depth: str = Form("basic")
|
| ):
|
| """
|
| Busca na web via Tavily e processa resultados para visualização.
|
| """
|
| if not tavily_client:
|
| raise HTTPException(status_code=503, detail="Tavily não configurado. Defina TAVILY_API_KEY.")
|
|
|
| logging.info(f"Tavily Search: '{query}' (max: {max_results})")
|
|
|
| try:
|
|
|
| search_result = tavily_client.search(
|
| query=query,
|
| max_results=max_results,
|
| search_depth=search_depth,
|
| include_answer=False
|
| )
|
|
|
| results = search_result.get("results", [])
|
| if not results:
|
| return {"error": "Nenhum resultado encontrado.", "results_count": 0}
|
|
|
|
|
| textos = []
|
| sources = []
|
| for r in results:
|
| title = r.get("title", "")
|
| content = r.get("content", "")
|
| url = r.get("url", "")
|
|
|
|
|
| full_text = f"{title}: {content}" if title else content
|
| if len(full_text.strip().split()) > 5:
|
| textos.append(full_text.strip())
|
| sources.append({"title": title, "url": url})
|
|
|
| if not textos:
|
| return {"error": "Resultados sem conteúdo válido.", "results_count": 0}
|
|
|
|
|
| df, embeddings = processar_pipeline(textos, small_dataset=True)
|
|
|
|
|
| df["source_url"] = [sources[i]["url"] if i < len(sources) else "" for i in range(len(df))]
|
| df["source_title"] = [sources[i]["title"] if i < len(sources) else "" for i in range(len(df))]
|
|
|
|
|
| job_id = str(uuid.uuid4())
|
| cache[job_id] = {"embeddings": embeddings, "df": df, "sources": sources}
|
| logging.info(f"Tavily Job criado: {job_id}")
|
|
|
|
|
| metricas_globais = calcular_metricas(df["full_text"].tolist())
|
| analise_por_cluster_tfidf = analisar_clusters(df)
|
|
|
| n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0)
|
| n_ruido = int((df["cluster"] == "-1").sum())
|
|
|
| return {
|
| "job_id": job_id,
|
| "metadata": {
|
| "query": query,
|
| "source": "tavily_web_search",
|
| "num_documents_processed": len(df),
|
| "num_clusters_found": n_clusters,
|
| "num_noise_points": n_ruido
|
| },
|
| "metrics": metricas_globais,
|
| "cluster_analysis": analise_por_cluster_tfidf,
|
| "plot_data": df[["x", "y", "z", "cluster", "full_text", "source_url", "source_title"]].to_dict("records"),
|
| "sources": sources
|
| }
|
|
|
| except Exception as e:
|
| logging.error(f"ERRO EM /search_web/: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
|
|
| @app.post("/entity_graph/")
|
| async def entity_graph_api(job_id: str = Form(...)):
|
| """
|
| Extrai entidades nomeadas e constrói grafo de conexões entre documentos.
|
| """
|
| if job_id not in cache:
|
| raise HTTPException(status_code=404, detail="Job ID não encontrado.")
|
|
|
| logging.info(f"Construindo Knowledge Graph para Job: {job_id}")
|
|
|
| try:
|
| cached_data = cache[job_id]
|
| df = cached_data["df"]
|
| textos = df["full_text"].tolist()
|
|
|
|
|
| logging.info(f"Extraindo entidades de {len(textos)} documentos...")
|
| entities_by_doc = extract_entities(textos)
|
|
|
|
|
| positions = df[["x", "y", "z"]].to_dict("records")
|
|
|
|
|
| graph_data = build_entity_graph(entities_by_doc, positions)
|
|
|
|
|
| graph_data["positions"] = positions
|
| graph_data["num_documents"] = len(textos)
|
|
|
|
|
| graph_data["entities_by_doc"] = [
|
| [{"text": e[0], "type": e[1]} for e in ents]
|
| for ents in entities_by_doc
|
| ]
|
|
|
|
|
| entity_network = build_entity_to_entity_graph(entities_by_doc)
|
| graph_data["entity_network"] = entity_network
|
|
|
| logging.info(f"Grafo construído: {graph_data['edge_count']} arestas doc-doc, {entity_network['node_count']} nós entidade")
|
|
|
| return graph_data
|
|
|
| except Exception as e:
|
| logging.error(f"ERRO EM /entity_graph/: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
|
|
| @app.post("/analyze_graph/")
|
| async def analyze_graph_api(job_id: str = Form(...)):
|
| """
|
| Usa o LLM para analisar o Knowledge Graph e extrair insights semânticos.
|
| """
|
| if job_id not in cache:
|
| raise HTTPException(status_code=404, detail="Job ID não encontrado.")
|
|
|
| logging.info(f"Analisando grafo com LLM para Job: {job_id}")
|
|
|
| try:
|
| cached_data = cache[job_id]
|
| df = cached_data["df"]
|
| textos = df["full_text"].tolist()
|
|
|
|
|
| entities_by_doc = extract_entities(textos)
|
| entity_network = build_entity_to_entity_graph(entities_by_doc)
|
|
|
|
|
| nodes = entity_network.get("nodes", [])[:15]
|
| edges = entity_network.get("edges", [])[:20]
|
| hubs = entity_network.get("hubs", [])[:5]
|
| insights = entity_network.get("insights", {})
|
|
|
| graph_summary = f"""
|
| ANÁLISE DE KNOWLEDGE GRAPH
|
|
|
| ## Visão Geral
|
| - {len(nodes)} entidades principais
|
| - {insights.get('total_connections', 0)} conexões totais
|
| - {insights.get('hub_count', 0)} hubs identificados
|
|
|
| ## Entidades Principais (por importância):
|
| {chr(10).join([f"- {n['entity']} ({n['type']}): {n['docs']} docs, centralidade {n.get('centrality', 0)}" for n in nodes])}
|
|
|
| ## Hubs Centrais (entidades mais conectadas):
|
| {chr(10).join([f"- {h['entity']} ({h['type']}): {h['degree']} conexões" for h in hubs]) if hubs else "Nenhum hub identificado"}
|
|
|
| ## Conexões Mais Fortes:
|
| {chr(10).join([f"- {e['source_entity']} ↔ {e['target_entity']}: {e['weight']} co-ocorrências" for e in edges[:10]])}
|
| """
|
|
|
|
|
| prompt = f"""Você é um analista de inteligência especializado em análise de redes e grafos de conhecimento.
|
|
|
| Analise o seguinte Knowledge Graph extraído de documentos e forneça insights acionáveis:
|
|
|
| {graph_summary}
|
|
|
| Por favor forneça:
|
| 1. **Narrativa Central**: Qual é a história principal que conecta estas entidades?
|
| 2. **Atores Chave**: Quem são os principais players e qual seu papel?
|
| 3. **Relações Ocultas**: Que conexões não-óbvias você identifica?
|
| 4. **Padrões**: Algum padrão interessante nos tipos de entidades ou conexões?
|
| 5. **Investigação**: O que valeria a pena investigar mais a fundo?
|
|
|
| Responda de forma concisa e acionável, como um briefing de inteligência."""
|
|
|
|
|
| completion = groq_client.chat.completions.create(
|
| model="llama-3.3-70b-versatile",
|
| messages=[{"role": "user", "content": prompt}],
|
| temperature=0.7,
|
| max_tokens=1500
|
| )
|
|
|
| analysis = completion.choices[0].message.content
|
|
|
| return {
|
| "analysis": analysis,
|
| "graph_summary": {
|
| "entities": len(nodes),
|
| "connections": insights.get('total_connections', 0),
|
| "hubs": len(hubs)
|
| }
|
| }
|
|
|
| except Exception as e:
|
| logging.error(f"ERRO EM /analyze_graph/: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=str(e)) |