Spaces:
Paused
Paused
File size: 15,485 Bytes
4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 3e23238 4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 | import asyncio
import importlib
import logging
import time
from typing import Any
from sqlalchemy.orm import Session
from core.infrastructure.registry import kernel_registry
from core.plugin_system.models import PluginExecution, PluginRegistry
from core.plugin_system.sandboxes import SandboxedPlugin
logger = logging.getLogger(__name__)
class PluginRegistryService:
def __init__(self, cache_ttl: int = 3600):
# Enhanced Intelligent Cache with usage tracking
self._plugin_cache: dict[str, dict[str, Any]] = {}
self._manual_registry: dict[str, Any] = {}
self._load_locks: dict[str, asyncio.Lock] = {}
self.cache_ttl = cache_ttl
# Performance metrics for intelligent caching
self._access_counts: dict[str, int] = {}
self._last_access: dict[str, float] = {}
self._load_times: dict[str, float] = {}
# Batch loading support
self._batch_queue: list[str] = []
self._batch_lock = asyncio.Lock()
self._batch_size = 5 # Load up to 5 plugins concurrently
def register_manual_plugin(self, plugin_id: str, plugin_instance: Any):
"""Manually register a plugin instance (useful for testing/migration)."""
expiry = time.time() + self.cache_ttl
self._plugin_cache[plugin_id] = {"instance": plugin_instance, "expiry": expiry}
self._manual_registry[plugin_id] = plugin_instance
async def get_plugin(self, plugin_id: str, db: Session = None):
"""Get a loaded plugin instance with async locking and intelligent TTL caching."""
# Track access patterns for intelligent caching
self._access_counts[plugin_id] = self._access_counts.get(plugin_id, 0) + 1
self._last_access[plugin_id] = time.time()
# 1. Check in-memory cache with TTL
if plugin_id in self._plugin_cache:
cached = self._plugin_cache[plugin_id]
if time.time() < cached["expiry"]:
logger.debug(f"Cache hit for plugin {plugin_id}")
return cached["instance"]
else:
logger.info(f"Cache expired for plugin {plugin_id}, reloading...")
# 2. Check manual registry (never expires)
if plugin_id in self._manual_registry:
logger.debug(f"Manual registry hit for plugin {plugin_id}")
return self._manual_registry[plugin_id]
# 3. Concurrency Safety: Use lock for this specific plugin_id
if plugin_id not in self._load_locks:
self._load_locks[plugin_id] = asyncio.Lock()
async with self._load_locks[plugin_id]:
# Double-check cache after acquiring lock
if plugin_id in self._plugin_cache:
cached = self._plugin_cache[plugin_id]
if time.time() < cached["expiry"]:
return cached["instance"]
# 4. Load from DB/Filesystem
if db:
try:
start_time = time.time()
plugin_record = (
db.query(PluginRegistry)
.filter(
(PluginRegistry.namespace == plugin_id)
| (PluginRegistry.plugin_id == plugin_id)
)
.first()
)
if plugin_record:
# Dynamically load the plugin module
namespace = plugin_record.namespace
module_path = (
"plugins." + namespace.replace("/", ".") + ".plugin"
)
logger.info(f"Attempting to load plugin from {module_path}")
# run_in_executor to avoid blocking the event loop with synchronous import
loop = asyncio.get_event_loop()
module = await loop.run_in_executor(
None, importlib.import_module, module_path
)
plugin_class = getattr(module, "Plugin", None)
if not plugin_class:
import inspect
for name, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and name.endswith("Plugin")
and name != "PluginInterface"
):
plugin_class = obj
break
if not plugin_class:
raise ImportError(f"No Plugin class found in {module_path}")
plugin_instance = plugin_class()
# Initialize the plugin
# Initialize the plugin
from core.plugin_system.interface import PluginContext
# Inject Core Services via Kernel Registry
services = {}
try:
# 1. AI Service
if kernel_registry.ai_service:
services["ai_service"] = kernel_registry.ai_service
# 2. Monitoring Service
if kernel_registry.monitoring_service:
services["monitoring_service"] = (
kernel_registry.monitoring_service
)
# 3. DB Service (Facade needs it)
# We might still need the import for the Facade itself if it's in core
# but let's see if we can get the underlying service from registry.
# SECURE: Use Facade instead of raw service
from core.plugin_system.facades import PluginDBFacade
from core.plugin_system.permissions import PluginPermission
if kernel_registry.db_service:
# In real world, we would parse plugin_record.permissions or similar
permissions = [PluginPermission.READ_ONLY]
if "fraud_detection" in (
plugin_record.capabilities or []
):
permissions.append(
PluginPermission.READ_DATA
) # Example permission
services["db_service"] = PluginDBFacade(
kernel_registry.db_service,
plugin_id=plugin_id,
permissions=permissions,
)
# [NEW] Inject User Service Facade
try:
from app.modules.users.plugin import UserPluginFacade
from app.modules.users.service import UserService
# Instantiate service with the current DB session
# Note: This session (db) comes from the get_plugin call
user_service = UserService(db)
# Grant permissions
user_permissions = [
PluginPermission.READ_USER
] # Minimal default
if "admin" in (plugin_record.capabilities or []):
user_permissions.append(PluginPermission.READ_DATA)
services["user_service"] = UserPluginFacade(
user_service,
plugin_id=plugin_id,
permissions=user_permissions,
)
except ImportError as e:
logger.warning(f"Could not inject user_service: {e}")
except Exception as e:
logger.error(
f"Error injecting services into plugin context: {e}"
)
context = PluginContext(config={}, services=services)
if asyncio.iscoroutinefunction(plugin_instance.initialize):
await plugin_instance.initialize(context)
else:
plugin_instance.initialize(context)
# Wrap in Sandbox for safe execution
sandboxed_instance = SandboxedPlugin(
plugin_instance, plugin_id=plugin_id
)
# Store in cache with expiry and track load time
load_time = time.time() - start_time
self._load_times[plugin_id] = load_time
expiry = time.time() + self.cache_ttl
self._plugin_cache[plugin_id] = {
"instance": sandboxed_instance,
"expiry": expiry,
"load_time": load_time,
"loaded_at": time.time(),
}
logger.info(
f"Successfully loaded and cached plugin {plugin_id} in {load_time:.2f}s"
)
return sandboxed_instance
except Exception as e:
logger.error(f"Failed to load plugin {plugin_id}: {e}")
raise ImportError(f"Failed to load plugin {plugin_id}: {e}")
raise ValueError(
f"Plugin {plugin_id} not found and no valid DB session provided"
)
async def get_plugins_by_capability(
self, capability: str, db: Session
) -> list[Any]:
"""Find and load all active plugins with a specific capability."""
if not db:
return []
try:
# Filter in Python for maximum compatibility across SQL dialects (SQLite vs Postgres JSON)
# Assuming strictly active plugins
all_plugins = (
db.query(PluginRegistry).filter(PluginRegistry.status == "active").all()
)
matching_plugins = []
for p in all_plugins:
caps = p.capabilities or []
if capability in caps:
try:
# Use get_plugin to ensure caching and locking logic compliance
instance = await self.get_plugin(p.plugin_id, db)
matching_plugins.append(instance)
except Exception as e:
logger.error(
f"Failed to load capable plugin {p.plugin_id}: {e}"
)
return matching_plugins
except Exception as e:
logger.error(f"Error finding plugins by capability '{capability}': {e}")
return []
async def preload_plugins(
self, plugin_ids: list[str], db: Session = None
) -> dict[str, Any]:
"""Batch preload multiple plugins asynchronously for better performance."""
results = {}
# Process in batches to avoid overwhelming the system
for i in range(0, len(plugin_ids), self._batch_size):
batch = plugin_ids[i : i + self._batch_size]
batch_tasks = [self.get_plugin(plugin_id, db) for plugin_id in batch]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
for plugin_id, result in zip(batch, batch_results):
if isinstance(result, Exception):
logger.error(f"Failed to preload plugin {plugin_id}: {result}")
results[plugin_id] = None
else:
results[plugin_id] = result
return results
async def warmup_cache(
self, frequently_used_plugins: list[str], db: Session = None
):
"""Warm up the cache with frequently used plugins."""
logger.info(f"Warming up cache with {len(frequently_used_plugins)} plugins")
# Load plugins in background without blocking
preload_task = asyncio.create_task(
self.preload_plugins(frequently_used_plugins, db)
)
self._background_tasks.append(preload_task)
def get_cache_stats(self) -> dict[str, Any]:
"""Get cache performance statistics."""
total_plugins = len(self._plugin_cache)
expired_count = sum(
1
for cached in self._plugin_cache.values()
if time.time() >= cached["expiry"]
)
total_accesses = sum(self._access_counts.values())
avg_load_time = sum(self._load_times.values()) / max(len(self._load_times), 1)
return {
"total_cached_plugins": total_plugins,
"expired_plugins": expired_count,
"total_accesses": total_accesses,
"average_load_time_ms": avg_load_time * 1000,
"cache_hit_rate": self._calculate_cache_hit_rate(),
"most_accessed_plugins": sorted(
self._access_counts.items(), key=lambda x: x[1], reverse=True
)[:5],
}
def _calculate_cache_hit_rate(self) -> float:
"""Calculate cache hit rate based on access patterns."""
if not self._access_counts:
return 0.0
cache_hits = sum(
1
for plugin_id in self._access_counts
if plugin_id in self._plugin_cache
and time.time() < self._plugin_cache[plugin_id]["expiry"]
)
total_accesses = sum(self._access_counts.values())
return cache_hits / total_accesses if total_accesses > 0 else 0.0
async def cleanup_expired_cache(self):
"""Clean up expired cache entries to free memory."""
expired_plugins = []
current_time = time.time()
for plugin_id, cached in self._plugin_cache.items():
if current_time >= cached["expiry"]:
expired_plugins.append(plugin_id)
for plugin_id in expired_plugins:
del self._plugin_cache[plugin_id]
logger.debug(f"Cleaned up expired cache for plugin {plugin_id}")
if expired_plugins:
logger.info(
f"Cleaned up {len(expired_plugins)} expired plugin cache entries"
)
async def store_shadow_result(self, result: Any, db: Session = None):
"""Store result in DB."""
if not db:
return
try:
execution = PluginExecution(
plugin_id=result.plugin_id,
execution_time_ms=int(result.execution_time_ms),
status="success",
# matches_production=result.matches_production # Update schema if needed
)
db.add(execution)
db.commit()
except Exception as e:
logger.error(f"Failed to store shadow result: {e}")
plugin_registry_service = PluginRegistryService()
|