| | """ |
| | Grafo principal do LangGraph para o AgentGraph |
| | """ |
| | import logging |
| | from typing import Dict, Any, Optional |
| | from langgraph.graph import StateGraph, END |
| | from langgraph.checkpoint.memory import MemorySaver |
| |
|
| | from nodes.agent_node import AgentState, should_refine_response, should_generate_graph |
| | from nodes.csv_processing_node import csv_processing_node |
| | from nodes.database_node import ( |
| | create_database_from_dataframe_node, |
| | load_existing_database_node, |
| | get_database_sample_node |
| | ) |
| | from nodes.query_node import ( |
| | validate_query_input_node, |
| | prepare_query_context_node, |
| | process_user_query_node |
| | ) |
| | from nodes.refinement_node import ( |
| | refine_response_node, |
| | format_final_response_node |
| | ) |
| | from nodes.cache_node import ( |
| | check_cache_node, |
| | cache_response_node, |
| | update_history_node |
| | ) |
| | from nodes.graph_selection_node import graph_selection_node |
| | from nodes.graph_generation_node import graph_generation_node |
| | from nodes.custom_nodes import CustomNodeManager |
| | from agents.sql_agent import SQLAgentManager |
| | from agents.tools import CacheManager |
| | from utils.database import create_sql_database |
| | from utils.config import get_active_csv_path, SQL_DB_PATH |
| | from utils.object_manager import get_object_manager |
| |
|
| | class AgentGraphManager: |
| | """ |
| | Gerenciador principal do grafo LangGraph |
| | """ |
| | |
| | def __init__(self): |
| | self.graph = None |
| | self.app = None |
| | self.cache_manager = CacheManager() |
| | self.custom_node_manager = CustomNodeManager() |
| | self.object_manager = get_object_manager() |
| | self.engine = None |
| | self.sql_agent = None |
| | self.db = None |
| | |
| | self.agent_id = None |
| | self.engine_id = None |
| | self.db_id = None |
| | self.cache_id = None |
| | self._initialize_system() |
| | self._build_graph() |
| | |
| | def _initialize_system(self): |
| | """Inicializa o sistema com banco e agente padrão""" |
| | try: |
| | |
| | |
| | import os |
| | from sqlalchemy import create_engine |
| |
|
| | |
| | if os.path.exists(SQL_DB_PATH): |
| | |
| | self.engine = create_engine(f"sqlite:///{SQL_DB_PATH}") |
| | db = create_sql_database(self.engine) |
| | logging.info("Banco existente carregado") |
| | else: |
| | |
| | csv_path = get_active_csv_path() |
| | self.engine = self._create_engine_sync(csv_path) |
| | db = create_sql_database(self.engine) |
| | logging.info("Novo banco criado") |
| |
|
| | |
| | self.db = db |
| | self.db_id = self.object_manager.store_database(db) |
| |
|
| | |
| | self.sql_agent = SQLAgentManager(db) |
| |
|
| | |
| | self.agent_id = self.object_manager.store_sql_agent(self.sql_agent, self.db_id) |
| | self.engine_id = self.object_manager.store_engine(self.engine) |
| | self.cache_id = self.object_manager.store_cache_manager(self.cache_manager) |
| |
|
| | logging.info("Sistema inicializado com sucesso") |
| |
|
| | except Exception as e: |
| | logging.error(f"Erro ao inicializar sistema: {e}") |
| | raise |
| |
|
| | def _create_engine_sync(self, csv_path: str): |
| | """Cria engine de forma síncrona para inicialização""" |
| | import pandas as pd |
| | from sqlalchemy import create_engine |
| | from sqlalchemy.types import DateTime, Integer, Float |
| |
|
| | |
| | df = pd.read_csv(csv_path, sep=';') |
| |
|
| | |
| | sql_types = {} |
| | for col in df.columns: |
| | if df[col].dtype == 'object': |
| | |
| | try: |
| | pd.to_datetime(df[col], errors='raise') |
| | df[col] = pd.to_datetime(df[col]) |
| | sql_types[col] = DateTime |
| | except: |
| | |
| | pass |
| | elif df[col].dtype in ['int64', 'int32']: |
| | sql_types[col] = Integer |
| | elif df[col].dtype in ['float64', 'float32']: |
| | sql_types[col] = Float |
| |
|
| | |
| | engine = create_engine(f"sqlite:///{SQL_DB_PATH}") |
| | df.to_sql("tabela", engine, index=False, if_exists="replace", dtype=sql_types) |
| |
|
| | logging.info(f"Banco criado com {len(df)} registros") |
| | return engine |
| | |
| | def _build_graph(self): |
| | """Constrói o grafo LangGraph com nova arquitetura""" |
| | try: |
| | |
| | workflow = StateGraph(AgentState) |
| |
|
| | |
| | workflow.add_node("validate_input", validate_query_input_node) |
| | workflow.add_node("check_cache", check_cache_node) |
| | workflow.add_node("prepare_context", prepare_query_context_node) |
| | workflow.add_node("get_db_sample", get_database_sample_node) |
| |
|
| | |
| | workflow.add_node("process_query", process_user_query_node) |
| |
|
| | |
| | workflow.add_node("graph_selection", graph_selection_node) |
| | workflow.add_node("graph_generation", graph_generation_node) |
| |
|
| | |
| | workflow.add_node("refine_response", refine_response_node) |
| | workflow.add_node("format_response", format_final_response_node) |
| |
|
| | |
| | workflow.add_node("cache_response", cache_response_node) |
| | workflow.add_node("update_history", update_history_node) |
| |
|
| | |
| | workflow.set_entry_point("validate_input") |
| |
|
| | |
| | workflow.add_edge("validate_input", "check_cache") |
| |
|
| | |
| | workflow.add_conditional_edges( |
| | "check_cache", |
| | lambda state: "update_history" if state.get("cache_hit") else "prepare_context" |
| | ) |
| |
|
| | workflow.add_edge("prepare_context", "get_db_sample") |
| | workflow.add_edge("get_db_sample", "process_query") |
| |
|
| | |
| | workflow.add_conditional_edges( |
| | "process_query", |
| | should_generate_graph, |
| | { |
| | "graph_selection": "graph_selection", |
| | "refine_response": "refine_response", |
| | "cache_response": "cache_response" |
| | } |
| | ) |
| |
|
| | |
| | workflow.add_edge("graph_selection", "graph_generation") |
| |
|
| | |
| | workflow.add_conditional_edges( |
| | "graph_generation", |
| | should_refine_response, |
| | { |
| | "refine_response": "refine_response", |
| | "cache_response": "cache_response" |
| | } |
| | ) |
| |
|
| | workflow.add_edge("refine_response", "format_response") |
| | workflow.add_edge("format_response", "cache_response") |
| | workflow.add_edge("cache_response", "update_history") |
| | workflow.add_edge("update_history", END) |
| |
|
| | |
| | memory = MemorySaver() |
| | self.app = workflow.compile(checkpointer=memory) |
| |
|
| | logging.info("Grafo LangGraph construído com sucesso") |
| |
|
| | except Exception as e: |
| | logging.error(f"Erro ao construir grafo: {e}") |
| | raise |
| | |
| | async def process_query( |
| | self, |
| | user_input: str, |
| | selected_model: str = "GPT-4o-mini", |
| | advanced_mode: bool = False, |
| | thread_id: str = "default" |
| | ) -> Dict[str, Any]: |
| | """ |
| | Processa uma query do usuário através do grafo |
| | |
| | Args: |
| | user_input: Entrada do usuário |
| | selected_model: Modelo LLM selecionado |
| | advanced_mode: Se deve usar refinamento avançado |
| | thread_id: ID da thread para checkpoint |
| | |
| | Returns: |
| | Resultado do processamento |
| | """ |
| | try: |
| | |
| | current_sql_agent = self.object_manager.get_sql_agent(self.agent_id) |
| | if current_sql_agent and current_sql_agent.model_name != selected_model: |
| | logging.info(f"Recriando agente SQL com modelo {selected_model}") |
| |
|
| | |
| | db_id = self.object_manager.get_db_id_for_agent(self.agent_id) |
| | if db_id: |
| | db = self.object_manager.get_database(db_id) |
| | if db: |
| | new_sql_agent = SQLAgentManager(db, selected_model) |
| | self.agent_id = self.object_manager.store_sql_agent(new_sql_agent, db_id) |
| | logging.info(f"Agente SQL recriado com sucesso para modelo {selected_model}") |
| | else: |
| | logging.error("Banco de dados não encontrado para recriar agente") |
| | else: |
| | logging.error("ID do banco de dados não encontrado para o agente") |
| |
|
| | |
| | initial_state = { |
| | "user_input": user_input, |
| | "selected_model": selected_model, |
| | "response": "", |
| | "advanced_mode": advanced_mode, |
| | "execution_time": 0.0, |
| | "error": None, |
| | "intermediate_steps": [], |
| | "db_sample_dict": {}, |
| | |
| | "agent_id": self.agent_id, |
| | "engine_id": self.engine_id, |
| | "db_id": self.db_id, |
| | "cache_id": self.cache_id, |
| | |
| | "query_type": "sql_query", |
| | "sql_query_extracted": None, |
| | "graph_type": None, |
| | "graph_data": None, |
| | "graph_image_id": None, |
| | "graph_generated": False, |
| | "graph_error": None |
| | } |
| | |
| | |
| | config = {"configurable": {"thread_id": thread_id}} |
| | result = await self.app.ainvoke(initial_state, config=config) |
| | |
| | logging.info(f"Query processada com sucesso: {user_input[:50]}...") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"Erro ao processar query: {e}" |
| | logging.error(error_msg) |
| | return { |
| | "user_input": user_input, |
| | "response": error_msg, |
| | "error": error_msg, |
| | "execution_time": 0.0 |
| | } |
| | |
| | async def handle_csv_upload(self, file_path: str) -> Dict[str, Any]: |
| | """ |
| | Processa upload de CSV usando nova arquitetura de nós |
| | |
| | Args: |
| | file_path: Caminho do arquivo CSV |
| | |
| | Returns: |
| | Resultado do upload |
| | """ |
| | try: |
| | |
| | csv_state = { |
| | "file_path": file_path, |
| | "success": False, |
| | "message": "", |
| | "csv_data_sample": {}, |
| | "column_info": {}, |
| | "processing_stats": {} |
| | } |
| |
|
| | csv_result = await csv_processing_node(csv_state) |
| |
|
| | if not csv_result["success"]: |
| | return csv_result |
| |
|
| | |
| | db_state = csv_result.copy() |
| | db_result = await create_database_from_dataframe_node(db_state) |
| |
|
| | if not db_result["success"]: |
| | return db_result |
| |
|
| | |
| | if db_result["success"]: |
| | |
| | self.engine_id = db_result["engine_id"] |
| | self.db_id = db_result["db_id"] |
| |
|
| | |
| | new_engine = self.object_manager.get_engine(self.engine_id) |
| | new_db = self.object_manager.get_database(self.db_id) |
| | new_sql_agent = SQLAgentManager(new_db) |
| |
|
| | |
| | self.agent_id = self.object_manager.store_sql_agent(new_sql_agent, self.db_id) |
| |
|
| | |
| | cache_manager = self.object_manager.get_cache_manager(self.cache_id) |
| | if cache_manager: |
| | cache_manager.clear_cache() |
| |
|
| | logging.info("[UPLOAD] Sistema atualizado com novo CSV") |
| |
|
| | return db_result |
| |
|
| | except Exception as e: |
| | error_msg = f"❌ Erro no upload de CSV: {e}" |
| | logging.error(error_msg) |
| | return { |
| | "success": False, |
| | "message": error_msg |
| | } |
| | |
| | async def reset_system(self) -> Dict[str, Any]: |
| | """ |
| | Reseta o sistema ao estado inicial |
| | |
| | Returns: |
| | Resultado do reset |
| | """ |
| | try: |
| | |
| | state = { |
| | "success": False, |
| | "message": "", |
| | "engine_id": self.engine_id, |
| | "agent_id": self.agent_id, |
| | "cache_id": self.cache_id |
| | } |
| |
|
| | result = await self.custom_node_manager.execute_node("system_reset", state) |
| |
|
| | |
| | if result.get("success"): |
| | self.engine_id = result.get("engine_id", self.engine_id) |
| | self.agent_id = result.get("agent_id", self.agent_id) |
| | |
| |
|
| | logging.info("[RESET] Sistema resetado com sucesso") |
| |
|
| | return result |
| |
|
| | except Exception as e: |
| | error_msg = f"❌ Erro ao resetar sistema: {e}" |
| | logging.error(error_msg) |
| | return { |
| | "success": False, |
| | "message": error_msg |
| | } |
| | |
| | def toggle_advanced_mode(self, enabled: bool) -> str: |
| | """ |
| | Alterna modo avançado |
| | |
| | Args: |
| | enabled: Se deve habilitar modo avançado |
| | |
| | Returns: |
| | Mensagem de status |
| | """ |
| | message = "Modo avançado ativado." if enabled else "Modo avançado desativado." |
| | logging.info(f"[MODO AVANÇADO] {'Ativado' if enabled else 'Desativado'}") |
| | return message |
| | |
| | def get_history(self) -> list: |
| | """ |
| | Retorna histórico de conversas |
| | |
| | Returns: |
| | Lista com histórico |
| | """ |
| | return self.cache_manager.get_history() |
| | |
| | def clear_cache(self): |
| | """Limpa cache do sistema""" |
| | self.cache_manager.clear_cache() |
| | logging.info("Cache limpo") |
| | |
| | async def get_system_info(self) -> Dict[str, Any]: |
| | """ |
| | Obtém informações do sistema |
| | |
| | Returns: |
| | Informações do sistema |
| | """ |
| | state = { |
| | "engine": self.engine, |
| | "sql_agent": self.sql_agent, |
| | "cache_manager": self.cache_manager |
| | } |
| | |
| | result = await self.custom_node_manager.execute_node("system_info", state) |
| | return result.get("system_info", {}) |
| | |
| | async def validate_system(self) -> Dict[str, Any]: |
| | """ |
| | Valida o estado do sistema |
| | |
| | Returns: |
| | Resultado da validação |
| | """ |
| | state = { |
| | "engine": self.engine, |
| | "sql_agent": self.sql_agent, |
| | "cache_manager": self.cache_manager |
| | } |
| | |
| | result = await self.custom_node_manager.execute_node("system_validation", state) |
| | return result.get("validation", {}) |
| |
|
| | |
| | _graph_manager: Optional[AgentGraphManager] = None |
| |
|
| | def get_graph_manager() -> AgentGraphManager: |
| | """ |
| | Retorna instância singleton do gerenciador de grafo |
| | |
| | Returns: |
| | AgentGraphManager |
| | """ |
| | global _graph_manager |
| | if _graph_manager is None: |
| | _graph_manager = AgentGraphManager() |
| | return _graph_manager |
| |
|
| | async def initialize_graph() -> AgentGraphManager: |
| | """ |
| | Inicializa o grafo principal |
| | |
| | Returns: |
| | AgentGraphManager inicializado |
| | """ |
| | try: |
| | manager = get_graph_manager() |
| | |
| | |
| | validation = await manager.validate_system() |
| | if not validation.get("overall_valid", False): |
| | logging.warning("Sistema não passou na validação completa") |
| | |
| | logging.info("Grafo principal inicializado e validado") |
| | return manager |
| | |
| | except Exception as e: |
| | logging.error(f"Erro ao inicializar grafo: {e}") |
| | raise |
| |
|