""" ACE System - Streamlit Web Interface Self-improving AI agent with beautiful UI """ import streamlit as st import json import os from datetime import datetime from typing import List, Dict, Optional, Literal from dataclasses import dataclass, asdict from enum import Enum import requests import plotly.graph_objects as go import plotly.express as px from collections import defaultdict # ============================================================================ # CONFIGURATION # ============================================================================ class Config: """System configuration""" OLLAMA_BASE_URL = "http://localhost:11434" # GENERATOR_MODEL = "llama3.2:3b" # REFLECTOR_MODEL = "llama3.2:3b" # CURATOR_MODEL = "llama3.2:3b" GENERATOR_MODEL = "aya" REFLECTOR_MODEL = "aya" CURATOR_MODEL = "aya" PLAYBOOK_PATH = "emergency_playbook.json" TEMPERATURE = 0.3 MAX_TOKENS = 4000 class Language: """Language settings""" MESSAGES = { "en": { "title": "🤖 ACE - Self-Improving AI Agent", "subtitle": "Agentic Context Engineering System", "sidebar_title": "⚙️ Settings", "language": "Language", "model_settings": "Model Settings", "playbook_info": "📚 Playbook Information", "total_bullets": "Total Knowledge Items", "sections": "Sections", "avg_score": "Average Quality Score", "total_tags": "Total Evaluations", "query_input": "💬 Ask me anything about emergencies...", "ask_button": "🚀 Get Answer", "clear_button": "🗑️ Clear History", "export_button": "💾 Export Playbook", "import_button": "📥 Import Playbook", "chat_history": "💬 Chat History", "answer": "Answer", "quality": "Quality", "reasoning": "Reasoning Process", "bullets_used": "Knowledge Used", "improvements": "System Improvements", "playbook_viz": "📊 Knowledge Evolution", "section_dist": "Knowledge Distribution by Section", "quality_trend": "Quality Score Trends", "connection_error": "❌ Cannot connect to Ollama!", "connection_success": "✅ Connected to Ollama", "processing": "🔄 Processing your query...", "generator_phase": "Generating answer...", "reflector_phase": "Evaluating quality...", "curator_phase": "Learning improvements...", "complete": "✅ Complete!", }, "ar": { "title": "🤖 ACE - نظام ذكي ذاتي التطوير", "subtitle": "نظام هندسة السياق الوكيل", "sidebar_title": "⚙️ الإعدادات", "language": "اللغة", "model_settings": "إعدادات النموذج", "playbook_info": "📚 معلومات دفتر المعرفة", "total_bullets": "إجمالي عناصر المعرفة", "sections": "الأقسام", "avg_score": "متوسط درجة الجودة", "total_tags": "إجمالي التقييمات", "query_input": "💬 اسألني أي شيء عن حالات الطوارئ...", "ask_button": "🚀 احصل على إجابة", "clear_button": "🗑️ مسح السجل", "export_button": "💾 تصدير دفتر المعرفة", "import_button": "📥 استيراد دفتر المعرفة", "chat_history": "💬 سجل المحادثة", "answer": "الإجابة", "quality": "الجودة", "reasoning": "عملية التفكير", "bullets_used": "المعرفة المستخدمة", "improvements": "تحسينات النظام", "playbook_viz": "📊 تطور المعرفة", "section_dist": "توزيع المعرفة حسب القسم", "quality_trend": "اتجاهات درجة الجودة", "connection_error": "❌ لا يمكن الاتصال بـ Ollama!", "connection_success": "✅ تم الاتصال بـ Ollama", "processing": "🔄 جاري معالجة استفسارك...", "generator_phase": "توليد الإجابة...", "reflector_phase": "تقييم الجودة...", "curator_phase": "تعلم التحسينات...", "complete": "✅ اكتمل!", } } @staticmethod def get(key, lang="en"): return Language.MESSAGES.get(lang, Language.MESSAGES["en"]).get(key, key) # ============================================================================ # DATA MODELS # ============================================================================ class TagType(str, Enum): HELPFUL = "helpful" HARMFUL = "harmful" NEUTRAL = "neutral" @dataclass class Bullet: id: str section: str content: str helpful: int = 0 harmful: int = 0 neutral: int = 0 created_at: str = "" updated_at: str = "" def __post_init__(self): if not self.created_at: self.created_at = datetime.now().isoformat() if not self.updated_at: self.updated_at = datetime.now().isoformat() def add_tag(self, tag: TagType): if tag == TagType.HELPFUL: self.helpful += 1 elif tag == TagType.HARMFUL: self.harmful += 1 else: self.neutral += 1 self.updated_at = datetime.now().isoformat() def score(self) -> float: total = self.helpful + self.harmful + self.neutral if total == 0: return 0.0 return (self.helpful - self.harmful) / total @dataclass class BulletTag: bullet_id: str tag: TagType reason: str @dataclass class GeneratorOutput: reasoning: List[str] bullet_ids: List[str] final_answer: str @dataclass class Reflection: answer_quality: str strengths: List[str] weaknesses: List[str] bullet_tags: List[BulletTag] @dataclass class DeltaOperation: type: Literal["ADD", "UPDATE", "REMOVE"] section: str content: Optional[str] = None bullet_id: Optional[str] = None @dataclass class DeltaBatch: reasoning: str operations: List[DeltaOperation] # ============================================================================ # PLAYBOOK MANAGEMENT # ============================================================================ class Playbook: def __init__(self): self.bullets: Dict[str, Bullet] = {} self.sections: Dict[str, List[str]] = {} self._next_id = 1 def add_bullet(self, section: str, content: str) -> str: bullet_id = f"B{self._next_id:04d}" self._next_id += 1 bullet = Bullet(id=bullet_id, section=section, content=content) self.bullets[bullet_id] = bullet if section not in self.sections: self.sections[section] = [] self.sections[section].append(bullet_id) return bullet_id def update_bullet(self, bullet_id: str, content: str): if bullet_id in self.bullets: self.bullets[bullet_id].content = content self.bullets[bullet_id].updated_at = datetime.now().isoformat() def remove_bullet(self, bullet_id: str): if bullet_id in self.bullets: bullet = self.bullets[bullet_id] section = bullet.section del self.bullets[bullet_id] if section in self.sections: self.sections[section] = [ bid for bid in self.sections[section] if bid != bullet_id ] def update_bullet_tag(self, bullet_id: str, tag: TagType): if bullet_id in self.bullets: self.bullets[bullet_id].add_tag(tag) def apply_delta(self, delta: DeltaBatch): for op in delta.operations: if op.type == "ADD" and op.content: self.add_bullet(op.section, op.content) elif op.type == "UPDATE" and op.bullet_id and op.content: self.update_bullet(op.bullet_id, op.content) elif op.type == "REMOVE" and op.bullet_id: self.remove_bullet(op.bullet_id) def as_prompt(self) -> str: if not self.bullets: return "No knowledge bullets available yet." lines = ["# Knowledge Playbook", ""] for section, bullet_ids in sorted(self.sections.items()): lines.append(f"## {section}") for bid in bullet_ids: bullet = self.bullets[bid] score = bullet.score() lines.append(f"- [{bid}] {bullet.content} (score: {score:.2f})") lines.append("") return "\n".join(lines) def stats(self) -> Dict: total_bullets = len(self.bullets) total_tags = sum(b.helpful + b.harmful + b.neutral for b in self.bullets.values()) avg_score = sum(b.score() for b in self.bullets.values()) / total_bullets if total_bullets > 0 else 0 return { "total_bullets": total_bullets, "total_sections": len(self.sections), "total_tags": total_tags, "average_score": avg_score } def save(self, filepath: str): data = { "bullets": {bid: asdict(b) for bid, b in self.bullets.items()}, "sections": self.sections, "next_id": self._next_id } with open(filepath, 'w') as f: json.dump(data, f, indent=2) @classmethod def load(cls, filepath: str) -> 'Playbook': playbook = cls() if os.path.exists(filepath): with open(filepath, 'r') as f: data = json.load(f) playbook.bullets = { bid: Bullet(**bullet_data) for bid, bullet_data in data.get("bullets", {}).items() } playbook.sections = data.get("sections", {}) playbook._next_id = data.get("next_id", 1) return playbook # ============================================================================ # OLLAMA CLIENT # ============================================================================ class OllamaClient: def __init__(self, base_url: str = Config.OLLAMA_BASE_URL): self.base_url = base_url def generate( self, model: str, prompt: str, system: Optional[str] = None, temperature: float = Config.TEMPERATURE, max_tokens: int = Config.MAX_TOKENS ) -> str: url = f"{self.base_url}/api/generate" payload = { "model": model, "prompt": prompt, "stream": False, "options": { "temperature": temperature, "num_predict": max_tokens, "num_ctx": 8192 } } if system: payload["system"] = system try: response = requests.post(url, json=payload, timeout=180) response.raise_for_status() return response.json()["response"] except Exception as e: return f"Error: {e}" def check_health(self) -> bool: try: response = requests.get(f"{self.base_url}/api/tags", timeout=5) return response.status_code == 200 except: return False # ============================================================================ # AGENTS # ============================================================================ class StateInitializer: def execute(self, user_query: str, playbook: Playbook) -> Dict: return { "user_query": user_query, "playbook": playbook, "ground_truth": None, "generator_output": None, "reflector_output": None, "curator_output": None } class Generator: def __init__(self, client: OllamaClient): self.client = client def execute(self, state: Dict, lang: str = "en") -> GeneratorOutput: user_query = state["user_query"] playbook = state["playbook"] bullet_context = [] for bid, bullet in playbook.bullets.items(): bullet_context.append(f"[{bid}] {bullet.content}") knowledge = "\n".join(bullet_context[:50]) if lang == "ar": system_prompt = "أنت خبير في الاستجابة للطوارئ. قدم تعليمات كاملة ومفصلة. لا تختصر إجابتك أبداً." prompt = f"""أنت خبير في الاستجابة للطوارئ. السؤال: {user_query} المعرفة المتاحة: {knowledge} قدم إجابة كاملة ومفصلة مع جميع الخطوات الضرورية. كن دقيقاً وشاملاً.""" else: system_prompt = "You are an emergency response expert. Provide complete, detailed emergency instructions. Never truncate your answer." prompt = f"""You are an emergency response expert. Question: {user_query} Available Knowledge: {knowledge} Provide a COMPLETE, detailed answer with ALL necessary steps. Be thorough and specific.""" response = self.client.generate( model=Config.GENERATOR_MODEL, prompt=prompt, system=system_prompt, temperature=0.3, max_tokens=4000 ) used_bullets = [] if response and isinstance(response, str): response_lower = response.lower() for bid, bullet in playbook.bullets.items(): bullet_preview = str(bullet.content)[:30].lower() if bid in response or bullet_preview in response_lower: used_bullets.append(bid) return GeneratorOutput( reasoning=["Analyzed emergency situation", "Found relevant protocols", "Provided complete response"], bullet_ids=used_bullets, final_answer=response if response else "Unable to generate response" ) class Reflector: def __init__(self, client: OllamaClient): self.client = client def execute(self, state: Dict) -> Reflection: user_query = state["user_query"] gen_output = state["generator_output"] playbook = state["playbook"] system_prompt = """You are a critical evaluator. Respond in JSON: { "answer_quality": "excellent|good|fair|poor", "strengths": ["strength 1", "strength 2"], "weaknesses": ["weakness 1"], "bullet_tags": [ {"bullet_id": "B0001", "tag": "helpful", "reason": "why"} ] }""" bullet_context = "\n".join([ f"[{bid}] {playbook.bullets[bid].content}" for bid in gen_output.bullet_ids if bid in playbook.bullets ]) prompt = f"""Query: {user_query} Bullets: {bullet_context if bullet_context else "None"} Answer: {gen_output.final_answer[:500]} Evaluate (JSON only):""" response = self.client.generate( model=Config.REFLECTOR_MODEL, prompt=prompt, system=system_prompt ) try: if "```json" in response: response = response.split("```json")[1].split("```")[0].strip() elif "```" in response: response = response.split("```")[1].split("```")[0].strip() data = json.loads(response) bullet_tags = [ BulletTag( bullet_id=bt["bullet_id"], tag=TagType(bt["tag"]), reason=bt.get("reason", "") ) for bt in data.get("bullet_tags", []) ] return Reflection( answer_quality=data.get("answer_quality", "unknown"), strengths=data.get("strengths", []), weaknesses=data.get("weaknesses", []), bullet_tags=bullet_tags ) except: return Reflection( answer_quality="good", strengths=["Provided answer"], weaknesses=[], bullet_tags=[] ) class Curator: def __init__(self, client: OllamaClient): self.client = client def execute(self, state: Dict) -> DeltaBatch: reflection = state["reflector_output"] # Simplified curation for demo operations = [] if reflection.answer_quality in ["fair", "poor"] and reflection.weaknesses: operations.append(DeltaOperation( type="ADD", section="Improvements", content=f"Address: {reflection.weaknesses[0]}" )) return DeltaBatch( reasoning="Learning from feedback", operations=operations ) # ============================================================================ # ACE ORCHESTRATOR # ============================================================================ class ACEOrchestrator: def __init__(self, playbook_path: str = Config.PLAYBOOK_PATH): self.client = OllamaClient() self.playbook = Playbook.load(playbook_path) self.playbook_path = playbook_path self.state_initializer = StateInitializer() self.generator = Generator(self.client) self.reflector = Reflector(self.client) self.curator = Curator(self.client) def run_cycle(self, user_query: str, lang: str = "en") -> Dict: state = self.state_initializer.execute(user_query, self.playbook) gen_output = self.generator.execute(state, lang) state["generator_output"] = gen_output reflection = self.reflector.execute(state) state["reflector_output"] = reflection for bt in reflection.bullet_tags: self.playbook.update_bullet_tag(bt.bullet_id, bt.tag) delta = self.curator.execute(state) state["curator_output"] = delta self.playbook.apply_delta(delta) self.playbook.save(self.playbook_path) return { "answer": gen_output.final_answer, "quality": reflection.answer_quality, "reasoning": gen_output.reasoning, "bullets_used": gen_output.bullet_ids, "strengths": reflection.strengths, "weaknesses": reflection.weaknesses, "operations": delta.operations, "stats": self.playbook.stats() } # ============================================================================ # STREAMLIT UI # ============================================================================ def init_session_state(): if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'ace' not in st.session_state: st.session_state.ace = ACEOrchestrator() if 'language' not in st.session_state: st.session_state.language = 'en' def create_quality_badge(quality: str): colors = { "excellent": "🟢", "good": "🟡", "fair": "🟠", "poor": "🔴" } return f"{colors.get(quality, '⚪')} {quality.upper()}" def plot_section_distribution(playbook: Playbook): section_counts = {section: len(bullets) for section, bullets in playbook.sections.items()} fig = go.Figure(data=[go.Bar( x=list(section_counts.keys()), y=list(section_counts.values()), marker_color='lightblue' )]) fig.update_layout( title="Knowledge Items by Section", xaxis_title="Section", yaxis_title="Count", height=400 ) return fig def plot_quality_scores(playbook: Playbook): scores = [bullet.score() for bullet in playbook.bullets.values()] fig = go.Figure(data=[go.Histogram( x=scores, nbinsx=20, marker_color='green' )]) fig.update_layout( title="Quality Score Distribution", xaxis_title="Score", yaxis_title="Frequency", height=400 ) return fig def main(): st.set_page_config( page_title="ACE System", page_icon="🤖", layout="wide", initial_sidebar_state="expanded" ) init_session_state() # Custom CSS st.markdown(""" """, unsafe_allow_html=True) # Sidebar with st.sidebar: lang = st.session_state.language st.title(Language.get("sidebar_title", lang)) # Language selector language_option = st.selectbox( Language.get("language", lang), options=["English", "العربية"], index=0 if lang == "en" else 1 ) st.session_state.language = "en" if language_option == "English" else "ar" lang = st.session_state.language st.divider() # Connection status client = OllamaClient() if client.check_health(): st.success(Language.get("connection_success", lang)) else: st.error(Language.get("connection_error", lang)) st.divider() # Playbook stats st.subheader(Language.get("playbook_info", lang)) stats = st.session_state.ace.playbook.stats() col1, col2 = st.columns(2) with col1: st.metric(Language.get("total_bullets", lang), stats["total_bullets"]) st.metric(Language.get("sections", lang), stats["total_sections"]) with col2: st.metric(Language.get("avg_score", lang), f"{stats['average_score']:.2f}") st.metric(Language.get("total_tags", lang), stats["total_tags"]) st.divider() # Export/Import if st.button(Language.get("export_button", lang), use_container_width=True): playbook_data = json.dumps({ "bullets": {bid: asdict(b) for bid, b in st.session_state.ace.playbook.bullets.items()}, "sections": st.session_state.ace.playbook.sections, "next_id": st.session_state.ace.playbook._next_id }, indent=2) st.download_button( "Download JSON", playbook_data, "playbook_export.json", "application/json" ) if st.button(Language.get("clear_button", lang), use_container_width=True): st.session_state.chat_history = [] st.rerun() # Main content lang = st.session_state.language st.title(Language.get("title", lang)) st.caption(Language.get("subtitle", lang)) # Tabs tab1, tab2 = st.tabs([Language.get("chat_history", lang), Language.get("playbook_viz", lang)]) with tab1: # Chat interface query = st.text_input( Language.get("query_input", lang), key="query_input", placeholder="e.g., What should I do if someone is choking?" ) if st.button(Language.get("ask_button", lang), type="primary", use_container_width=True): if query: with st.spinner(Language.get("processing", lang)): # Progress progress_bar = st.progress(0) status = st.empty() status.text(Language.get("generator_phase", lang)) progress_bar.progress(33) result = st.session_state.ace.run_cycle(query, lang) status.text(Language.get("reflector_phase", lang)) progress_bar.progress(66) status.text(Language.get("curator_phase", lang)) progress_bar.progress(100) status.text(Language.get("complete", lang)) st.session_state.chat_history.append({ "query": query, "result": result, "timestamp": datetime.now().isoformat() }) progress_bar.empty() status.empty() # Display chat history for i, chat in enumerate(reversed(st.session_state.chat_history)): with st.container(): st.markdown(f"
", unsafe_allow_html=True) st.markdown(f"", unsafe_allow_html=True) st.divider() with tab2: # Visualizations st.subheader(Language.get("playbook_viz", lang)) col1, col2 = st.columns(2) with col1: st.plotly_chart( plot_section_distribution(st.session_state.ace.playbook), use_container_width=True ) with col2: st.plotly_chart( plot_quality_scores(st.session_state.ace.playbook), use_container_width=True ) if __name__ == "__main__": main()