Spaces:
Paused
Paused
| """ | |
| graph_merged_simple.py - Simplified 3-Tier Graph System | |
| ======================================================== | |
| Clean, maintainable 3-tier orchestration system. | |
| - Lite: Fast responses (30s) | |
| - Standard: Balanced quality (2min) | |
| - Full: Premium refinement (10min) | |
| Key Features: | |
| - No loops by design | |
| - Context-aware agents | |
| - Research mode with citations | |
| - Universal file tracking | |
| - Clear tier differentiation | |
| Author: AI Lab Team | |
| Last Updated: 2025-10-10 | |
| Version: 3.0 - Simplified Architecture | |
| """ | |
| # ============================================================================= | |
| # SECTION 1: IMPORTS | |
| # ============================================================================= | |
| import json | |
| import os | |
| import uuid | |
| import math | |
| import operator | |
| from typing import TypedDict, List, Dict, Optional, Annotated, Any | |
| from datetime import datetime | |
| # LangChain | |
| from langchain_openai import ChatOpenAI | |
| from langgraph.graph import StateGraph, END | |
| # Local modules | |
| import graph_config as cfg | |
| from context_manager import context_manager | |
| from artifact_registry import artifact_registry | |
| from memory_manager import memory_manager | |
| from logging_config import get_logger | |
| # Multi-language support | |
| from multi_language_support import ( | |
| detect_language, | |
| extract_code_blocks_multi_lang, | |
| execute_code, | |
| detect_requested_output_types_enhanced, | |
| write_script_multi_lang, | |
| LANGUAGES | |
| ) | |
| # Artifact generation | |
| import nbformat | |
| from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell | |
| import pandas as pd | |
| from docx import Document as DocxDocument | |
| # Setup | |
| log = get_logger(__name__) | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0.4, max_retries=3, request_timeout=60) | |
| # Ensure directories | |
| os.makedirs(cfg.OUT_DIR, exist_ok=True) | |
| os.makedirs(cfg.USER_ARTIFACTS_DIR, exist_ok=True) | |
| log.info("=" * 60) | |
| log.info("SIMPLIFIED 3-TIER GRAPH SYSTEM INITIALIZED") | |
| log.info("=" * 60) | |
| # ============================================================================= | |
| # SECTION 2: STATE DEFINITION | |
| # ============================================================================= | |
| class AgentState(TypedDict): | |
| """Simplified state for 3-tier system.""" | |
| # Core inputs | |
| userInput: str | |
| session_id: str | |
| # Conversation context | |
| is_follow_up: bool | |
| conversation_context: Optional[Dict] | |
| # Execution mode | |
| execution_mode: str # research, coding, hybrid, simple_response | |
| preferred_tier: str # lite, standard, full | |
| # Memory and planning | |
| retrievedMemory: Optional[str] | |
| coreObjectivePrompt: str | |
| pmPlan: Optional[Dict] | |
| # Execution | |
| experimentCode: Optional[str] | |
| experimentResults: Optional[Dict] | |
| researchResults: Optional[Dict] | |
| # Output | |
| draftResponse: str | |
| qaFeedback: Optional[str] | |
| approved: bool | |
| # Workflow control | |
| execution_path: Annotated[List[str], operator.add] | |
| rework_cycles: int | |
| max_rework_cycles: int | |
| status_updates: Annotated[List[str], operator.add] | |
| # Budget tracking | |
| current_cost: float | |
| budget_limit: float | |
| # ============================================================================= | |
| # SECTION 3: HELPER FUNCTIONS | |
| # ============================================================================= | |
| def add_status(node_name: str, status: str) -> Dict[str, Any]: | |
| """Add status update.""" | |
| return { | |
| "status_updates": [f"{node_name}: {status}"], | |
| "execution_path": [node_name] | |
| } | |
| def parse_json_safe(text: str) -> Optional[Dict]: | |
| """Safe JSON parsing with fallbacks.""" | |
| if not text: | |
| return None | |
| # Try direct parse | |
| try: | |
| return json.loads(text) | |
| except: | |
| pass | |
| # Try to extract JSON block | |
| import re | |
| match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL) | |
| if match: | |
| try: | |
| return json.loads(match.group(1)) | |
| except: | |
| pass | |
| # Try to find any JSON object | |
| match = re.search(r'{.*}', text, re.DOTALL) | |
| if match: | |
| try: | |
| return json.loads(match.group(0)) | |
| except: | |
| pass | |
| return None | |
| def determine_execution_mode(user_input: str, context: Dict) -> str: | |
| """ | |
| Determine execution mode: research, coding, hybrid, or simple_response. | |
| Priority: | |
| 1. Research mode (if research keywords and no explicit code request) | |
| 2. Coding mode (if code keywords or explicit request) | |
| 3. Hybrid mode (research + code examples) | |
| 4. Simple response (quick questions) | |
| """ | |
| text_lower = user_input.lower() | |
| # Check for research indicators | |
| has_research = any(kw in text_lower for kw in cfg.RESEARCH_KEYWORDS) | |
| has_code_request = any(kw in text_lower for kw in cfg.CODE_KEYWORDS) | |
| has_no_code = any(kw in text_lower for kw in cfg.NO_CODE_KEYWORDS) | |
| # Research mode (no code) | |
| if has_research and not has_code_request: | |
| return "research" | |
| # Hybrid mode (research + code) | |
| if has_research and has_code_request: | |
| return "hybrid" | |
| # Coding mode | |
| if has_code_request or not has_no_code: | |
| # Check if it's actually a coding task | |
| if any(word in text_lower for word in ['create', 'build', 'implement', 'write', 'develop']): | |
| return "coding" | |
| # Simple response (questions, explanations) | |
| if has_no_code: | |
| return "simple_response" | |
| # Default to coding for ambiguous cases | |
| return "coding" | |
| def detect_language_priority(user_input: str, detected: str) -> str: | |
| """ | |
| Determine language with priority: explicit > detected > default. | |
| """ | |
| text_lower = user_input.lower() | |
| # Check for explicit preferences | |
| for lang, keywords in cfg.LANGUAGE_PREFERENCE_KEYWORDS.items(): | |
| if any(kw in text_lower for kw in keywords): | |
| log.info(f"🎯 Explicit language preference: {lang}") | |
| return lang | |
| # Use detected if not default | |
| if detected and detected != cfg.DEFAULT_LANGUAGE: | |
| log.info(f"🔍 Detected language: {detected}") | |
| return detected | |
| # Default | |
| log.info(f"📝 Using default language: {cfg.DEFAULT_LANGUAGE}") | |
| return cfg.DEFAULT_LANGUAGE | |
| # ============================================================================= | |
| # SECTION 4: CORE AGENT NODES | |
| # ============================================================================= | |
| def run_memory_retrieval(state: AgentState) -> Dict[str, Any]: | |
| """Retrieve relevant context from memory.""" | |
| log.info("--- MEMORY RETRIEVAL ---") | |
| user_input = state.get('userInput', '') | |
| session_id = state.get('session_id', 'default') | |
| # Get conversation context | |
| conv_context = context_manager.get_context(session_id, user_input) | |
| # Get vector memory | |
| try: | |
| memories = memory_manager.retrieve_relevant_memories(user_input) | |
| memory_context = "\n".join([f"- {m.page_content}" for m in memories]) if memories else "" | |
| except Exception as e: | |
| log.warning(f"Memory retrieval failed: {e}") | |
| memory_context = "" | |
| return { | |
| "retrievedMemory": memory_context, | |
| "is_follow_up": conv_context["is_follow_up"], | |
| "conversation_context": conv_context, | |
| **add_status("Memory", "Context retrieved") | |
| } | |
| def run_intent_clarification(state: AgentState) -> Dict[str, Any]: | |
| """Clarify user intent with conversation context.""" | |
| log.info("--- INTENT CLARIFICATION ---") | |
| user_input = state.get('userInput', '') | |
| conv_context = state.get('conversation_context', {}) | |
| memory = state.get('retrievedMemory', '') | |
| # Build context-aware prompt | |
| if conv_context.get("is_follow_up"): | |
| prompt = f"""You are clarifying a FOLLOW-UP request. | |
| {conv_context.get('context', '')} | |
| {conv_context.get('artifacts_context', '')} | |
| CURRENT FOLLOW-UP REQUEST: {user_input} | |
| Refine into clear objective that: | |
| 1. References previous conversation | |
| 2. Builds on existing work | |
| 3. Is specific and actionable | |
| REFINED OBJECTIVE (2-3 sentences):""" | |
| else: | |
| prompt = f"""Refine user request into clear objective. | |
| MEMORY CONTEXT: | |
| {memory} | |
| REQUEST: {user_input} | |
| REFINED OBJECTIVE (2-3 sentences):""" | |
| try: | |
| response = llm.invoke(prompt) | |
| objective = getattr(response, "content", "") or user_input | |
| except Exception as e: | |
| log.error(f"Intent clarification failed: {e}") | |
| objective = user_input | |
| # Determine execution mode | |
| execution_mode = determine_execution_mode(user_input, conv_context) | |
| log.info(f"📋 Execution mode: {execution_mode}") | |
| return { | |
| "coreObjectivePrompt": objective, | |
| "execution_mode": execution_mode, | |
| **add_status("Intent", f"Mode: {execution_mode}") | |
| } | |
| def run_planning_agent(state: AgentState) -> Dict[str, Any]: | |
| """Create execution plan.""" | |
| log.info("--- PLANNING ---") | |
| objective = state.get('coreObjectivePrompt', '') | |
| user_input = state.get('userInput', '') | |
| execution_mode = state.get('execution_mode', 'coding') | |
| prompt = f"""Create execution plan. | |
| OBJECTIVE: {objective} | |
| MODE: {execution_mode} | |
| Return JSON: | |
| {{ | |
| "plan_steps": ["step 1", "step 2", ...], | |
| "experiment_needed": true/false, | |
| "experiment_type": "script|notebook|word|excel|pdf", | |
| "estimated_calls": 3 | |
| }}""" | |
| try: | |
| response = llm.invoke(prompt) | |
| plan = parse_json_safe(getattr(response, "content", "")) | |
| except Exception as e: | |
| log.error(f"Planning failed: {e}") | |
| plan = None | |
| # Fallback plan | |
| if not plan or not isinstance(plan, dict): | |
| plan = { | |
| "plan_steps": ["Analyze request", "Create deliverable"], | |
| "experiment_needed": execution_mode in ["coding", "hybrid"], | |
| "experiment_type": "word" if execution_mode == "research" else "script", | |
| "estimated_calls": 3 | |
| } | |
| # Calculate cost | |
| calls = plan.get('estimated_calls', 3) | |
| cost_per_call = cfg.calculate_cost_per_call() | |
| plan['estimated_cost'] = round(calls * cost_per_call, 2) | |
| return { | |
| "pmPlan": plan, | |
| **add_status("Planning", f"Plan created ({len(plan.get('plan_steps', []))} steps)") | |
| } | |
| def run_experimenter_agent(state: AgentState) -> Dict[str, Any]: | |
| """Generate code or artifacts.""" | |
| log.info("--- EXPERIMENTER ---") | |
| plan = state.get('pmPlan', {}) or {} | |
| objective = state.get('coreObjectivePrompt', '') | |
| user_input = state.get('userInput', '') | |
| session_id = state.get('session_id', 'default') | |
| if not plan.get('experiment_needed'): | |
| return add_status("Experimenter", "Skipped - not needed") | |
| # Detect language and type | |
| detected = detect_requested_output_types_enhanced(objective) | |
| language = detect_language_priority(user_input, detected.get('language', 'python')) | |
| exp_type = plan.get('experiment_type', 'script') | |
| log.info(f"📝 Creating: {exp_type} in {language}") | |
| # Build prompt | |
| prompt = f"""Create {exp_type} artifact. | |
| OBJECTIVE: {objective} | |
| LANGUAGE: {language} | |
| REQUIREMENTS: | |
| - Production quality | |
| - Complete implementation | |
| - No placeholders | |
| - Include documentation | |
| Generate complete {exp_type}:""" | |
| try: | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "") | |
| except Exception as e: | |
| log.error(f"Generation failed: {e}") | |
| return { | |
| "experimentResults": {"success": False, "error": str(e)}, | |
| **add_status("Experimenter", "Failed") | |
| } | |
| # Create artifact | |
| try: | |
| if exp_type == "notebook": | |
| filepath = write_notebook(content) | |
| elif exp_type in ["word", "document"]: | |
| filepath = write_document(content) | |
| else: # script | |
| filepath = write_script_multi_lang(content, language, cfg.OUT_DIR) | |
| # Register artifact | |
| artifact_id, user_path = artifact_registry.register_artifact( | |
| filepath, | |
| metadata={ | |
| "type": exp_type, | |
| "session_id": session_id, | |
| "user_request": user_input, | |
| "language": language | |
| } | |
| ) | |
| results = { | |
| "success": True, | |
| "artifact_id": artifact_id, | |
| "path": user_path, | |
| "filename": os.path.basename(user_path), | |
| "type": exp_type, | |
| "language": language | |
| } | |
| return { | |
| "experimentCode": content, | |
| "experimentResults": results, | |
| **add_status("Experimenter", f"Created {exp_type}") | |
| } | |
| except Exception as e: | |
| log.error(f"Artifact creation failed: {e}") | |
| return { | |
| "experimentResults": {"success": False, "error": str(e)}, | |
| **add_status("Experimenter", "Artifact failed") | |
| } | |
| def run_research_agent(state: AgentState) -> Dict[str, Any]: | |
| """Conduct research with citations (placeholder for web search integration).""" | |
| log.info("--- RESEARCH ---") | |
| objective = state.get('coreObjectivePrompt', '') | |
| user_input = state.get('userInput', '') | |
| session_id = state.get('session_id', 'default') | |
| # Build research prompt | |
| prompt = f"""Conduct comprehensive research and provide factual information with citations. | |
| TOPIC: {objective} | |
| Provide: | |
| 1. Executive summary | |
| 2. Key findings (3-5 points) | |
| 3. Detailed analysis | |
| 4. Recommendations | |
| Format with clear sections and cite sources as [1], [2], etc. | |
| Include bibliography at end. | |
| RESEARCH RESPONSE:""" | |
| try: | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "") | |
| except Exception as e: | |
| log.error(f"Research failed: {e}") | |
| content = f"Research error: {e}" | |
| # Create document | |
| try: | |
| filepath = write_document(content, prefix="research_") | |
| # Register artifact | |
| artifact_id, user_path = artifact_registry.register_artifact( | |
| filepath, | |
| metadata={ | |
| "type": "research_document", | |
| "session_id": session_id, | |
| "user_request": user_input | |
| } | |
| ) | |
| results = { | |
| "content": content, | |
| "artifact_id": artifact_id, | |
| "path": user_path, | |
| "filename": os.path.basename(user_path), | |
| "source_count": content.count('[') # Rough citation count | |
| } | |
| return { | |
| "researchResults": results, | |
| **add_status("Research", f"Research complete ({results['source_count']} citations)") | |
| } | |
| except Exception as e: | |
| log.error(f"Research document creation failed: {e}") | |
| return { | |
| "researchResults": {"content": content, "error": str(e)}, | |
| **add_status("Research", "Document failed") | |
| } | |
| def run_synthesis_agent(state: AgentState) -> Dict[str, Any]: | |
| """Synthesize final response with artifact information.""" | |
| log.info("--- SYNTHESIS ---") | |
| objective = state.get('coreObjectivePrompt', '') | |
| user_input = state.get('userInput', '') | |
| exp_results = state.get('experimentResults', {}) | |
| research_results = state.get('researchResults', {}) | |
| session_id = state.get('session_id', 'default') | |
| # Build context | |
| context_parts = [f"USER REQUEST: {user_input}", f"OBJECTIVE: {objective}"] | |
| # Add experiment results | |
| if exp_results and exp_results.get('success'): | |
| context_parts.append(f"CREATED: {exp_results.get('type')} - {exp_results.get('filename')}") | |
| # Add research results | |
| if research_results and research_results.get('content'): | |
| context_parts.append(f"RESEARCH: {research_results.get('filename')}") | |
| context = "\n".join(context_parts) | |
| # Generate response | |
| prompt = f"""Create final response for user. | |
| {context} | |
| Requirements: | |
| - Explain what was created | |
| - Describe how to use it | |
| - Highlight key features | |
| - Provide next steps | |
| Create clear, helpful response:""" | |
| try: | |
| response = llm.invoke(prompt) | |
| final_text = getattr(response, "content", "") | |
| except Exception as e: | |
| log.error(f"Synthesis failed: {e}") | |
| final_text = f"Task completed. {context}" | |
| # Add file information | |
| manifest = artifact_registry.get_download_manifest(session_id) | |
| if manifest['total_files'] > 0: | |
| final_text += f"\n\n---\n\n## 📁 Your Files\n\n" | |
| final_text += f"**{manifest['total_files']} file(s) created:**\n\n" | |
| for file in manifest['files']: | |
| final_text += f"- **{file['filename']}** ({file['size_kb']} KB)\n" | |
| final_text += f" 📍 `{file['path']}`\n\n" | |
| # Track in context manager | |
| artifacts = [f['filename'] for f in manifest['files']] | |
| context_manager.add_exchange( | |
| session_id=session_id, | |
| user_message=user_input, | |
| assistant_response=final_text, | |
| artifacts=artifacts, | |
| metadata={ | |
| "tier": state.get('preferred_tier'), | |
| "cost": state.get('current_cost', 0.0) | |
| } | |
| ) | |
| return { | |
| "draftResponse": final_text, | |
| **add_status("Synthesis", "Response complete") | |
| } | |
| def run_qa_agent(state: AgentState) -> Dict[str, Any]: | |
| """Quality assurance review.""" | |
| log.info("--- QA REVIEW ---") | |
| user_input = state.get('userInput', '') | |
| draft = state.get('draftResponse', '') | |
| rework_cycles = state.get('rework_cycles', 0) | |
| max_rework = state.get('max_rework_cycles', 0) | |
| # Skip QA if no reworks allowed | |
| if max_rework == 0: | |
| return { | |
| "approved": True, | |
| **add_status("QA", "Approved (Lite tier)") | |
| } | |
| # CRITICAL FIX: Check if at limit BEFORE trying to rework | |
| if rework_cycles >= max_rework: | |
| log.warning(f"Rework limit reached ({rework_cycles}/{max_rework}), auto-approving") | |
| return { | |
| "approved": True, | |
| **add_status("QA", f"Approved (limit reached: {rework_cycles}/{max_rework})") | |
| } | |
| # QA review | |
| prompt = f"""Review this response against user request. | |
| REQUEST: {user_input} | |
| RESPONSE: {draft[:1000]} | |
| Is this complete and satisfactory? Reply APPROVED or provide specific feedback. | |
| REVIEW:""" | |
| try: | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "") | |
| if "APPROVED" in content.upper(): | |
| return { | |
| "approved": True, | |
| **add_status("QA", "Approved") | |
| } | |
| else: | |
| # Increment rework counter | |
| new_cycles = rework_cycles + 1 | |
| log.info(f"QA requesting rework (cycle {new_cycles}/{max_rework})") | |
| return { | |
| "approved": False, | |
| "qaFeedback": content[:500], | |
| "rework_cycles": new_cycles, | |
| **add_status("QA", f"Rework needed (cycle {new_cycles}/{max_rework})") | |
| } | |
| except Exception as e: | |
| log.error(f"QA failed: {e}") | |
| return { | |
| "approved": True, # Approve on error | |
| **add_status("QA", "Approved (error)") | |
| } | |
| def run_observer_agent(state: AgentState) -> Dict[str, Any]: | |
| """Performance monitoring (Full tier only).""" | |
| log.info("--- OBSERVER ---") | |
| execution_path = state.get('execution_path', []) | |
| cost = state.get('current_cost', 0.0) | |
| rework_cycles = state.get('rework_cycles', 0) | |
| metrics = { | |
| "execution_length": len(execution_path), | |
| "total_cost": cost, | |
| "rework_cycles": rework_cycles, | |
| "efficiency": "good" if rework_cycles <= 1 else "needs improvement" | |
| } | |
| log.info(f"📊 Metrics: {metrics}") | |
| return add_status("Observer", f"Monitored - {metrics['efficiency']}") | |
| def run_archive_agent(state: AgentState) -> Dict[str, Any]: | |
| """Archive successful execution to memory.""" | |
| log.info("--- ARCHIVE ---") | |
| objective = state.get('coreObjectivePrompt', '') | |
| draft = state.get('draftResponse', '') | |
| try: | |
| memory_manager.add_to_memory( | |
| f"Task: {objective}\nResult: {draft[:500]}", | |
| {"timestamp": datetime.utcnow().isoformat()} | |
| ) | |
| except Exception as e: | |
| log.warning(f"Archive failed: {e}") | |
| return add_status("Archive", "Saved to memory") | |
| # ============================================================================= | |
| # SECTION 5: ARTIFACT CREATION HELPERS | |
| # ============================================================================= | |
| def write_notebook(content: str) -> str: | |
| """Create Jupyter notebook.""" | |
| import re | |
| # Extract code blocks | |
| code_blocks = re.findall(r'```python\s*(.*?)\s*```', content, re.DOTALL) | |
| md_parts = re.split(r'```python\s*.*?\s*```', content, flags=re.DOTALL) | |
| # Build notebook | |
| nb = new_notebook() | |
| cells = [] | |
| for i in range(max(len(md_parts), len(code_blocks))): | |
| if i < len(md_parts) and md_parts[i].strip(): | |
| cells.append(new_markdown_cell(md_parts[i].strip())) | |
| if i < len(code_blocks) and code_blocks[i].strip(): | |
| cells.append(new_code_cell(code_blocks[i].strip())) | |
| if not cells: | |
| cells = [new_markdown_cell("# Notebook\n\n" + content)] | |
| nb['cells'] = cells | |
| # Save | |
| filename = f"notebook_{uuid.uuid4().hex[:8]}.ipynb" | |
| filepath = os.path.join(cfg.OUT_DIR, filename) | |
| nbformat.write(nb, filepath) | |
| return filepath | |
| def write_document(content: str, prefix: str = "document_") -> str: | |
| """Create Word document.""" | |
| doc = DocxDocument() | |
| for para in content.split("\n\n"): | |
| if para.strip(): | |
| doc.add_paragraph(para.strip()) | |
| filename = f"{prefix}{uuid.uuid4().hex[:8]}.docx" | |
| filepath = os.path.join(cfg.OUT_DIR, filename) | |
| doc.save(filepath) | |
| return filepath | |
| # ============================================================================= | |
| # SECTION 6: ROUTING LOGIC | |
| # ============================================================================= | |
| def route_by_tier(state: AgentState) -> str: | |
| """Route to appropriate tier-specific graph.""" | |
| tier = cfg.validate_tier(state.get('preferred_tier', cfg.DEFAULT_TIER)) | |
| log.info(f"🎯 Routing to: {tier} tier") | |
| return tier | |
| def should_rework(state: AgentState) -> str: | |
| """Determine if QA approved or needs rework.""" | |
| if state.get('approved'): | |
| # Route based on tier | |
| tier = state.get('preferred_tier', cfg.DEFAULT_TIER) | |
| if cfg.should_include_monitoring(tier): | |
| return "observer" | |
| else: | |
| return "archive" | |
| else: | |
| # CRITICAL FIX: Check the rework cycles BEFORE incrementing | |
| cycles = state.get('rework_cycles', 0) | |
| max_cycles = state.get('max_rework_cycles', 0) | |
| # Add 1 because we're about to do another rework | |
| if cycles + 1 >= max_cycles: | |
| log.warning(f"Rework limit reached ({cycles + 1}/{max_cycles}), forcing approval") | |
| return "archive" # Force completion | |
| else: | |
| log.info(f"Rework cycle {cycles + 1}/{max_cycles}") | |
| return "planning" # Rework | |
| def route_execution_mode(state: AgentState) -> str: | |
| """Route based on execution mode.""" | |
| mode = state.get('execution_mode', 'coding') | |
| if mode == "research": | |
| return "research" | |
| elif mode in ["coding", "hybrid"]: | |
| return "experimenter" | |
| else: # simple_response | |
| return "synthesis" | |
| # ============================================================================= | |
| # SECTION 7: TIER-SPECIFIC GRAPHS | |
| # ============================================================================= | |
| def build_lite_graph() -> StateGraph: | |
| """Lite tier: Fast, simple responses.""" | |
| workflow = StateGraph(AgentState) | |
| # Nodes | |
| workflow.add_node("memory", run_memory_retrieval) | |
| workflow.add_node("synthesis", run_synthesis_agent) | |
| workflow.add_node("archive", run_archive_agent) | |
| # Flow | |
| workflow.set_entry_point("memory") | |
| workflow.add_edge("memory", "synthesis") | |
| workflow.add_edge("synthesis", "archive") | |
| workflow.add_edge("archive", END) | |
| return workflow | |
| def build_standard_graph() -> StateGraph: | |
| """Standard tier: Balanced quality and speed.""" | |
| workflow = StateGraph(AgentState) | |
| # Nodes | |
| workflow.add_node("memory", run_memory_retrieval) | |
| workflow.add_node("intent", run_intent_clarification) | |
| workflow.add_node("planning", run_planning_agent) | |
| workflow.add_node("experimenter", run_experimenter_agent) | |
| workflow.add_node("research", run_research_agent) | |
| workflow.add_node("synthesis", run_synthesis_agent) | |
| workflow.add_node("qa", run_qa_agent) | |
| workflow.add_node("archive", run_archive_agent) | |
| # Flow | |
| workflow.set_entry_point("memory") | |
| workflow.add_edge("memory", "intent") | |
| workflow.add_edge("intent", "planning") | |
| # Route by execution mode | |
| workflow.add_conditional_edges( | |
| "planning", | |
| route_execution_mode, | |
| { | |
| "experimenter": "experimenter", | |
| "research": "research", | |
| "synthesis": "synthesis" | |
| } | |
| ) | |
| workflow.add_edge("experimenter", "synthesis") | |
| workflow.add_edge("research", "synthesis") | |
| workflow.add_edge("synthesis", "qa") | |
| # QA routing (allows 1 rework) | |
| workflow.add_conditional_edges( | |
| "qa", | |
| should_rework, | |
| { | |
| "archive": "archive", | |
| "planning": "planning" | |
| } | |
| ) | |
| workflow.add_edge("archive", END) | |
| return workflow | |
| def build_full_graph() -> StateGraph: | |
| """Full tier: Premium quality with monitoring.""" | |
| workflow = StateGraph(AgentState) | |
| # Nodes (includes observer) | |
| workflow.add_node("memory", run_memory_retrieval) | |
| workflow.add_node("intent", run_intent_clarification) | |
| workflow.add_node("planning", run_planning_agent) | |
| workflow.add_node("experimenter", run_experimenter_agent) | |
| workflow.add_node("research", run_research_agent) | |
| workflow.add_node("synthesis", run_synthesis_agent) | |
| workflow.add_node("qa", run_qa_agent) | |
| workflow.add_node("observer", run_observer_agent) | |
| workflow.add_node("archive", run_archive_agent) | |
| # Flow | |
| workflow.set_entry_point("memory") | |
| workflow.add_edge("memory", "intent") | |
| workflow.add_edge("intent", "planning") | |
| # Route by execution mode | |
| workflow.add_conditional_edges( | |
| "planning", | |
| route_execution_mode, | |
| { | |
| "experimenter": "experimenter", | |
| "research": "research", | |
| "synthesis": "synthesis" | |
| } | |
| ) | |
| workflow.add_edge("experimenter", "synthesis") | |
| workflow.add_edge("research", "synthesis") | |
| workflow.add_edge("synthesis", "qa") | |
| # QA routing (allows 3 reworks) | |
| workflow.add_conditional_edges( | |
| "qa", | |
| should_rework, | |
| { | |
| "observer": "observer", | |
| "archive": "archive", | |
| "planning": "planning" | |
| } | |
| ) | |
| workflow.add_edge("observer", "archive") | |
| workflow.add_edge("archive", END) | |
| return workflow | |
| # ============================================================================= | |
| # SECTION 8: GRAPH COMPILATION (UPDATED) | |
| # ============================================================================= | |
| # Build tier-specific graphs | |
| lite_workflow = build_lite_graph() | |
| standard_workflow = build_standard_graph() | |
| full_workflow = build_full_graph() | |
| # Compile with explicit recursion limits to prevent infinite loops | |
| lite_app = lite_workflow.compile() | |
| standard_app = standard_workflow.compile() | |
| full_app = full_workflow.compile() | |
| # Default app (standard) | |
| main_app = standard_app | |
| # CRITICAL: Set maximum recursion limits in config | |
| CONFIG_RECURSION_LIMITS = { | |
| "lite": 10, # Memory → Synthesis → Archive (3 nodes + buffer) | |
| "standard": 25, # Full path + 1 rework: ~12-14 nodes | |
| "full": 100 # Full path + 3 reworks: ~20-24 nodes | |
| } | |
| log.info("=" * 60) | |
| log.info("✅ GRAPHS COMPILED SUCCESSFULLY") | |
| log.info(f" - Lite: Memory → Synthesis → Archive (limit: {CONFIG_RECURSION_LIMITS['lite']})") | |
| log.info(f" - Standard: Full path + 1 rework (limit: {CONFIG_RECURSION_LIMITS['standard']})") | |
| log.info(f" - Full: Full path + 3 reworks (limit: {CONFIG_RECURSION_LIMITS['full']})") | |
| log.info("=" * 60) | |
| # ============================================================================= | |
| # SECTION 9: EXECUTION ENTRY POINT | |
| # ============================================================================= | |
| def execute_request(user_input: str, session_id: str = None, | |
| preferred_tier: str = None) -> Dict[str, Any]: | |
| """ | |
| Main execution entry point. | |
| Args: | |
| user_input: User's request | |
| session_id: Session identifier | |
| preferred_tier: lite, standard, or full | |
| Returns: | |
| Final state dict | |
| """ | |
| # Initialize | |
| session_id = session_id or uuid.uuid4().hex[:8] | |
| preferred_tier = cfg.validate_tier(preferred_tier or cfg.DEFAULT_TIER) | |
| # Get tier config | |
| tier_config = cfg.get_tier_config(preferred_tier) | |
| # Build initial state | |
| initial_state = { | |
| "userInput": user_input, | |
| "session_id": session_id, | |
| "preferred_tier": preferred_tier, | |
| "is_follow_up": False, | |
| "conversation_context": None, | |
| "execution_mode": "coding", | |
| "retrievedMemory": None, | |
| "coreObjectivePrompt": "", | |
| "pmPlan": None, | |
| "experimentCode": None, | |
| "experimentResults": None, | |
| "researchResults": None, | |
| "draftResponse": "", | |
| "qaFeedback": None, | |
| "approved": False, | |
| "execution_path": [], | |
| "rework_cycles": 0, | |
| "max_rework_cycles": tier_config["qa_rework_cycles"], | |
| "status_updates": [], | |
| "current_cost": 0.0, | |
| "budget_limit": tier_config["max_cost"] or float('inf') | |
| } | |
| # Select appropriate graph and set recursion limit | |
| # CRITICAL: Recursion limits prevent infinite loops | |
| if preferred_tier == cfg.TIER_LITE: | |
| app = lite_app | |
| recursion_limit = 10 # Lite: ~3-5 nodes | |
| elif preferred_tier == cfg.TIER_FULL: | |
| app = full_app | |
| recursion_limit = 100 # Full: up to 3 reworks = ~40 nodes max | |
| else: | |
| app = standard_app | |
| recursion_limit = 30 # Standard: up to 1 rework = ~15-20 nodes | |
| log.info(f"🚀 Executing with {preferred_tier} tier (recursion_limit={recursion_limit})") | |
| # Execute graph with recursion limit config | |
| try: | |
| config = {"recursion_limit": recursion_limit} | |
| final_state = app.invoke(initial_state, config=config) | |
| return final_state | |
| except Exception as e: | |
| log.exception(f"Execution failed: {e}") | |
| return { | |
| **initial_state, | |
| "draftResponse": f"❌ Execution error: {e}", | |
| "approved": False | |
| } | |
| # ============================================================================= | |
| # SECTION 10: LEGACY COMPATIBILITY | |
| # ============================================================================= | |
| # For backward compatibility with existing app_gradio.py | |
| def apply_upgrades() -> bool: | |
| """ | |
| Legacy function for compatibility. | |
| In the simplified system, graphs are already "upgraded". | |
| """ | |
| log.info("✅ Simplified graphs already active (no upgrade needed)") | |
| return True | |
| # Triage and planner apps for initial estimation | |
| def build_triage_app(): | |
| """Simple triage for greeting detection.""" | |
| workflow = StateGraph(AgentState) | |
| def triage_node(state: AgentState) -> Dict[str, Any]: | |
| user_input = state.get('userInput', '').lower() | |
| greetings = ['hello', 'hi', 'hey', 'greetings'] | |
| if any(g in user_input for g in greetings) and len(user_input.split()) < 5: | |
| return { | |
| "draftResponse": "Hello! How can I help you today?", | |
| **add_status("Triage", "Greeting detected") | |
| } | |
| return add_status("Triage", "Task detected") | |
| workflow.add_node("triage", triage_node) | |
| workflow.set_entry_point("triage") | |
| workflow.add_edge("triage", END) | |
| return workflow.compile() | |
| def build_planner_app(): | |
| """Quick planner for cost estimation.""" | |
| workflow = StateGraph(AgentState) | |
| def planner_node(state: AgentState) -> Dict[str, Any]: | |
| user_input = state.get('userInput', '') | |
| prompt = f"""Quick estimate for: {user_input} | |
| Return JSON: | |
| {{ | |
| "plan_steps": ["step 1", "step 2", "step 3"], | |
| "estimated_llm_calls_per_loop": 3, | |
| "max_loops_initial": 3 | |
| }}""" | |
| try: | |
| response = llm.invoke(prompt) | |
| plan = parse_json_safe(getattr(response, "content", "")) | |
| except: | |
| plan = None | |
| if not plan: | |
| plan = { | |
| "plan_steps": ["Analyze request", "Create solution", "Review quality"], | |
| "estimated_llm_calls_per_loop": 3, | |
| "max_loops_initial": 3 | |
| } | |
| # Cost calculation | |
| calls = plan.get('estimated_llm_calls_per_loop', 3) | |
| cost_per_loop = cfg.calculate_cost_per_call() * calls | |
| total_loops = plan.get('max_loops_initial', 3) + 1 | |
| plan['cost_per_loop_usd'] = round(cost_per_loop, 4) | |
| plan['estimated_cost_usd'] = round(cost_per_loop * total_loops, 2) | |
| return { | |
| "pmPlan": plan, | |
| **add_status("Planner", "Estimate created") | |
| } | |
| workflow.add_node("planner", planner_node) | |
| workflow.set_entry_point("planner") | |
| workflow.add_edge("planner", END) | |
| return workflow.compile() | |
| # Create triage and planner apps | |
| triage_app = build_triage_app() | |
| planner_app = build_planner_app() | |
| # ============================================================================= | |
| # SECTION 11: EXPORTS | |
| # ============================================================================= | |
| __all__ = [ | |
| # State | |
| 'AgentState', | |
| # Main execution | |
| 'execute_request', | |
| # Tier-specific apps | |
| 'lite_app', | |
| 'standard_app', | |
| 'full_app', | |
| 'main_app', | |
| # Helper apps | |
| 'triage_app', | |
| 'planner_app', | |
| # Agent nodes (for testing) | |
| 'run_memory_retrieval', | |
| 'run_intent_clarification', | |
| 'run_planning_agent', | |
| 'run_experimenter_agent', | |
| 'run_research_agent', | |
| 'run_synthesis_agent', | |
| 'run_qa_agent', | |
| 'run_observer_agent', | |
| 'run_archive_agent', | |
| # Helpers | |
| 'parse_json_safe', | |
| 'determine_execution_mode', | |
| 'detect_language_priority', | |
| # Legacy compatibility | |
| 'apply_upgrades', | |
| # Configuration constants (re-export) | |
| 'INITIAL_MAX_REWORK_CYCLES', | |
| 'BUDGET_BUFFER_MULTIPLIER', | |
| 'MAX_COST_MULTIPLIER', | |
| ] | |
| # ============================================================================= | |
| # INITIALIZATION LOG | |
| # ============================================================================= | |
| log.info("=" * 60) | |
| log.info("SIMPLIFIED 3-TIER GRAPH SYSTEM READY") | |
| log.info("=" * 60) | |
| log.info("Available tiers:") | |
| for tier_name, tier_conf in cfg.TIER_CONFIGS.items(): | |
| log.info(f" • {tier_conf['name']}: {tier_conf['description']}") | |
| log.info("=" * 60) | |
| log.info("Features enabled:") | |
| log.info(" ✅ Conversation context tracking") | |
| log.info(" ✅ Universal file access") | |
| log.info(" ✅ Research mode with citations") | |
| log.info(" ✅ Multi-language support") | |
| log.info(" ✅ No-loop guarantee") | |
| log.info("=" * 60) |