ArduinoYuri's picture
Create workflow.py
bff91d0 verified
# workflow.py
import pandas as pd
from typing import Any
from pydantic import ConfigDict
from llama_index.core import Settings
from llama_index.core.workflow import Workflow, Event, StartEvent as BaseStartEvent, StopEvent, step
from llama_index.llms.groq import Groq
# Importações de nossos módulos locais
from config import pandas_prompt_str, instruction_str, RESPONSE_SYNTHESIS_PROMPT_STR
from utils import descricao_colunas, limpar_codigo_pandas
# --- MODELOS DE EVENTOS DO WORKFLOW ---
class StartEvent(BaseStartEvent):
query: str
df: pd.DataFrame
model_config = ConfigDict(arbitrary_types_allowed=True)
class CodeEvent(Event):
pandas_prompt: str; query: str; df: pd.DataFrame
model_config = ConfigDict(arbitrary_types_allowed=True)
class OutputEvent(Event):
pandas_code: str; query: str; df: pd.DataFrame
model_config = ConfigDict(arbitrary_types_allowed=True)
class ExecutedEvent(Event):
pandas_code: str; pandas_output: Any; query: str; df: pd.DataFrame
model_config = ConfigDict(arbitrary_types_allowed=True)
# --- CLASSE WORKFLOW PRINCIPAL ---
class PandasWorkflow(Workflow):
@step
async def iniciar_processamento(self, ev: StartEvent) -> CodeEvent:
colunas_info = descricao_colunas(ev.df)
prompt_text = pandas_prompt_str.format(
colunas_detalhes=colunas_info, df_str=ev.df.head(5).to_string(),
instruction_str=instruction_str, query_str=ev.query
)
return CodeEvent(pandas_prompt=prompt_text, query=ev.query, df=ev.df)
@step
async def gerar_codigo(self, ev: CodeEvent) -> OutputEvent:
response = await Settings.llm.acomplete(ev.pandas_prompt)
codigo_limpo = limpar_codigo_pandas(str(response).strip())
print(f"✅ Código gerado: {codigo_limpo}")
return OutputEvent(pandas_code=codigo_limpo, query=ev.query, df=ev.df)
@step
async def executar_codigo(self, ev: OutputEvent) -> ExecutedEvent:
try:
resultado = eval(ev.pandas_code, {"__builtins__": {}}, {"df": ev.df, "pd": pd})
except Exception as e:
resultado = f"Erro ao executar o código: {str(e)}"
return ExecutedEvent(
pandas_code=ev.pandas_code, pandas_output=resultado,
query=ev.query, df=ev.df
)
@step
async def finalizar_e_sintetizar(self, ev: ExecutedEvent) -> StopEvent:
if isinstance(ev.pandas_output, str) and "Erro" in ev.pandas_output:
resposta_final = f"Não foi possível processar: {ev.pandas_output}"
else:
prompt = RESPONSE_SYNTHESIS_PROMPT_STR.format(
query_str=ev.query, pandas_instructions=ev.pandas_code,
pandas_output=str(ev.pandas_output)
)
response = await Settings.llm.acomplete(prompt)
resposta_final = str(response).strip()
return StopEvent(result={"resposta_final": resposta_final})
# --- FUNÇÃO DE EXECUÇÃO ---
async def executar_consulta(query: str, df_local: pd.DataFrame):
if df_local is None: return {"resposta_final": "Erro: DataFrame não carregado."}
if not query.strip(): return {"resposta_final": "Erro: Consulta vazia."}
try:
wf = PandasWorkflow()
resultado_final = await wf.run(query=query, df=df_local)
if not isinstance(resultado_final, dict):
raise TypeError(f"Workflow retornou tipo inesperado: {type(resultado_final)}")
return resultado_final
except Exception as e:
import traceback; traceback.print_exc()
return {"resposta_final": f"Erro crítico no workflow: {str(e)}"}