|
|
import json |
|
|
import os |
|
|
import base64 |
|
|
import re |
|
|
import warnings |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
import gradio as gr |
|
|
import google.generativeai as genai |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore", category=FutureWarning, module="google.api_core") |
|
|
|
|
|
|
|
|
API_KEY = os.getenv("GOOGLE_API_KEY", "") |
|
|
if not API_KEY: |
|
|
print("❌ ERRO CRÍTICO: GOOGLE_API_KEY não encontrada nas variáveis de ambiente.") |
|
|
print("Defina com: export GOOGLE_API_KEY='sua_chave'") |
|
|
|
|
|
else: |
|
|
print(f"✅ API Key carregada (termina em ...{API_KEY[-4:]})") |
|
|
|
|
|
genai.configure(api_key=API_KEY) |
|
|
model = genai.GenerativeModel("gemini-1.5-flash") |
|
|
|
|
|
TITLE = "# 🛠️ Anise v10.2 DEBUG MODE\n**Console mostra SAÍDA BRUTA**" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def debug_print(titulo: str, conteudo: any): |
|
|
"""Imprime no console com formatação visível para debug""" |
|
|
print(f"\n{'='*60}") |
|
|
print(f"🐛 DEBUG: {titulo}") |
|
|
print(f"{'-'*60}") |
|
|
if isinstance(conteudo, (dict, list)): |
|
|
print(json.dumps(conteudo, indent=2, ensure_ascii=False)) |
|
|
else: |
|
|
print(str(conteudo)) |
|
|
print(f"{'='*60}\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def processar_anexo(arquivo) -> Tuple[str, str]: |
|
|
if arquivo is None: |
|
|
return "", "nenhum" |
|
|
|
|
|
caminho = str(arquivo) |
|
|
print(f"📂 Processando arquivo: {caminho}") |
|
|
|
|
|
if caminho.lower().endswith('.pdf'): |
|
|
import PyPDF2 |
|
|
with open(caminho, 'rb') as f: |
|
|
leitor = PyPDF2.PdfReader(f) |
|
|
texto = "".join(pagina.extract_text() + "\n" for pagina in leitor.pages[:5]) |
|
|
print(f"📄 PDF extraído: {len(texto)} caracteres") |
|
|
return texto[:5000], "pdf" |
|
|
|
|
|
elif any(caminho.lower().endswith(ext) for ext in ['.png','.jpg','.jpeg','.gif','.webp']): |
|
|
with open(caminho, 'rb') as f: |
|
|
encoded = base64.b64encode(f.read()).decode() |
|
|
print(f"🖼️ Imagem codificada: {len(encoded)} bytes") |
|
|
return encoded, "imagem" |
|
|
|
|
|
return "", "nao_suportado" |
|
|
|
|
|
def limpar_json_raw(texto: str) -> str: |
|
|
"""Limpa markdown ```json ... ``` para tentar parsear""" |
|
|
texto = re.sub(r'^```json\s*', '', texto, flags=re.MULTILINE) |
|
|
texto = re.sub(r'^```\s*', '', texto, flags=re.MULTILINE) |
|
|
texto = re.sub(r'```$', '', texto, flags=re.MULTILINE) |
|
|
return texto.strip() |
|
|
|
|
|
def chamar_gemini_json(prompt_base: str, etapa: str, temperatura=0.2, max_tokens=2000) -> Dict: |
|
|
""" |
|
|
Chama o Gemini e imprime o RAW OUTPUT. |
|
|
Se falhar o JSON, explode o erro para análise. |
|
|
""" |
|
|
full_prompt = f"""{prompt_base} |
|
|
|
|
|
--- |
|
|
**INSTRUÇÃO DE SISTEMA OBRIGATÓRIA:** |
|
|
1. Responda APENAS com um JSON válido. |
|
|
2. NÃO use blocos de código markdown (```json). |
|
|
3. NÃO escreva texto antes ou depois do JSON. |
|
|
""" |
|
|
|
|
|
|
|
|
print(f"📡 Enviando requisição para {etapa}...") |
|
|
response = model.generate_content( |
|
|
full_prompt, |
|
|
generation_config=genai.types.GenerationConfig( |
|
|
temperature=temperatura, |
|
|
max_output_tokens=max_tokens, |
|
|
|
|
|
) |
|
|
) |
|
|
|
|
|
raw_text = response.text |
|
|
|
|
|
|
|
|
print(f"\n🛑 SAÍDA BRUTA GEMINI [{etapa}]:") |
|
|
print(f">>> INICIO RAW <<<\n{raw_text}\n>>> FIM RAW <<<") |
|
|
|
|
|
|
|
|
texto_limpo = limpar_json_raw(raw_text) |
|
|
|
|
|
if not texto_limpo: |
|
|
raise ValueError(f"[{etapa}] Gemini retornou texto vazio!") |
|
|
|
|
|
try: |
|
|
dados_json = json.loads(texto_limpo) |
|
|
return dados_json |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"❌ FALHA NO PARSE JSON [{etapa}]") |
|
|
print(f"Erro: {e}") |
|
|
print("Tentando corrigir manualmente strings...") |
|
|
|
|
|
raise e |
|
|
|
|
|
def historico_compacto(historico: List) -> str: |
|
|
if not historico: return "Nenhum." |
|
|
return "\n".join([f"{m['role']}: {str(m['content'])[:100]}..." for m in historico[-4:]]) |
|
|
|
|
|
def criar_dna() -> Dict: |
|
|
return {"historico": [], "meta": {"turnos": 0}} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def passo_0_aluno(pergunta: str, historico: List) -> Dict: |
|
|
prompt = f"""ETAPA: P0-INTENÇÃO |
|
|
HISTÓRICO: {historico_compacto(historico)} |
|
|
PERGUNTA: {pergunta} |
|
|
|
|
|
Analise a intenção do usuário. |
|
|
JSON: {{"relacao": "continua|nova", "intent": "resumo"}}""" |
|
|
return chamar_gemini_json(prompt, "P0") |
|
|
|
|
|
def passo_1_triagem(pergunta: str, p0: Dict) -> Dict: |
|
|
prompt = f"""ETAPA: P1-TRIAGEM |
|
|
P0: {json.dumps(p0)} |
|
|
PERGUNTA: {pergunta} |
|
|
|
|
|
Classifique a pergunta. |
|
|
JSON: {{"tipo": "factual|analitica", "complexidade": "alta|baixa"}}""" |
|
|
return chamar_gemini_json(prompt, "P1") |
|
|
|
|
|
def passo_x1_lacunas(pergunta: str, p1: Dict) -> Dict: |
|
|
prompt = f"""ETAPA: X1-LACUNAS |
|
|
P1: {json.dumps(p1)} |
|
|
PERGUNTA: {pergunta} |
|
|
|
|
|
Quais perguntas o assistente deve fazer a si mesmo para responder isso perfeitamente? |
|
|
JSON: {{"perguntas_internas": ["pergunta1", "pergunta2"]}}""" |
|
|
return chamar_gemini_json(prompt, "X1") |
|
|
|
|
|
def passo_x2_resolver(x1: Dict, historico: List) -> Dict: |
|
|
perguntas = x1.get("perguntas_internas", []) |
|
|
if not perguntas: return {"respostas": []} |
|
|
|
|
|
prompt = f"""ETAPA: X2-RESOLUÇÃO |
|
|
PERGUNTAS: {json.dumps(perguntas)} |
|
|
CONTEXTO: {historico_compacto(historico)} |
|
|
|
|
|
Responda as perguntas internas. |
|
|
JSON: {{"respostas": [{{"p": "pergunta", "r": "resposta", "confianca": "alta|baixa"}}]}}""" |
|
|
return chamar_gemini_json(prompt, "X2") |
|
|
|
|
|
def passo_2_cenarios(pergunta: str, x2: Dict) -> Dict: |
|
|
prompt = f"""ETAPA: P2-CENÁRIOS |
|
|
DADOS X2: {json.dumps(x2)} |
|
|
PERGUNTA ORIGINAL: {pergunta} |
|
|
|
|
|
Crie cenários de resposta ou decida parar se faltar informação crítica. |
|
|
JSON: {{"decisao": "continuar|parar", "cenarios": ["C1: ...", "C2: ..."], "motivo_parada": "..."}}""" |
|
|
return chamar_gemini_json(prompt, "P2") |
|
|
|
|
|
def passo_7_sintese(p2: Dict, pergunta: str) -> Dict: |
|
|
prompt = f"""ETAPA: P7-FINAL |
|
|
CENÁRIOS: {json.dumps(p2)} |
|
|
PERGUNTA: {pergunta} |
|
|
|
|
|
Escreva a resposta final para o usuário. |
|
|
JSON: {{"resposta_final": "texto aqui"}}""" |
|
|
return chamar_gemini_json(prompt, "P7", temperatura=0.7) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def processar_pipeline(pergunta: str, historico: List, arquivo_anexo=None, dna=None) -> Tuple[str, List, Dict]: |
|
|
|
|
|
debug_print("INICIO PIPELINE", f"Pergunta: {pergunta}\nAnexo: {arquivo_anexo}") |
|
|
|
|
|
if dna is None: dna = criar_dna() |
|
|
|
|
|
|
|
|
conteudo_anexo, tipo_anexo = processar_anexo(arquivo_anexo) |
|
|
|
|
|
|
|
|
if tipo_anexo == "pdf": |
|
|
prompt_final = f"CONTEXTO DO PDF:\n{conteudo_anexo}\n---\nPERGUNTA: {pergunta}" |
|
|
elif tipo_anexo == "imagem": |
|
|
prompt_final = f"[IMAGEM BASE64: {conteudo_anexo}]\nAnalise a imagem.\nPERGUNTA: {pergunta}" |
|
|
else: |
|
|
prompt_final = pergunta |
|
|
|
|
|
|
|
|
p0 = passo_0_aluno(prompt_final, historico) |
|
|
debug_print("P0 Resultado", p0) |
|
|
|
|
|
p1 = passo_1_triagem(prompt_final, p0) |
|
|
debug_print("P1 Resultado", p1) |
|
|
|
|
|
x1 = passo_x1_lacunas(prompt_final, p1) |
|
|
debug_print("X1 Resultado", x1) |
|
|
|
|
|
x2 = passo_x2_resolver(x1, historico) |
|
|
debug_print("X2 Resultado", x2) |
|
|
|
|
|
p2 = passo_2_cenarios(prompt_final, x2) |
|
|
debug_print("P2 Resultado", p2) |
|
|
|
|
|
|
|
|
if p2.get("decisao") == "parar": |
|
|
resposta = f"⚠️ Não consigo responder com certeza.\nMotivo: {p2.get('motivo_parada')}" |
|
|
else: |
|
|
p7 = passo_7_sintese(p2, prompt_final) |
|
|
debug_print("P7 Resultado", p7) |
|
|
resposta = p7.get("resposta_final", "Erro na síntese P7") |
|
|
|
|
|
|
|
|
novo_hist = historico + [ |
|
|
{"role": "user", "content": pergunta}, |
|
|
{"role": "assistant", "content": resposta} |
|
|
] |
|
|
|
|
|
dna["historico"].append({"turn": dna["meta"]["turnos"], "p": pergunta, "r": resposta[:20]}) |
|
|
dna["meta"]["turnos"] += 1 |
|
|
|
|
|
return resposta, novo_hist, dna |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chat_interface(msg, hist, anexo, dna_json): |
|
|
|
|
|
dna = json.loads(dna_json) if dna_json else {} |
|
|
if hist is None: hist = [] |
|
|
|
|
|
|
|
|
resp, novo_hist, dna_new = processar_pipeline(msg, hist, anexo, dna) |
|
|
|
|
|
return novo_hist, "", json.dumps(dna_new, indent=2), None |
|
|
|
|
|
if __name__ == "__main__": |
|
|
with gr.Blocks(title="Anise Debug", theme=gr.themes.Base()) as demo: |
|
|
gr.Markdown(TITLE) |
|
|
|
|
|
with gr.Row(): |
|
|
chat = gr.Chatbot(height=500, type="messages", label="Chat") |
|
|
dna_box = gr.Code(label="DNA (JSON State)", language="json") |
|
|
|
|
|
with gr.Row(): |
|
|
txt_in = gr.Textbox(label="Pergunta", scale=2) |
|
|
file_in = gr.File(label="Anexo") |
|
|
btn = gr.Button("Enviar", variant="primary") |
|
|
|
|
|
btn.click(chat_interface, [txt_in, chat, file_in, dna_box], [chat, txt_in, dna_box, file_in]) |
|
|
txt_in.submit(chat_interface, [txt_in, chat, file_in, dna_box], [chat, txt_in, dna_box, file_in]) |
|
|
|
|
|
print("🚀 Servidor Iniciado! Verifique este console para logs.") |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |