Spaces:
Sleeping
Sleeping
feat: Implement Anthropic context engineering with compaction, structured prompts, and tool result clearing
09e23a2
| # ============================================================= | |
| # File: backend/api/services/context_engineer.py | |
| # ============================================================= | |
| """ | |
| Context Engineering Service | |
| Implements write, select, compress, and isolate strategies for managing agent context. | |
| Based on LangChain's context engineering best practices. | |
| """ | |
| import time | |
| from typing import Dict, Any, List, Optional | |
| from collections import deque | |
| class ContextScratchpad: | |
| """Scratchpad for saving context during agent execution. | |
| Based on Anthropic's structured note-taking strategy: | |
| - Agents write notes persisted outside context window | |
| - Notes pulled back into context when needed | |
| - Enables tracking progress across complex tasks | |
| """ | |
| def __init__(self, max_size: int = 50): | |
| self.notes: deque = deque(maxlen=max_size) | |
| self.plan: Optional[str] = None | |
| self.key_facts: List[str] = [] | |
| self.objectives: List[Dict[str, Any]] = [] # Track objectives like Claude playing Pokémon | |
| self.architectural_decisions: List[str] = [] # Track design decisions | |
| self.unresolved_issues: List[str] = [] # Track bugs/issues | |
| def add_note(self, note: str, category: str = "general"): | |
| """Add a note to the scratchpad.""" | |
| self.notes.append({ | |
| "timestamp": time.time(), | |
| "note": note, | |
| "category": category | |
| }) | |
| def set_plan(self, plan: str): | |
| """Save the agent's plan.""" | |
| self.plan = plan | |
| def add_fact(self, fact: str): | |
| """Add a key fact.""" | |
| if fact not in self.key_facts: | |
| self.key_facts.append(fact) | |
| if len(self.key_facts) > 20: # Limit facts | |
| self.key_facts.pop(0) | |
| def get_recent_notes(self, limit: int = 10, category: Optional[str] = None) -> List[str]: | |
| """Get recent notes, optionally filtered by category.""" | |
| notes = list(self.notes) | |
| if category: | |
| notes = [n for n in notes if n.get("category") == category] | |
| return [n["note"] for n in notes[-limit:]] | |
| def add_objective(self, objective: str, progress: str = "", target: str = ""): | |
| """Add or update an objective (like Claude playing Pokémon tracking).""" | |
| # Update existing or add new | |
| for obj in self.objectives: | |
| if objective in obj.get("objective", ""): | |
| obj["progress"] = progress | |
| obj["target"] = target | |
| return | |
| self.objectives.append({ | |
| "objective": objective, | |
| "progress": progress, | |
| "target": target | |
| }) | |
| if len(self.objectives) > 10: | |
| self.objectives.pop(0) | |
| def add_architectural_decision(self, decision: str): | |
| """Add an architectural decision (preserved during compaction).""" | |
| if decision not in self.architectural_decisions: | |
| self.architectural_decisions.append(decision) | |
| if len(self.architectural_decisions) > 10: | |
| self.architectural_decisions.pop(0) | |
| def add_unresolved_issue(self, issue: str): | |
| """Add an unresolved issue (preserved during compaction).""" | |
| if issue not in self.unresolved_issues: | |
| self.unresolved_issues.append(issue) | |
| if len(self.unresolved_issues) > 10: | |
| self.unresolved_issues.pop(0) | |
| def get_summary(self) -> str: | |
| """Get a structured summary of scratchpad contents. | |
| Based on Anthropic's structured note-taking approach.""" | |
| parts = [] | |
| if self.plan: | |
| parts.append(f"## Plan\n{self.plan}") | |
| if self.objectives: | |
| obj_text = "\n".join([f"- {o['objective']}: {o.get('progress', '')} (target: {o.get('target', 'N/A')})" | |
| for o in self.objectives[-5:]]) | |
| parts.append(f"## Objectives\n{obj_text}") | |
| if self.architectural_decisions: | |
| parts.append(f"## Architectural Decisions\n" + "\n".join([f"- {d}" for d in self.architectural_decisions[-5:]])) | |
| if self.unresolved_issues: | |
| parts.append(f"## Unresolved Issues\n" + "\n".join([f"- {i}" for i in self.unresolved_issues[-5:]])) | |
| if self.key_facts: | |
| parts.append(f"## Key Facts\n" + ", ".join(self.key_facts[:5])) | |
| if self.notes: | |
| recent = self.get_recent_notes(5) | |
| parts.append(f"## Recent Notes\n" + "\n".join([f"- {n}" for n in recent])) | |
| return "\n\n".join(parts) if parts else "" | |
| class ContextCompressor: | |
| """Compresses context to reduce token usage. | |
| Based on Anthropic's context engineering best practices: | |
| - Compaction: Summarize conversations nearing context limit | |
| - Tool result clearing: Remove raw tool outputs once processed | |
| - High-fidelity summarization preserving critical details | |
| """ | |
| def __init__(self, llm_client): | |
| self.llm = llm_client | |
| async def compact_conversation(self, messages: List[Dict[str, Any]], preserve_recent: int = 5, max_tokens: int = 1000) -> List[Dict[str, Any]]: | |
| """ | |
| Compact a conversation using Anthropic's compaction strategy. | |
| Preserves architectural decisions, unresolved issues, and implementation details | |
| while discarding redundant tool outputs. | |
| Args: | |
| messages: List of message dicts with 'role' and 'content' | |
| preserve_recent: Number of recent messages to keep verbatim | |
| max_tokens: Target token count for summary | |
| Returns: | |
| Compacted message list with summary + recent messages | |
| """ | |
| if len(messages) <= preserve_recent + 2: | |
| return messages | |
| # Keep first message (system/initial context) and last N messages | |
| first = messages[:1] if messages else [] | |
| recent = messages[-preserve_recent:] if len(messages) > preserve_recent else messages | |
| middle = messages[1:-preserve_recent] if len(messages) > preserve_recent + 1 else [] | |
| if not middle: | |
| return messages | |
| # Extract key information for compaction | |
| user_queries = [m.get("content", "") for m in middle if m.get("role") == "user"] | |
| assistant_responses = [m.get("content", "") for m in middle if m.get("role") == "assistant"] | |
| tool_calls = [m for m in middle if m.get("role") == "tool" or "tool" in str(m.get("content", "")).lower()] | |
| # Compaction prompt based on Anthropic's guidance | |
| prompt = f"""You are compacting a conversation history. Preserve: | |
| 1. Architectural decisions and design choices | |
| 2. Unresolved bugs or issues | |
| 3. Implementation details and progress | |
| 4. Key facts and information shared | |
| 5. User preferences and requirements | |
| Discard: | |
| - Redundant tool outputs (raw results already processed) | |
| - Repetitive information | |
| - Verbose explanations that don't add value | |
| - Tool call details that are no longer needed | |
| Conversation to compact: | |
| {chr(10).join([f"{m.get('role', 'user')}: {str(m.get('content', ''))[:400]}" for m in middle[:20]])} | |
| Provide a high-fidelity summary that preserves critical context (max {max_tokens} tokens):""" | |
| try: | |
| summary = await self.llm.simple_call(prompt, temperature=0.0) | |
| summary_msg = { | |
| "role": "system", | |
| "content": f"[Compacted conversation history: {summary}]", | |
| "_compacted": True, | |
| "_original_length": len(middle) | |
| } | |
| return first + [summary_msg] + recent | |
| except Exception: | |
| # Fallback: simple trimming | |
| return first + recent | |
| async def summarize_conversation(self, messages: List[Dict[str, Any]], max_tokens: int = 500) -> str: | |
| """ | |
| Summarize a conversation while preserving key decisions and facts. | |
| Uses Anthropic's compaction principles. | |
| Args: | |
| messages: List of message dicts with 'role' and 'content' | |
| max_tokens: Target token count for summary | |
| Returns: | |
| Summarized conversation | |
| """ | |
| if len(messages) <= 2: | |
| return "\n".join([f"{m.get('role', 'user')}: {m.get('content', '')[:200]}" for m in messages]) | |
| # Extract key information | |
| user_queries = [m.get("content", "") for m in messages if m.get("role") == "user"] | |
| assistant_responses = [m.get("content", "") for m in messages if m.get("role") == "assistant"] | |
| prompt = f"""Summarize this conversation using high-fidelity compaction. Preserve: | |
| 1. Key user questions/requests | |
| 2. Important decisions made (architectural, design, implementation) | |
| 3. Critical facts or information shared | |
| 4. Unresolved issues or bugs | |
| 5. Implementation progress | |
| Discard redundant tool outputs and repetitive information. | |
| Conversation: | |
| {chr(10).join([f"User: {q[:300]}" for q in user_queries[-5:]])} | |
| {chr(10).join([f"Assistant: {r[:300]}" for r in assistant_responses[-5:]])} | |
| Provide a concise, high-fidelity summary (max {max_tokens} tokens):""" | |
| try: | |
| summary = await self.llm.simple_call(prompt, temperature=0.0) | |
| return summary[:max_tokens * 4] # Rough token limit | |
| except Exception: | |
| # Fallback: simple truncation | |
| return "\n".join([f"{m.get('role', 'user')}: {m.get('content', '')[:100]}..." for m in messages[-5:]]) | |
| def trim_messages(self, messages: List[Dict[str, Any]], keep_first: int = 2, keep_last: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Trim messages, keeping first N and last M. | |
| Based on Anthropic's guidance: preserve system context and recent interactions. | |
| Args: | |
| messages: List of messages | |
| keep_first: Number of initial messages to keep (system context) | |
| keep_last: Number of recent messages to keep | |
| Returns: | |
| Trimmed message list | |
| """ | |
| if len(messages) <= keep_first + keep_last: | |
| return messages | |
| return messages[:keep_first] + messages[-keep_last:] | |
| def clear_tool_results(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Clear tool call results from messages (safest form of compaction). | |
| Based on Anthropic's recommendation: once a tool has been called deep in history, | |
| the raw result is often no longer needed. | |
| Args: | |
| messages: List of messages | |
| Returns: | |
| Messages with tool results cleared (tool calls kept, results removed) | |
| """ | |
| cleared = [] | |
| for msg in messages: | |
| # Keep tool calls but clear large results | |
| if msg.get("role") == "tool" or "tool" in str(msg.get("content", "")).lower(): | |
| # Keep tool metadata but truncate large results | |
| content = str(msg.get("content", "")) | |
| if len(content) > 500: | |
| msg_copy = msg.copy() | |
| msg_copy["content"] = content[:200] + "... [tool result truncated]" | |
| msg_copy["_tool_result_cleared"] = True | |
| cleared.append(msg_copy) | |
| else: | |
| cleared.append(msg) | |
| else: | |
| cleared.append(msg) | |
| return cleared | |
| async def compress_tool_output(self, tool_name: str, output: Dict[str, Any], max_length: int = 500) -> Dict[str, Any]: | |
| """ | |
| Compress tool output to reduce tokens. | |
| Args: | |
| tool_name: Name of the tool | |
| output: Tool output dict | |
| max_length: Max characters for compressed output | |
| Returns: | |
| Compressed output | |
| """ | |
| if tool_name == "web": | |
| # Compress web search results | |
| hits = output.get("results", []) | |
| if len(hits) > 5: | |
| # Keep only top 5 results | |
| output["results"] = hits[:5] | |
| output["_compressed"] = True | |
| output["_original_count"] = len(hits) | |
| elif tool_name == "rag": | |
| # Compress RAG results | |
| hits = output.get("results", []) | |
| if len(hits) > 5: | |
| output["results"] = hits[:5] | |
| output["_compressed"] = True | |
| output["_original_count"] = len(hits) | |
| # Summarize long text fields | |
| for key in ["text", "content", "snippet"]: | |
| if key in output and len(str(output[key])) > max_length: | |
| text = str(output[key]) | |
| output[key] = text[:max_length] + "..." | |
| output[f"{key}_compressed"] = True | |
| return output | |
| class ContextSelector: | |
| """Selects relevant context for agent steps.""" | |
| def __init__(self, llm_client): | |
| self.llm = llm_client | |
| async def select_relevant_memories(self, query: str, memories: List[Dict[str, Any]], limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Select most relevant memories for a query. | |
| Args: | |
| query: User query | |
| memories: List of memory dicts | |
| limit: Max memories to return | |
| Returns: | |
| Selected memories | |
| """ | |
| if not memories or len(memories) <= limit: | |
| return memories | |
| # Simple keyword-based selection (can be enhanced with embeddings) | |
| query_lower = query.lower() | |
| scored = [] | |
| for mem in memories: | |
| content = str(mem.get("content", "")).lower() | |
| score = sum(1 for word in query_lower.split() if word in content) | |
| scored.append((score, mem)) | |
| # Sort by score and return top N | |
| scored.sort(reverse=True, key=lambda x: x[0]) | |
| return [mem for score, mem in scored[:limit] if score > 0] | |
| def select_relevant_tools(self, query: str, available_tools: List[Dict[str, Any]], limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Select most relevant tools for a query. | |
| Args: | |
| query: User query | |
| available_tools: List of tool dicts with descriptions | |
| limit: Max tools to return | |
| Returns: | |
| Selected tools | |
| """ | |
| if not available_tools or len(available_tools) <= limit: | |
| return available_tools | |
| # Simple keyword matching (can be enhanced with semantic search) | |
| query_lower = query.lower() | |
| scored = [] | |
| for tool in available_tools: | |
| desc = str(tool.get("description", "")).lower() | |
| name = str(tool.get("name", "")).lower() | |
| score = sum(1 for word in query_lower.split() if word in desc or word in name) | |
| scored.append((score, tool)) | |
| scored.sort(reverse=True, key=lambda x: x[0]) | |
| return [tool for score, tool in scored[:limit]] | |
| class ContextIsolator: | |
| """Isolates context to prevent token bloat.""" | |
| def __init__(self): | |
| self.isolated_data: Dict[str, Any] = {} | |
| def isolate_tool_output(self, tool_name: str, output: Any, key: Optional[str] = None) -> str: | |
| """ | |
| Isolate tool output, storing it separately and returning a reference. | |
| Args: | |
| tool_name: Name of the tool | |
| output: Tool output | |
| key: Optional key for storage | |
| Returns: | |
| Reference string to use in context | |
| """ | |
| storage_key = key or f"{tool_name}_{int(time.time())}" | |
| self.isolated_data[storage_key] = { | |
| "tool": tool_name, | |
| "output": output, | |
| "timestamp": time.time() | |
| } | |
| return f"[ISOLATED:{storage_key}]" | |
| def get_isolated(self, key: str) -> Optional[Any]: | |
| """Retrieve isolated data by key.""" | |
| return self.isolated_data.get(key, {}).get("output") | |
| def clear_old_isolated(self, max_age_seconds: int = 3600): | |
| """Clear isolated data older than max_age_seconds.""" | |
| current_time = time.time() | |
| keys_to_remove = [ | |
| key for key, data in self.isolated_data.items() | |
| if current_time - data.get("timestamp", 0) > max_age_seconds | |
| ] | |
| for key in keys_to_remove: | |
| del self.isolated_data[key] | |
| class ContextEngineer: | |
| """Main context engineering service combining all strategies.""" | |
| def __init__(self, llm_client): | |
| self.scratchpad = ContextScratchpad() | |
| self.compressor = ContextCompressor(llm_client) | |
| self.selector = ContextSelector(llm_client) | |
| self.isolator = ContextIsolator() | |
| self.llm = llm_client | |
| def write_to_scratchpad(self, note: str, category: str = "general"): | |
| """Write to scratchpad.""" | |
| self.scratchpad.add_note(note, category) | |
| def save_plan(self, plan: str): | |
| """Save agent plan.""" | |
| self.scratchpad.set_plan(plan) | |
| def save_fact(self, fact: str): | |
| """Save key fact.""" | |
| self.scratchpad.add_fact(fact) | |
| def get_scratchpad_context(self, limit: int = 10) -> str: | |
| """Get relevant scratchpad context.""" | |
| return self.scratchpad.get_summary() | |
| async def compress_if_needed(self, messages: List[Dict[str, Any]], max_tokens: int = 8000, | |
| use_compaction: bool = True) -> List[Dict[str, Any]]: | |
| """ | |
| Compress messages if they exceed token limit. | |
| Uses Anthropic's compaction strategy: high-fidelity summarization | |
| preserving architectural decisions, unresolved issues, and implementation details. | |
| Args: | |
| messages: List of messages | |
| max_tokens: Token limit | |
| use_compaction: Use full compaction vs simple trimming | |
| Returns: | |
| Compressed messages | |
| """ | |
| # Rough token estimate (4 chars per token) | |
| total_chars = sum(len(str(m.get("content", ""))) for m in messages) | |
| estimated_tokens = total_chars // 4 | |
| if estimated_tokens > max_tokens: | |
| # First, try tool result clearing (safest form of compaction) | |
| cleared = self.compressor.clear_tool_results(messages) | |
| cleared_chars = sum(len(str(m.get("content", ""))) for m in cleared) | |
| cleared_tokens = cleared_chars // 4 | |
| if cleared_tokens <= max_tokens: | |
| return cleared | |
| # If still over limit, use full compaction | |
| if use_compaction and len(messages) > 10: | |
| return await self.compressor.compact_conversation(messages, preserve_recent=5, max_tokens=1000) | |
| else: | |
| # Fallback: simple trimming | |
| return self.compressor.trim_messages(messages, keep_first=2, keep_last=5) | |
| return messages | |
| async def select_context(self, query: str, available_context: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Select relevant context for a query. | |
| Args: | |
| query: User query | |
| available_context: Dict with keys like 'memories', 'tools', etc. | |
| Returns: | |
| Selected context dict | |
| """ | |
| selected = {} | |
| # Select memories | |
| if "memories" in available_context: | |
| selected["memories"] = await self.selector.select_relevant_memories( | |
| query, available_context["memories"] | |
| ) | |
| # Select tools | |
| if "tools" in available_context: | |
| selected["tools"] = self.selector.select_relevant_tools( | |
| query, available_context["tools"] | |
| ) | |
| return selected | |
| def isolate_large_output(self, tool_name: str, output: Any) -> str: | |
| """Isolate large tool output.""" | |
| return self.isolator.isolate_tool_output(tool_name, output) | |
| def get_isolated_context(self, key: str) -> Optional[Any]: | |
| """Get isolated context.""" | |
| return self.isolator.get_isolated(key) | |