File size: 15,485 Bytes
4a2ab42
 
 
 
 
 
 
 
3e23238
4a2ab42
3e23238
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e23238
 
 
 
4a2ab42
 
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
 
3e23238
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
3e23238
4a2ab42
 
3e23238
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
3e23238
 
 
 
 
4a2ab42
 
 
 
 
 
3e23238
4a2ab42
 
 
 
 
3e23238
 
 
 
4a2ab42
 
 
 
3e23238
 
 
4a2ab42
3e23238
 
 
4a2ab42
 
 
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
3e23238
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
3e23238
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e23238
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
import asyncio
import importlib
import logging
import time
from typing import Any

from sqlalchemy.orm import Session

from core.infrastructure.registry import kernel_registry
from core.plugin_system.models import PluginExecution, PluginRegistry
from core.plugin_system.sandboxes import SandboxedPlugin

logger = logging.getLogger(__name__)


class PluginRegistryService:
    def __init__(self, cache_ttl: int = 3600):
        # Enhanced Intelligent Cache with usage tracking
        self._plugin_cache: dict[str, dict[str, Any]] = {}
        self._manual_registry: dict[str, Any] = {}
        self._load_locks: dict[str, asyncio.Lock] = {}
        self.cache_ttl = cache_ttl

        # Performance metrics for intelligent caching
        self._access_counts: dict[str, int] = {}
        self._last_access: dict[str, float] = {}
        self._load_times: dict[str, float] = {}

        # Batch loading support
        self._batch_queue: list[str] = []
        self._batch_lock = asyncio.Lock()
        self._batch_size = 5  # Load up to 5 plugins concurrently

    def register_manual_plugin(self, plugin_id: str, plugin_instance: Any):
        """Manually register a plugin instance (useful for testing/migration)."""
        expiry = time.time() + self.cache_ttl
        self._plugin_cache[plugin_id] = {"instance": plugin_instance, "expiry": expiry}
        self._manual_registry[plugin_id] = plugin_instance

    async def get_plugin(self, plugin_id: str, db: Session = None):
        """Get a loaded plugin instance with async locking and intelligent TTL caching."""

        # Track access patterns for intelligent caching
        self._access_counts[plugin_id] = self._access_counts.get(plugin_id, 0) + 1
        self._last_access[plugin_id] = time.time()

        # 1. Check in-memory cache with TTL
        if plugin_id in self._plugin_cache:
            cached = self._plugin_cache[plugin_id]
            if time.time() < cached["expiry"]:
                logger.debug(f"Cache hit for plugin {plugin_id}")
                return cached["instance"]
            else:
                logger.info(f"Cache expired for plugin {plugin_id}, reloading...")

        # 2. Check manual registry (never expires)
        if plugin_id in self._manual_registry:
            logger.debug(f"Manual registry hit for plugin {plugin_id}")
            return self._manual_registry[plugin_id]

        # 3. Concurrency Safety: Use lock for this specific plugin_id
        if plugin_id not in self._load_locks:
            self._load_locks[plugin_id] = asyncio.Lock()

        async with self._load_locks[plugin_id]:
            # Double-check cache after acquiring lock
            if plugin_id in self._plugin_cache:
                cached = self._plugin_cache[plugin_id]
                if time.time() < cached["expiry"]:
                    return cached["instance"]

            # 4. Load from DB/Filesystem
            if db:
                try:
                    start_time = time.time()

                    plugin_record = (
                        db.query(PluginRegistry)
                        .filter(
                            (PluginRegistry.namespace == plugin_id)
                            | (PluginRegistry.plugin_id == plugin_id)
                        )
                        .first()
                    )

                    if plugin_record:
                        # Dynamically load the plugin module
                        namespace = plugin_record.namespace
                        module_path = (
                            "plugins." + namespace.replace("/", ".") + ".plugin"
                        )

                        logger.info(f"Attempting to load plugin from {module_path}")

                        # run_in_executor to avoid blocking the event loop with synchronous import
                        loop = asyncio.get_event_loop()
                        module = await loop.run_in_executor(
                            None, importlib.import_module, module_path
                        )

                        plugin_class = getattr(module, "Plugin", None)
                        if not plugin_class:
                            import inspect

                            for name, obj in inspect.getmembers(module):
                                if (
                                    inspect.isclass(obj)
                                    and name.endswith("Plugin")
                                    and name != "PluginInterface"
                                ):
                                    plugin_class = obj
                                    break

                        if not plugin_class:
                            raise ImportError(f"No Plugin class found in {module_path}")

                        plugin_instance = plugin_class()

                        # Initialize the plugin
                        # Initialize the plugin
                        from core.plugin_system.interface import PluginContext

                        # Inject Core Services via Kernel Registry
                        services = {}
                        try:
                            # 1. AI Service
                            if kernel_registry.ai_service:
                                services["ai_service"] = kernel_registry.ai_service

                            # 2. Monitoring Service
                            if kernel_registry.monitoring_service:
                                services["monitoring_service"] = (
                                    kernel_registry.monitoring_service
                                )

                            # 3. DB Service (Facade needs it)
                            # We might still need the import for the Facade itself if it's in core
                            # but let's see if we can get the underlying service from registry.
                            # SECURE: Use Facade instead of raw service
                            from core.plugin_system.facades import PluginDBFacade
                            from core.plugin_system.permissions import PluginPermission

                            if kernel_registry.db_service:
                                # In real world, we would parse plugin_record.permissions or similar
                                permissions = [PluginPermission.READ_ONLY]
                                if "fraud_detection" in (
                                    plugin_record.capabilities or []
                                ):
                                    permissions.append(
                                        PluginPermission.READ_DATA
                                    )  # Example permission

                                services["db_service"] = PluginDBFacade(
                                    kernel_registry.db_service,
                                    plugin_id=plugin_id,
                                    permissions=permissions,
                                )

                            # [NEW] Inject User Service Facade
                            try:
                                from app.modules.users.plugin import UserPluginFacade
                                from app.modules.users.service import UserService

                                # Instantiate service with the current DB session
                                # Note: This session (db) comes from the get_plugin call
                                user_service = UserService(db)

                                # Grant permissions
                                user_permissions = [
                                    PluginPermission.READ_USER
                                ]  # Minimal default
                                if "admin" in (plugin_record.capabilities or []):
                                    user_permissions.append(PluginPermission.READ_DATA)

                                services["user_service"] = UserPluginFacade(
                                    user_service,
                                    plugin_id=plugin_id,
                                    permissions=user_permissions,
                                )
                            except ImportError as e:
                                logger.warning(f"Could not inject user_service: {e}")

                        except Exception as e:
                            logger.error(
                                f"Error injecting services into plugin context: {e}"
                            )

                        context = PluginContext(config={}, services=services)

                        if asyncio.iscoroutinefunction(plugin_instance.initialize):
                            await plugin_instance.initialize(context)
                        else:
                            plugin_instance.initialize(context)

                        # Wrap in Sandbox for safe execution
                        sandboxed_instance = SandboxedPlugin(
                            plugin_instance, plugin_id=plugin_id
                        )

                        # Store in cache with expiry and track load time
                        load_time = time.time() - start_time
                        self._load_times[plugin_id] = load_time

                        expiry = time.time() + self.cache_ttl
                        self._plugin_cache[plugin_id] = {
                            "instance": sandboxed_instance,
                            "expiry": expiry,
                            "load_time": load_time,
                            "loaded_at": time.time(),
                        }

                        logger.info(
                            f"Successfully loaded and cached plugin {plugin_id} in {load_time:.2f}s"
                        )
                        return sandboxed_instance
                except Exception as e:
                    logger.error(f"Failed to load plugin {plugin_id}: {e}")
                    raise ImportError(f"Failed to load plugin {plugin_id}: {e}")

        raise ValueError(
            f"Plugin {plugin_id} not found and no valid DB session provided"
        )

    async def get_plugins_by_capability(
        self, capability: str, db: Session
    ) -> list[Any]:
        """Find and load all active plugins with a specific capability."""
        if not db:
            return []

        try:
            # Filter in Python for maximum compatibility across SQL dialects (SQLite vs Postgres JSON)
            # Assuming strictly active plugins
            all_plugins = (
                db.query(PluginRegistry).filter(PluginRegistry.status == "active").all()
            )

            matching_plugins = []
            for p in all_plugins:
                caps = p.capabilities or []
                if capability in caps:
                    try:
                        # Use get_plugin to ensure caching and locking logic compliance
                        instance = await self.get_plugin(p.plugin_id, db)
                        matching_plugins.append(instance)
                    except Exception as e:
                        logger.error(
                            f"Failed to load capable plugin {p.plugin_id}: {e}"
                        )

            return matching_plugins
        except Exception as e:
            logger.error(f"Error finding plugins by capability '{capability}': {e}")
            return []

    async def preload_plugins(
        self, plugin_ids: list[str], db: Session = None
    ) -> dict[str, Any]:
        """Batch preload multiple plugins asynchronously for better performance."""
        results = {}

        # Process in batches to avoid overwhelming the system
        for i in range(0, len(plugin_ids), self._batch_size):
            batch = plugin_ids[i : i + self._batch_size]
            batch_tasks = [self.get_plugin(plugin_id, db) for plugin_id in batch]
            batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

            for plugin_id, result in zip(batch, batch_results):
                if isinstance(result, Exception):
                    logger.error(f"Failed to preload plugin {plugin_id}: {result}")
                    results[plugin_id] = None
                else:
                    results[plugin_id] = result

        return results

    async def warmup_cache(
        self, frequently_used_plugins: list[str], db: Session = None
    ):
        """Warm up the cache with frequently used plugins."""
        logger.info(f"Warming up cache with {len(frequently_used_plugins)} plugins")

        # Load plugins in background without blocking
        preload_task = asyncio.create_task(
            self.preload_plugins(frequently_used_plugins, db)
        )
        self._background_tasks.append(preload_task)

    def get_cache_stats(self) -> dict[str, Any]:
        """Get cache performance statistics."""
        total_plugins = len(self._plugin_cache)
        expired_count = sum(
            1
            for cached in self._plugin_cache.values()
            if time.time() >= cached["expiry"]
        )

        total_accesses = sum(self._access_counts.values())
        avg_load_time = sum(self._load_times.values()) / max(len(self._load_times), 1)

        return {
            "total_cached_plugins": total_plugins,
            "expired_plugins": expired_count,
            "total_accesses": total_accesses,
            "average_load_time_ms": avg_load_time * 1000,
            "cache_hit_rate": self._calculate_cache_hit_rate(),
            "most_accessed_plugins": sorted(
                self._access_counts.items(), key=lambda x: x[1], reverse=True
            )[:5],
        }

    def _calculate_cache_hit_rate(self) -> float:
        """Calculate cache hit rate based on access patterns."""
        if not self._access_counts:
            return 0.0

        cache_hits = sum(
            1
            for plugin_id in self._access_counts
            if plugin_id in self._plugin_cache
            and time.time() < self._plugin_cache[plugin_id]["expiry"]
        )

        total_accesses = sum(self._access_counts.values())
        return cache_hits / total_accesses if total_accesses > 0 else 0.0

    async def cleanup_expired_cache(self):
        """Clean up expired cache entries to free memory."""
        expired_plugins = []
        current_time = time.time()

        for plugin_id, cached in self._plugin_cache.items():
            if current_time >= cached["expiry"]:
                expired_plugins.append(plugin_id)

        for plugin_id in expired_plugins:
            del self._plugin_cache[plugin_id]
            logger.debug(f"Cleaned up expired cache for plugin {plugin_id}")

        if expired_plugins:
            logger.info(
                f"Cleaned up {len(expired_plugins)} expired plugin cache entries"
            )

    async def store_shadow_result(self, result: Any, db: Session = None):
        """Store result in DB."""
        if not db:
            return

        try:
            execution = PluginExecution(
                plugin_id=result.plugin_id,
                execution_time_ms=int(result.execution_time_ms),
                status="success",
                # matches_production=result.matches_production # Update schema if needed
            )
            db.add(execution)
            db.commit()
        except Exception as e:
            logger.error(f"Failed to store shadow result: {e}")


plugin_registry_service = PluginRegistryService()