Spaces:
Sleeping
Sleeping
File size: 7,328 Bytes
e982206 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
"""
Gerenciador de objetos não-serializáveis para LangGraph
"""
import uuid
from typing import Dict, Any, Optional
import logging
class ObjectManager:
"""
Gerencia objetos não-serializáveis que não podem ser incluídos no estado do LangGraph
"""
def __init__(self):
self._objects: Dict[str, Any] = {}
self._sql_agents: Dict[str, Any] = {}
self._processing_agents: Dict[str, Any] = {}
self._engines: Dict[str, Any] = {}
self._databases: Dict[str, Any] = {}
self._cache_managers: Dict[str, Any] = {}
# Mapeamento para relacionar agentes com seus bancos
self._agent_db_mapping: Dict[str, str] = {}
# Metadados de conexões (CSV/PostgreSQL)
self._connection_metadata: Dict[str, Dict[str, Any]] = {}
def store_sql_agent(self, agent: Any, db_id: str = None) -> str:
"""Armazena agente SQL e retorna ID"""
agent_id = str(uuid.uuid4())
self._sql_agents[agent_id] = agent
# Mapeia agente com seu banco se fornecido
if db_id:
self._agent_db_mapping[agent_id] = db_id
logging.info(f"Agente SQL armazenado com ID: {agent_id}")
return agent_id
def get_sql_agent(self, agent_id: str) -> Optional[Any]:
"""Recupera agente SQL pelo ID"""
return self._sql_agents.get(agent_id)
def store_processing_agent(self, agent: Any) -> str:
"""Armazena Processing Agent e retorna ID"""
agent_id = str(uuid.uuid4())
self._processing_agents[agent_id] = agent
logging.info(f"Processing Agent armazenado com ID: {agent_id}")
return agent_id
def get_processing_agent(self, agent_id: str) -> Optional[Any]:
"""Recupera Processing Agent pelo ID"""
return self._processing_agents.get(agent_id)
def store_engine(self, engine: Any) -> str:
"""Armazena engine e retorna ID"""
engine_id = str(uuid.uuid4())
self._engines[engine_id] = engine
logging.info(f"Engine armazenada com ID: {engine_id}")
return engine_id
def get_engine(self, engine_id: str) -> Optional[Any]:
"""Recupera engine pelo ID"""
return self._engines.get(engine_id)
def store_database(self, database: Any) -> str:
"""Armazena banco de dados e retorna ID"""
db_id = str(uuid.uuid4())
self._databases[db_id] = database
logging.info(f"Banco de dados armazenado com ID: {db_id}")
return db_id
def get_database(self, db_id: str) -> Optional[Any]:
"""Recupera banco de dados pelo ID"""
return self._databases.get(db_id)
def get_db_id_for_agent(self, agent_id: str) -> Optional[str]:
"""Recupera ID do banco associado ao agente"""
return self._agent_db_mapping.get(agent_id)
def store_cache_manager(self, cache_manager: Any) -> str:
"""Armazena cache manager e retorna ID"""
cache_id = str(uuid.uuid4())
self._cache_managers[cache_id] = cache_manager
logging.info(f"Cache manager armazenado com ID: {cache_id}")
return cache_id
def get_cache_manager(self, cache_id: str) -> Optional[Any]:
"""Recupera cache manager pelo ID"""
return self._cache_managers.get(cache_id)
def store_object(self, obj: Any, category: str = "general") -> str:
"""Armazena objeto genérico e retorna ID"""
obj_id = str(uuid.uuid4())
self._objects[obj_id] = {"object": obj, "category": category}
logging.info(f"Objeto {category} armazenado com ID: {obj_id}")
return obj_id
def get_object(self, obj_id: str) -> Optional[Any]:
"""Recupera objeto pelo ID"""
obj_data = self._objects.get(obj_id)
return obj_data["object"] if obj_data else None
def update_sql_agent(self, agent_id: str, new_agent: Any) -> bool:
"""Atualiza agente SQL existente"""
if agent_id in self._sql_agents:
self._sql_agents[agent_id] = new_agent
logging.info(f"Agente SQL atualizado: {agent_id}")
return True
return False
def update_engine(self, engine_id: str, new_engine: Any) -> bool:
"""Atualiza engine existente"""
if engine_id in self._engines:
self._engines[engine_id] = new_engine
logging.info(f"Engine atualizada: {engine_id}")
return True
return False
def update_cache_manager(self, cache_id: str, new_cache_manager: Any) -> bool:
"""Atualiza cache manager existente"""
if cache_id in self._cache_managers:
self._cache_managers[cache_id] = new_cache_manager
logging.info(f"Cache manager atualizado: {cache_id}")
return True
return False
def clear_all(self):
"""Limpa todos os objetos armazenados"""
self._objects.clear()
self._sql_agents.clear()
self._engines.clear()
self._databases.clear()
self._cache_managers.clear()
self._agent_db_mapping.clear()
self._connection_metadata.clear()
logging.info("Todos os objetos foram limpos do gerenciador")
def store_connection_metadata(self, connection_id: str, metadata: Dict[str, Any]) -> str:
"""
Armazena metadados de conexão
Args:
connection_id: ID da conexão
metadata: Metadados da conexão
Returns:
ID dos metadados armazenados
"""
self._connection_metadata[connection_id] = metadata
logging.info(f"Metadados de conexão armazenados com ID: {connection_id}")
return connection_id
def get_connection_metadata(self, connection_id: str) -> Optional[Dict[str, Any]]:
"""
Recupera metadados de conexão pelo ID
Args:
connection_id: ID da conexão
Returns:
Metadados da conexão ou None se não encontrado
"""
return self._connection_metadata.get(connection_id)
def get_all_connection_metadata(self) -> Dict[str, Dict[str, Any]]:
"""
Retorna todos os metadados de conexão
Returns:
Dicionário com todos os metadados
"""
return self._connection_metadata.copy()
def get_stats(self) -> Dict[str, int]:
"""Retorna estatísticas dos objetos armazenados"""
return {
"sql_agents": len(self._sql_agents),
"engines": len(self._engines),
"databases": len(self._databases),
"cache_managers": len(self._cache_managers),
"general_objects": len(self._objects),
"agent_db_mappings": len(self._agent_db_mapping),
"connection_metadata": len(self._connection_metadata)
}
# Instância global do gerenciador
_object_manager: Optional[ObjectManager] = None
def get_object_manager() -> ObjectManager:
"""Retorna instância singleton do gerenciador de objetos"""
global _object_manager
if _object_manager is None:
_object_manager = ObjectManager()
return _object_manager
def reset_object_manager():
"""Reseta o gerenciador de objetos"""
global _object_manager
if _object_manager:
_object_manager.clear_all()
_object_manager = ObjectManager()
|