""" ACE (Agentic Context Engineering) System with Ollama A self-improving AI agent system using local LLMs """ import json import os from datetime import datetime from typing import List, Dict, Optional, Literal from dataclasses import dataclass, asdict from enum import Enum import requests # ============================================================================ # CONFIGURATION # ============================================================================ class Config: """System configuration""" OLLAMA_BASE_URL = "http://localhost:11434" GENERATOR_MODEL = "aya" # Fast for generation REFLECTOR_MODEL = "aya" # Can use same or different CURATOR_MODEL = "aya" # Can use same or different PLAYBOOK_PATH = "emergency_playbook.json" TEMPERATURE = 0.7 MAX_TOKENS = 2000 # Update Config: # class Config: # """System configuration""" # OLLAMA_BASE_URL = "http://localhost:11434" # GENERATOR_MODEL = "llama3.1" # REFLECTOR_MODEL = "llama3.1" # CURATOR_MODEL = "llama3.1" # PLAYBOOK_PATH = "emergency_playbook.json" # TEMPERATURE = 0.3 # MAX_TOKENS = 4000 # Increased # ============================================================================ # DATA MODELS # ============================================================================ class TagType(str, Enum): HELPFUL = "helpful" HARMFUL = "harmful" NEUTRAL = "neutral" @dataclass class Bullet: """A knowledge item in the playbook""" id: str section: str content: str helpful: int = 0 harmful: int = 0 neutral: int = 0 created_at: str = "" updated_at: str = "" def __post_init__(self): if not self.created_at: self.created_at = datetime.now().isoformat() if not self.updated_at: self.updated_at = datetime.now().isoformat() def add_tag(self, tag: TagType): """Add a tag vote to this bullet""" if tag == TagType.HELPFUL: self.helpful += 1 elif tag == TagType.HARMFUL: self.harmful += 1 else: self.neutral += 1 self.updated_at = datetime.now().isoformat() def score(self) -> float: """Calculate bullet quality score""" total = self.helpful + self.harmful + self.neutral if total == 0: return 0.0 return (self.helpful - self.harmful) / total @dataclass class BulletTag: """Tag assignment for a bullet""" bullet_id: str tag: TagType reason: str @dataclass class GeneratorOutput: """Output from the Generator agent""" reasoning: List[str] bullet_ids: List[str] final_answer: str @dataclass class Reflection: """Output from the Reflector agent""" answer_quality: str strengths: List[str] weaknesses: List[str] bullet_tags: List[BulletTag] @dataclass class DeltaOperation: """A single playbook modification operation""" type: Literal["ADD", "UPDATE", "REMOVE"] section: str content: Optional[str] = None bullet_id: Optional[str] = None @dataclass class DeltaBatch: """Batch of playbook modifications""" reasoning: str operations: List[DeltaOperation] # ============================================================================ # PLAYBOOK MANAGEMENT # ============================================================================ class Playbook: """Manages the evolving knowledge base""" def __init__(self): self.bullets: Dict[str, Bullet] = {} self.sections: Dict[str, List[str]] = {} self._next_id = 1 def add_bullet(self, section: str, content: str) -> str: """Add a new bullet to the playbook""" bullet_id = f"B{self._next_id:04d}" self._next_id += 1 bullet = Bullet( id=bullet_id, section=section, content=content ) self.bullets[bullet_id] = bullet if section not in self.sections: self.sections[section] = [] self.sections[section].append(bullet_id) return bullet_id def update_bullet(self, bullet_id: str, content: str): """Update an existing bullet""" if bullet_id in self.bullets: self.bullets[bullet_id].content = content self.bullets[bullet_id].updated_at = datetime.now().isoformat() def remove_bullet(self, bullet_id: str): """Remove a bullet from the playbook""" if bullet_id in self.bullets: bullet = self.bullets[bullet_id] section = bullet.section del self.bullets[bullet_id] if section in self.sections: self.sections[section] = [ bid for bid in self.sections[section] if bid != bullet_id ] def update_bullet_tag(self, bullet_id: str, tag: TagType): """Add a tag to a bullet""" if bullet_id in self.bullets: self.bullets[bullet_id].add_tag(tag) def apply_delta(self, delta: DeltaBatch): """Apply a batch of modifications""" for op in delta.operations: if op.type == "ADD" and op.content: self.add_bullet(op.section, op.content) elif op.type == "UPDATE" and op.bullet_id and op.content: self.update_bullet(op.bullet_id, op.content) elif op.type == "REMOVE" and op.bullet_id: self.remove_bullet(op.bullet_id) def as_prompt(self) -> str: """Format playbook for inclusion in prompts""" if not self.bullets: return "No knowledge bullets available yet." lines = ["# Knowledge Playbook", ""] for section, bullet_ids in sorted(self.sections.items()): lines.append(f"## {section}") for bid in bullet_ids: bullet = self.bullets[bid] score = bullet.score() lines.append(f"- [{bid}] {bullet.content} (score: {score:.2f})") lines.append("") return "\n".join(lines) def stats(self) -> Dict: """Get playbook statistics""" total_bullets = len(self.bullets) total_tags = sum(b.helpful + b.harmful + b.neutral for b in self.bullets.values()) avg_score = sum(b.score() for b in self.bullets.values()) / total_bullets if total_bullets > 0 else 0 return { "total_bullets": total_bullets, "total_sections": len(self.sections), "total_tags": total_tags, "average_score": avg_score } def save(self, filepath: str): """Save playbook to disk""" data = { "bullets": {bid: asdict(b) for bid, b in self.bullets.items()}, "sections": self.sections, "next_id": self._next_id } with open(filepath, 'w') as f: json.dump(data, f, indent=2) @classmethod def load(cls, filepath: str) -> 'Playbook': """Load playbook from disk""" playbook = cls() if os.path.exists(filepath): with open(filepath, 'r') as f: data = json.load(f) playbook.bullets = { bid: Bullet(**bullet_data) for bid, bullet_data in data.get("bullets", {}).items() } playbook.sections = data.get("sections", {}) playbook._next_id = data.get("next_id", 1) return playbook # ============================================================================ # OLLAMA CLIENT # ============================================================================ class OllamaClient: """Client for interacting with Ollama""" def __init__(self, base_url: str = Config.OLLAMA_BASE_URL): self.base_url = base_url def generate( self, model: str, prompt: str, system: Optional[str] = None, temperature: float = Config.TEMPERATURE, max_tokens: int = Config.MAX_TOKENS ) -> str: """Generate completion from Ollama""" url = f"{self.base_url}/api/generate" payload = { "model": model, "prompt": prompt, "stream": False, "options": { "temperature": temperature, "num_predict": max_tokens, "num_ctx": 8192 # Added: Larger context window } } if system: payload["system"] = system try: response = requests.post(url, json=payload, timeout=180) # Increased timeout response.raise_for_status() return response.json()["response"] except Exception as e: print(f"Error calling Ollama: {e}") return "" def check_health(self) -> bool: """Check if Ollama is running""" try: response = requests.get(f"{self.base_url}/api/tags", timeout=5) return response.status_code == 200 except: return False # ============================================================================ # AGENTS # ============================================================================ class StateInitializer: """Initializes session state""" def execute(self, user_query: str, playbook: Playbook) -> Dict: """Initialize state for a new query""" return { "user_query": user_query, "playbook": playbook, "ground_truth": None, "generator_output": None, "reflector_output": None, "curator_output": None } class Generator: """Generates answers using the playbook""" def __init__(self, client: OllamaClient): self.client = client def execute(self, state: Dict) -> GeneratorOutput: """Generate an answer with reasoning""" user_query = state["user_query"] playbook = state["playbook"] # First, find relevant bullets bullet_context = [] for bid, bullet in playbook.bullets.items(): bullet_context.append(f"[{bid}] {bullet.content}") knowledge = "\n".join(bullet_context[:50]) # Use up to 50 most relevant # Simple, direct prompt prompt = f"""You are an emergency response expert. Question: {user_query} Available Knowledge: {knowledge} Provide a COMPLETE, detailed answer with ALL necessary steps. Be thorough and specific.""" response = self.client.generate( model=Config.GENERATOR_MODEL, prompt=prompt, system="Provide complete, detailed emergency instructions. Never truncate your answer.", temperature=0.3, max_tokens=4000 ) # Find relevant bullet IDs in response used_bullets = [] if response and isinstance(response, str): response_lower = response.lower() for bid, bullet in playbook.bullets.items(): bullet_preview = str(bullet.content)[:30].lower() if bid in response or bullet_preview in response_lower: used_bullets.append(bid) return GeneratorOutput( reasoning=["Analyzed emergency situation", "Found relevant protocols", "Provided complete response"], bullet_ids=used_bullets, final_answer=response if response else "Unable to generate response" ) class Reflector: """Reflects on generated output and tags bullets""" def __init__(self, client: OllamaClient): self.client = client def execute(self, state: Dict) -> Reflection: """Reflect on the generator's output""" user_query = state["user_query"] gen_output = state["generator_output"] playbook = state["playbook"] system_prompt = """You are a critical evaluator that assesses answer quality and tags knowledge bullets. INSTRUCTIONS: 1. Evaluate the quality of the generated answer 2. Identify what worked well and what didn't 3. Tag each referenced bullet as: - "helpful": Contributed positively to the answer - "harmful": Led to errors or poor quality - "neutral": Was referenced but had minimal impact Respond in JSON format: { "answer_quality": "excellent|good|fair|poor", "strengths": ["strength 1", "strength 2", ...], "weaknesses": ["weakness 1", "weakness 2", ...], "bullet_tags": [ {"bullet_id": "B0001", "tag": "helpful", "reason": "why"}, ... ] }""" # Build bullet context bullet_context = "\n".join([ f"[{bid}] {playbook.bullets[bid].content}" for bid in gen_output.bullet_ids if bid in playbook.bullets ]) prompt = f"""# User Query {user_query} # Referenced Bullets {bullet_context if bullet_context else "None"} # Generated Answer Reasoning: {gen_output.reasoning} Final Answer: {gen_output.final_answer} # Your Evaluation (JSON only):""" response = self.client.generate( model=Config.REFLECTOR_MODEL, prompt=prompt, system=system_prompt ) # Parse JSON response try: if "```json" in response: response = response.split("```json")[1].split("```")[0].strip() elif "```" in response: response = response.split("```")[1].split("```")[0].strip() data = json.loads(response) bullet_tags = [ BulletTag( bullet_id=bt["bullet_id"], tag=TagType(bt["tag"]), reason=bt.get("reason", "") ) for bt in data.get("bullet_tags", []) ] return Reflection( answer_quality=data.get("answer_quality", "unknown"), strengths=data.get("strengths", []), weaknesses=data.get("weaknesses", []), bullet_tags=bullet_tags ) except json.JSONDecodeError as e: print(f"JSON parse error: {e}") print(f"Raw response: {response}") return Reflection( answer_quality="error", strengths=[], weaknesses=["Failed to parse reflection"], bullet_tags=[] ) class Curator: """Curates the playbook based on reflections""" def __init__(self, client: OllamaClient): self.client = client def execute(self, state: Dict) -> DeltaBatch: """Generate playbook modifications""" user_query = state["user_query"] reflection = state["reflector_output"] playbook = state["playbook"] system_prompt = """You are a knowledge curator that improves the playbook. INSTRUCTIONS: 1. Review the reflection and current playbook 2. Decide what changes to make: - ADD: Create new bullets for missing knowledge - UPDATE: Improve existing bullets - REMOVE: Delete harmful or redundant bullets 3. Focus on bullets with consistent tags Respond in JSON format: { "reasoning": "Why these changes improve the playbook", "operations": [ {"type": "ADD", "section": "Section Name", "content": "New bullet content"}, {"type": "UPDATE", "section": "Section Name", "bullet_id": "B0001", "content": "Updated content"}, {"type": "REMOVE", "section": "Section Name", "bullet_id": "B0002"} ] }""" # Build tag summary tag_summary = "\n".join([ f"[{bt.bullet_id}] {bt.tag.value}: {bt.reason}" for bt in reflection.bullet_tags ]) prompt = f"""# Query Context User Query: {user_query} # Reflection Summary Quality: {reflection.answer_quality} Strengths: {reflection.strengths} Weaknesses: {reflection.weaknesses} # Bullet Tags {tag_summary if tag_summary else "No bullets were tagged"} # Current Playbook Stats {json.dumps(playbook.stats(), indent=2)} # Your Curation Plan (JSON only):""" response = self.client.generate( model=Config.CURATOR_MODEL, prompt=prompt, system=system_prompt ) # Parse JSON response try: if "```json" in response: response = response.split("```json")[1].split("```")[0].strip() elif "```" in response: response = response.split("```")[1].split("```")[0].strip() data = json.loads(response) operations = [ DeltaOperation( type=op["type"], section=op.get("section", "General"), content=op.get("content"), bullet_id=op.get("bullet_id") ) for op in data.get("operations", []) ] return DeltaBatch( reasoning=data.get("reasoning", ""), operations=operations ) except json.JSONDecodeError as e: print(f"JSON parse error: {e}") print(f"Raw response: {response}") return DeltaBatch( reasoning="Error parsing curation plan", operations=[] ) # ============================================================================ # ACE ORCHESTRATOR # ============================================================================ class ACEOrchestrator: """Orchestrates the full ACE cycle""" def __init__(self, playbook_path: str = Config.PLAYBOOK_PATH): self.client = OllamaClient() self.playbook = Playbook.load(playbook_path) self.playbook_path = playbook_path self.state_initializer = StateInitializer() self.generator = Generator(self.client) self.reflector = Reflector(self.client) self.curator = Curator(self.client) def run_cycle(self, user_query: str, verbose: bool = True) -> Dict: """Run one complete ACE cycle""" if verbose: print("\n" + "="*60) print("ACE CYCLE START") print("="*60) # 1. Initialize State if verbose: print("\n[1] Initializing state...") state = self.state_initializer.execute(user_query, self.playbook) # 2. Generate Answer if verbose: print("[2] Generating answer...") gen_output = self.generator.execute(state) state["generator_output"] = gen_output if verbose: print(f"\n--- GENERATOR OUTPUT ---") print(f"Reasoning: {gen_output.reasoning}") print(f"Bullets Used: {gen_output.bullet_ids}") print(f"Answer: {gen_output.final_answer}") # 3. Reflect if verbose: print("\n[3] Reflecting on output...") reflection = self.reflector.execute(state) state["reflector_output"] = reflection # Apply tags to playbook for bt in reflection.bullet_tags: self.playbook.update_bullet_tag(bt.bullet_id, bt.tag) if verbose: print(f"\n--- REFLECTION ---") print(f"Quality: {reflection.answer_quality}") print(f"Strengths: {reflection.strengths}") print(f"Weaknesses: {reflection.weaknesses}") print(f"Tags Applied: {len(reflection.bullet_tags)}") # 4. Curate Playbook if verbose: print("\n[4] Curating playbook...") delta = self.curator.execute(state) state["curator_output"] = delta # Apply delta to playbook self.playbook.apply_delta(delta) if verbose: print(f"\n--- CURATION ---") print(f"Reasoning: {delta.reasoning}") print(f"Operations: {len(delta.operations)}") for op in delta.operations: print(f" - {op.type}: {op.section}") # 5. Save Playbook self.playbook.save(self.playbook_path) if verbose: print(f"\n--- PLAYBOOK STATS ---") stats = self.playbook.stats() for key, value in stats.items(): print(f" {key}: {value}") print("\n" + "="*60) print("ACE CYCLE COMPLETE") print("="*60 + "\n") return { "answer": gen_output.final_answer, "quality": reflection.answer_quality, "operations": len(delta.operations), "stats": self.playbook.stats() } # ============================================================================ # MAIN ENTRY POINT # ============================================================================ def main(): """Main entry point for the ACE system""" print("ACE System with Ollama") print("=" * 60) # Check Ollama connection client = OllamaClient() if not client.check_health(): print("ERROR: Cannot connect to Ollama!") print(f"Make sure Ollama is running at {Config.OLLAMA_BASE_URL}") print("Start it with: ollama serve") return print("✓ Connected to Ollama") # Initialize orchestrator ace = ACEOrchestrator() print(f"✓ Loaded playbook: {ace.playbook.stats()}") # Interactive loop print("\nACE System Ready! (Type 'quit' to exit, 'stats' for playbook stats)") print("-" * 60) while True: try: user_input = input("\nYour query: ").strip() if not user_input: continue if user_input.lower() == 'quit': print("Goodbye!") break if user_input.lower() == 'stats': print("\nPlaybook Statistics:") print(json.dumps(ace.playbook.stats(), indent=2)) print("\nPlaybook Content:") print(ace.playbook.as_prompt()) continue # Run ACE cycle result = ace.run_cycle(user_input, verbose=True) except KeyboardInterrupt: print("\n\nGoodbye!") break except Exception as e: print(f"\nError: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()