Spaces:
Build error
Build error
File size: 7,112 Bytes
8a682b5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | """
Integration Manager for AI Agent
Coordinates initialization and management of all integrations including
Supabase, LangChain, CrewAI, LlamaIndex, and GAIA components.
"""
import logging
from typing import Dict, Any, Optional, List
from datetime import datetime
import asyncio
try:
from .config.integrations import integration_config
except ImportError:
try:
from config.integrations import integration_config
except ImportError:
# Fallback for when running as standalone script
integration_config = None
logging.warning("Could not import integration_config - using defaults")
logger = logging.getLogger(__name__)
class IntegrationManager:
"""Manages initialization and coordination of all integrations"""
def __init__(self):
self.config = integration_config
self.components = {}
self._initialized = False
async def initialize_all(self):
"""Initialize all components in correct order"""
if self._initialized:
logger.info("Integration manager already initialized")
return self.components
# Validate config first
is_valid, issues = self.config.validate()
if not is_valid:
raise ValueError(f"Invalid configuration: {issues}")
logger.info("Starting integration initialization...")
try:
# 1. Initialize Supabase if configured
if self.config.supabase.is_configured():
logger.info("Initializing Supabase components...")
from .database_enhanced import initialize_supabase_enhanced
self.components['supabase'] = await initialize_supabase_enhanced()
logger.info("✅ Supabase components initialized")
else:
logger.info("⚠️ Supabase not configured, skipping")
# 2. Initialize LlamaIndex
if self.config.llamaindex.enable_hierarchical_indexing:
logger.info("Initializing LlamaIndex components...")
from .llamaindex_enhanced import create_gaia_knowledge_base
self.components['llamaindex'] = create_gaia_knowledge_base()
logger.info("✅ LlamaIndex components initialized")
else:
logger.info("⚠️ LlamaIndex disabled, skipping")
# 3. Initialize LangChain
if self.config.langchain.enable_memory:
logger.info("Initializing LangChain components...")
try:
from .langchain_enhanced import initialize_enhanced_agent
tools = self._get_available_tools()
self.components['langchain'] = initialize_enhanced_agent(tools)
logger.info("✅ LangChain components initialized")
except ImportError as e:
logger.warning(f"⚠️ LangChain not available: {e}")
else:
logger.info("⚠️ LangChain disabled, skipping")
# 4. Initialize CrewAI
if self.config.crewai.enable_multi_agent:
logger.info("Initializing CrewAI components...")
try:
from .crew_enhanced import initialize_crew_enhanced
tools = self._get_available_tools()
self.components['crewai'] = initialize_crew_enhanced(tools)
logger.info("✅ CrewAI components initialized")
except ImportError as e:
logger.warning(f"⚠️ CrewAI not available: {e}")
else:
logger.info("⚠️ CrewAI disabled, skipping")
self._initialized = True
logger.info("🎉 All integrations initialized successfully")
return self.components
except Exception as e:
logger.error(f"❌ Integration initialization failed: {e}")
raise
def _get_available_tools(self) -> List[Any]:
"""Get list of available tools for agents"""
tools = []
# Add basic tools
try:
from .tools import (
SemanticSearchTool, PythonInterpreterTool,
FileReaderTool, WeatherTool
)
tools.extend([
SemanticSearchTool(),
PythonInterpreterTool(),
FileReaderTool(),
WeatherTool()
])
except ImportError as e:
logger.warning(f"Could not load basic tools: {e}")
# Add GAIA tools if enabled
if self.config.gaia.enable_gaia_tools:
try:
from .gaia.tools.gaia_specialized import (
GAIAVideoAnalyzer, GAIAChessLogicTool,
GAIAImageAnalyzer, GAIAMusicAnalyzer
)
tools.extend([
GAIAVideoAnalyzer(),
GAIAChessLogicTool(),
GAIAImageAnalyzer(),
GAIAMusicAnalyzer()
])
except ImportError as e:
logger.warning(f"Could not load GAIA tools: {e}")
return tools
def get_component(self, name: str) -> Optional[Any]:
"""Get specific component by name"""
return self.components.get(name)
def get_status(self) -> Dict[str, Any]:
"""Get status of all components"""
status = {
"timestamp": datetime.now().isoformat(),
"initialized": self._initialized,
"config_valid": self.config.validate()[0],
"components": {}
}
for name, component in self.components.items():
status["components"][name] = {
"available": component is not None,
"type": type(component).__name__ if component else None
}
return status
async def shutdown(self):
"""Gracefully shutdown all components"""
logger.info("Shutting down integrations...")
# Close Supabase connections
if 'supabase' in self.components:
try:
connection_pool = self.components['supabase'].get('connection_pool')
if connection_pool:
await connection_pool.close()
logger.info("✅ Supabase connections closed")
except Exception as e:
logger.error(f"❌ Error closing Supabase connections: {e}")
# Clear components
self.components.clear()
self._initialized = False
logger.info("✅ Integration manager shutdown complete")
# Global integration manager instance
integration_manager = IntegrationManager()
async def get_integration_manager() -> IntegrationManager:
"""Get or initialize the global integration manager"""
if not integration_manager._initialized:
await integration_manager.initialize_all()
return integration_manager |