| |
| |
| |
| |
| |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| import gc |
| import uuid |
| import os |
| import json |
| import logging |
|
|
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException |
| from typing import List, Dict, Any |
| from functools import lru_cache |
|
|
| |
| from sentence_transformers import SentenceTransformer, CrossEncoder |
| import umap |
| import hdbscan |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.metrics.pairwise import cosine_similarity |
| from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer |
| from scipy.stats import entropy |
|
|
| |
| from groq import Groq |
|
|
| |
| |
| |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
| |
| RETRIEVAL_MODEL = "all-MiniLM-L6-v2" |
| |
| RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2" |
|
|
| BATCH_SIZE = 256 |
| UMAP_N_NEIGHBORS = 30 |
|
|
| |
| cache: Dict[str, Any] = {} |
|
|
| |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") |
| try: |
| if not GROQ_API_KEY: |
| raise ValueError("GROQ_API_KEY não encontrada nos segredos do ambiente.") |
| groq_client = Groq(api_key=GROQ_API_KEY) |
| logging.info("Cliente Groq inicializado com sucesso.") |
| except Exception as e: |
| logging.error(f"FALHA CRÍTICA AO INICIALIZAR GROQ: {e}") |
| groq_client = None |
|
|
| |
| STOP_WORDS_PT = [ |
| 'de','a','o','que','e','do','da','em','um','para','é','com','não','uma','os','no', |
| 'se','na','por','mais','as','dos','como','mas','foi','ao','ele','das','tem','à', |
| 'seu','sua','ou','ser','quando','muito','há','nos','já','está','eu','também','só', |
| 'pelo','pela','até','isso','ela','entre','era','depois','sem','mesmo','aos','ter', |
| 'seus','quem','nas','me','esse','eles','estão','você','tinha','foram','essa','num', |
| 'nem','suas','meu','às','minha','numa','pelos','elas','havia','seja','qual','será', |
| 'nós','tenho','lhe','deles','essas','esses','pelas','este','fosse','dele','tu','te', |
| 'vocês','vos','lhes','meus','minhas','teu','tua','teus','tuas','nosso','nossa', |
| 'nossos','nossas','dela','delas','esta','estes','estas','aquele','aquela','aqueles', |
| 'aquelas','isto','aquilo','estou','está','estamos','estão','estive','esteve', |
| 'estivemos','estiveram','estava','estávamos','estavam','estivera','estivéramos', |
| 'esteja','estejamos','estejam','estivesse','estivéssemos','estivessem','estiver', |
| 'estivermos','estiverem','hei','há','havemos','hão','houve','houvemos','houveram', |
| 'houvera','houvéramos','haja','hajamos','hajam','houvesse','houvéssemos','houvessem', |
| 'houver','houvermos','houverem','houverei','houverá','houveremos','houverão', |
| 'houveria','houveríamos','houveriam','sou','somos','são','era','éramos','eram', |
| 'fui','foi','fomos','foram','fora','fôramos','seja','sejamos','sejam','fosse', |
| 'fôssemos','fossem','for','formos','forem','serei','será','seremos','serão','seria', |
| 'seríamos','seriam','tenho','tem','temos','tém','tinha','tínhamos','tinham','tive', |
| 'teve','tivemos','tiveram','tivera','tivéramos','tenha','tenhamos','tenham', |
| 'tivesse','tivéssemos','tivessem','tiver','tivermos','tiverem','terei','terá', |
| 'teremos','terão','teria','teríamos','teriam','dá','pergunta','resposta' |
| ] |
|
|
|
|
| |
| |
| |
| @lru_cache(maxsize=1) |
| def load_retriever(): |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| logging.info(f"Carregando Retriever '{RETRIEVAL_MODEL}' em: {device}") |
| return SentenceTransformer(RETRIEVAL_MODEL, device=device) |
|
|
| @lru_cache(maxsize=1) |
| def load_reranker(): |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| logging.info(f"Carregando Reranker '{RERANKER_MODEL}' em: {device}") |
| |
| return CrossEncoder(RERANKER_MODEL, device=device) |
|
|
| def preparar_textos(file_bytes: bytes, n_samples: int) -> List[str]: |
| linhas = file_bytes.decode("utf-8", errors="ignore").splitlines() |
| textos = [s for line in linhas if (s := line.strip()) and len(s.split()) > 3] |
| return textos[:n_samples] |
|
|
| def processar_pipeline(textos: List[str]) -> (pd.DataFrame, np.ndarray): |
| logging.info(f"Iniciando pipeline para {len(textos)} textos...") |
| model = load_retriever() |
| embeddings = model.encode(textos, batch_size=BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True) |
| |
| |
| reducer = umap.UMAP(n_components=3, n_neighbors=UMAP_N_NEIGHBORS, min_dist=0.0, metric="cosine", random_state=42) |
| emb_3d = reducer.fit_transform(embeddings) |
| emb_3d = StandardScaler().fit_transform(emb_3d) |
| |
| num_textos = len(textos) |
| min_size = max(10, int(num_textos * 0.02)) |
| logging.info(f"HDBSCAN min_cluster_size dinâmico definido para: {min_size}") |
| |
| clusterer = hdbscan.HDBSCAN(min_cluster_size=min_size) |
| clusters = clusterer.fit_predict(emb_3d) |
| |
| df = pd.DataFrame({"x": emb_3d[:, 0], "y": emb_3d[:, 1], "z": emb_3d[:, 2], "full_text": textos, "cluster": clusters.astype(str)}) |
| del reducer, clusterer, emb_3d; gc.collect() |
| return df, embeddings |
|
|
| def calcular_metricas(textos: List[str]) -> Dict[str, Any]: |
| logging.info("Calculando métricas globais...") |
| if not textos: return {} |
| vectorizer_count = CountVectorizer(stop_words=STOP_WORDS_PT, max_features=1000) |
| vectorizer_tfidf = TfidfVectorizer(stop_words=STOP_WORDS_PT, max_features=1000) |
| try: |
| counts_matrix = vectorizer_count.fit_transform(textos) |
| tfidf_matrix = vectorizer_tfidf.fit_transform(textos) |
| except ValueError: |
| return {"riqueza_lexical": 0, "top_tfidf_palavras": [], "entropia": 0.0} |
| |
| vocab_count = vectorizer_count.get_feature_names_out() |
| contagens = counts_matrix.sum(axis=0).A1 |
| vocab_tfidf = vectorizer_tfidf.get_feature_names_out() |
| soma_tfidf = tfidf_matrix.sum(axis=0).A1 |
| top_idx_tfidf = np.argsort(soma_tfidf)[-10:][::-1] |
| top_tfidf = [{"palavra": vocab_tfidf[i], "score": round(float(soma_tfidf[i]), 4)} for i in top_idx_tfidf] |
| |
| return { |
| "riqueza_lexical": len(vocab_count), |
| "top_tfidf_palavras": top_tfidf, |
| "entropia": float(entropy(contagens / contagens.sum(), base=2)) if contagens.sum() > 0 else 0.0 |
| } |
|
|
| def encontrar_duplicados(df: pd.DataFrame, embeddings: np.ndarray) -> Dict[str, Any]: |
| logging.info("Detectando duplicados...") |
| mask = df["full_text"].duplicated(keep=False) |
| grupos_exatos = {t: [int(i) for i in idxs] for t, idxs in df[mask].groupby("full_text").groups.items()} |
| pares_semanticos = [] |
| |
| |
| if 2 < len(embeddings) < 5000: |
| sim = cosine_similarity(embeddings) |
| triu_indices = np.triu_indices_from(sim, k=1) |
| sim_vetor = sim[triu_indices] |
| pares_idx = np.where(sim_vetor > 0.98)[0] |
| top_pares_idx = pares_idx[np.argsort(sim_vetor[pares_idx])[-5:][::-1]] |
| for i in top_pares_idx: |
| idx1, idx2 = triu_indices[0][i], triu_indices[1][i] |
| if df["full_text"].iloc[idx1] != df["full_text"].iloc[idx2]: |
| pares_semanticos.append({"similaridade": float(sim[idx1, idx2]), "texto1": df["full_text"].iloc[idx1], "texto2": df["full_text"].iloc[idx2]}) |
| return {"grupos_exatos": grupos_exatos, "pares_semanticos": pares_semanticos} |
|
|
| def analisar_clusters(df: pd.DataFrame) -> Dict[str, Any]: |
| logging.info("Analisando clusters individualmente (TF-IDF)...") |
| analise = {} |
| ids_clusters_validos = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) |
| for cid in ids_clusters_validos: |
| textos_cluster = df[df["cluster"] == cid]["full_text"].tolist() |
| if len(textos_cluster) < 2: continue |
| try: |
| vectorizer = TfidfVectorizer(stop_words=STOP_WORDS_PT, max_features=1000) |
| tfidf_matrix = vectorizer.fit_transform(textos_cluster) |
| vocab = vectorizer.get_feature_names_out() |
| soma = tfidf_matrix.sum(axis=0).A1 |
| top_idx = np.argsort(soma)[-5:][::-1] |
| top_palavras = [{"palavra": vocab[i], "score": round(float(soma[i]), 4)} for i in top_idx] |
| except ValueError: |
| top_palavras = [] |
| analise[cid] = {"num_documentos": len(textos_cluster), "top_palavras": top_palavras} |
| return analise |
|
|
|
|
| |
| |
| |
| app = FastAPI(title="API do AetherMap 6.5 (The Command Killer)", version="6.5.0") |
|
|
| @app.post("/process/") |
| async def process_api(n_samples: int = Form(10000), file: UploadFile = File(...)): |
| logging.info(f"Requisição recebida para {file.filename}.") |
| try: |
| file_bytes = await file.read() |
| textos = preparar_textos(file_bytes, n_samples) |
| if not textos: raise HTTPException(status_code=400, detail="Nenhum texto válido encontrado.") |
|
|
| df, embeddings = processar_pipeline(textos) |
| |
| job_id = str(uuid.uuid4()) |
| cache[job_id] = {"embeddings": embeddings, "df": df} |
| logging.info(f"Resultados salvos no cache com o ID: {job_id}") |
|
|
| metricas_globais = calcular_metricas(df["full_text"].tolist()) |
| analise_de_duplicados = encontrar_duplicados(df, embeddings) |
| analise_por_cluster_tfidf = analisar_clusters(df) |
|
|
| n_clusters = len(df["cluster"].unique()) - (1 if "-1" in df["cluster"].unique() else 0) |
| n_ruido = int((df["cluster"] == "-1").sum()) |
|
|
| return { |
| "job_id": job_id, |
| "metadata": {"filename": file.filename, "num_documents_processed": len(df), "num_clusters_found": n_clusters, "num_noise_points": n_ruido}, |
| "metrics": metricas_globais, |
| "duplicates": analise_de_duplicados, |
| "cluster_analysis": analise_por_cluster_tfidf, |
| "plot_data": df[["x", "y", "z", "cluster", "full_text"]].to_dict("records"), |
| } |
| except Exception as e: |
| logging.error(f"ERRO CRÍTICO EM /process/: {e}", exc_info=True) |
| raise HTTPException(status_code=500, detail=f"Erro interno do servidor: {str(e)}") |
|
|
|
|
| @app.post("/search/") |
| async def search_api(query: str = Form(...), job_id: str = Form(...)): |
| """ |
| RAG 2.0: Recuperação Híbrida (Bi-Encoder + Cross-Encoder Reranking) |
| Fornece citações precisas estilo Command R+. |
| """ |
| logging.info(f"Busca RAG 2.0 recebida para: '{query}' [Job: {job_id}]") |
| if job_id not in cache: |
| raise HTTPException(status_code=404, detail="Job ID não encontrado ou expirado.") |
|
|
| try: |
| |
| model = load_retriever() |
| reranker = load_reranker() |
| |
| cached_data = cache[job_id] |
| df = cached_data["df"] |
| corpus_embeddings = cached_data["embeddings"] |
| |
| |
| |
| query_embedding = model.encode([query], convert_to_numpy=True) |
| similarities = cosine_similarity(query_embedding, corpus_embeddings)[0] |
| |
| top_k_retrieval = 50 |
| top_indices = np.argsort(similarities)[-top_k_retrieval:][::-1] |
| |
| |
| candidate_docs = [] |
| candidate_indices = [] |
| |
| |
| for idx in top_indices: |
| if similarities[idx] > 0.15: |
| doc_text = df.iloc[int(idx)]["full_text"] |
| candidate_docs.append([query, doc_text]) |
| candidate_indices.append(int(idx)) |
|
|
| if not candidate_docs: |
| return {"summary": "Não foram encontrados documentos minimamente relevantes.", "results": []} |
|
|
| |
| |
| logging.info(f"Reordenando {len(candidate_docs)} documentos com Cross-Encoder...") |
| rerank_scores = reranker.predict(candidate_docs) |
| |
| |
| rerank_results = sorted( |
| zip(candidate_indices, rerank_scores), |
| key=lambda x: x[1], |
| reverse=True |
| ) |
|
|
| |
| final_top_k = 5 |
| final_results = [] |
| context_parts = [] |
|
|
| for rank, (idx, score) in enumerate(rerank_results[:final_top_k]): |
| doc_text = df.iloc[idx]["full_text"] |
| |
| context_parts.append(f"[ID: {rank+1}] DOCUMENTO:\n{doc_text}\n---------------------") |
| |
| final_results.append({ |
| "index": idx, |
| "score": float(score), |
| "cosine_score": float(similarities[idx]), |
| "citation_id": rank + 1 |
| }) |
|
|
| context_str = "\n".join(context_parts) |
|
|
| |
| summary = "" |
| if groq_client: |
| |
| rag_prompt = ( |
| "INSTRUÇÃO DE SISTEMA:\n" |
| "Você é o Aetherius, um motor de busca semântica de alta precisão.\n" |
| "Sua missão é responder à pergunta do usuário baseando-se ESTRITAMENTE nos documentos fornecidos.\n\n" |
| "REGRAS DE OURO:\n" |
| "1. CITAÇÕES OBRIGATÓRIAS: Toda afirmação factual deve ser seguida da fonte no formato [ID: x].\n" |
| " Exemplo: 'O lucro subiu 10% [ID: 1], mas a margem caiu [ID: 2].'\n" |
| "2. HONESTIDADE INTELECTUAL: Se a resposta não estiver no contexto, diga 'Não encontrei informações suficientes nos documentos'.\n" |
| "3. ESTILO: Seja direto, técnico e conciso. Fale em Português do Brasil.\n\n" |
| f"CONTEXTO RECUPERADO (Ordenado por Relevância):\n{context_str}\n\n" |
| f"PERGUNTA DO USUÁRIO: \"{query}\"\n\n" |
| "RESPOSTA:" |
| ) |
|
|
| try: |
| |
| chat_completion = groq_client.chat.completions.create( |
| messages=[{"role": "user", "content": rag_prompt}], |
| model="moonshotai/kimi-k2-instruct-0905", |
| temperature=0.1, |
| max_tokens=1024 |
| ) |
| summary = chat_completion.choices[0].message.content.strip() |
| logging.info(f"Resumo gerado com sucesso.") |
| except Exception as e: |
| logging.warning(f"Falha ao gerar resumo com a Groq: {e}") |
| summary = "O Oráculo está indisponível, mas os documentos mais relevantes estão listados abaixo." |
| |
| return {"summary": summary, "results": final_results} |
|
|
| except Exception as e: |
| logging.error(f"ERRO CRÍTICO EM /search/: {e}", exc_info=True) |
| raise HTTPException(status_code=500, detail=f"Erro interno na busca semântica: {str(e)}") |
|
|
|
|
| @app.post("/describe_clusters/") |
| async def describe_clusters_api(job_id: str = Form(...)): |
| logging.info(f"Requisição recebida para descrever clusters do job '{job_id}'.") |
| if groq_client is None: raise HTTPException(status_code=503, detail="O Oráculo (Groq) não está disponível.") |
| if job_id not in cache: raise HTTPException(status_code=404, detail="Job ID não encontrado ou expirado.") |
|
|
| try: |
| cached_data = cache[job_id] |
| df = cached_data["df"] |
| embeddings = cached_data["embeddings"] |
| |
| champion_docs_by_cluster = {} |
| cluster_ids = sorted([c for c in df["cluster"].unique() if c != "-1"], key=int) |
|
|
| for cid in cluster_ids: |
| mask = df["cluster"] == cid |
| cluster_embeddings = embeddings[mask] |
| cluster_texts = df[mask]["full_text"].tolist() |
| if len(cluster_texts) < 3: continue |
| |
| |
| centroid = np.mean(cluster_embeddings, axis=0) |
| similarities = cosine_similarity([centroid], cluster_embeddings)[0] |
| top_indices = np.argsort(similarities)[-3:][::-1] |
| champion_docs_by_cluster[cid] = [cluster_texts[i] for i in top_indices] |
|
|
| if not champion_docs_by_cluster: return {"insights": {}} |
| |
| prompt_sections = [] |
| for cid, docs in champion_docs_by_cluster.items(): |
| doc_list = "\n".join([f"- {doc[:300]}..." for doc in docs]) |
| prompt_sections.append(f"Grupo {cid}:\n{doc_list}") |
|
|
| master_prompt = ( |
| "Você é Aetherius, um analista de dados especialista. Para cada grupo de textos, responda com um objeto JSON com duas chaves: " |
| "'topic_name' (um nome temático curto, máx 5 palavras) e 'core_insight' (um resumo de uma frase da ideia central).\n\n" |
| "Analise os seguintes grupos:\n\n" + "\n\n".join(prompt_sections) + |
| "\n\nResponda APENAS com o JSON." |
| ) |
| |
| |
| chat_completion = groq_client.chat.completions.create( |
| messages=[ |
| {"role": "system", "content": "Siga as instruções e responda apenas com um objeto JSON válido."}, |
| {"role": "user", "content": master_prompt}, |
| ], model="meta-llama/llama-3.3-70b-versatile", temperature=0.2, |
| ) |
| response_content = chat_completion.choices[0].message.content |
| |
| try: |
| insights = json.loads(response_content.strip().replace("```json", "").replace("```", "")) |
| except json.JSONDecodeError: |
| logging.error(f"Falha ao decodificar JSON da Groq. Resposta: {response_content}") |
| raise HTTPException(status_code=500, detail="O Oráculo respondeu em um formato inesperado.") |
| |
| return {"insights": insights} |
| except Exception as e: |
| logging.error(f"ERRO CRÍTICO em /describe_clusters/: {e}", exc_info=True) |
| raise HTTPException(status_code=500, detail=f"Erro interno ao gerar insights: {str(e)}") |