""" graph_merged_simple.py - Simplified 3-Tier Graph System ======================================================== Clean, maintainable 3-tier orchestration system. - Lite: Fast responses (30s) - Standard: Balanced quality (2min) - Full: Premium refinement (10min) Key Features: - No loops by design - Context-aware agents - Research mode with citations - Universal file tracking - Clear tier differentiation Author: AI Lab Team Last Updated: 2025-10-10 Version: 3.0 - Simplified Architecture """ # ============================================================================= # SECTION 1: IMPORTS # ============================================================================= import json import os import uuid import math import operator from typing import TypedDict, List, Dict, Optional, Annotated, Any from datetime import datetime # LangChain from langchain_openai import ChatOpenAI from langgraph.graph import StateGraph, END # Local modules import graph_config as cfg from context_manager import context_manager from artifact_registry import artifact_registry from memory_manager import memory_manager from logging_config import get_logger # Multi-language support from multi_language_support import ( detect_language, extract_code_blocks_multi_lang, execute_code, detect_requested_output_types_enhanced, write_script_multi_lang, LANGUAGES ) # Artifact generation import nbformat from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell import pandas as pd from docx import Document as DocxDocument # Setup log = get_logger(__name__) llm = ChatOpenAI(model="gpt-4o", temperature=0.4, max_retries=3, request_timeout=60) # Ensure directories os.makedirs(cfg.OUT_DIR, exist_ok=True) os.makedirs(cfg.USER_ARTIFACTS_DIR, exist_ok=True) log.info("=" * 60) log.info("SIMPLIFIED 3-TIER GRAPH SYSTEM INITIALIZED") log.info("=" * 60) # ============================================================================= # SECTION 2: STATE DEFINITION # ============================================================================= class AgentState(TypedDict): """Simplified state for 3-tier system.""" # Core inputs userInput: str session_id: str # Conversation context is_follow_up: bool conversation_context: Optional[Dict] # Execution mode execution_mode: str # research, coding, hybrid, simple_response preferred_tier: str # lite, standard, full # Memory and planning retrievedMemory: Optional[str] coreObjectivePrompt: str pmPlan: Optional[Dict] # Execution experimentCode: Optional[str] experimentResults: Optional[Dict] researchResults: Optional[Dict] # Output draftResponse: str qaFeedback: Optional[str] approved: bool # Workflow control execution_path: Annotated[List[str], operator.add] rework_cycles: int max_rework_cycles: int status_updates: Annotated[List[str], operator.add] # Budget tracking current_cost: float budget_limit: float # ============================================================================= # SECTION 3: HELPER FUNCTIONS # ============================================================================= def add_status(node_name: str, status: str) -> Dict[str, Any]: """Add status update.""" return { "status_updates": [f"{node_name}: {status}"], "execution_path": [node_name] } def parse_json_safe(text: str) -> Optional[Dict]: """Safe JSON parsing with fallbacks.""" if not text: return None # Try direct parse try: return json.loads(text) except: pass # Try to extract JSON block import re match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL) if match: try: return json.loads(match.group(1)) except: pass # Try to find any JSON object match = re.search(r'{.*}', text, re.DOTALL) if match: try: return json.loads(match.group(0)) except: pass return None def determine_execution_mode(user_input: str, context: Dict) -> str: """ Determine execution mode: research, coding, hybrid, or simple_response. Priority: 1. Research mode (if research keywords and no explicit code request) 2. Coding mode (if code keywords or explicit request) 3. Hybrid mode (research + code examples) 4. Simple response (quick questions) """ text_lower = user_input.lower() # Check for research indicators has_research = any(kw in text_lower for kw in cfg.RESEARCH_KEYWORDS) has_code_request = any(kw in text_lower for kw in cfg.CODE_KEYWORDS) has_no_code = any(kw in text_lower for kw in cfg.NO_CODE_KEYWORDS) # Research mode (no code) if has_research and not has_code_request: return "research" # Hybrid mode (research + code) if has_research and has_code_request: return "hybrid" # Coding mode if has_code_request or not has_no_code: # Check if it's actually a coding task if any(word in text_lower for word in ['create', 'build', 'implement', 'write', 'develop']): return "coding" # Simple response (questions, explanations) if has_no_code: return "simple_response" # Default to coding for ambiguous cases return "coding" def detect_language_priority(user_input: str, detected: str) -> str: """ Determine language with priority: explicit > detected > default. """ text_lower = user_input.lower() # Check for explicit preferences for lang, keywords in cfg.LANGUAGE_PREFERENCE_KEYWORDS.items(): if any(kw in text_lower for kw in keywords): log.info(f"🎯 Explicit language preference: {lang}") return lang # Use detected if not default if detected and detected != cfg.DEFAULT_LANGUAGE: log.info(f"🔍 Detected language: {detected}") return detected # Default log.info(f"📝 Using default language: {cfg.DEFAULT_LANGUAGE}") return cfg.DEFAULT_LANGUAGE # ============================================================================= # SECTION 4: CORE AGENT NODES # ============================================================================= def run_memory_retrieval(state: AgentState) -> Dict[str, Any]: """Retrieve relevant context from memory.""" log.info("--- MEMORY RETRIEVAL ---") user_input = state.get('userInput', '') session_id = state.get('session_id', 'default') # Get conversation context conv_context = context_manager.get_context(session_id, user_input) # Get vector memory try: memories = memory_manager.retrieve_relevant_memories(user_input) memory_context = "\n".join([f"- {m.page_content}" for m in memories]) if memories else "" except Exception as e: log.warning(f"Memory retrieval failed: {e}") memory_context = "" return { "retrievedMemory": memory_context, "is_follow_up": conv_context["is_follow_up"], "conversation_context": conv_context, **add_status("Memory", "Context retrieved") } def run_intent_clarification(state: AgentState) -> Dict[str, Any]: """Clarify user intent with conversation context.""" log.info("--- INTENT CLARIFICATION ---") user_input = state.get('userInput', '') conv_context = state.get('conversation_context', {}) memory = state.get('retrievedMemory', '') # Build context-aware prompt if conv_context.get("is_follow_up"): prompt = f"""You are clarifying a FOLLOW-UP request. {conv_context.get('context', '')} {conv_context.get('artifacts_context', '')} CURRENT FOLLOW-UP REQUEST: {user_input} Refine into clear objective that: 1. References previous conversation 2. Builds on existing work 3. Is specific and actionable REFINED OBJECTIVE (2-3 sentences):""" else: prompt = f"""Refine user request into clear objective. MEMORY CONTEXT: {memory} REQUEST: {user_input} REFINED OBJECTIVE (2-3 sentences):""" try: response = llm.invoke(prompt) objective = getattr(response, "content", "") or user_input except Exception as e: log.error(f"Intent clarification failed: {e}") objective = user_input # Determine execution mode execution_mode = determine_execution_mode(user_input, conv_context) log.info(f"📋 Execution mode: {execution_mode}") return { "coreObjectivePrompt": objective, "execution_mode": execution_mode, **add_status("Intent", f"Mode: {execution_mode}") } def run_planning_agent(state: AgentState) -> Dict[str, Any]: """Create execution plan.""" log.info("--- PLANNING ---") objective = state.get('coreObjectivePrompt', '') user_input = state.get('userInput', '') execution_mode = state.get('execution_mode', 'coding') prompt = f"""Create execution plan. OBJECTIVE: {objective} MODE: {execution_mode} Return JSON: {{ "plan_steps": ["step 1", "step 2", ...], "experiment_needed": true/false, "experiment_type": "script|notebook|word|excel|pdf", "estimated_calls": 3 }}""" try: response = llm.invoke(prompt) plan = parse_json_safe(getattr(response, "content", "")) except Exception as e: log.error(f"Planning failed: {e}") plan = None # Fallback plan if not plan or not isinstance(plan, dict): plan = { "plan_steps": ["Analyze request", "Create deliverable"], "experiment_needed": execution_mode in ["coding", "hybrid"], "experiment_type": "word" if execution_mode == "research" else "script", "estimated_calls": 3 } # Calculate cost calls = plan.get('estimated_calls', 3) cost_per_call = cfg.calculate_cost_per_call() plan['estimated_cost'] = round(calls * cost_per_call, 2) return { "pmPlan": plan, **add_status("Planning", f"Plan created ({len(plan.get('plan_steps', []))} steps)") } def run_experimenter_agent(state: AgentState) -> Dict[str, Any]: """Generate code or artifacts.""" log.info("--- EXPERIMENTER ---") plan = state.get('pmPlan', {}) or {} objective = state.get('coreObjectivePrompt', '') user_input = state.get('userInput', '') session_id = state.get('session_id', 'default') if not plan.get('experiment_needed'): return add_status("Experimenter", "Skipped - not needed") # Detect language and type detected = detect_requested_output_types_enhanced(objective) language = detect_language_priority(user_input, detected.get('language', 'python')) exp_type = plan.get('experiment_type', 'script') log.info(f"📝 Creating: {exp_type} in {language}") # Build prompt prompt = f"""Create {exp_type} artifact. OBJECTIVE: {objective} LANGUAGE: {language} REQUIREMENTS: - Production quality - Complete implementation - No placeholders - Include documentation Generate complete {exp_type}:""" try: response = llm.invoke(prompt) content = getattr(response, "content", "") except Exception as e: log.error(f"Generation failed: {e}") return { "experimentResults": {"success": False, "error": str(e)}, **add_status("Experimenter", "Failed") } # Create artifact try: if exp_type == "notebook": filepath = write_notebook(content) elif exp_type in ["word", "document"]: filepath = write_document(content) else: # script filepath = write_script_multi_lang(content, language, cfg.OUT_DIR) # Register artifact artifact_id, user_path = artifact_registry.register_artifact( filepath, metadata={ "type": exp_type, "session_id": session_id, "user_request": user_input, "language": language } ) results = { "success": True, "artifact_id": artifact_id, "path": user_path, "filename": os.path.basename(user_path), "type": exp_type, "language": language } return { "experimentCode": content, "experimentResults": results, **add_status("Experimenter", f"Created {exp_type}") } except Exception as e: log.error(f"Artifact creation failed: {e}") return { "experimentResults": {"success": False, "error": str(e)}, **add_status("Experimenter", "Artifact failed") } def run_research_agent(state: AgentState) -> Dict[str, Any]: """Conduct research with citations (placeholder for web search integration).""" log.info("--- RESEARCH ---") objective = state.get('coreObjectivePrompt', '') user_input = state.get('userInput', '') session_id = state.get('session_id', 'default') # Build research prompt prompt = f"""Conduct comprehensive research and provide factual information with citations. TOPIC: {objective} Provide: 1. Executive summary 2. Key findings (3-5 points) 3. Detailed analysis 4. Recommendations Format with clear sections and cite sources as [1], [2], etc. Include bibliography at end. RESEARCH RESPONSE:""" try: response = llm.invoke(prompt) content = getattr(response, "content", "") except Exception as e: log.error(f"Research failed: {e}") content = f"Research error: {e}" # Create document try: filepath = write_document(content, prefix="research_") # Register artifact artifact_id, user_path = artifact_registry.register_artifact( filepath, metadata={ "type": "research_document", "session_id": session_id, "user_request": user_input } ) results = { "content": content, "artifact_id": artifact_id, "path": user_path, "filename": os.path.basename(user_path), "source_count": content.count('[') # Rough citation count } return { "researchResults": results, **add_status("Research", f"Research complete ({results['source_count']} citations)") } except Exception as e: log.error(f"Research document creation failed: {e}") return { "researchResults": {"content": content, "error": str(e)}, **add_status("Research", "Document failed") } def run_synthesis_agent(state: AgentState) -> Dict[str, Any]: """Synthesize final response with artifact information.""" log.info("--- SYNTHESIS ---") objective = state.get('coreObjectivePrompt', '') user_input = state.get('userInput', '') exp_results = state.get('experimentResults', {}) research_results = state.get('researchResults', {}) session_id = state.get('session_id', 'default') # Build context context_parts = [f"USER REQUEST: {user_input}", f"OBJECTIVE: {objective}"] # Add experiment results if exp_results and exp_results.get('success'): context_parts.append(f"CREATED: {exp_results.get('type')} - {exp_results.get('filename')}") # Add research results if research_results and research_results.get('content'): context_parts.append(f"RESEARCH: {research_results.get('filename')}") context = "\n".join(context_parts) # Generate response prompt = f"""Create final response for user. {context} Requirements: - Explain what was created - Describe how to use it - Highlight key features - Provide next steps Create clear, helpful response:""" try: response = llm.invoke(prompt) final_text = getattr(response, "content", "") except Exception as e: log.error(f"Synthesis failed: {e}") final_text = f"Task completed. {context}" # Add file information manifest = artifact_registry.get_download_manifest(session_id) if manifest['total_files'] > 0: final_text += f"\n\n---\n\n## 📁 Your Files\n\n" final_text += f"**{manifest['total_files']} file(s) created:**\n\n" for file in manifest['files']: final_text += f"- **{file['filename']}** ({file['size_kb']} KB)\n" final_text += f" 📍 `{file['path']}`\n\n" # Track in context manager artifacts = [f['filename'] for f in manifest['files']] context_manager.add_exchange( session_id=session_id, user_message=user_input, assistant_response=final_text, artifacts=artifacts, metadata={ "tier": state.get('preferred_tier'), "cost": state.get('current_cost', 0.0) } ) return { "draftResponse": final_text, **add_status("Synthesis", "Response complete") } def run_qa_agent(state: AgentState) -> Dict[str, Any]: """Quality assurance review.""" log.info("--- QA REVIEW ---") user_input = state.get('userInput', '') draft = state.get('draftResponse', '') rework_cycles = state.get('rework_cycles', 0) max_rework = state.get('max_rework_cycles', 0) # Skip QA if no reworks allowed if max_rework == 0: return { "approved": True, **add_status("QA", "Approved (Lite tier)") } # CRITICAL FIX: Check if at limit BEFORE trying to rework if rework_cycles >= max_rework: log.warning(f"Rework limit reached ({rework_cycles}/{max_rework}), auto-approving") return { "approved": True, **add_status("QA", f"Approved (limit reached: {rework_cycles}/{max_rework})") } # QA review prompt = f"""Review this response against user request. REQUEST: {user_input} RESPONSE: {draft[:1000]} Is this complete and satisfactory? Reply APPROVED or provide specific feedback. REVIEW:""" try: response = llm.invoke(prompt) content = getattr(response, "content", "") if "APPROVED" in content.upper(): return { "approved": True, **add_status("QA", "Approved") } else: # Increment rework counter new_cycles = rework_cycles + 1 log.info(f"QA requesting rework (cycle {new_cycles}/{max_rework})") return { "approved": False, "qaFeedback": content[:500], "rework_cycles": new_cycles, **add_status("QA", f"Rework needed (cycle {new_cycles}/{max_rework})") } except Exception as e: log.error(f"QA failed: {e}") return { "approved": True, # Approve on error **add_status("QA", "Approved (error)") } def run_observer_agent(state: AgentState) -> Dict[str, Any]: """Performance monitoring (Full tier only).""" log.info("--- OBSERVER ---") execution_path = state.get('execution_path', []) cost = state.get('current_cost', 0.0) rework_cycles = state.get('rework_cycles', 0) metrics = { "execution_length": len(execution_path), "total_cost": cost, "rework_cycles": rework_cycles, "efficiency": "good" if rework_cycles <= 1 else "needs improvement" } log.info(f"📊 Metrics: {metrics}") return add_status("Observer", f"Monitored - {metrics['efficiency']}") def run_archive_agent(state: AgentState) -> Dict[str, Any]: """Archive successful execution to memory.""" log.info("--- ARCHIVE ---") objective = state.get('coreObjectivePrompt', '') draft = state.get('draftResponse', '') try: memory_manager.add_to_memory( f"Task: {objective}\nResult: {draft[:500]}", {"timestamp": datetime.utcnow().isoformat()} ) except Exception as e: log.warning(f"Archive failed: {e}") return add_status("Archive", "Saved to memory") # ============================================================================= # SECTION 5: ARTIFACT CREATION HELPERS # ============================================================================= def write_notebook(content: str) -> str: """Create Jupyter notebook.""" import re # Extract code blocks code_blocks = re.findall(r'```python\s*(.*?)\s*```', content, re.DOTALL) md_parts = re.split(r'```python\s*.*?\s*```', content, flags=re.DOTALL) # Build notebook nb = new_notebook() cells = [] for i in range(max(len(md_parts), len(code_blocks))): if i < len(md_parts) and md_parts[i].strip(): cells.append(new_markdown_cell(md_parts[i].strip())) if i < len(code_blocks) and code_blocks[i].strip(): cells.append(new_code_cell(code_blocks[i].strip())) if not cells: cells = [new_markdown_cell("# Notebook\n\n" + content)] nb['cells'] = cells # Save filename = f"notebook_{uuid.uuid4().hex[:8]}.ipynb" filepath = os.path.join(cfg.OUT_DIR, filename) nbformat.write(nb, filepath) return filepath def write_document(content: str, prefix: str = "document_") -> str: """Create Word document.""" doc = DocxDocument() for para in content.split("\n\n"): if para.strip(): doc.add_paragraph(para.strip()) filename = f"{prefix}{uuid.uuid4().hex[:8]}.docx" filepath = os.path.join(cfg.OUT_DIR, filename) doc.save(filepath) return filepath # ============================================================================= # SECTION 6: ROUTING LOGIC # ============================================================================= def route_by_tier(state: AgentState) -> str: """Route to appropriate tier-specific graph.""" tier = cfg.validate_tier(state.get('preferred_tier', cfg.DEFAULT_TIER)) log.info(f"🎯 Routing to: {tier} tier") return tier def should_rework(state: AgentState) -> str: """Determine if QA approved or needs rework.""" if state.get('approved'): # Route based on tier tier = state.get('preferred_tier', cfg.DEFAULT_TIER) if cfg.should_include_monitoring(tier): return "observer" else: return "archive" else: # CRITICAL FIX: Check the rework cycles BEFORE incrementing cycles = state.get('rework_cycles', 0) max_cycles = state.get('max_rework_cycles', 0) # Add 1 because we're about to do another rework if cycles + 1 >= max_cycles: log.warning(f"Rework limit reached ({cycles + 1}/{max_cycles}), forcing approval") return "archive" # Force completion else: log.info(f"Rework cycle {cycles + 1}/{max_cycles}") return "planning" # Rework def route_execution_mode(state: AgentState) -> str: """Route based on execution mode.""" mode = state.get('execution_mode', 'coding') if mode == "research": return "research" elif mode in ["coding", "hybrid"]: return "experimenter" else: # simple_response return "synthesis" # ============================================================================= # SECTION 7: TIER-SPECIFIC GRAPHS # ============================================================================= def build_lite_graph() -> StateGraph: """Lite tier: Fast, simple responses.""" workflow = StateGraph(AgentState) # Nodes workflow.add_node("memory", run_memory_retrieval) workflow.add_node("synthesis", run_synthesis_agent) workflow.add_node("archive", run_archive_agent) # Flow workflow.set_entry_point("memory") workflow.add_edge("memory", "synthesis") workflow.add_edge("synthesis", "archive") workflow.add_edge("archive", END) return workflow def build_standard_graph() -> StateGraph: """Standard tier: Balanced quality and speed.""" workflow = StateGraph(AgentState) # Nodes workflow.add_node("memory", run_memory_retrieval) workflow.add_node("intent", run_intent_clarification) workflow.add_node("planning", run_planning_agent) workflow.add_node("experimenter", run_experimenter_agent) workflow.add_node("research", run_research_agent) workflow.add_node("synthesis", run_synthesis_agent) workflow.add_node("qa", run_qa_agent) workflow.add_node("archive", run_archive_agent) # Flow workflow.set_entry_point("memory") workflow.add_edge("memory", "intent") workflow.add_edge("intent", "planning") # Route by execution mode workflow.add_conditional_edges( "planning", route_execution_mode, { "experimenter": "experimenter", "research": "research", "synthesis": "synthesis" } ) workflow.add_edge("experimenter", "synthesis") workflow.add_edge("research", "synthesis") workflow.add_edge("synthesis", "qa") # QA routing (allows 1 rework) workflow.add_conditional_edges( "qa", should_rework, { "archive": "archive", "planning": "planning" } ) workflow.add_edge("archive", END) return workflow def build_full_graph() -> StateGraph: """Full tier: Premium quality with monitoring.""" workflow = StateGraph(AgentState) # Nodes (includes observer) workflow.add_node("memory", run_memory_retrieval) workflow.add_node("intent", run_intent_clarification) workflow.add_node("planning", run_planning_agent) workflow.add_node("experimenter", run_experimenter_agent) workflow.add_node("research", run_research_agent) workflow.add_node("synthesis", run_synthesis_agent) workflow.add_node("qa", run_qa_agent) workflow.add_node("observer", run_observer_agent) workflow.add_node("archive", run_archive_agent) # Flow workflow.set_entry_point("memory") workflow.add_edge("memory", "intent") workflow.add_edge("intent", "planning") # Route by execution mode workflow.add_conditional_edges( "planning", route_execution_mode, { "experimenter": "experimenter", "research": "research", "synthesis": "synthesis" } ) workflow.add_edge("experimenter", "synthesis") workflow.add_edge("research", "synthesis") workflow.add_edge("synthesis", "qa") # QA routing (allows 3 reworks) workflow.add_conditional_edges( "qa", should_rework, { "observer": "observer", "archive": "archive", "planning": "planning" } ) workflow.add_edge("observer", "archive") workflow.add_edge("archive", END) return workflow # ============================================================================= # SECTION 8: GRAPH COMPILATION (UPDATED) # ============================================================================= # Build tier-specific graphs lite_workflow = build_lite_graph() standard_workflow = build_standard_graph() full_workflow = build_full_graph() # Compile with explicit recursion limits to prevent infinite loops lite_app = lite_workflow.compile() standard_app = standard_workflow.compile() full_app = full_workflow.compile() # Default app (standard) main_app = standard_app # CRITICAL: Set maximum recursion limits in config CONFIG_RECURSION_LIMITS = { "lite": 10, # Memory → Synthesis → Archive (3 nodes + buffer) "standard": 25, # Full path + 1 rework: ~12-14 nodes "full": 100 # Full path + 3 reworks: ~20-24 nodes } log.info("=" * 60) log.info("✅ GRAPHS COMPILED SUCCESSFULLY") log.info(f" - Lite: Memory → Synthesis → Archive (limit: {CONFIG_RECURSION_LIMITS['lite']})") log.info(f" - Standard: Full path + 1 rework (limit: {CONFIG_RECURSION_LIMITS['standard']})") log.info(f" - Full: Full path + 3 reworks (limit: {CONFIG_RECURSION_LIMITS['full']})") log.info("=" * 60) # ============================================================================= # SECTION 9: EXECUTION ENTRY POINT # ============================================================================= def execute_request(user_input: str, session_id: str = None, preferred_tier: str = None) -> Dict[str, Any]: """ Main execution entry point. Args: user_input: User's request session_id: Session identifier preferred_tier: lite, standard, or full Returns: Final state dict """ # Initialize session_id = session_id or uuid.uuid4().hex[:8] preferred_tier = cfg.validate_tier(preferred_tier or cfg.DEFAULT_TIER) # Get tier config tier_config = cfg.get_tier_config(preferred_tier) # Build initial state initial_state = { "userInput": user_input, "session_id": session_id, "preferred_tier": preferred_tier, "is_follow_up": False, "conversation_context": None, "execution_mode": "coding", "retrievedMemory": None, "coreObjectivePrompt": "", "pmPlan": None, "experimentCode": None, "experimentResults": None, "researchResults": None, "draftResponse": "", "qaFeedback": None, "approved": False, "execution_path": [], "rework_cycles": 0, "max_rework_cycles": tier_config["qa_rework_cycles"], "status_updates": [], "current_cost": 0.0, "budget_limit": tier_config["max_cost"] or float('inf') } # Select appropriate graph and set recursion limit # CRITICAL: Recursion limits prevent infinite loops if preferred_tier == cfg.TIER_LITE: app = lite_app recursion_limit = 10 # Lite: ~3-5 nodes elif preferred_tier == cfg.TIER_FULL: app = full_app recursion_limit = 100 # Full: up to 3 reworks = ~40 nodes max else: app = standard_app recursion_limit = 30 # Standard: up to 1 rework = ~15-20 nodes log.info(f"🚀 Executing with {preferred_tier} tier (recursion_limit={recursion_limit})") # Execute graph with recursion limit config try: config = {"recursion_limit": recursion_limit} final_state = app.invoke(initial_state, config=config) return final_state except Exception as e: log.exception(f"Execution failed: {e}") return { **initial_state, "draftResponse": f"❌ Execution error: {e}", "approved": False } # ============================================================================= # SECTION 10: LEGACY COMPATIBILITY # ============================================================================= # For backward compatibility with existing app_gradio.py def apply_upgrades() -> bool: """ Legacy function for compatibility. In the simplified system, graphs are already "upgraded". """ log.info("✅ Simplified graphs already active (no upgrade needed)") return True # Triage and planner apps for initial estimation def build_triage_app(): """Simple triage for greeting detection.""" workflow = StateGraph(AgentState) def triage_node(state: AgentState) -> Dict[str, Any]: user_input = state.get('userInput', '').lower() greetings = ['hello', 'hi', 'hey', 'greetings'] if any(g in user_input for g in greetings) and len(user_input.split()) < 5: return { "draftResponse": "Hello! How can I help you today?", **add_status("Triage", "Greeting detected") } return add_status("Triage", "Task detected") workflow.add_node("triage", triage_node) workflow.set_entry_point("triage") workflow.add_edge("triage", END) return workflow.compile() def build_planner_app(): """Quick planner for cost estimation.""" workflow = StateGraph(AgentState) def planner_node(state: AgentState) -> Dict[str, Any]: user_input = state.get('userInput', '') prompt = f"""Quick estimate for: {user_input} Return JSON: {{ "plan_steps": ["step 1", "step 2", "step 3"], "estimated_llm_calls_per_loop": 3, "max_loops_initial": 3 }}""" try: response = llm.invoke(prompt) plan = parse_json_safe(getattr(response, "content", "")) except: plan = None if not plan: plan = { "plan_steps": ["Analyze request", "Create solution", "Review quality"], "estimated_llm_calls_per_loop": 3, "max_loops_initial": 3 } # Cost calculation calls = plan.get('estimated_llm_calls_per_loop', 3) cost_per_loop = cfg.calculate_cost_per_call() * calls total_loops = plan.get('max_loops_initial', 3) + 1 plan['cost_per_loop_usd'] = round(cost_per_loop, 4) plan['estimated_cost_usd'] = round(cost_per_loop * total_loops, 2) return { "pmPlan": plan, **add_status("Planner", "Estimate created") } workflow.add_node("planner", planner_node) workflow.set_entry_point("planner") workflow.add_edge("planner", END) return workflow.compile() # Create triage and planner apps triage_app = build_triage_app() planner_app = build_planner_app() # ============================================================================= # SECTION 11: EXPORTS # ============================================================================= __all__ = [ # State 'AgentState', # Main execution 'execute_request', # Tier-specific apps 'lite_app', 'standard_app', 'full_app', 'main_app', # Helper apps 'triage_app', 'planner_app', # Agent nodes (for testing) 'run_memory_retrieval', 'run_intent_clarification', 'run_planning_agent', 'run_experimenter_agent', 'run_research_agent', 'run_synthesis_agent', 'run_qa_agent', 'run_observer_agent', 'run_archive_agent', # Helpers 'parse_json_safe', 'determine_execution_mode', 'detect_language_priority', # Legacy compatibility 'apply_upgrades', # Configuration constants (re-export) 'INITIAL_MAX_REWORK_CYCLES', 'BUDGET_BUFFER_MULTIPLIER', 'MAX_COST_MULTIPLIER', ] # ============================================================================= # INITIALIZATION LOG # ============================================================================= log.info("=" * 60) log.info("SIMPLIFIED 3-TIER GRAPH SYSTEM READY") log.info("=" * 60) log.info("Available tiers:") for tier_name, tier_conf in cfg.TIER_CONFIGS.items(): log.info(f" • {tier_conf['name']}: {tier_conf['description']}") log.info("=" * 60) log.info("Features enabled:") log.info(" ✅ Conversation context tracking") log.info(" ✅ Universal file access") log.info(" ✅ Research mode with citations") log.info(" ✅ Multi-language support") log.info(" ✅ No-loop guarantee") log.info("=" * 60)