| """Context engine plugin discovery. |
| |
| Scans ``plugins/context_engine/<name>/`` directories for context engine |
| plugins. Each subdirectory must contain ``__init__.py`` with a class |
| implementing the ContextEngine ABC. |
| |
| Context engines are separate from the general plugin system — they live |
| in the repo and are always available without user installation. Only ONE |
| can be active at a time, selected via ``context.engine`` in config.yaml. |
| The default engine is ``"compressor"`` (the built-in ContextCompressor). |
| |
| Usage: |
| from plugins.context_engine import discover_context_engines, load_context_engine |
| |
| available = discover_context_engines() # [(name, desc, available), ...] |
| engine = load_context_engine("lcm") # ContextEngine instance |
| """ |
|
|
| from __future__ import annotations |
|
|
| import importlib |
| import importlib.util |
| import logging |
| import sys |
| from pathlib import Path |
| from typing import List, Optional, Tuple |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _CONTEXT_ENGINE_PLUGINS_DIR = Path(__file__).parent |
|
|
|
|
| def discover_context_engines() -> List[Tuple[str, str, bool]]: |
| """Scan plugins/context_engine/ for available engines. |
| |
| Returns list of (name, description, is_available) tuples. |
| Does NOT import the engines — just reads plugin.yaml for metadata |
| and does a lightweight availability check. |
| """ |
| results = [] |
| if not _CONTEXT_ENGINE_PLUGINS_DIR.is_dir(): |
| return results |
|
|
| for child in sorted(_CONTEXT_ENGINE_PLUGINS_DIR.iterdir()): |
| if not child.is_dir() or child.name.startswith(("_", ".")): |
| continue |
| init_file = child / "__init__.py" |
| if not init_file.exists(): |
| continue |
|
|
| |
| desc = "" |
| yaml_file = child / "plugin.yaml" |
| if yaml_file.exists(): |
| try: |
| import yaml |
| with open(yaml_file) as f: |
| meta = yaml.safe_load(f) or {} |
| desc = meta.get("description", "") |
| except Exception: |
| pass |
|
|
| |
| available = True |
| try: |
| engine = _load_engine_from_dir(child) |
| if engine is None: |
| available = False |
| elif hasattr(engine, "is_available"): |
| available = engine.is_available() |
| except Exception: |
| available = False |
|
|
| results.append((child.name, desc, available)) |
|
|
| return results |
|
|
|
|
| def load_context_engine(name: str) -> Optional["ContextEngine"]: |
| """Load and return a ContextEngine instance by name. |
| |
| Returns None if the engine is not found or fails to load. |
| """ |
| engine_dir = _CONTEXT_ENGINE_PLUGINS_DIR / name |
| if not engine_dir.is_dir(): |
| logger.debug("Context engine '%s' not found in %s", name, _CONTEXT_ENGINE_PLUGINS_DIR) |
| return None |
|
|
| try: |
| engine = _load_engine_from_dir(engine_dir) |
| if engine: |
| return engine |
| logger.warning("Context engine '%s' loaded but no engine instance found", name) |
| return None |
| except Exception as e: |
| logger.warning("Failed to load context engine '%s': %s", name, e) |
| return None |
|
|
|
|
| def _load_engine_from_dir(engine_dir: Path) -> Optional["ContextEngine"]: |
| """Import an engine module and extract the ContextEngine instance. |
| |
| The module must have either: |
| - A register(ctx) function (plugin-style) — we simulate a ctx |
| - A top-level class that extends ContextEngine — we instantiate it |
| """ |
| name = engine_dir.name |
| module_name = f"plugins.context_engine.{name}" |
| init_file = engine_dir / "__init__.py" |
|
|
| if not init_file.exists(): |
| return None |
|
|
| |
| if module_name in sys.modules: |
| mod = sys.modules[module_name] |
| else: |
| |
| |
| for parent in ("plugins", "plugins.context_engine"): |
| if parent not in sys.modules: |
| parent_path = Path(__file__).parent |
| if parent == "plugins": |
| parent_path = parent_path.parent |
| parent_init = parent_path / "__init__.py" |
| if parent_init.exists(): |
| spec = importlib.util.spec_from_file_location( |
| parent, str(parent_init), |
| submodule_search_locations=[str(parent_path)] |
| ) |
| if spec: |
| parent_mod = importlib.util.module_from_spec(spec) |
| sys.modules[parent] = parent_mod |
| try: |
| spec.loader.exec_module(parent_mod) |
| except Exception: |
| pass |
|
|
| |
| spec = importlib.util.spec_from_file_location( |
| module_name, str(init_file), |
| submodule_search_locations=[str(engine_dir)] |
| ) |
| if not spec: |
| return None |
|
|
| mod = importlib.util.module_from_spec(spec) |
| sys.modules[module_name] = mod |
|
|
| |
| for sub_file in engine_dir.glob("*.py"): |
| if sub_file.name == "__init__.py": |
| continue |
| sub_name = sub_file.stem |
| full_sub_name = f"{module_name}.{sub_name}" |
| if full_sub_name not in sys.modules: |
| sub_spec = importlib.util.spec_from_file_location( |
| full_sub_name, str(sub_file) |
| ) |
| if sub_spec: |
| sub_mod = importlib.util.module_from_spec(sub_spec) |
| sys.modules[full_sub_name] = sub_mod |
| try: |
| sub_spec.loader.exec_module(sub_mod) |
| except Exception as e: |
| logger.debug("Failed to load submodule %s: %s", full_sub_name, e) |
|
|
| try: |
| spec.loader.exec_module(mod) |
| except Exception as e: |
| logger.debug("Failed to exec_module %s: %s", module_name, e) |
| sys.modules.pop(module_name, None) |
| return None |
|
|
| |
| if hasattr(mod, "register"): |
| collector = _EngineCollector() |
| try: |
| mod.register(collector) |
| if collector.engine: |
| return collector.engine |
| except Exception as e: |
| logger.debug("register() failed for %s: %s", name, e) |
|
|
| |
| from agent.context_engine import ContextEngine |
| for attr_name in dir(mod): |
| attr = getattr(mod, attr_name, None) |
| if (isinstance(attr, type) and issubclass(attr, ContextEngine) |
| and attr is not ContextEngine): |
| try: |
| return attr() |
| except Exception: |
| pass |
|
|
| return None |
|
|
|
|
| class _EngineCollector: |
| """Fake plugin context that captures register_context_engine calls.""" |
|
|
| def __init__(self): |
| self.engine = None |
|
|
| def register_context_engine(self, engine): |
| self.engine = engine |
|
|
| |
| def register_tool(self, *args, **kwargs): |
| pass |
|
|
| def register_hook(self, *args, **kwargs): |
| pass |
|
|
| def register_cli_command(self, *args, **kwargs): |
| pass |
|
|
| def register_memory_provider(self, *args, **kwargs): |
| pass |
|
|