# workflow.py import pandas as pd from typing import Any from pydantic import ConfigDict from llama_index.core import Settings from llama_index.core.workflow import Workflow, Event, StartEvent as BaseStartEvent, StopEvent, step from llama_index.llms.groq import Groq # Importações de nossos módulos locais from config import pandas_prompt_str, instruction_str, RESPONSE_SYNTHESIS_PROMPT_STR from utils import descricao_colunas, limpar_codigo_pandas # --- MODELOS DE EVENTOS DO WORKFLOW --- class StartEvent(BaseStartEvent): query: str df: pd.DataFrame model_config = ConfigDict(arbitrary_types_allowed=True) class CodeEvent(Event): pandas_prompt: str; query: str; df: pd.DataFrame model_config = ConfigDict(arbitrary_types_allowed=True) class OutputEvent(Event): pandas_code: str; query: str; df: pd.DataFrame model_config = ConfigDict(arbitrary_types_allowed=True) class ExecutedEvent(Event): pandas_code: str; pandas_output: Any; query: str; df: pd.DataFrame model_config = ConfigDict(arbitrary_types_allowed=True) # --- CLASSE WORKFLOW PRINCIPAL --- class PandasWorkflow(Workflow): @step async def iniciar_processamento(self, ev: StartEvent) -> CodeEvent: colunas_info = descricao_colunas(ev.df) prompt_text = pandas_prompt_str.format( colunas_detalhes=colunas_info, df_str=ev.df.head(5).to_string(), instruction_str=instruction_str, query_str=ev.query ) return CodeEvent(pandas_prompt=prompt_text, query=ev.query, df=ev.df) @step async def gerar_codigo(self, ev: CodeEvent) -> OutputEvent: response = await Settings.llm.acomplete(ev.pandas_prompt) codigo_limpo = limpar_codigo_pandas(str(response).strip()) print(f"✅ Código gerado: {codigo_limpo}") return OutputEvent(pandas_code=codigo_limpo, query=ev.query, df=ev.df) @step async def executar_codigo(self, ev: OutputEvent) -> ExecutedEvent: try: resultado = eval(ev.pandas_code, {"__builtins__": {}}, {"df": ev.df, "pd": pd}) except Exception as e: resultado = f"Erro ao executar o código: {str(e)}" return ExecutedEvent( pandas_code=ev.pandas_code, pandas_output=resultado, query=ev.query, df=ev.df ) @step async def finalizar_e_sintetizar(self, ev: ExecutedEvent) -> StopEvent: if isinstance(ev.pandas_output, str) and "Erro" in ev.pandas_output: resposta_final = f"Não foi possível processar: {ev.pandas_output}" else: prompt = RESPONSE_SYNTHESIS_PROMPT_STR.format( query_str=ev.query, pandas_instructions=ev.pandas_code, pandas_output=str(ev.pandas_output) ) response = await Settings.llm.acomplete(prompt) resposta_final = str(response).strip() return StopEvent(result={"resposta_final": resposta_final}) # --- FUNÇÃO DE EXECUÇÃO --- async def executar_consulta(query: str, df_local: pd.DataFrame): if df_local is None: return {"resposta_final": "Erro: DataFrame não carregado."} if not query.strip(): return {"resposta_final": "Erro: Consulta vazia."} try: wf = PandasWorkflow() resultado_final = await wf.run(query=query, df=df_local) if not isinstance(resultado_final, dict): raise TypeError(f"Workflow retornou tipo inesperado: {type(resultado_final)}") return resultado_final except Exception as e: import traceback; traceback.print_exc() return {"resposta_final": f"Erro crítico no workflow: {str(e)}"}