SPOC_V1 / graph_merged_simple.py
JatinAutonomousLabs's picture
Update graph_merged_simple.py
9bba2c0 verified
raw
history blame
35.6 kB
"""
graph_merged_simple.py - Simplified 3-Tier Graph System
========================================================
Clean, maintainable 3-tier orchestration system.
- Lite: Fast responses (30s)
- Standard: Balanced quality (2min)
- Full: Premium refinement (10min)
Key Features:
- No loops by design
- Context-aware agents
- Research mode with citations
- Universal file tracking
- Clear tier differentiation
Author: AI Lab Team
Last Updated: 2025-10-10
Version: 3.0 - Simplified Architecture
"""
# =============================================================================
# SECTION 1: IMPORTS
# =============================================================================
import json
import os
import uuid
import math
import operator
from typing import TypedDict, List, Dict, Optional, Annotated, Any
from datetime import datetime
# LangChain
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
# Local modules
import graph_config as cfg
from context_manager import context_manager
from artifact_registry import artifact_registry
from memory_manager import memory_manager
from logging_config import get_logger
# Multi-language support
from multi_language_support import (
detect_language,
extract_code_blocks_multi_lang,
execute_code,
detect_requested_output_types_enhanced,
write_script_multi_lang,
LANGUAGES
)
# Artifact generation
import nbformat
from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell
import pandas as pd
from docx import Document as DocxDocument
# Setup
log = get_logger(__name__)
llm = ChatOpenAI(model="gpt-4o", temperature=0.4, max_retries=3, request_timeout=60)
# Ensure directories
os.makedirs(cfg.OUT_DIR, exist_ok=True)
os.makedirs(cfg.USER_ARTIFACTS_DIR, exist_ok=True)
log.info("=" * 60)
log.info("SIMPLIFIED 3-TIER GRAPH SYSTEM INITIALIZED")
log.info("=" * 60)
# =============================================================================
# SECTION 2: STATE DEFINITION
# =============================================================================
class AgentState(TypedDict):
"""Simplified state for 3-tier system."""
# Core inputs
userInput: str
session_id: str
# Conversation context
is_follow_up: bool
conversation_context: Optional[Dict]
# Execution mode
execution_mode: str # research, coding, hybrid, simple_response
preferred_tier: str # lite, standard, full
# Memory and planning
retrievedMemory: Optional[str]
coreObjectivePrompt: str
pmPlan: Optional[Dict]
# Execution
experimentCode: Optional[str]
experimentResults: Optional[Dict]
researchResults: Optional[Dict]
# Output
draftResponse: str
qaFeedback: Optional[str]
approved: bool
# Workflow control
execution_path: Annotated[List[str], operator.add]
rework_cycles: int
max_rework_cycles: int
status_updates: Annotated[List[str], operator.add]
# Budget tracking
current_cost: float
budget_limit: float
# =============================================================================
# SECTION 3: HELPER FUNCTIONS
# =============================================================================
def add_status(node_name: str, status: str) -> Dict[str, Any]:
"""Add status update."""
return {
"status_updates": [f"{node_name}: {status}"],
"execution_path": [node_name]
}
def parse_json_safe(text: str) -> Optional[Dict]:
"""Safe JSON parsing with fallbacks."""
if not text:
return None
# Try direct parse
try:
return json.loads(text)
except:
pass
# Try to extract JSON block
import re
match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except:
pass
# Try to find any JSON object
match = re.search(r'{.*}', text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except:
pass
return None
def determine_execution_mode(user_input: str, context: Dict) -> str:
"""
Determine execution mode: research, coding, hybrid, or simple_response.
Priority:
1. Research mode (if research keywords and no explicit code request)
2. Coding mode (if code keywords or explicit request)
3. Hybrid mode (research + code examples)
4. Simple response (quick questions)
"""
text_lower = user_input.lower()
# Check for research indicators
has_research = any(kw in text_lower for kw in cfg.RESEARCH_KEYWORDS)
has_code_request = any(kw in text_lower for kw in cfg.CODE_KEYWORDS)
has_no_code = any(kw in text_lower for kw in cfg.NO_CODE_KEYWORDS)
# Research mode (no code)
if has_research and not has_code_request:
return "research"
# Hybrid mode (research + code)
if has_research and has_code_request:
return "hybrid"
# Coding mode
if has_code_request or not has_no_code:
# Check if it's actually a coding task
if any(word in text_lower for word in ['create', 'build', 'implement', 'write', 'develop']):
return "coding"
# Simple response (questions, explanations)
if has_no_code:
return "simple_response"
# Default to coding for ambiguous cases
return "coding"
def detect_language_priority(user_input: str, detected: str) -> str:
"""
Determine language with priority: explicit > detected > default.
"""
text_lower = user_input.lower()
# Check for explicit preferences
for lang, keywords in cfg.LANGUAGE_PREFERENCE_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
log.info(f"🎯 Explicit language preference: {lang}")
return lang
# Use detected if not default
if detected and detected != cfg.DEFAULT_LANGUAGE:
log.info(f"🔍 Detected language: {detected}")
return detected
# Default
log.info(f"📝 Using default language: {cfg.DEFAULT_LANGUAGE}")
return cfg.DEFAULT_LANGUAGE
# =============================================================================
# SECTION 4: CORE AGENT NODES
# =============================================================================
def run_memory_retrieval(state: AgentState) -> Dict[str, Any]:
"""Retrieve relevant context from memory."""
log.info("--- MEMORY RETRIEVAL ---")
user_input = state.get('userInput', '')
session_id = state.get('session_id', 'default')
# Get conversation context
conv_context = context_manager.get_context(session_id, user_input)
# Get vector memory
try:
memories = memory_manager.retrieve_relevant_memories(user_input)
memory_context = "\n".join([f"- {m.page_content}" for m in memories]) if memories else ""
except Exception as e:
log.warning(f"Memory retrieval failed: {e}")
memory_context = ""
return {
"retrievedMemory": memory_context,
"is_follow_up": conv_context["is_follow_up"],
"conversation_context": conv_context,
**add_status("Memory", "Context retrieved")
}
def run_intent_clarification(state: AgentState) -> Dict[str, Any]:
"""Clarify user intent with conversation context."""
log.info("--- INTENT CLARIFICATION ---")
user_input = state.get('userInput', '')
conv_context = state.get('conversation_context', {})
memory = state.get('retrievedMemory', '')
# Build context-aware prompt
if conv_context.get("is_follow_up"):
prompt = f"""You are clarifying a FOLLOW-UP request.
{conv_context.get('context', '')}
{conv_context.get('artifacts_context', '')}
CURRENT FOLLOW-UP REQUEST: {user_input}
Refine into clear objective that:
1. References previous conversation
2. Builds on existing work
3. Is specific and actionable
REFINED OBJECTIVE (2-3 sentences):"""
else:
prompt = f"""Refine user request into clear objective.
MEMORY CONTEXT:
{memory}
REQUEST: {user_input}
REFINED OBJECTIVE (2-3 sentences):"""
try:
response = llm.invoke(prompt)
objective = getattr(response, "content", "") or user_input
except Exception as e:
log.error(f"Intent clarification failed: {e}")
objective = user_input
# Determine execution mode
execution_mode = determine_execution_mode(user_input, conv_context)
log.info(f"📋 Execution mode: {execution_mode}")
return {
"coreObjectivePrompt": objective,
"execution_mode": execution_mode,
**add_status("Intent", f"Mode: {execution_mode}")
}
def run_planning_agent(state: AgentState) -> Dict[str, Any]:
"""Create execution plan."""
log.info("--- PLANNING ---")
objective = state.get('coreObjectivePrompt', '')
user_input = state.get('userInput', '')
execution_mode = state.get('execution_mode', 'coding')
prompt = f"""Create execution plan.
OBJECTIVE: {objective}
MODE: {execution_mode}
Return JSON:
{{
"plan_steps": ["step 1", "step 2", ...],
"experiment_needed": true/false,
"experiment_type": "script|notebook|word|excel|pdf",
"estimated_calls": 3
}}"""
try:
response = llm.invoke(prompt)
plan = parse_json_safe(getattr(response, "content", ""))
except Exception as e:
log.error(f"Planning failed: {e}")
plan = None
# Fallback plan
if not plan or not isinstance(plan, dict):
plan = {
"plan_steps": ["Analyze request", "Create deliverable"],
"experiment_needed": execution_mode in ["coding", "hybrid"],
"experiment_type": "word" if execution_mode == "research" else "script",
"estimated_calls": 3
}
# Calculate cost
calls = plan.get('estimated_calls', 3)
cost_per_call = cfg.calculate_cost_per_call()
plan['estimated_cost'] = round(calls * cost_per_call, 2)
return {
"pmPlan": plan,
**add_status("Planning", f"Plan created ({len(plan.get('plan_steps', []))} steps)")
}
def run_experimenter_agent(state: AgentState) -> Dict[str, Any]:
"""Generate code or artifacts."""
log.info("--- EXPERIMENTER ---")
plan = state.get('pmPlan', {}) or {}
objective = state.get('coreObjectivePrompt', '')
user_input = state.get('userInput', '')
session_id = state.get('session_id', 'default')
if not plan.get('experiment_needed'):
return add_status("Experimenter", "Skipped - not needed")
# Detect language and type
detected = detect_requested_output_types_enhanced(objective)
language = detect_language_priority(user_input, detected.get('language', 'python'))
exp_type = plan.get('experiment_type', 'script')
log.info(f"📝 Creating: {exp_type} in {language}")
# Build prompt
prompt = f"""Create {exp_type} artifact.
OBJECTIVE: {objective}
LANGUAGE: {language}
REQUIREMENTS:
- Production quality
- Complete implementation
- No placeholders
- Include documentation
Generate complete {exp_type}:"""
try:
response = llm.invoke(prompt)
content = getattr(response, "content", "")
except Exception as e:
log.error(f"Generation failed: {e}")
return {
"experimentResults": {"success": False, "error": str(e)},
**add_status("Experimenter", "Failed")
}
# Create artifact
try:
if exp_type == "notebook":
filepath = write_notebook(content)
elif exp_type in ["word", "document"]:
filepath = write_document(content)
else: # script
filepath = write_script_multi_lang(content, language, cfg.OUT_DIR)
# Register artifact
artifact_id, user_path = artifact_registry.register_artifact(
filepath,
metadata={
"type": exp_type,
"session_id": session_id,
"user_request": user_input,
"language": language
}
)
results = {
"success": True,
"artifact_id": artifact_id,
"path": user_path,
"filename": os.path.basename(user_path),
"type": exp_type,
"language": language
}
return {
"experimentCode": content,
"experimentResults": results,
**add_status("Experimenter", f"Created {exp_type}")
}
except Exception as e:
log.error(f"Artifact creation failed: {e}")
return {
"experimentResults": {"success": False, "error": str(e)},
**add_status("Experimenter", "Artifact failed")
}
def run_research_agent(state: AgentState) -> Dict[str, Any]:
"""Conduct research with citations (placeholder for web search integration)."""
log.info("--- RESEARCH ---")
objective = state.get('coreObjectivePrompt', '')
user_input = state.get('userInput', '')
session_id = state.get('session_id', 'default')
# Build research prompt
prompt = f"""Conduct comprehensive research and provide factual information with citations.
TOPIC: {objective}
Provide:
1. Executive summary
2. Key findings (3-5 points)
3. Detailed analysis
4. Recommendations
Format with clear sections and cite sources as [1], [2], etc.
Include bibliography at end.
RESEARCH RESPONSE:"""
try:
response = llm.invoke(prompt)
content = getattr(response, "content", "")
except Exception as e:
log.error(f"Research failed: {e}")
content = f"Research error: {e}"
# Create document
try:
filepath = write_document(content, prefix="research_")
# Register artifact
artifact_id, user_path = artifact_registry.register_artifact(
filepath,
metadata={
"type": "research_document",
"session_id": session_id,
"user_request": user_input
}
)
results = {
"content": content,
"artifact_id": artifact_id,
"path": user_path,
"filename": os.path.basename(user_path),
"source_count": content.count('[') # Rough citation count
}
return {
"researchResults": results,
**add_status("Research", f"Research complete ({results['source_count']} citations)")
}
except Exception as e:
log.error(f"Research document creation failed: {e}")
return {
"researchResults": {"content": content, "error": str(e)},
**add_status("Research", "Document failed")
}
def run_synthesis_agent(state: AgentState) -> Dict[str, Any]:
"""Synthesize final response with artifact information."""
log.info("--- SYNTHESIS ---")
objective = state.get('coreObjectivePrompt', '')
user_input = state.get('userInput', '')
exp_results = state.get('experimentResults', {})
research_results = state.get('researchResults', {})
session_id = state.get('session_id', 'default')
# Build context
context_parts = [f"USER REQUEST: {user_input}", f"OBJECTIVE: {objective}"]
# Add experiment results
if exp_results and exp_results.get('success'):
context_parts.append(f"CREATED: {exp_results.get('type')} - {exp_results.get('filename')}")
# Add research results
if research_results and research_results.get('content'):
context_parts.append(f"RESEARCH: {research_results.get('filename')}")
context = "\n".join(context_parts)
# Generate response
prompt = f"""Create final response for user.
{context}
Requirements:
- Explain what was created
- Describe how to use it
- Highlight key features
- Provide next steps
Create clear, helpful response:"""
try:
response = llm.invoke(prompt)
final_text = getattr(response, "content", "")
except Exception as e:
log.error(f"Synthesis failed: {e}")
final_text = f"Task completed. {context}"
# Add file information
manifest = artifact_registry.get_download_manifest(session_id)
if manifest['total_files'] > 0:
final_text += f"\n\n---\n\n## 📁 Your Files\n\n"
final_text += f"**{manifest['total_files']} file(s) created:**\n\n"
for file in manifest['files']:
final_text += f"- **{file['filename']}** ({file['size_kb']} KB)\n"
final_text += f" 📍 `{file['path']}`\n\n"
# Track in context manager
artifacts = [f['filename'] for f in manifest['files']]
context_manager.add_exchange(
session_id=session_id,
user_message=user_input,
assistant_response=final_text,
artifacts=artifacts,
metadata={
"tier": state.get('preferred_tier'),
"cost": state.get('current_cost', 0.0)
}
)
return {
"draftResponse": final_text,
**add_status("Synthesis", "Response complete")
}
def run_qa_agent(state: AgentState) -> Dict[str, Any]:
"""Quality assurance review."""
log.info("--- QA REVIEW ---")
user_input = state.get('userInput', '')
draft = state.get('draftResponse', '')
rework_cycles = state.get('rework_cycles', 0)
max_rework = state.get('max_rework_cycles', 0)
# Skip QA if no reworks allowed
if max_rework == 0:
return {
"approved": True,
**add_status("QA", "Approved (Lite tier)")
}
# CRITICAL FIX: Check if at limit BEFORE trying to rework
if rework_cycles >= max_rework:
log.warning(f"Rework limit reached ({rework_cycles}/{max_rework}), auto-approving")
return {
"approved": True,
**add_status("QA", f"Approved (limit reached: {rework_cycles}/{max_rework})")
}
# QA review
prompt = f"""Review this response against user request.
REQUEST: {user_input}
RESPONSE: {draft[:1000]}
Is this complete and satisfactory? Reply APPROVED or provide specific feedback.
REVIEW:"""
try:
response = llm.invoke(prompt)
content = getattr(response, "content", "")
if "APPROVED" in content.upper():
return {
"approved": True,
**add_status("QA", "Approved")
}
else:
# Increment rework counter
new_cycles = rework_cycles + 1
log.info(f"QA requesting rework (cycle {new_cycles}/{max_rework})")
return {
"approved": False,
"qaFeedback": content[:500],
"rework_cycles": new_cycles,
**add_status("QA", f"Rework needed (cycle {new_cycles}/{max_rework})")
}
except Exception as e:
log.error(f"QA failed: {e}")
return {
"approved": True, # Approve on error
**add_status("QA", "Approved (error)")
}
def run_observer_agent(state: AgentState) -> Dict[str, Any]:
"""Performance monitoring (Full tier only)."""
log.info("--- OBSERVER ---")
execution_path = state.get('execution_path', [])
cost = state.get('current_cost', 0.0)
rework_cycles = state.get('rework_cycles', 0)
metrics = {
"execution_length": len(execution_path),
"total_cost": cost,
"rework_cycles": rework_cycles,
"efficiency": "good" if rework_cycles <= 1 else "needs improvement"
}
log.info(f"📊 Metrics: {metrics}")
return add_status("Observer", f"Monitored - {metrics['efficiency']}")
def run_archive_agent(state: AgentState) -> Dict[str, Any]:
"""Archive successful execution to memory."""
log.info("--- ARCHIVE ---")
objective = state.get('coreObjectivePrompt', '')
draft = state.get('draftResponse', '')
try:
memory_manager.add_to_memory(
f"Task: {objective}\nResult: {draft[:500]}",
{"timestamp": datetime.utcnow().isoformat()}
)
except Exception as e:
log.warning(f"Archive failed: {e}")
return add_status("Archive", "Saved to memory")
# =============================================================================
# SECTION 5: ARTIFACT CREATION HELPERS
# =============================================================================
def write_notebook(content: str) -> str:
"""Create Jupyter notebook."""
import re
# Extract code blocks
code_blocks = re.findall(r'```python\s*(.*?)\s*```', content, re.DOTALL)
md_parts = re.split(r'```python\s*.*?\s*```', content, flags=re.DOTALL)
# Build notebook
nb = new_notebook()
cells = []
for i in range(max(len(md_parts), len(code_blocks))):
if i < len(md_parts) and md_parts[i].strip():
cells.append(new_markdown_cell(md_parts[i].strip()))
if i < len(code_blocks) and code_blocks[i].strip():
cells.append(new_code_cell(code_blocks[i].strip()))
if not cells:
cells = [new_markdown_cell("# Notebook\n\n" + content)]
nb['cells'] = cells
# Save
filename = f"notebook_{uuid.uuid4().hex[:8]}.ipynb"
filepath = os.path.join(cfg.OUT_DIR, filename)
nbformat.write(nb, filepath)
return filepath
def write_document(content: str, prefix: str = "document_") -> str:
"""Create Word document."""
doc = DocxDocument()
for para in content.split("\n\n"):
if para.strip():
doc.add_paragraph(para.strip())
filename = f"{prefix}{uuid.uuid4().hex[:8]}.docx"
filepath = os.path.join(cfg.OUT_DIR, filename)
doc.save(filepath)
return filepath
# =============================================================================
# SECTION 6: ROUTING LOGIC
# =============================================================================
def route_by_tier(state: AgentState) -> str:
"""Route to appropriate tier-specific graph."""
tier = cfg.validate_tier(state.get('preferred_tier', cfg.DEFAULT_TIER))
log.info(f"🎯 Routing to: {tier} tier")
return tier
def should_rework(state: AgentState) -> str:
"""Determine if QA approved or needs rework."""
if state.get('approved'):
# Route based on tier
tier = state.get('preferred_tier', cfg.DEFAULT_TIER)
if cfg.should_include_monitoring(tier):
return "observer"
else:
return "archive"
else:
# CRITICAL FIX: Check the rework cycles BEFORE incrementing
cycles = state.get('rework_cycles', 0)
max_cycles = state.get('max_rework_cycles', 0)
# Add 1 because we're about to do another rework
if cycles + 1 >= max_cycles:
log.warning(f"Rework limit reached ({cycles + 1}/{max_cycles}), forcing approval")
return "archive" # Force completion
else:
log.info(f"Rework cycle {cycles + 1}/{max_cycles}")
return "planning" # Rework
def route_execution_mode(state: AgentState) -> str:
"""Route based on execution mode."""
mode = state.get('execution_mode', 'coding')
if mode == "research":
return "research"
elif mode in ["coding", "hybrid"]:
return "experimenter"
else: # simple_response
return "synthesis"
# =============================================================================
# SECTION 7: TIER-SPECIFIC GRAPHS
# =============================================================================
def build_lite_graph() -> StateGraph:
"""Lite tier: Fast, simple responses."""
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("memory", run_memory_retrieval)
workflow.add_node("synthesis", run_synthesis_agent)
workflow.add_node("archive", run_archive_agent)
# Flow
workflow.set_entry_point("memory")
workflow.add_edge("memory", "synthesis")
workflow.add_edge("synthesis", "archive")
workflow.add_edge("archive", END)
return workflow
def build_standard_graph() -> StateGraph:
"""Standard tier: Balanced quality and speed."""
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("memory", run_memory_retrieval)
workflow.add_node("intent", run_intent_clarification)
workflow.add_node("planning", run_planning_agent)
workflow.add_node("experimenter", run_experimenter_agent)
workflow.add_node("research", run_research_agent)
workflow.add_node("synthesis", run_synthesis_agent)
workflow.add_node("qa", run_qa_agent)
workflow.add_node("archive", run_archive_agent)
# Flow
workflow.set_entry_point("memory")
workflow.add_edge("memory", "intent")
workflow.add_edge("intent", "planning")
# Route by execution mode
workflow.add_conditional_edges(
"planning",
route_execution_mode,
{
"experimenter": "experimenter",
"research": "research",
"synthesis": "synthesis"
}
)
workflow.add_edge("experimenter", "synthesis")
workflow.add_edge("research", "synthesis")
workflow.add_edge("synthesis", "qa")
# QA routing (allows 1 rework)
workflow.add_conditional_edges(
"qa",
should_rework,
{
"archive": "archive",
"planning": "planning"
}
)
workflow.add_edge("archive", END)
return workflow
def build_full_graph() -> StateGraph:
"""Full tier: Premium quality with monitoring."""
workflow = StateGraph(AgentState)
# Nodes (includes observer)
workflow.add_node("memory", run_memory_retrieval)
workflow.add_node("intent", run_intent_clarification)
workflow.add_node("planning", run_planning_agent)
workflow.add_node("experimenter", run_experimenter_agent)
workflow.add_node("research", run_research_agent)
workflow.add_node("synthesis", run_synthesis_agent)
workflow.add_node("qa", run_qa_agent)
workflow.add_node("observer", run_observer_agent)
workflow.add_node("archive", run_archive_agent)
# Flow
workflow.set_entry_point("memory")
workflow.add_edge("memory", "intent")
workflow.add_edge("intent", "planning")
# Route by execution mode
workflow.add_conditional_edges(
"planning",
route_execution_mode,
{
"experimenter": "experimenter",
"research": "research",
"synthesis": "synthesis"
}
)
workflow.add_edge("experimenter", "synthesis")
workflow.add_edge("research", "synthesis")
workflow.add_edge("synthesis", "qa")
# QA routing (allows 3 reworks)
workflow.add_conditional_edges(
"qa",
should_rework,
{
"observer": "observer",
"archive": "archive",
"planning": "planning"
}
)
workflow.add_edge("observer", "archive")
workflow.add_edge("archive", END)
return workflow
# =============================================================================
# SECTION 8: GRAPH COMPILATION (UPDATED)
# =============================================================================
# Build tier-specific graphs
lite_workflow = build_lite_graph()
standard_workflow = build_standard_graph()
full_workflow = build_full_graph()
# Compile with explicit recursion limits to prevent infinite loops
lite_app = lite_workflow.compile()
standard_app = standard_workflow.compile()
full_app = full_workflow.compile()
# Default app (standard)
main_app = standard_app
# CRITICAL: Set maximum recursion limits in config
CONFIG_RECURSION_LIMITS = {
"lite": 10, # Memory → Synthesis → Archive (3 nodes + buffer)
"standard": 25, # Full path + 1 rework: ~12-14 nodes
"full": 100 # Full path + 3 reworks: ~20-24 nodes
}
log.info("=" * 60)
log.info("✅ GRAPHS COMPILED SUCCESSFULLY")
log.info(f" - Lite: Memory → Synthesis → Archive (limit: {CONFIG_RECURSION_LIMITS['lite']})")
log.info(f" - Standard: Full path + 1 rework (limit: {CONFIG_RECURSION_LIMITS['standard']})")
log.info(f" - Full: Full path + 3 reworks (limit: {CONFIG_RECURSION_LIMITS['full']})")
log.info("=" * 60)
# =============================================================================
# SECTION 9: EXECUTION ENTRY POINT
# =============================================================================
def execute_request(user_input: str, session_id: str = None,
preferred_tier: str = None) -> Dict[str, Any]:
"""
Main execution entry point.
Args:
user_input: User's request
session_id: Session identifier
preferred_tier: lite, standard, or full
Returns:
Final state dict
"""
# Initialize
session_id = session_id or uuid.uuid4().hex[:8]
preferred_tier = cfg.validate_tier(preferred_tier or cfg.DEFAULT_TIER)
# Get tier config
tier_config = cfg.get_tier_config(preferred_tier)
# Build initial state
initial_state = {
"userInput": user_input,
"session_id": session_id,
"preferred_tier": preferred_tier,
"is_follow_up": False,
"conversation_context": None,
"execution_mode": "coding",
"retrievedMemory": None,
"coreObjectivePrompt": "",
"pmPlan": None,
"experimentCode": None,
"experimentResults": None,
"researchResults": None,
"draftResponse": "",
"qaFeedback": None,
"approved": False,
"execution_path": [],
"rework_cycles": 0,
"max_rework_cycles": tier_config["qa_rework_cycles"],
"status_updates": [],
"current_cost": 0.0,
"budget_limit": tier_config["max_cost"] or float('inf')
}
# Select appropriate graph and set recursion limit
# CRITICAL: Recursion limits prevent infinite loops
if preferred_tier == cfg.TIER_LITE:
app = lite_app
recursion_limit = 10 # Lite: ~3-5 nodes
elif preferred_tier == cfg.TIER_FULL:
app = full_app
recursion_limit = 100 # Full: up to 3 reworks = ~40 nodes max
else:
app = standard_app
recursion_limit = 30 # Standard: up to 1 rework = ~15-20 nodes
log.info(f"🚀 Executing with {preferred_tier} tier (recursion_limit={recursion_limit})")
# Execute graph with recursion limit config
try:
config = {"recursion_limit": recursion_limit}
final_state = app.invoke(initial_state, config=config)
return final_state
except Exception as e:
log.exception(f"Execution failed: {e}")
return {
**initial_state,
"draftResponse": f"❌ Execution error: {e}",
"approved": False
}
# =============================================================================
# SECTION 10: LEGACY COMPATIBILITY
# =============================================================================
# For backward compatibility with existing app_gradio.py
def apply_upgrades() -> bool:
"""
Legacy function for compatibility.
In the simplified system, graphs are already "upgraded".
"""
log.info("✅ Simplified graphs already active (no upgrade needed)")
return True
# Triage and planner apps for initial estimation
def build_triage_app():
"""Simple triage for greeting detection."""
workflow = StateGraph(AgentState)
def triage_node(state: AgentState) -> Dict[str, Any]:
user_input = state.get('userInput', '').lower()
greetings = ['hello', 'hi', 'hey', 'greetings']
if any(g in user_input for g in greetings) and len(user_input.split()) < 5:
return {
"draftResponse": "Hello! How can I help you today?",
**add_status("Triage", "Greeting detected")
}
return add_status("Triage", "Task detected")
workflow.add_node("triage", triage_node)
workflow.set_entry_point("triage")
workflow.add_edge("triage", END)
return workflow.compile()
def build_planner_app():
"""Quick planner for cost estimation."""
workflow = StateGraph(AgentState)
def planner_node(state: AgentState) -> Dict[str, Any]:
user_input = state.get('userInput', '')
prompt = f"""Quick estimate for: {user_input}
Return JSON:
{{
"plan_steps": ["step 1", "step 2", "step 3"],
"estimated_llm_calls_per_loop": 3,
"max_loops_initial": 3
}}"""
try:
response = llm.invoke(prompt)
plan = parse_json_safe(getattr(response, "content", ""))
except:
plan = None
if not plan:
plan = {
"plan_steps": ["Analyze request", "Create solution", "Review quality"],
"estimated_llm_calls_per_loop": 3,
"max_loops_initial": 3
}
# Cost calculation
calls = plan.get('estimated_llm_calls_per_loop', 3)
cost_per_loop = cfg.calculate_cost_per_call() * calls
total_loops = plan.get('max_loops_initial', 3) + 1
plan['cost_per_loop_usd'] = round(cost_per_loop, 4)
plan['estimated_cost_usd'] = round(cost_per_loop * total_loops, 2)
return {
"pmPlan": plan,
**add_status("Planner", "Estimate created")
}
workflow.add_node("planner", planner_node)
workflow.set_entry_point("planner")
workflow.add_edge("planner", END)
return workflow.compile()
# Create triage and planner apps
triage_app = build_triage_app()
planner_app = build_planner_app()
# =============================================================================
# SECTION 11: EXPORTS
# =============================================================================
__all__ = [
# State
'AgentState',
# Main execution
'execute_request',
# Tier-specific apps
'lite_app',
'standard_app',
'full_app',
'main_app',
# Helper apps
'triage_app',
'planner_app',
# Agent nodes (for testing)
'run_memory_retrieval',
'run_intent_clarification',
'run_planning_agent',
'run_experimenter_agent',
'run_research_agent',
'run_synthesis_agent',
'run_qa_agent',
'run_observer_agent',
'run_archive_agent',
# Helpers
'parse_json_safe',
'determine_execution_mode',
'detect_language_priority',
# Legacy compatibility
'apply_upgrades',
# Configuration constants (re-export)
'INITIAL_MAX_REWORK_CYCLES',
'BUDGET_BUFFER_MULTIPLIER',
'MAX_COST_MULTIPLIER',
]
# =============================================================================
# INITIALIZATION LOG
# =============================================================================
log.info("=" * 60)
log.info("SIMPLIFIED 3-TIER GRAPH SYSTEM READY")
log.info("=" * 60)
log.info("Available tiers:")
for tier_name, tier_conf in cfg.TIER_CONFIGS.items():
log.info(f" • {tier_conf['name']}: {tier_conf['description']}")
log.info("=" * 60)
log.info("Features enabled:")
log.info(" ✅ Conversation context tracking")
log.info(" ✅ Universal file access")
log.info(" ✅ Research mode with citations")
log.info(" ✅ Multi-language support")
log.info(" ✅ No-loop guarantee")
log.info("=" * 60)