DataGraph / utils /object_manager.py
rwayz's picture
Deploy
6d597f0
raw
history blame
5.49 kB
"""
Gerenciador de objetos não-serializáveis para LangGraph
"""
import uuid
from typing import Dict, Any, Optional
import logging
class ObjectManager:
"""
Gerencia objetos não-serializáveis que não podem ser incluídos no estado do LangGraph
"""
def __init__(self):
self._objects: Dict[str, Any] = {}
self._sql_agents: Dict[str, Any] = {}
self._engines: Dict[str, Any] = {}
self._databases: Dict[str, Any] = {}
self._cache_managers: Dict[str, Any] = {}
# Mapeamento para relacionar agentes com seus bancos
self._agent_db_mapping: Dict[str, str] = {}
def store_sql_agent(self, agent: Any, db_id: str = None) -> str:
"""Armazena agente SQL e retorna ID"""
agent_id = str(uuid.uuid4())
self._sql_agents[agent_id] = agent
# Mapeia agente com seu banco se fornecido
if db_id:
self._agent_db_mapping[agent_id] = db_id
logging.info(f"Agente SQL armazenado com ID: {agent_id}")
return agent_id
def get_sql_agent(self, agent_id: str) -> Optional[Any]:
"""Recupera agente SQL pelo ID"""
return self._sql_agents.get(agent_id)
def store_engine(self, engine: Any) -> str:
"""Armazena engine e retorna ID"""
engine_id = str(uuid.uuid4())
self._engines[engine_id] = engine
logging.info(f"Engine armazenada com ID: {engine_id}")
return engine_id
def get_engine(self, engine_id: str) -> Optional[Any]:
"""Recupera engine pelo ID"""
return self._engines.get(engine_id)
def store_database(self, database: Any) -> str:
"""Armazena banco de dados e retorna ID"""
db_id = str(uuid.uuid4())
self._databases[db_id] = database
logging.info(f"Banco de dados armazenado com ID: {db_id}")
return db_id
def get_database(self, db_id: str) -> Optional[Any]:
"""Recupera banco de dados pelo ID"""
return self._databases.get(db_id)
def get_db_id_for_agent(self, agent_id: str) -> Optional[str]:
"""Recupera ID do banco associado ao agente"""
return self._agent_db_mapping.get(agent_id)
def store_cache_manager(self, cache_manager: Any) -> str:
"""Armazena cache manager e retorna ID"""
cache_id = str(uuid.uuid4())
self._cache_managers[cache_id] = cache_manager
logging.info(f"Cache manager armazenado com ID: {cache_id}")
return cache_id
def get_cache_manager(self, cache_id: str) -> Optional[Any]:
"""Recupera cache manager pelo ID"""
return self._cache_managers.get(cache_id)
def store_object(self, obj: Any, category: str = "general") -> str:
"""Armazena objeto genérico e retorna ID"""
obj_id = str(uuid.uuid4())
self._objects[obj_id] = {"object": obj, "category": category}
logging.info(f"Objeto {category} armazenado com ID: {obj_id}")
return obj_id
def get_object(self, obj_id: str) -> Optional[Any]:
"""Recupera objeto pelo ID"""
obj_data = self._objects.get(obj_id)
return obj_data["object"] if obj_data else None
def update_sql_agent(self, agent_id: str, new_agent: Any) -> bool:
"""Atualiza agente SQL existente"""
if agent_id in self._sql_agents:
self._sql_agents[agent_id] = new_agent
logging.info(f"Agente SQL atualizado: {agent_id}")
return True
return False
def update_engine(self, engine_id: str, new_engine: Any) -> bool:
"""Atualiza engine existente"""
if engine_id in self._engines:
self._engines[engine_id] = new_engine
logging.info(f"Engine atualizada: {engine_id}")
return True
return False
def update_cache_manager(self, cache_id: str, new_cache_manager: Any) -> bool:
"""Atualiza cache manager existente"""
if cache_id in self._cache_managers:
self._cache_managers[cache_id] = new_cache_manager
logging.info(f"Cache manager atualizado: {cache_id}")
return True
return False
def clear_all(self):
"""Limpa todos os objetos armazenados"""
self._objects.clear()
self._sql_agents.clear()
self._engines.clear()
self._databases.clear()
self._cache_managers.clear()
self._agent_db_mapping.clear()
logging.info("Todos os objetos foram limpos do gerenciador")
def get_stats(self) -> Dict[str, int]:
"""Retorna estatísticas dos objetos armazenados"""
return {
"sql_agents": len(self._sql_agents),
"engines": len(self._engines),
"databases": len(self._databases),
"cache_managers": len(self._cache_managers),
"general_objects": len(self._objects),
"agent_db_mappings": len(self._agent_db_mapping)
}
# Instância global do gerenciador
_object_manager: Optional[ObjectManager] = None
def get_object_manager() -> ObjectManager:
"""Retorna instância singleton do gerenciador de objetos"""
global _object_manager
if _object_manager is None:
_object_manager = ObjectManager()
return _object_manager
def reset_object_manager():
"""Reseta o gerenciador de objetos"""
global _object_manager
if _object_manager:
_object_manager.clear_all()
_object_manager = ObjectManager()