alaatiger989's picture
Upload 3 files
8f37414 verified
"""
ACE (Agentic Context Engineering) System with Ollama
A self-improving AI agent system using local LLMs
"""
import json
import os
from datetime import datetime
from typing import List, Dict, Optional, Literal
from dataclasses import dataclass, asdict
from enum import Enum
import requests
# ============================================================================
# CONFIGURATION
# ============================================================================
class Config:
"""System configuration"""
OLLAMA_BASE_URL = "http://localhost:11434"
GENERATOR_MODEL = "aya" # Fast for generation
REFLECTOR_MODEL = "aya" # Can use same or different
CURATOR_MODEL = "aya" # Can use same or different
PLAYBOOK_PATH = "emergency_playbook.json"
TEMPERATURE = 0.7
MAX_TOKENS = 2000
# Update Config:
# class Config:
# """System configuration"""
# OLLAMA_BASE_URL = "http://localhost:11434"
# GENERATOR_MODEL = "llama3.1"
# REFLECTOR_MODEL = "llama3.1"
# CURATOR_MODEL = "llama3.1"
# PLAYBOOK_PATH = "emergency_playbook.json"
# TEMPERATURE = 0.3
# MAX_TOKENS = 4000 # Increased
# ============================================================================
# DATA MODELS
# ============================================================================
class TagType(str, Enum):
HELPFUL = "helpful"
HARMFUL = "harmful"
NEUTRAL = "neutral"
@dataclass
class Bullet:
"""A knowledge item in the playbook"""
id: str
section: str
content: str
helpful: int = 0
harmful: int = 0
neutral: int = 0
created_at: str = ""
updated_at: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
if not self.updated_at:
self.updated_at = datetime.now().isoformat()
def add_tag(self, tag: TagType):
"""Add a tag vote to this bullet"""
if tag == TagType.HELPFUL:
self.helpful += 1
elif tag == TagType.HARMFUL:
self.harmful += 1
else:
self.neutral += 1
self.updated_at = datetime.now().isoformat()
def score(self) -> float:
"""Calculate bullet quality score"""
total = self.helpful + self.harmful + self.neutral
if total == 0:
return 0.0
return (self.helpful - self.harmful) / total
@dataclass
class BulletTag:
"""Tag assignment for a bullet"""
bullet_id: str
tag: TagType
reason: str
@dataclass
class GeneratorOutput:
"""Output from the Generator agent"""
reasoning: List[str]
bullet_ids: List[str]
final_answer: str
@dataclass
class Reflection:
"""Output from the Reflector agent"""
answer_quality: str
strengths: List[str]
weaknesses: List[str]
bullet_tags: List[BulletTag]
@dataclass
class DeltaOperation:
"""A single playbook modification operation"""
type: Literal["ADD", "UPDATE", "REMOVE"]
section: str
content: Optional[str] = None
bullet_id: Optional[str] = None
@dataclass
class DeltaBatch:
"""Batch of playbook modifications"""
reasoning: str
operations: List[DeltaOperation]
# ============================================================================
# PLAYBOOK MANAGEMENT
# ============================================================================
class Playbook:
"""Manages the evolving knowledge base"""
def __init__(self):
self.bullets: Dict[str, Bullet] = {}
self.sections: Dict[str, List[str]] = {}
self._next_id = 1
def add_bullet(self, section: str, content: str) -> str:
"""Add a new bullet to the playbook"""
bullet_id = f"B{self._next_id:04d}"
self._next_id += 1
bullet = Bullet(
id=bullet_id,
section=section,
content=content
)
self.bullets[bullet_id] = bullet
if section not in self.sections:
self.sections[section] = []
self.sections[section].append(bullet_id)
return bullet_id
def update_bullet(self, bullet_id: str, content: str):
"""Update an existing bullet"""
if bullet_id in self.bullets:
self.bullets[bullet_id].content = content
self.bullets[bullet_id].updated_at = datetime.now().isoformat()
def remove_bullet(self, bullet_id: str):
"""Remove a bullet from the playbook"""
if bullet_id in self.bullets:
bullet = self.bullets[bullet_id]
section = bullet.section
del self.bullets[bullet_id]
if section in self.sections:
self.sections[section] = [
bid for bid in self.sections[section] if bid != bullet_id
]
def update_bullet_tag(self, bullet_id: str, tag: TagType):
"""Add a tag to a bullet"""
if bullet_id in self.bullets:
self.bullets[bullet_id].add_tag(tag)
def apply_delta(self, delta: DeltaBatch):
"""Apply a batch of modifications"""
for op in delta.operations:
if op.type == "ADD" and op.content:
self.add_bullet(op.section, op.content)
elif op.type == "UPDATE" and op.bullet_id and op.content:
self.update_bullet(op.bullet_id, op.content)
elif op.type == "REMOVE" and op.bullet_id:
self.remove_bullet(op.bullet_id)
def as_prompt(self) -> str:
"""Format playbook for inclusion in prompts"""
if not self.bullets:
return "No knowledge bullets available yet."
lines = ["# Knowledge Playbook", ""]
for section, bullet_ids in sorted(self.sections.items()):
lines.append(f"## {section}")
for bid in bullet_ids:
bullet = self.bullets[bid]
score = bullet.score()
lines.append(f"- [{bid}] {bullet.content} (score: {score:.2f})")
lines.append("")
return "\n".join(lines)
def stats(self) -> Dict:
"""Get playbook statistics"""
total_bullets = len(self.bullets)
total_tags = sum(b.helpful + b.harmful + b.neutral for b in self.bullets.values())
avg_score = sum(b.score() for b in self.bullets.values()) / total_bullets if total_bullets > 0 else 0
return {
"total_bullets": total_bullets,
"total_sections": len(self.sections),
"total_tags": total_tags,
"average_score": avg_score
}
def save(self, filepath: str):
"""Save playbook to disk"""
data = {
"bullets": {bid: asdict(b) for bid, b in self.bullets.items()},
"sections": self.sections,
"next_id": self._next_id
}
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
@classmethod
def load(cls, filepath: str) -> 'Playbook':
"""Load playbook from disk"""
playbook = cls()
if os.path.exists(filepath):
with open(filepath, 'r') as f:
data = json.load(f)
playbook.bullets = {
bid: Bullet(**bullet_data)
for bid, bullet_data in data.get("bullets", {}).items()
}
playbook.sections = data.get("sections", {})
playbook._next_id = data.get("next_id", 1)
return playbook
# ============================================================================
# OLLAMA CLIENT
# ============================================================================
class OllamaClient:
"""Client for interacting with Ollama"""
def __init__(self, base_url: str = Config.OLLAMA_BASE_URL):
self.base_url = base_url
def generate(
self,
model: str,
prompt: str,
system: Optional[str] = None,
temperature: float = Config.TEMPERATURE,
max_tokens: int = Config.MAX_TOKENS
) -> str:
"""Generate completion from Ollama"""
url = f"{self.base_url}/api/generate"
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
"num_ctx": 8192 # Added: Larger context window
}
}
if system:
payload["system"] = system
try:
response = requests.post(url, json=payload, timeout=180) # Increased timeout
response.raise_for_status()
return response.json()["response"]
except Exception as e:
print(f"Error calling Ollama: {e}")
return ""
def check_health(self) -> bool:
"""Check if Ollama is running"""
try:
response = requests.get(f"{self.base_url}/api/tags", timeout=5)
return response.status_code == 200
except:
return False
# ============================================================================
# AGENTS
# ============================================================================
class StateInitializer:
"""Initializes session state"""
def execute(self, user_query: str, playbook: Playbook) -> Dict:
"""Initialize state for a new query"""
return {
"user_query": user_query,
"playbook": playbook,
"ground_truth": None,
"generator_output": None,
"reflector_output": None,
"curator_output": None
}
class Generator:
"""Generates answers using the playbook"""
def __init__(self, client: OllamaClient):
self.client = client
def execute(self, state: Dict) -> GeneratorOutput:
"""Generate an answer with reasoning"""
user_query = state["user_query"]
playbook = state["playbook"]
# First, find relevant bullets
bullet_context = []
for bid, bullet in playbook.bullets.items():
bullet_context.append(f"[{bid}] {bullet.content}")
knowledge = "\n".join(bullet_context[:50]) # Use up to 50 most relevant
# Simple, direct prompt
prompt = f"""You are an emergency response expert.
Question: {user_query}
Available Knowledge:
{knowledge}
Provide a COMPLETE, detailed answer with ALL necessary steps. Be thorough and specific."""
response = self.client.generate(
model=Config.GENERATOR_MODEL,
prompt=prompt,
system="Provide complete, detailed emergency instructions. Never truncate your answer.",
temperature=0.3,
max_tokens=4000
)
# Find relevant bullet IDs in response
used_bullets = []
if response and isinstance(response, str):
response_lower = response.lower()
for bid, bullet in playbook.bullets.items():
bullet_preview = str(bullet.content)[:30].lower()
if bid in response or bullet_preview in response_lower:
used_bullets.append(bid)
return GeneratorOutput(
reasoning=["Analyzed emergency situation", "Found relevant protocols", "Provided complete response"],
bullet_ids=used_bullets,
final_answer=response if response else "Unable to generate response"
)
class Reflector:
"""Reflects on generated output and tags bullets"""
def __init__(self, client: OllamaClient):
self.client = client
def execute(self, state: Dict) -> Reflection:
"""Reflect on the generator's output"""
user_query = state["user_query"]
gen_output = state["generator_output"]
playbook = state["playbook"]
system_prompt = """You are a critical evaluator that assesses answer quality and tags knowledge bullets.
INSTRUCTIONS:
1. Evaluate the quality of the generated answer
2. Identify what worked well and what didn't
3. Tag each referenced bullet as:
- "helpful": Contributed positively to the answer
- "harmful": Led to errors or poor quality
- "neutral": Was referenced but had minimal impact
Respond in JSON format:
{
"answer_quality": "excellent|good|fair|poor",
"strengths": ["strength 1", "strength 2", ...],
"weaknesses": ["weakness 1", "weakness 2", ...],
"bullet_tags": [
{"bullet_id": "B0001", "tag": "helpful", "reason": "why"},
...
]
}"""
# Build bullet context
bullet_context = "\n".join([
f"[{bid}] {playbook.bullets[bid].content}"
for bid in gen_output.bullet_ids
if bid in playbook.bullets
])
prompt = f"""# User Query
{user_query}
# Referenced Bullets
{bullet_context if bullet_context else "None"}
# Generated Answer
Reasoning: {gen_output.reasoning}
Final Answer: {gen_output.final_answer}
# Your Evaluation (JSON only):"""
response = self.client.generate(
model=Config.REFLECTOR_MODEL,
prompt=prompt,
system=system_prompt
)
# Parse JSON response
try:
if "```json" in response:
response = response.split("```json")[1].split("```")[0].strip()
elif "```" in response:
response = response.split("```")[1].split("```")[0].strip()
data = json.loads(response)
bullet_tags = [
BulletTag(
bullet_id=bt["bullet_id"],
tag=TagType(bt["tag"]),
reason=bt.get("reason", "")
)
for bt in data.get("bullet_tags", [])
]
return Reflection(
answer_quality=data.get("answer_quality", "unknown"),
strengths=data.get("strengths", []),
weaknesses=data.get("weaknesses", []),
bullet_tags=bullet_tags
)
except json.JSONDecodeError as e:
print(f"JSON parse error: {e}")
print(f"Raw response: {response}")
return Reflection(
answer_quality="error",
strengths=[],
weaknesses=["Failed to parse reflection"],
bullet_tags=[]
)
class Curator:
"""Curates the playbook based on reflections"""
def __init__(self, client: OllamaClient):
self.client = client
def execute(self, state: Dict) -> DeltaBatch:
"""Generate playbook modifications"""
user_query = state["user_query"]
reflection = state["reflector_output"]
playbook = state["playbook"]
system_prompt = """You are a knowledge curator that improves the playbook.
INSTRUCTIONS:
1. Review the reflection and current playbook
2. Decide what changes to make:
- ADD: Create new bullets for missing knowledge
- UPDATE: Improve existing bullets
- REMOVE: Delete harmful or redundant bullets
3. Focus on bullets with consistent tags
Respond in JSON format:
{
"reasoning": "Why these changes improve the playbook",
"operations": [
{"type": "ADD", "section": "Section Name", "content": "New bullet content"},
{"type": "UPDATE", "section": "Section Name", "bullet_id": "B0001", "content": "Updated content"},
{"type": "REMOVE", "section": "Section Name", "bullet_id": "B0002"}
]
}"""
# Build tag summary
tag_summary = "\n".join([
f"[{bt.bullet_id}] {bt.tag.value}: {bt.reason}"
for bt in reflection.bullet_tags
])
prompt = f"""# Query Context
User Query: {user_query}
# Reflection Summary
Quality: {reflection.answer_quality}
Strengths: {reflection.strengths}
Weaknesses: {reflection.weaknesses}
# Bullet Tags
{tag_summary if tag_summary else "No bullets were tagged"}
# Current Playbook Stats
{json.dumps(playbook.stats(), indent=2)}
# Your Curation Plan (JSON only):"""
response = self.client.generate(
model=Config.CURATOR_MODEL,
prompt=prompt,
system=system_prompt
)
# Parse JSON response
try:
if "```json" in response:
response = response.split("```json")[1].split("```")[0].strip()
elif "```" in response:
response = response.split("```")[1].split("```")[0].strip()
data = json.loads(response)
operations = [
DeltaOperation(
type=op["type"],
section=op.get("section", "General"),
content=op.get("content"),
bullet_id=op.get("bullet_id")
)
for op in data.get("operations", [])
]
return DeltaBatch(
reasoning=data.get("reasoning", ""),
operations=operations
)
except json.JSONDecodeError as e:
print(f"JSON parse error: {e}")
print(f"Raw response: {response}")
return DeltaBatch(
reasoning="Error parsing curation plan",
operations=[]
)
# ============================================================================
# ACE ORCHESTRATOR
# ============================================================================
class ACEOrchestrator:
"""Orchestrates the full ACE cycle"""
def __init__(self, playbook_path: str = Config.PLAYBOOK_PATH):
self.client = OllamaClient()
self.playbook = Playbook.load(playbook_path)
self.playbook_path = playbook_path
self.state_initializer = StateInitializer()
self.generator = Generator(self.client)
self.reflector = Reflector(self.client)
self.curator = Curator(self.client)
def run_cycle(self, user_query: str, verbose: bool = True) -> Dict:
"""Run one complete ACE cycle"""
if verbose:
print("\n" + "="*60)
print("ACE CYCLE START")
print("="*60)
# 1. Initialize State
if verbose:
print("\n[1] Initializing state...")
state = self.state_initializer.execute(user_query, self.playbook)
# 2. Generate Answer
if verbose:
print("[2] Generating answer...")
gen_output = self.generator.execute(state)
state["generator_output"] = gen_output
if verbose:
print(f"\n--- GENERATOR OUTPUT ---")
print(f"Reasoning: {gen_output.reasoning}")
print(f"Bullets Used: {gen_output.bullet_ids}")
print(f"Answer: {gen_output.final_answer}")
# 3. Reflect
if verbose:
print("\n[3] Reflecting on output...")
reflection = self.reflector.execute(state)
state["reflector_output"] = reflection
# Apply tags to playbook
for bt in reflection.bullet_tags:
self.playbook.update_bullet_tag(bt.bullet_id, bt.tag)
if verbose:
print(f"\n--- REFLECTION ---")
print(f"Quality: {reflection.answer_quality}")
print(f"Strengths: {reflection.strengths}")
print(f"Weaknesses: {reflection.weaknesses}")
print(f"Tags Applied: {len(reflection.bullet_tags)}")
# 4. Curate Playbook
if verbose:
print("\n[4] Curating playbook...")
delta = self.curator.execute(state)
state["curator_output"] = delta
# Apply delta to playbook
self.playbook.apply_delta(delta)
if verbose:
print(f"\n--- CURATION ---")
print(f"Reasoning: {delta.reasoning}")
print(f"Operations: {len(delta.operations)}")
for op in delta.operations:
print(f" - {op.type}: {op.section}")
# 5. Save Playbook
self.playbook.save(self.playbook_path)
if verbose:
print(f"\n--- PLAYBOOK STATS ---")
stats = self.playbook.stats()
for key, value in stats.items():
print(f" {key}: {value}")
print("\n" + "="*60)
print("ACE CYCLE COMPLETE")
print("="*60 + "\n")
return {
"answer": gen_output.final_answer,
"quality": reflection.answer_quality,
"operations": len(delta.operations),
"stats": self.playbook.stats()
}
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
def main():
"""Main entry point for the ACE system"""
print("ACE System with Ollama")
print("=" * 60)
# Check Ollama connection
client = OllamaClient()
if not client.check_health():
print("ERROR: Cannot connect to Ollama!")
print(f"Make sure Ollama is running at {Config.OLLAMA_BASE_URL}")
print("Start it with: ollama serve")
return
print("✓ Connected to Ollama")
# Initialize orchestrator
ace = ACEOrchestrator()
print(f"✓ Loaded playbook: {ace.playbook.stats()}")
# Interactive loop
print("\nACE System Ready! (Type 'quit' to exit, 'stats' for playbook stats)")
print("-" * 60)
while True:
try:
user_input = input("\nYour query: ").strip()
if not user_input:
continue
if user_input.lower() == 'quit':
print("Goodbye!")
break
if user_input.lower() == 'stats':
print("\nPlaybook Statistics:")
print(json.dumps(ace.playbook.stats(), indent=2))
print("\nPlaybook Content:")
print(ace.playbook.as_prompt())
continue
# Run ACE cycle
result = ace.run_cycle(user_input, verbose=True)
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()