Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import base64 | |
| import re | |
| import warnings | |
| from datetime import datetime | |
| from typing import Dict, List, Tuple | |
| import gradio as gr | |
| import google.generativeai as genai | |
| # ============================================================================ | |
| # CONFIGURAÇÃO | |
| # ============================================================================ | |
| # Filtra avisos do gRPC para limpar o console | |
| warnings.filterwarnings("ignore", category=FutureWarning, module="google.api_core") | |
| # API KEY | |
| API_KEY = os.getenv("GOOGLE_API_KEY", "") | |
| if not API_KEY: | |
| print("❌ ERRO CRÍTICO: GOOGLE_API_KEY não encontrada nas variáveis de ambiente.") | |
| print("Defina com: export GOOGLE_API_KEY='sua_chave'") | |
| # Não paramos o script, mas vai dar erro na chamada | |
| else: | |
| print(f"✅ API Key carregada (termina em ...{API_KEY[-4:]})") | |
| genai.configure(api_key=API_KEY) | |
| model = genai.GenerativeModel("gemini-1.5-flash") | |
| TITLE = "# 🛠️ Anise v10.2 DEBUG MODE\n**Console mostra SAÍDA BRUTA**" | |
| # ============================================================================ | |
| # SISTEMA DE LOGS & DEBUG | |
| # ============================================================================ | |
| def debug_print(titulo: str, conteudo: any): | |
| """Imprime no console com formatação visível para debug""" | |
| print(f"\n{'='*60}") | |
| print(f"🐛 DEBUG: {titulo}") | |
| print(f"{'-'*60}") | |
| if isinstance(conteudo, (dict, list)): | |
| print(json.dumps(conteudo, indent=2, ensure_ascii=False)) | |
| else: | |
| print(str(conteudo)) | |
| print(f"{'='*60}\n") | |
| # ============================================================================ | |
| # HELPERS (SEM TRY/EXCEPT PARA DEBUG) | |
| # ============================================================================ | |
| def processar_anexo(arquivo) -> Tuple[str, str]: | |
| if arquivo is None: | |
| return "", "nenhum" | |
| caminho = str(arquivo) | |
| print(f"📂 Processando arquivo: {caminho}") | |
| if caminho.lower().endswith('.pdf'): | |
| import PyPDF2 # Se falhar aqui, queremos ver o erro de importação | |
| with open(caminho, 'rb') as f: | |
| leitor = PyPDF2.PdfReader(f) | |
| texto = "".join(pagina.extract_text() + "\n" for pagina in leitor.pages[:5]) | |
| print(f"📄 PDF extraído: {len(texto)} caracteres") | |
| return texto[:5000], "pdf" | |
| elif any(caminho.lower().endswith(ext) for ext in ['.png','.jpg','.jpeg','.gif','.webp']): | |
| with open(caminho, 'rb') as f: | |
| encoded = base64.b64encode(f.read()).decode() | |
| print(f"🖼️ Imagem codificada: {len(encoded)} bytes") | |
| return encoded, "imagem" | |
| return "", "nao_suportado" | |
| def limpar_json_raw(texto: str) -> str: | |
| """Limpa markdown ```json ... ``` para tentar parsear""" | |
| texto = re.sub(r'^```json\s*', '', texto, flags=re.MULTILINE) | |
| texto = re.sub(r'^```\s*', '', texto, flags=re.MULTILINE) | |
| texto = re.sub(r'```$', '', texto, flags=re.MULTILINE) | |
| return texto.strip() | |
| def chamar_gemini_json(prompt_base: str, etapa: str, temperatura=0.2, max_tokens=2000) -> Dict: | |
| """ | |
| Chama o Gemini e imprime o RAW OUTPUT. | |
| Se falhar o JSON, explode o erro para análise. | |
| """ | |
| full_prompt = f"""{prompt_base} | |
| --- | |
| **INSTRUÇÃO DE SISTEMA OBRIGATÓRIA:** | |
| 1. Responda APENAS com um JSON válido. | |
| 2. NÃO use blocos de código markdown (```json). | |
| 3. NÃO escreva texto antes ou depois do JSON. | |
| """ | |
| # 1. Chamada API | |
| print(f"📡 Enviando requisição para {etapa}...") | |
| response = model.generate_content( | |
| full_prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=temperatura, | |
| max_output_tokens=max_tokens, | |
| # response_mime_type="application/json" # Opcional: força modo JSON estrito do 1.5 | |
| ) | |
| ) | |
| raw_text = response.text | |
| # 2. PRINT DA SAÍDA BRUTA (O que você pediu) | |
| print(f"\n🛑 SAÍDA BRUTA GEMINI [{etapa}]:") | |
| print(f">>> INICIO RAW <<<\n{raw_text}\n>>> FIM RAW <<<") | |
| # 3. Tentativa de Parse (sem try/except silencioso) | |
| texto_limpo = limpar_json_raw(raw_text) | |
| if not texto_limpo: | |
| raise ValueError(f"[{etapa}] Gemini retornou texto vazio!") | |
| try: | |
| dados_json = json.loads(texto_limpo) | |
| return dados_json | |
| except json.JSONDecodeError as e: | |
| print(f"❌ FALHA NO PARSE JSON [{etapa}]") | |
| print(f"Erro: {e}") | |
| print("Tentando corrigir manualmente strings...") | |
| # Fallback simples se for apenas texto solto (opcional) | |
| raise e # Relança o erro para parar o script e ver o traceback | |
| def historico_compacto(historico: List) -> str: | |
| if not historico: return "Nenhum." | |
| return "\n".join([f"{m['role']}: {str(m['content'])[:100]}..." for m in historico[-4:]]) | |
| def criar_dna() -> Dict: | |
| return {"historico": [], "meta": {"turnos": 0}} | |
| # ============================================================================ | |
| # PIPELINE - PASSOS (COM LOGS EXPLÍCITOS) | |
| # ============================================================================ | |
| def passo_0_aluno(pergunta: str, historico: List) -> Dict: | |
| prompt = f"""ETAPA: P0-INTENÇÃO | |
| HISTÓRICO: {historico_compacto(historico)} | |
| PERGUNTA: {pergunta} | |
| Analise a intenção do usuário. | |
| JSON: {{"relacao": "continua|nova", "intent": "resumo"}}""" | |
| return chamar_gemini_json(prompt, "P0") | |
| def passo_1_triagem(pergunta: str, p0: Dict) -> Dict: | |
| prompt = f"""ETAPA: P1-TRIAGEM | |
| P0: {json.dumps(p0)} | |
| PERGUNTA: {pergunta} | |
| Classifique a pergunta. | |
| JSON: {{"tipo": "factual|analitica", "complexidade": "alta|baixa"}}""" | |
| return chamar_gemini_json(prompt, "P1") | |
| def passo_x1_lacunas(pergunta: str, p1: Dict) -> Dict: | |
| prompt = f"""ETAPA: X1-LACUNAS | |
| P1: {json.dumps(p1)} | |
| PERGUNTA: {pergunta} | |
| Quais perguntas o assistente deve fazer a si mesmo para responder isso perfeitamente? | |
| JSON: {{"perguntas_internas": ["pergunta1", "pergunta2"]}}""" | |
| return chamar_gemini_json(prompt, "X1") | |
| def passo_x2_resolver(x1: Dict, historico: List) -> Dict: | |
| perguntas = x1.get("perguntas_internas", []) | |
| if not perguntas: return {"respostas": []} | |
| prompt = f"""ETAPA: X2-RESOLUÇÃO | |
| PERGUNTAS: {json.dumps(perguntas)} | |
| CONTEXTO: {historico_compacto(historico)} | |
| Responda as perguntas internas. | |
| JSON: {{"respostas": [{{"p": "pergunta", "r": "resposta", "confianca": "alta|baixa"}}]}}""" | |
| return chamar_gemini_json(prompt, "X2") | |
| def passo_2_cenarios(pergunta: str, x2: Dict) -> Dict: | |
| prompt = f"""ETAPA: P2-CENÁRIOS | |
| DADOS X2: {json.dumps(x2)} | |
| PERGUNTA ORIGINAL: {pergunta} | |
| Crie cenários de resposta ou decida parar se faltar informação crítica. | |
| JSON: {{"decisao": "continuar|parar", "cenarios": ["C1: ...", "C2: ..."], "motivo_parada": "..."}}""" | |
| return chamar_gemini_json(prompt, "P2") | |
| def passo_7_sintese(p2: Dict, pergunta: str) -> Dict: | |
| prompt = f"""ETAPA: P7-FINAL | |
| CENÁRIOS: {json.dumps(p2)} | |
| PERGUNTA: {pergunta} | |
| Escreva a resposta final para o usuário. | |
| JSON: {{"resposta_final": "texto aqui"}}""" | |
| return chamar_gemini_json(prompt, "P7", temperatura=0.7) | |
| # ============================================================================ | |
| # ORQUESTADOR (SEM REDE DE SEGURANÇA) | |
| # ============================================================================ | |
| def processar_pipeline(pergunta: str, historico: List, arquivo_anexo=None, dna=None) -> Tuple[str, List, Dict]: | |
| # Debug inicial | |
| debug_print("INICIO PIPELINE", f"Pergunta: {pergunta}\nAnexo: {arquivo_anexo}") | |
| if dna is None: dna = criar_dna() | |
| # 1. Processar Anexo | |
| conteudo_anexo, tipo_anexo = processar_anexo(arquivo_anexo) | |
| # 2. Montar Prompt com Contexto Visual/Documental | |
| if tipo_anexo == "pdf": | |
| prompt_final = f"CONTEXTO DO PDF:\n{conteudo_anexo}\n---\nPERGUNTA: {pergunta}" | |
| elif tipo_anexo == "imagem": | |
| prompt_final = f"[IMAGEM BASE64: {conteudo_anexo}]\nAnalise a imagem.\nPERGUNTA: {pergunta}" | |
| else: | |
| prompt_final = pergunta | |
| # 3. Execução Linear (Se falhar, o erro aparece no console) | |
| p0 = passo_0_aluno(prompt_final, historico) | |
| debug_print("P0 Resultado", p0) | |
| p1 = passo_1_triagem(prompt_final, p0) | |
| debug_print("P1 Resultado", p1) | |
| x1 = passo_x1_lacunas(prompt_final, p1) | |
| debug_print("X1 Resultado", x1) | |
| x2 = passo_x2_resolver(x1, historico) | |
| debug_print("X2 Resultado", x2) | |
| p2 = passo_2_cenarios(prompt_final, x2) | |
| debug_print("P2 Resultado", p2) | |
| # Lógica de Parada | |
| if p2.get("decisao") == "parar": | |
| resposta = f"⚠️ Não consigo responder com certeza.\nMotivo: {p2.get('motivo_parada')}" | |
| else: | |
| p7 = passo_7_sintese(p2, prompt_final) | |
| debug_print("P7 Resultado", p7) | |
| resposta = p7.get("resposta_final", "Erro na síntese P7") | |
| # Atualiza Histórico | |
| novo_hist = historico + [ | |
| {"role": "user", "content": pergunta}, | |
| {"role": "assistant", "content": resposta} | |
| ] | |
| dna["historico"].append({"turn": dna["meta"]["turnos"], "p": pergunta, "r": resposta[:20]}) | |
| dna["meta"]["turnos"] += 1 | |
| return resposta, novo_hist, dna | |
| # ============================================================================ | |
| # INTERFACE | |
| # ============================================================================ | |
| def chat_interface(msg, hist, anexo, dna_json): | |
| # Converte string JSON do DNA de volta para dict | |
| dna = json.loads(dna_json) if dna_json else {} | |
| if hist is None: hist = [] | |
| # Chama o pipeline (Erros vão aparecer no console do servidor) | |
| resp, novo_hist, dna_new = processar_pipeline(msg, hist, anexo, dna) | |
| return novo_hist, "", json.dumps(dna_new, indent=2), None | |
| if __name__ == "__main__": | |
| with gr.Blocks(title="Anise Debug", theme=gr.themes.Base()) as demo: | |
| gr.Markdown(TITLE) | |
| with gr.Row(): | |
| chat = gr.Chatbot(height=500, type="messages", label="Chat") | |
| dna_box = gr.Code(label="DNA (JSON State)", language="json") | |
| with gr.Row(): | |
| txt_in = gr.Textbox(label="Pergunta", scale=2) | |
| file_in = gr.File(label="Anexo") | |
| btn = gr.Button("Enviar", variant="primary") | |
| btn.click(chat_interface, [txt_in, chat, file_in, dna_box], [chat, txt_in, dna_box, file_in]) | |
| txt_in.submit(chat_interface, [txt_in, chat, file_in, dna_box], [chat, txt_in, dna_box, file_in]) | |
| print("🚀 Servidor Iniciado! Verifique este console para logs.") | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |