|
|
|
|
|
|
|
|
import pandas as pd |
|
|
from typing import Any |
|
|
from pydantic import ConfigDict |
|
|
|
|
|
from llama_index.core import Settings |
|
|
from llama_index.core.workflow import Workflow, Event, StartEvent as BaseStartEvent, StopEvent, step |
|
|
from llama_index.llms.groq import Groq |
|
|
|
|
|
|
|
|
from config import pandas_prompt_str, instruction_str, RESPONSE_SYNTHESIS_PROMPT_STR |
|
|
from utils import descricao_colunas, limpar_codigo_pandas |
|
|
|
|
|
|
|
|
class StartEvent(BaseStartEvent): |
|
|
query: str |
|
|
df: pd.DataFrame |
|
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
|
|
class CodeEvent(Event): |
|
|
pandas_prompt: str; query: str; df: pd.DataFrame |
|
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
|
|
class OutputEvent(Event): |
|
|
pandas_code: str; query: str; df: pd.DataFrame |
|
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
|
|
class ExecutedEvent(Event): |
|
|
pandas_code: str; pandas_output: Any; query: str; df: pd.DataFrame |
|
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
|
|
|
|
|
class PandasWorkflow(Workflow): |
|
|
@step |
|
|
async def iniciar_processamento(self, ev: StartEvent) -> CodeEvent: |
|
|
colunas_info = descricao_colunas(ev.df) |
|
|
prompt_text = pandas_prompt_str.format( |
|
|
colunas_detalhes=colunas_info, df_str=ev.df.head(5).to_string(), |
|
|
instruction_str=instruction_str, query_str=ev.query |
|
|
) |
|
|
return CodeEvent(pandas_prompt=prompt_text, query=ev.query, df=ev.df) |
|
|
|
|
|
@step |
|
|
async def gerar_codigo(self, ev: CodeEvent) -> OutputEvent: |
|
|
response = await Settings.llm.acomplete(ev.pandas_prompt) |
|
|
codigo_limpo = limpar_codigo_pandas(str(response).strip()) |
|
|
print(f"✅ Código gerado: {codigo_limpo}") |
|
|
return OutputEvent(pandas_code=codigo_limpo, query=ev.query, df=ev.df) |
|
|
|
|
|
@step |
|
|
async def executar_codigo(self, ev: OutputEvent) -> ExecutedEvent: |
|
|
try: |
|
|
resultado = eval(ev.pandas_code, {"__builtins__": {}}, {"df": ev.df, "pd": pd}) |
|
|
except Exception as e: |
|
|
resultado = f"Erro ao executar o código: {str(e)}" |
|
|
return ExecutedEvent( |
|
|
pandas_code=ev.pandas_code, pandas_output=resultado, |
|
|
query=ev.query, df=ev.df |
|
|
) |
|
|
|
|
|
@step |
|
|
async def finalizar_e_sintetizar(self, ev: ExecutedEvent) -> StopEvent: |
|
|
if isinstance(ev.pandas_output, str) and "Erro" in ev.pandas_output: |
|
|
resposta_final = f"Não foi possível processar: {ev.pandas_output}" |
|
|
else: |
|
|
prompt = RESPONSE_SYNTHESIS_PROMPT_STR.format( |
|
|
query_str=ev.query, pandas_instructions=ev.pandas_code, |
|
|
pandas_output=str(ev.pandas_output) |
|
|
) |
|
|
response = await Settings.llm.acomplete(prompt) |
|
|
resposta_final = str(response).strip() |
|
|
return StopEvent(result={"resposta_final": resposta_final}) |
|
|
|
|
|
|
|
|
async def executar_consulta(query: str, df_local: pd.DataFrame): |
|
|
if df_local is None: return {"resposta_final": "Erro: DataFrame não carregado."} |
|
|
if not query.strip(): return {"resposta_final": "Erro: Consulta vazia."} |
|
|
try: |
|
|
wf = PandasWorkflow() |
|
|
resultado_final = await wf.run(query=query, df=df_local) |
|
|
if not isinstance(resultado_final, dict): |
|
|
raise TypeError(f"Workflow retornou tipo inesperado: {type(resultado_final)}") |
|
|
return resultado_final |
|
|
except Exception as e: |
|
|
import traceback; traceback.print_exc() |
|
|
return {"resposta_final": f"Erro crítico no workflow: {str(e)}"} |