| """Abstract base class for pluggable context engines. |
| |
| A context engine controls how conversation context is managed when |
| approaching the model's token limit. The built-in ContextCompressor |
| is the default implementation. Third-party engines (e.g. LCM) can |
| replace it via the plugin system or by being placed in the |
| ``plugins/context_engine/<name>/`` directory. |
| |
| Selection is config-driven: ``context.engine`` in config.yaml. |
| Default is ``"compressor"`` (the built-in). Only one engine is active. |
| |
| The engine is responsible for: |
| - Deciding when compaction should fire |
| - Performing compaction (summarization, DAG construction, etc.) |
| - Optionally exposing tools the agent can call (e.g. lcm_grep) |
| - Tracking token usage from API responses |
| |
| Lifecycle: |
| 1. Engine is instantiated and registered (plugin register() or default) |
| 2. on_session_start() called when a conversation begins |
| 3. update_from_response() called after each API response with usage data |
| 4. should_compress() checked after each turn |
| 5. compress() called when should_compress() returns True |
| 6. on_session_end() called at real session boundaries (CLI exit, /reset, |
| gateway session expiry) — NOT per-turn |
| """ |
|
|
| from abc import ABC, abstractmethod |
| from typing import Any, Dict, List |
|
|
|
|
| class ContextEngine(ABC): |
| """Base class all context engines must implement.""" |
|
|
| |
|
|
| @property |
| @abstractmethod |
| def name(self) -> str: |
| """Short identifier (e.g. 'compressor', 'lcm').""" |
|
|
| |
| |
| |
|
|
| last_prompt_tokens: int = 0 |
| last_completion_tokens: int = 0 |
| last_total_tokens: int = 0 |
| threshold_tokens: int = 0 |
| context_length: int = 0 |
| compression_count: int = 0 |
|
|
| |
| |
| |
| |
| |
|
|
| threshold_percent: float = 0.75 |
| protect_first_n: int = 3 |
| protect_last_n: int = 6 |
|
|
| |
|
|
| @abstractmethod |
| def update_from_response(self, usage: Dict[str, Any]) -> None: |
| """Update tracked token usage from an API response. |
| |
| Called after every LLM call with the usage dict from the response. |
| """ |
|
|
| @abstractmethod |
| def should_compress(self, prompt_tokens: int = None) -> bool: |
| """Return True if compaction should fire this turn.""" |
|
|
| @abstractmethod |
| def compress( |
| self, |
| messages: List[Dict[str, Any]], |
| current_tokens: int = None, |
| ) -> List[Dict[str, Any]]: |
| """Compact the message list and return the new message list. |
| |
| This is the main entry point. The engine receives the full message |
| list and returns a (possibly shorter) list that fits within the |
| context budget. The implementation is free to summarize, build a |
| DAG, or do anything else — as long as the returned list is a valid |
| OpenAI-format message sequence. |
| """ |
|
|
| |
|
|
| def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool: |
| """Quick rough check before the API call (no real token count yet). |
| |
| Default returns False (skip pre-flight). Override if your engine |
| can do a cheap estimate. |
| """ |
| return False |
|
|
| |
|
|
| def on_session_start(self, session_id: str, **kwargs) -> None: |
| """Called when a new conversation session begins. |
| |
| Use this to load persisted state (DAG, store) for the session. |
| kwargs may include hermes_home, platform, model, etc. |
| """ |
|
|
| def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> None: |
| """Called at real session boundaries (CLI exit, /reset, gateway expiry). |
| |
| Use this to flush state, close DB connections, etc. |
| NOT called per-turn — only when the session truly ends. |
| """ |
|
|
| def on_session_reset(self) -> None: |
| """Called on /new or /reset. Reset per-session state. |
| |
| Default resets compression_count and token tracking. |
| """ |
| self.last_prompt_tokens = 0 |
| self.last_completion_tokens = 0 |
| self.last_total_tokens = 0 |
| self.compression_count = 0 |
|
|
| |
|
|
| def get_tool_schemas(self) -> List[Dict[str, Any]]: |
| """Return tool schemas this engine provides to the agent. |
| |
| Default returns empty list (no tools). LCM would return schemas |
| for lcm_grep, lcm_describe, lcm_expand here. |
| """ |
| return [] |
|
|
| def handle_tool_call(self, name: str, args: Dict[str, Any], **kwargs) -> str: |
| """Handle a tool call from the agent. |
| |
| Only called for tool names returned by get_tool_schemas(). |
| Must return a JSON string. |
| |
| kwargs may include: |
| messages: the current in-memory message list (for live ingestion) |
| """ |
| import json |
| return json.dumps({"error": f"Unknown context engine tool: {name}"}) |
|
|
| |
|
|
| def get_status(self) -> Dict[str, Any]: |
| """Return status dict for display/logging. |
| |
| Default returns the standard fields run_agent.py expects. |
| """ |
| return { |
| "last_prompt_tokens": self.last_prompt_tokens, |
| "threshold_tokens": self.threshold_tokens, |
| "context_length": self.context_length, |
| "usage_percent": ( |
| min(100, self.last_prompt_tokens / self.context_length * 100) |
| if self.context_length else 0 |
| ), |
| "compression_count": self.compression_count, |
| } |
|
|
| |
|
|
| def update_model( |
| self, |
| model: str, |
| context_length: int, |
| base_url: str = "", |
| api_key: str = "", |
| provider: str = "", |
| ) -> None: |
| """Called when the user switches models or on fallback activation. |
| |
| Default updates context_length and recalculates threshold_tokens |
| from threshold_percent. Override if your engine needs more |
| (e.g. recalculate DAG budgets, switch summary models). |
| """ |
| self.context_length = context_length |
| self.threshold_tokens = int(context_length * self.threshold_percent) |
|
|