AetherMap / app.py
Madras1's picture
Update app.py
f40c66d verified
raw
history blame
19.7 kB
# ==============================================================================
# API do AetherMap — VERSÃO 6.5 (THE COMMAND KILLER)
# Backend com RAG em Dois Estágios (Retrieval + Reranking) e Citações Nativas.
# Arquitetura otimizada por Berta & Gabriel.
# ==============================================================================
import numpy as np
import pandas as pd
import torch
import gc
import uuid
import os
import json
import logging
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from typing import List, Dict, Any
from functools import lru_cache
# Ferramentas de Alquimia
from sentence_transformers import SentenceTransformer, CrossEncoder # <--- A ARMA SECRETA
import umap
import hdbscan
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from scipy.stats import entropy
# A Conexão com o Oráculo
from groq import Groq
# ==============================================================================
# CONFIGURAÇÕES GERAIS E CACHE
# ==============================================================================
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Bi-Encoder para varredura rápida
RETRIEVAL_MODEL = "all-MiniLM-L6-v2"
# Cross-Encoder para precisão cirúrgica (Reranking)
RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2"
BATCH_SIZE = 256
UMAP_N_NEIGHBORS = 30
# A Câmara do Tesouro (Cache de Sessão)
cache: Dict[str, Any] = {}
# Inicialização segura do Oráculo Groq
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
try:
if not GROQ_API_KEY:
raise ValueError("GROQ_API_KEY não encontrada nos segredos do ambiente.")
groq_client = Groq(api_key=GROQ_API_KEY)
logging.info("Cliente Groq inicializado com sucesso.")
except Exception as e:
logging.error(f"FALHA CRÍTICA AO INICIALIZAR GROQ: {e}")
groq_client = None
# Palavras de Parada (Mantidas da versão anterior)
STOP_WORDS_PT = [
'de','a','o','que','e','do','da','em','um','para','é','com','não','uma','os','no',
'se','na','por','mais','as','dos','como','mas','foi','ao','ele','das','tem','à',
'seu','sua','ou','ser','quando','muito','há','nos','já','está','eu','também','só',
'pelo','pela','até','isso','ela','entre','era','depois','sem','mesmo','aos','ter',
'seus','quem','nas','me','esse','eles','estão','você','tinha','foram','essa','num',
'nem','suas','meu','às','minha','numa','pelos','elas','havia','seja','qual','será',
'nós','tenho','lhe','deles','essas','esses','pelas','este','fosse','dele','tu','te',
'vocês','vos','lhes','meus','minhas','teu','tua','teus','tuas','nosso','nossa',
'nossos','nossas','dela','delas','esta','estes','estas','aquele','aquela','aqueles',
'aquelas','isto','aquilo','estou','está','estamos','estão','estive','esteve',
'estivemos','estiveram','estava','estávamos','estavam','estivera','estivéramos',
'esteja','estejamos','estejam','estivesse','estivéssemos','estivessem','estiver',
'estivermos','estiverem','hei','há','havemos','hão','houve','houvemos','houveram',
'houvera','houvéramos','haja','hajamos','hajam','houvesse','houvéssemos','houvessem',
'houver','houvermos','houverem','houverei','houverá','houveremos','houverão',
'houveria','houveríamos','houveriam','sou','somos','são','era','éramos','eram',
'fui','foi','fomos','foram','fora','fôramos','seja','sejamos','sejam','fosse',
'fôssemos','fossem','for','formos','forem','serei','será','seremos','serão','seria',
'seríamos','seriam','tenho','tem','temos','tém','tinha','tínhamos','tinham','tive',
'teve','tivemos','tiveram','tivera','tivéramos','tenha','tenhamos','tenham',
'tivesse','tivéssemos','tivessem','tiver','tivermos','tiverem','terei','terá',
'teremos','terão','teria','teríamos','teriam','dá','pergunta','resposta'
]
# ==============================================================================
# FUNÇÕES DE ANÁLISE E CARREGAMENTO DE MODELOS
# ==============================================================================
@lru_cache(maxsize=1)
def load_retriever():
device = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}")
return SentenceTransformer(RETRIEVAL_MODEL, device=device)
@lru_cache(maxsize=1)
def load_reranker():
device = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}")
# O CrossEncoder processa par (query, doc) juntos. É mais lento, mas MUITO mais preciso.
return CrossEncoder(RERANKER_MODEL, device=device)
def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]:
linhas = file_bytes.decode("utf-8", errors="ignore").splitlines()
textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3]
return textos[:n_samples]
def processar_pipeline(textos: List[str]) -> (pd.DataFrame, np.ndarray):
logging.info(f"Iniciando pipeline para {len(textos)} textos...")
model = load_retriever()
embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True)
# Redução dimensional e Clustering
reducer = umap.UMAP(n_components=3, n_neighbors=UMAP_N_NEIGHBORS, min_dist=0.0, metric="cosine", random_state=42)
emb_3d = reducer.fit_transform(embeddings)
emb_3d = StandardScaler().fit_transform(emb_3d)
num_textos = len(textos)
min_size = max(10, int(num_textos * 0.02))
logging.info(f"HDBSCAN min_cluster_size dinâmico definido para: {min_size}")
clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size)
clusters = clusterer.fit_predict(emb_3d)
df = pd.DataFrame({"x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2], "full_text": textos, "cluster": clusters.astype(str)})
del reducer, clusterer, emb_3d; gc.collect()
return df, embeddings
def calcular_metricas(textos: List[str]) -> Dict[str, Any]:
logging.info("Calculando métricas globais...")
if not textos: return {}
vectorizer_count = CountVectorizer(stop_words=STOP_WORDS_PT, max_features=1000)
vectorizer_tfidf = TfidfVectorizer(stop_words=STOP_WORDS_PT, max_features=1000)
try:
counts_matrix = vectorizer_count.fit_transform(textos)
tfidf_matrix = vectorizer_tfidf.fit_transform(textos)
except ValueError:
return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0}
vocab_count = vectorizer_count.get_feature_names_out()
contagens = counts_matrix.sum(axis=0).A1
vocab_tfidf = vectorizer_tfidf.get_feature_names_out()
soma_tfidf = tfidf_matrix.sum(axis=0).A1
top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1]
top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf]
return {
"riqueza_lexical": len(vocab_count),
"top_tfidf_palavras": top_tfidf,
"entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0
}
def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]:
logging.info("Detectando duplicados...")
mask = df["full_text"].duplicated(keep=False)
grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()}
pares_semanticos = []
# Só roda similaridade pesada se não for gigante
if 2 < len(embeddings) < 5000:
sim = cosine_similarity(embeddings)
triu_indices = np.triu_indices_from(sim, k=1)
sim_vetor = sim[triu_indices]
pares_idx = np.where(sim_vetor > 0.98)[0]
top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]]
for i in top_pares_idx:
idx1, idx2 = triu_indices[0][i], triu_indices[1][i]
if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]:
pares_semanticos.append({"similaridade": float(sim[idx1, idx2]), "texto1": df["full_text"].iloc[idx1], "texto2": df["full_text"].iloc[idx2]})
return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos}
def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]:
logging.info("Analisando clusters individualmente (TF-IDF)...")
analise = {}
ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
for cid in ids_clusters_validos:
textos_cluster = df[df["cluster"] == cid]["full_text"].tolist()
if len(textos_cluster) < 2: continue
try:
vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_PT, max_features=1000)
tfidf_matrix = vectorizer.fit_transform(textos_cluster)
vocab = vectorizer.get_feature_names_out()
soma = tfidf_matrix.sum(axis=0).A1
top_idx = np.argsort(soma)[-5:][::-1]
top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx]
except ValueError:
top_palavras = []
analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras}
return analise
# ==============================================================================
# FASTAPI — DEFINIÇÃO DA API
# ==============================================================================
app = FastAPI(title="API do AetherMap 6.5 (The Command Killer)", version="6.5.0")
@app.post("/process/")
async def process_api(n_samples: int = Form(10000), file: UploadFile = File(...)):
logging.info(f"Requisição recebida para {file.filename}.")
try:
file_bytes = await file.read()
textos = preparar_textos(file_bytes, n_samples)
if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.")
df, embeddings = processar_pipeline(textos)
job_id = str(uuid.uuid4())
cache[job_id] = {"embeddings": embeddings, "df": df}
logging.info(f"Resultados salvos no cache com o ID: {job_id}")
metricas_globais = calcular_metricas(df["full_text"].tolist())
analise_de_duplicados = encontrar_duplicados(df, embeddings)
analise_por_cluster_tfidf = analisar_clusters(df)
n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0)
n_ruido = int((df["cluster"] == "-1").sum())
return {
"job_id": job_id,
"metadata": {"filename": file.filename, "num_documents_processed": len(df), "num_clusters_found": n_clusters, "num_noise_points": n_ruido},
"metrics": metricas_globais,
"duplicates": analise_de_duplicados,
"cluster_analysis": analise_por_cluster_tfidf,
"plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"),
}
except Exception as e:
logging.error(f"ERRO CRÍTICO EM /process/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erro interno do servidor: {str(e)}")
@app.post("/search/")
async def search_api(query: str = Form(...), job_id: str = Form(...)):
"""
RAG 2.0: Recuperação Híbrida (Bi-Encoder + Cross-Encoder Reranking)
Fornece citações precisas estilo Command R+.
"""
logging.info(f"Busca RAG 2.0 recebida para: '{query}' [Job: {job_id}]")
if job_id not in cache:
raise HTTPException(status_code=404, detail="Job ID não encontrado ou expirado.")
try:
# --- FASE 1: RECUPERAÇÃO (RETRIEVAL - BI-ENCODER) ---
model = load_retriever()
reranker = load_reranker() # Carrega o Juiz
cached_data = cache[job_id]
df = cached_data["df"]
corpus_embeddings = cached_data["embeddings"]
# Passo 1: Busca inicial ampla com Cosseno (Pega 50 candidatos)
# Isso garante que não perdemos nada relevante que tenha palavras-chave parecidas
query_embedding = model.encode([query], convert_to_numpy=True)
similarities = cosine_similarity(query_embedding, corpus_embeddings)[0]
top_k_retrieval = 50
top_indices = np.argsort(similarities)[-top_k_retrieval:][::-1]
# Prepara dados para o Reranker: Lista de pares [Query, Doc]
candidate_docs = []
candidate_indices = []
# Filtro de corte mínimo para não passar lixo total pro reranker
for idx in top_indices:
if similarities[idx] > 0.15: # Threshold leve
doc_text = df.iloc[int(idx)]["full_text"]
candidate_docs.append([query, doc_text])
candidate_indices.append(int(idx))
if not candidate_docs:
return {"summary": "Não foram encontrados documentos minimamente relevantes.", "results": []}
# --- FASE 2: REORDENAÇÃO (RERANKING - CROSS-ENCODER) ---
# O modelo Cross-Encoder lê a pergunta e o documento juntos e dá um score de verdade.
logging.info(f"Reordenando {len(candidate_docs)} documentos com Cross-Encoder...")
rerank_scores = reranker.predict(candidate_docs)
# Ordena pelos scores do reranker (do maior para o menor)
rerank_results = sorted(
zip(candidate_indices, rerank_scores),
key=lambda x: x[1],
reverse=True
)
# Agora pegamos o Top 5 da "Nata da Nata" para enviar ao Kimi
final_top_k = 5
final_results = []
context_parts = []
for rank, (idx, score) in enumerate(rerank_results[:final_top_k]):
doc_text = df.iloc[idx]["full_text"]
# Montamos o contexto COM O ID EXPLÍCITO para forçar a citação
context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------")
final_results.append({
"index": idx,
"score": float(score), # Score do Reranker (Confiança semântica)
"cosine_score": float(similarities[idx]), # Score original (para debug)
"citation_id": rank + 1
})
context_str = "\n".join(context_parts)
# --- FASE 3: GERAÇÃO CITADA (READER - KIMI K2) ---
summary = ""
if groq_client:
# Prompt de Sistema projetado para emular o comportamento do Command R+
rag_prompt = (
"INSTRUÇÃO DE SISTEMA:\n"
"Você é o Aetherius, um motor de busca semântica de alta precisão.\n"
"Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n"
"REGRAS DE OURO:\n"
"1. CITAÇÕES OBRIGATÓRIAS: Toda afirmação factual deve ser seguida da fonte no formato [ID: x].\n"
" Exemplo: 'O lucro subiu 10% [ID: 1], mas a margem caiu [ID: 2].'\n"
"2. HONESTIDADE INTELECTUAL: Se a resposta não estiver no contexto, diga 'Não encontrei informações suficientes nos documentos'.\n"
"3. ESTILO: Seja direto, técnico e conciso. Fale em Português do Brasil.\n\n"
f"CONTEXTO RECUPERADO (Ordenado por Relevância):\n{context_str}\n\n"
f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n"
"RESPOSTA:"
)
try:
# Usando Kimi K2 pois ele tem ótimo raciocínio lógico para síntese
chat_completion = groq_client.chat.completions.create(
messages=[{"role": "user", "content": rag_prompt}],
model="moonshotai/kimi-k2-instruct-0905",
temperature=0.1, # Temperatura baixa é CRUCIAL para não inventar citações
max_tokens=1024
)
summary = chat_completion.choices[0].message.content.strip()
logging.info(f"Resumo gerado com sucesso.")
except Exception as e:
logging.warning(f"Falha ao gerar resumo com a Groq: {e}")
summary = "O Oráculo está indisponível, mas os documentos mais relevantes estão listados abaixo."
return {"summary": summary, "results": final_results}
except Exception as e:
logging.error(f"ERRO CRÍTICO EM /search/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erro interno na busca semântica: {str(e)}")
@app.post("/describe_clusters/")
async def describe_clusters_api(job_id: str = Form(...)):
logging.info(f"Requisição recebida para descrever clusters do job '{job_id}'.")
if groq_client is None: raise HTTPException(status_code=503, detail="O Oráculo (Groq) não está disponível.")
if job_id not in cache: raise HTTPException(status_code=404, detail="Job ID não encontrado ou expirado.")
try:
cached_data = cache[job_id]
df = cached_data["df"]
embeddings = cached_data["embeddings"]
champion_docs_by_cluster = {}
cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int)
for cid in cluster_ids:
mask = df["cluster"] == cid
cluster_embeddings = embeddings[mask]
cluster_texts = df[mask]["full_text"].tolist()
if len(cluster_texts) < 3: continue
# Pega os documentos mais próximos do centróide do cluster
centroid = np.mean(cluster_embeddings, axis=0)
similarities = cosine_similarity([centroid], cluster_embeddings)[0]
top_indices = np.argsort(similarities)[-3:][::-1]
champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices]
if not champion_docs_by_cluster: return {"insights": {}}
prompt_sections = []
for cid, docs in champion_docs_by_cluster.items():
doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs])
prompt_sections.append(f"Grupo {cid}:\n{doc_list}")
master_prompt = (
"Você é Aetherius, um analista de dados especialista. Para cada grupo de textos, responda com um objeto JSON com duas chaves: "
"'topic_name' (um nome temático curto, máx 5 palavras) e 'core_insight' (um resumo de uma frase da ideia central).\n\n"
"Analise os seguintes grupos:\n\n" + "\n\n".join(prompt_sections) +
"\n\nResponda APENAS com o JSON."
)
# Mantendo Llama para tarefa de JSON simples, pois é rápido e segue bem formato
chat_completion = groq_client.chat.completions.create(
messages=[
{"role": "system", "content": "Siga as instruções e responda apenas com um objeto JSON válido."},
{"role": "user", "content": master_prompt},
], model="meta-llama/llama-3.3-70b-versatile", temperature=0.2, # Atualizei para o Llama 3.3 que é melhor
)
response_content = chat_completion.choices[0].message.content
try:
insights = json.loads(response_content.strip().replace("```json", "").replace("```", ""))
except json.JSONDecodeError:
logging.error(f"Falha ao decodificar JSON da Groq. Resposta: {response_content}")
raise HTTPException(status_code=500, detail="O Oráculo respondeu em um formato inesperado.")
return {"insights": insights}
except Exception as e:
logging.error(f"ERRO CRÍTICO em /describe_clusters/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erro interno ao gerar insights: {str(e)}")