|
|
""" |
|
|
Compacting Supervisor - Agent wrapper with automatic context management. |
|
|
|
|
|
Wraps an agent to enforce Context Window limits via 'Compaction'. |
|
|
Implements the Interceptor Pattern to transparently manage token usage. |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, Generator |
|
|
|
|
|
from src.backend.agents.supervisor.supervisor_v2 import supervisor_agent, memory |
|
|
|
|
|
from .token_counter import count_tokens_for_messages |
|
|
from .history_manager import HistoryManager |
|
|
|
|
|
|
|
|
class CompactingSupervisor: |
|
|
""" |
|
|
Wraps an agent to enforce Context Window limits via 'Compaction'. |
|
|
|
|
|
Technique (Interceptor Pattern): |
|
|
1. Intercepts the agent's execution flow. |
|
|
2. Runs the agent normally. |
|
|
3. Post-execution: Checks if the total context (tokens) exceeds the limit. |
|
|
4. If exceeded, triggers `HistoryManager` to compact old history and rewrite memory. |
|
|
|
|
|
This ensures the agent remains "forever young" regarding token usage, |
|
|
without losing long-term context. |
|
|
""" |
|
|
|
|
|
def __init__(self, agent, history_manager: HistoryManager, token_limit: int = 3000, compaction_ratio: float = 0.5): |
|
|
self.agent = agent |
|
|
self.history_manager = history_manager |
|
|
self.token_limit = token_limit |
|
|
self.compaction_ratio = compaction_ratio |
|
|
|
|
|
def invoke(self, input_data: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute the agent and perform context maintenance if needed. |
|
|
""" |
|
|
thread_id = config.get("configurable", {}).get("thread_id") |
|
|
|
|
|
|
|
|
response = self.agent.invoke(input_data, config) |
|
|
|
|
|
|
|
|
if thread_id and "messages" in response: |
|
|
all_messages = response["messages"] |
|
|
total_tokens = count_tokens_for_messages(all_messages) |
|
|
|
|
|
if total_tokens > self.token_limit: |
|
|
print(f"Tokens ({total_tokens}) exceeded limit ({self.token_limit}). Compacting...", flush=True) |
|
|
try: |
|
|
|
|
|
compacted_messages = self.history_manager.compact_messages( |
|
|
all_messages, |
|
|
compaction_ratio=self.compaction_ratio |
|
|
) |
|
|
self.history_manager.replace_thread_history(thread_id, compacted_messages) |
|
|
|
|
|
|
|
|
response["messages"] = compacted_messages |
|
|
|
|
|
|
|
|
new_tokens = count_tokens_for_messages(compacted_messages) |
|
|
print(f"Compaction complete. {total_tokens} -> {new_tokens}", flush=True) |
|
|
except Exception as e: |
|
|
print(f"Compaction failed: {e}", flush=True) |
|
|
|
|
|
return response |
|
|
|
|
|
def stream(self, input_data: Dict[str, Any], config: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]: |
|
|
""" |
|
|
Stream the agent response token by token, then perform compaction if needed. |
|
|
|
|
|
Yields: |
|
|
dict: Streaming chunks with 'type' and 'content' keys. |
|
|
- type='token': A content token from the AI response |
|
|
- type='done': Final message with token count |
|
|
- type='error': Error occurred |
|
|
""" |
|
|
thread_id = config.get("configurable", {}).get("thread_id") |
|
|
full_response_content = "" |
|
|
final_messages = [] |
|
|
|
|
|
try: |
|
|
|
|
|
for chunk in self.agent.stream(input_data, config, stream_mode="messages"): |
|
|
|
|
|
message, metadata = chunk |
|
|
|
|
|
|
|
|
if hasattr(message, 'content') and message.content: |
|
|
|
|
|
msg_type = message.__class__.__name__ |
|
|
if 'AIMessage' in msg_type: |
|
|
yield {"type": "token", "content": message.content} |
|
|
full_response_content += message.content |
|
|
|
|
|
|
|
|
|
|
|
final_state = self.agent.get_state(config) |
|
|
if final_state and hasattr(final_state, 'values'): |
|
|
final_messages = final_state.values.get("messages", []) |
|
|
|
|
|
|
|
|
token_count = 0 |
|
|
if thread_id and final_messages: |
|
|
token_count = count_tokens_for_messages(final_messages) |
|
|
|
|
|
if token_count > self.token_limit: |
|
|
print(f"Tokens ({token_count}) exceeded limit ({self.token_limit}). Compacting...", flush=True) |
|
|
try: |
|
|
compacted_messages = self.history_manager.compact_messages( |
|
|
final_messages, |
|
|
compaction_ratio=self.compaction_ratio |
|
|
) |
|
|
self.history_manager.replace_thread_history(thread_id, compacted_messages) |
|
|
token_count = count_tokens_for_messages(compacted_messages) |
|
|
print(f"Compaction complete. New token count: {token_count}", flush=True) |
|
|
except Exception as e: |
|
|
print(f"Compaction failed: {e}", flush=True) |
|
|
|
|
|
yield {"type": "done", "token_count": token_count} |
|
|
|
|
|
except Exception as e: |
|
|
yield {"type": "error", "content": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
history_manager = HistoryManager(memory_saver=memory) |
|
|
|
|
|
compacting_supervisor = CompactingSupervisor( |
|
|
agent=supervisor_agent, |
|
|
history_manager=history_manager, |
|
|
token_limit=500, |
|
|
compaction_ratio=0.5 |
|
|
) |
|
|
|
|
|
|