RobotPai / src /services /integration_manager.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Integration Manager for AI Agent
Coordinates initialization and management of all integrations including
Supabase, LangChain, CrewAI, LlamaIndex, and GAIA components.
"""
import logging
from typing import Dict, Any, Optional, List
from datetime import datetime
import asyncio
try:
from .config.integrations import integration_config
except ImportError:
try:
from config.integrations import integration_config
except ImportError:
# Fallback for when running as standalone script
integration_config = None
logging.warning("Could not import integration_config - using defaults")
logger = logging.getLogger(__name__)
class IntegrationManager:
"""Manages initialization and coordination of all integrations"""
def __init__(self):
self.config = integration_config
self.components = {}
self._initialized = False
async def initialize_all(self):
"""Initialize all components in correct order"""
if self._initialized:
logger.info("Integration manager already initialized")
return self.components
# Validate config first
is_valid, issues = self.config.validate()
if not is_valid:
raise ValueError(f"Invalid configuration: {issues}")
logger.info("Starting integration initialization...")
try:
# 1. Initialize Supabase if configured
if self.config.supabase.is_configured():
logger.info("Initializing Supabase components...")
from .database_enhanced import initialize_supabase_enhanced
self.components['supabase'] = await initialize_supabase_enhanced()
logger.info("✅ Supabase components initialized")
else:
logger.info("⚠️ Supabase not configured, skipping")
# 2. Initialize LlamaIndex
if self.config.llamaindex.enable_hierarchical_indexing:
logger.info("Initializing LlamaIndex components...")
from .llamaindex_enhanced import create_gaia_knowledge_base
self.components['llamaindex'] = create_gaia_knowledge_base()
logger.info("✅ LlamaIndex components initialized")
else:
logger.info("⚠️ LlamaIndex disabled, skipping")
# 3. Initialize LangChain
if self.config.langchain.enable_memory:
logger.info("Initializing LangChain components...")
try:
from .langchain_enhanced import initialize_enhanced_agent
tools = self._get_available_tools()
self.components['langchain'] = initialize_enhanced_agent(tools)
logger.info("✅ LangChain components initialized")
except ImportError as e:
logger.warning(f"⚠️ LangChain not available: {e}")
else:
logger.info("⚠️ LangChain disabled, skipping")
# 4. Initialize CrewAI
if self.config.crewai.enable_multi_agent:
logger.info("Initializing CrewAI components...")
try:
from .crew_enhanced import initialize_crew_enhanced
tools = self._get_available_tools()
self.components['crewai'] = initialize_crew_enhanced(tools)
logger.info("✅ CrewAI components initialized")
except ImportError as e:
logger.warning(f"⚠️ CrewAI not available: {e}")
else:
logger.info("⚠️ CrewAI disabled, skipping")
self._initialized = True
logger.info("🎉 All integrations initialized successfully")
return self.components
except Exception as e:
logger.error(f"❌ Integration initialization failed: {e}")
raise
def _get_available_tools(self) -> List[Any]:
"""Get list of available tools for agents"""
tools = []
# Add basic tools
try:
from .tools import (
SemanticSearchTool, PythonInterpreterTool,
FileReaderTool, WeatherTool
)
tools.extend([
SemanticSearchTool(),
PythonInterpreterTool(),
FileReaderTool(),
WeatherTool()
])
except ImportError as e:
logger.warning(f"Could not load basic tools: {e}")
# Add GAIA tools if enabled
if self.config.gaia.enable_gaia_tools:
try:
from .gaia.tools.gaia_specialized import (
GAIAVideoAnalyzer, GAIAChessLogicTool,
GAIAImageAnalyzer, GAIAMusicAnalyzer
)
tools.extend([
GAIAVideoAnalyzer(),
GAIAChessLogicTool(),
GAIAImageAnalyzer(),
GAIAMusicAnalyzer()
])
except ImportError as e:
logger.warning(f"Could not load GAIA tools: {e}")
return tools
def get_component(self, name: str) -> Optional[Any]:
"""Get specific component by name"""
return self.components.get(name)
def get_status(self) -> Dict[str, Any]:
"""Get status of all components"""
status = {
"timestamp": datetime.now().isoformat(),
"initialized": self._initialized,
"config_valid": self.config.validate()[0],
"components": {}
}
for name, component in self.components.items():
status["components"][name] = {
"available": component is not None,
"type": type(component).__name__ if component else None
}
return status
async def shutdown(self):
"""Gracefully shutdown all components"""
logger.info("Shutting down integrations...")
# Close Supabase connections
if 'supabase' in self.components:
try:
connection_pool = self.components['supabase'].get('connection_pool')
if connection_pool:
await connection_pool.close()
logger.info("✅ Supabase connections closed")
except Exception as e:
logger.error(f"❌ Error closing Supabase connections: {e}")
# Clear components
self.components.clear()
self._initialized = False
logger.info("✅ Integration manager shutdown complete")
# Global integration manager instance
integration_manager = IntegrationManager()
async def get_integration_manager() -> IntegrationManager:
"""Get or initialize the global integration manager"""
if not integration_manager._initialized:
await integration_manager.initialize_all()
return integration_manager