""" graph_merged.py - Unified AI Lab Graph System (Liberal Budget Policy v2.0) ============================================================================= LIBERAL BUDGET POLICY: - 20% budget buffer on user input - Stop only at 120% of user budget - 10x rework cycles (3 → 10 → 20 hard limit) - 150 node execution path limit - Always proceed to Experimenter unless explicitly rejected This file contains both the BASE and UPGRADED graph implementations: - BASE GRAPH: Simple workflow (Memory → Intent → PM → Experimenter → Synthesis → QA → Archivist) - UPGRADED GRAPH: Enhanced workflow with governance, compliance, and observation layers Author: AI Lab Team Last Updated: 2025-10-08 Version: 2.0 - Liberal Budget Policy """ # ============================================================================= # SECTION 1: IMPORTS AND CONFIGURATION # ============================================================================= import json import re import math import os import uuid import shutil import zipfile import operator from typing import TypedDict, List, Dict, Optional, Annotated, Any from datetime import datetime # LangChain imports from langchain_openai import ChatOpenAI from langgraph.graph import StateGraph, END # Local imports from memory_manager import memory_manager from code_executor import execute_python_code from logging_config import setup_logging, get_logger # Multi-language support from multi_language_support import ( detect_language, extract_code_blocks_multi_lang, execute_code, detect_requested_output_types_enhanced, write_script_multi_lang, LANGUAGES ) # Artifact generation libraries import nbformat from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell import pandas as pd from docx import Document as DocxDocument from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet # Setup logging setup_logging() log = get_logger(__name__) # ============================================================================= # SECTION 2: CONFIGURATION CONSTANTS (LIBERAL BUDGET POLICY) # ============================================================================= # REPLACE from line 55 to wherever these constants end OUT_DIR = os.environ.get("OUT_DIR", "/tmp") os.makedirs(OUT_DIR, exist_ok=True) EXPORTS_DIR = os.path.join(OUT_DIR, "exports") os.makedirs(EXPORTS_DIR, exist_ok=True) # Cost and loop control - FIXED LIMITS INITIAL_MAX_REWORK_CYCLES = 3 # FIXED: Was 100 BUDGET_BUFFER_MULTIPLIER = 1.10 # FIXED: Was 1.20 MAX_COST_MULTIPLIER = 1.15 # FIXED: Was 1.20 MAX_EXECUTION_PATH_LENGTH = 500 # FIXED: Was 150 LOOP_DETECTION_WINDOW = 20 # NEW LOOP_THRESHOLD = 5 # NEW GPT4O_INPUT_COST_PER_1K_TOKENS = 0.005 GPT4O_OUTPUT_COST_PER_1K_TOKENS = 0.015 AVG_TOKENS_PER_CALL = 2.0 # Artifact types KNOWN_ARTIFACT_TYPES = {"notebook", "excel", "word", "pdf", "image", "repo", "script"} log.info("=" * 60) log.info("FIXED BUDGET POLICY ACTIVE") log.info(f"Max rework cycles: {INITIAL_MAX_REWORK_CYCLES}") log.info(f"Budget buffer: {int((BUDGET_BUFFER_MULTIPLIER - 1) * 100)}%") log.info(f"Stop threshold: {int(MAX_COST_MULTIPLIER * 100)}% of budget") log.info(f"Max path length: {MAX_EXECUTION_PATH_LENGTH}") log.info("=" * 60) # ============================================================================= # SECTION 3: STATE DEFINITION # ============================================================================= class AgentState(TypedDict): """ State shared across all agents in the workflow. Uses Annotated with operator.add for fields that multiple agents might update. """ # Core inputs userInput: str chatHistory: List[str] coreObjectivePrompt: str # Memory and planning retrievedMemory: Optional[str] pmPlan: Dict # Execution experimentCode: Optional[str] experimentResults: Optional[Dict] # Output draftResponse: str qaFeedback: Optional[str] approved: bool # Workflow control execution_path: Annotated[List[str], operator.add] rework_cycles: int max_loops: int status_updates: Annotated[List[Dict[str, str]], operator.add] # Budget and governance (LIBERAL POLICY) current_cost: float budget: float stop_threshold: float # NEW: 120% of budget budget_exceeded: bool flexible_budget_mode: bool preferred_tier: Optional[str] auto_accept_approved_with_warning: bool # Reports (UPGRADED GRAPH ONLY) pragmatistReport: Optional[Dict] governanceReport: Optional[Dict] complianceReport: Optional[Dict] observerReport: Optional[Dict] knowledgeInsights: Optional[Dict] # ============================================================================= # SECTION 4: HELPER FUNCTIONS # ============================================================================= def ensure_list(state: AgentState, key: str) -> List: """Safely extract a list from state, handling None and non-list types.""" v = state.get(key) if state else None if v is None: return [] if isinstance(v, list): return v if isinstance(v, tuple): return list(v) return [v] def ensure_int(state: AgentState, key: str, default: int = 0) -> int: """Safely extract an integer from state.""" try: v = state.get(key) if state else None if v is None: return default return int(v) except Exception: return default def sanitize_path(path: str) -> str: """Convert path to absolute path for security.""" return os.path.abspath(path) def get_latest_status(state: AgentState) -> str: """Get the most recent status update from the list.""" updates = state.get('status_updates', []) if updates and isinstance(updates, list): for update in reversed(updates): if isinstance(update, dict) and 'status' in update: return update['status'] elif isinstance(update, str): return update return "Processing..." def add_status_update(node_name: str, status: str) -> Dict[str, Any]: """ Create a status update entry. Returns a dict suitable for merging into agent return values. """ return { "status_updates": [{ "node": node_name, "status": status, "timestamp": datetime.utcnow().isoformat() }] } # ============================================================================= # SECTION 5: LLM INITIALIZATION AND JSON PARSING # ============================================================================= # Initialize LLM llm = ChatOpenAI(model="gpt-4o", temperature=0.4, max_retries=3, request_timeout=60) def parse_json_from_llm(llm_output: str) -> Optional[dict]: """ Robust JSON extraction from LLM output. Tries multiple strategies: 1. Explicit ```json {} ``` fenced blocks 2. Any fenced code block with JSON-like content 3. First balanced {...} substring 4. ast.literal_eval for Python-like dicts 5. Conservative cleanup (remove comments, trailing commas) Returns: dict or None on failure """ import ast if not llm_output or not isinstance(llm_output, str) or not llm_output.strip(): return None text = llm_output.strip() # Strategy 1: Explicit JSON fenced block match = re.search(r"```json\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE) if match: candidate = match.group(1).strip() try: return json.loads(candidate) except Exception as e: log.debug(f"json.loads failed on triple-backtick json block: {e}") # Strategy 2: Any code fence with JSON-like content match2 = re.search(r"```(?:json|python|text)?\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE) if match2: candidate = match2.group(1).strip() try: return json.loads(candidate) except Exception as e: log.debug(f"json.loads failed on fenced candidate: {e}") # Strategy 3: Find balanced {...} substring def find_balanced_brace_substring(s: str): start_idx = None depth = 0 for i, ch in enumerate(s): if ch == '{': if start_idx is None: start_idx = i depth += 1 elif ch == '}': if depth > 0: depth -= 1 if depth == 0 and start_idx is not None: return s[start_idx:i+1] return None candidate = find_balanced_brace_substring(text) # Strategy 4: Fallback heuristic if not candidate: first = text.find('{') last = text.rfind('}') if first != -1 and last != -1 and last > first: candidate = text[first:last+1] if candidate: # Try direct JSON parsing try: return json.loads(candidate) except Exception as e: log.debug(f"json.loads failed on candidate substring: {e}") # Try ast.literal_eval (handles single quotes) try: parsed = ast.literal_eval(candidate) if isinstance(parsed, (dict, list)): return json.loads(json.dumps(parsed)) except Exception as e: log.debug(f"ast.literal_eval failed: {e}") # Conservative cleanup cleaned = candidate try: # Remove line comments cleaned = re.sub(r"//.*?$", "", cleaned, flags=re.MULTILINE) # Remove block comments cleaned = re.sub(r"/\*.*?\*/", "", cleaned, flags=re.DOTALL) # Remove trailing commas cleaned = re.sub(r",\s*([}\]])", r"\1", cleaned) # Replace single quotes with double quotes (carefully) def _single_to_double(m): inner = m.group(1) inner_escaped = inner.replace('"', '\\"') return f'"{inner_escaped}"' cleaned = re.sub(r"(?<=[:\{\[,]\s*)'([^']*?)'", _single_to_double, cleaned) return json.loads(cleaned) except Exception as e: log.debug(f"json.loads still failed after cleanup: {e}") # All strategies failed log.error(f"parse_json_from_llm failed. Output preview: {text[:200]}") return None # ============================================================================= # SECTION 6: ARTIFACT DETECTION AND GENERATION # ============================================================================= def detect_requested_output_types(text: str) -> Dict: """ Detect what type of artifact the user wants. Uses the enhanced version from multi_language_support. """ return detect_requested_output_types_enhanced(text) def normalize_experiment_type(exp_type: Optional[str], goal_text: str) -> str: """Normalize experiment type to known artifact types.""" if not exp_type: detection = detect_requested_output_types(goal_text or "") return detection.get("artifact_type") or "word" s = exp_type.strip().lower() if s in KNOWN_ARTIFACT_TYPES: return s # Map variations to known types mapping = { "notebook": ["notebook", "ipynb", "jupyter"], "excel": ["excel", "xlsx", "spreadsheet"], "word": ["word", "docx", "doc"], "pdf": ["pdf"], "repo": ["repo", "repository", "backend", "codebase"], "script": ["script", "python", "py"] } for known_type, keywords in mapping.items(): if any(kw in s for kw in keywords): return known_type # Fallback: use detection detection = detect_requested_output_types(goal_text or "") return detection.get("artifact_type") or "word" def write_notebook_from_text(llm_text: str, out_dir: Optional[str] = None) -> str: """Generate a Jupyter notebook from LLM output.""" out_dir = out_dir or OUT_DIR os.makedirs(out_dir, exist_ok=True) # Extract code blocks code_blocks = re.findall(r"```python\s*(.*?)\s*```", llm_text, re.DOTALL) if not code_blocks: code_blocks = re.findall(r"```\s*(.*?)\s*```", llm_text, re.DOTALL) # Split markdown sections md_parts = re.split(r"```(?:python)?\s*.*?\s*```", llm_text, flags=re.DOTALL) # Build notebook nb = new_notebook() cells = [] max_len = max(len(md_parts), len(code_blocks)) for i in range(max_len): if i < len(md_parts) and md_parts[i].strip(): cells.append(new_markdown_cell(md_parts[i].strip())) if i < len(code_blocks) and code_blocks[i].strip(): cells.append(new_code_cell(code_blocks[i].strip())) if not cells: cells = [new_markdown_cell("# Notebook\n\nNo content generated.")] nb['cells'] = cells uid = uuid.uuid4().hex[:10] filename = os.path.join(out_dir, f"generated_notebook_{uid}.ipynb") nbformat.write(nb, filename) return filename def write_script(code_text: str, language_hint: Optional[str] = None, out_dir: Optional[str] = None) -> str: """ Write a script file with appropriate extension. Uses multi-language support. """ return write_script_multi_lang(code_text, language_hint, out_dir) def write_docx_from_text(text: str, out_dir: Optional[str] = None) -> str: """Generate a Word document from text.""" out_dir = out_dir or OUT_DIR os.makedirs(out_dir, exist_ok=True) doc = DocxDocument() for para in [p.strip() for p in text.split("\n\n") if p.strip()]: doc.add_paragraph(para) uid = uuid.uuid4().hex[:10] filename = os.path.join(out_dir, f"generated_doc_{uid}.docx") doc.save(filename) return filename def write_excel_from_tables(maybe_table_text: str, out_dir: Optional[str] = None) -> str: """Generate an Excel file from table data.""" out_dir = out_dir or OUT_DIR os.makedirs(out_dir, exist_ok=True) uid = uuid.uuid4().hex[:10] filename = os.path.join(out_dir, f"generated_excel_{uid}.xlsx") try: # Try to parse as JSON try: parsed = json.loads(maybe_table_text) if isinstance(parsed, list): df = pd.DataFrame(parsed) elif isinstance(parsed, dict): df = pd.DataFrame([parsed]) else: df = pd.DataFrame({"content": [str(maybe_table_text)]}) except Exception: # Try CSV parsing if "," in maybe_table_text: from io import StringIO df = pd.read_csv(StringIO(maybe_table_text)) else: df = pd.DataFrame({"content": [maybe_table_text]}) df.to_excel(filename, index=False, engine="openpyxl") return filename except Exception as e: log.error(f"Excel creation failed: {e}") return write_docx_from_text(f"Excel error: {e}\n\n{maybe_table_text}", out_dir=out_dir) def write_pdf_from_text(text: str, out_dir: Optional[str] = None) -> str: """Generate a PDF from text.""" out_dir = out_dir or OUT_DIR os.makedirs(out_dir, exist_ok=True) uid = uuid.uuid4().hex[:10] filename = os.path.join(out_dir, f"generated_doc_{uid}.pdf") try: doc = SimpleDocTemplate(filename) styles = getSampleStyleSheet() flowables = [] for para in [p.strip() for p in text.split("\n\n") if p.strip()]: flowables.append(Paragraph(para.replace("\n", "
"), styles["Normal"])) flowables.append(Spacer(1, 8)) doc.build(flowables) return filename except Exception as e: log.error(f"PDF creation failed: {e}") return write_docx_from_text(f"PDF error: {e}\n\n{text}", out_dir=out_dir) def build_repo_zip(files_map: Dict[str, str], repo_name: str = "generated_app", out_dir: Optional[str] = None) -> str: """Build a repository as a ZIP file.""" out_dir = out_dir or OUT_DIR os.makedirs(out_dir, exist_ok=True) uid = uuid.uuid4().hex[:8] repo_dir = os.path.join(out_dir, f"{repo_name}_{uid}") os.makedirs(repo_dir, exist_ok=True) # Write all files for rel_path, content in files_map.items(): dest = os.path.join(repo_dir, rel_path) os.makedirs(os.path.dirname(dest), exist_ok=True) if isinstance(content, str) and os.path.exists(content): shutil.copyfile(content, dest) else: with open(dest, "w", encoding="utf-8") as fh: fh.write(str(content)) # Create ZIP zip_path = os.path.join(out_dir, f"{repo_name}_{uid}.zip") with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for root, _, files in os.walk(repo_dir): for f in files: full = os.path.join(root, f) arc = os.path.relpath(full, repo_dir) zf.write(full, arc) return zip_path # ============================================================================= # SECTION 7: BASE GRAPH AGENT NODES # ============================================================================= # These are the core agents that form the foundation of the workflow. def run_triage_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Determines if input is a greeting or task. Simple classifier to route casual interactions. """ log.info("--- TRIAGE ---") prompt = f"Is this a greeting or a task? '{state.get('userInput', '')}' Reply: 'greeting' or 'task'" response = llm.invoke(prompt) content = getattr(response, "content", "") or "" if 'greeting' in content.lower(): return { "draftResponse": "Hello! How can I help?", "execution_path": ["Triage"], **add_status_update("Triage", "Greeting") } return { "execution_path": ["Triage"], **add_status_update("Triage", "Task detected") } def run_planner_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Quick planning agent for cost estimation. Used for initial triage and budgeting. Robust behavior: - ask LLM for JSON-only output - try parse_json_from_llm (existing) - if that fails, attempt regex-extract JSON and json.loads - retry LLM with low temperature if parsing fails - fallback to heuristic plaintext -> plan conversion (partial result) - compute costs with sane clamping and document LLM preview when fallbacked """ import json import re import logging log.info("--- PLANNER ---") path = ensure_list(state, 'execution_path') + ["Planner"] user_input = state.get('userInput', '') or "" # Build a strict JSON-only prompt with schema + example prompt = ( "You are a planning assistant. **Return ONLY valid JSON** (no explanation, no markdown).\n" "Schema: {\"plan\": [string], \"estimated_llm_calls_per_loop\": integer}\n" "Example:\n" "{\n" " \"plan\": [\"Market research: 1 week\", \"MVP: 4 weeks\"],\n" " \"estimated_llm_calls_per_loop\": 4\n" "}\n\n" f"Create a plan for: {json.dumps(user_input)}\n" "Keep plan concise (each item one short sentence)." ) # Helper: extract JSON blob with regex def _extract_json_blob(text: str) -> Optional[str]: if not text: return None # Try to find outermost JSON object or array # Use non-greedy match between braces/brackets to capture first JSON-like blob m = re.search(r'(\{[\s\S]*\})', text) if m: return m.group(1) m2 = re.search(r'(\[[\s\S]*\])', text) if m2: return m2.group(1) return None # Try primary LLM call try: response = llm.invoke(prompt) except Exception as e: log.exception("Planner: llm.invoke failed: %s", e) return { "pmPlan": {"error": "LLM invoke failed"}, "execution_path": path, **add_status_update("Planner", "Error") } raw = (getattr(response, "content", "") or "").strip() # 1) Try existing parser if available plan_data = None try: # prefer using existing parse_json_from_llm if present plan_data = parse_json_from_llm(raw) if 'parse_json_from_llm' in globals() else None except Exception: plan_data = None # 2) If parse_json_from_llm failed, try regex extraction + json.loads if not plan_data: try: blob = _extract_json_blob(raw) if blob: plan_data = json.loads(blob) except Exception: plan_data = None # 3) Retry the LLM with stricter settings (if parse still failed) if not plan_data: try: log.info("Planner: parse failed, retrying with stricter instruction / low temp.") # Attempt a retry - adapt params to your llm wrapper if needed retry_prompt = prompt + "\n\nIMPORTANT: Return only JSON, nothing else." # Some wrappers accept params, try best-effort; ignore if it errors retry_resp = None try: retry_resp = llm.invoke(retry_prompt, params={"temperature": 0.0}) except Exception: retry_resp = llm.invoke(retry_prompt) raw_retry = (getattr(retry_resp, "content", "") or "").strip() # attempt parse_json_from_llm on retry try: plan_data = parse_json_from_llm(raw_retry) if 'parse_json_from_llm' in globals() else None except Exception: plan_data = None if not plan_data: blob = _extract_json_blob(raw_retry) if blob: try: plan_data = json.loads(blob) raw = raw + "\n\n--- retry ---\n\n" + raw_retry except Exception: plan_data = None except Exception as e: log.exception("Planner retry failed: %s", e) # 4) Final fallback: convert plaintext into a minimal plan so pipeline continues if not plan_data: log.warning("Planner: JSON parse failed, using plaintext fallback.") # heuristics to make steps fragments = [f.strip() for f in re.split(r'\n+|(?<=\.)\s+', raw) if f.strip()] steps = [] for frag in fragments: s = frag if len(frag) <= 200 else frag[:197] + "..." if len(s) > 10 and s not in steps: steps.append(s) if len(steps) >= 8: break if not steps: steps = [ "Clarify requirements with user.", "Perform market research and competitor analysis.", "Draft curriculum and module outline.", "Estimate effort and resources for MVP." ] plan_data = { "plan": steps, "estimated_llm_calls_per_loop": 3, # add debug info for observability "notes": "Derived from plaintext fallback; original LLM output not valid JSON.", "llm_preview": raw[:2000] } # Sanitize and normalize plan_data fields if not isinstance(plan_data, dict): plan_data = {"plan": ["Clarify requirements with user."], "estimated_llm_calls_per_loop": 3} if 'plan' not in plan_data or not isinstance(plan_data['plan'], list): plan_data['plan'] = plan_data.get('plan') and list(plan_data.get('plan')) or ["Clarify requirements with user."] try: calls = int(plan_data.get('estimated_llm_calls_per_loop', 3)) except Exception: calls = 3 # clamp calls to sensible range calls = max(1, min(calls, 50)) plan_data['estimated_llm_calls_per_loop'] = calls # Cost calculation: tokens * avg_cost_per_1k / 1000 -> USD avg_cost_per_1k = (GPT4O_INPUT_COST_PER_1K_TOKENS + GPT4O_OUTPUT_COST_PER_1K_TOKENS) / 2.0 # AVG_TOKENS_PER_CALL is tokens per call try: avg_tokens = float(AVG_TOKENS_PER_CALL) except Exception: avg_tokens = 1500.0 cost_per_loop = (calls * avg_tokens) * (avg_cost_per_1k / 1000.0) plan_data['max_loops_initial'] = INITIAL_MAX_REWORK_CYCLES plan_data['cost_per_loop_usd'] = max(0.01, round(cost_per_loop, 4)) total_loops = INITIAL_MAX_REWORK_CYCLES + 1 plan_data['estimated_cost_usd'] = round(cost_per_loop * total_loops, 2) # Detect artifact requirements (preserve existing behavior) detection = detect_requested_output_types(user_input) if detection.get('requires_artifact'): plan_data.setdefault('experiment_needed', True) plan_data.setdefault('experiment_type', detection.get('artifact_type')) plan_data.setdefault('experiment_goal', user_input) return { "pmPlan": plan_data, "execution_path": path, **add_status_update("Planner", "Plan created") } def run_memory_retrieval(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Retrieve relevant memories for context. Uses vector similarity search on past interactions. """ log.info("--- MEMORY ---") path = ensure_list(state, 'execution_path') + ["Memory"] mems = memory_manager.retrieve_relevant_memories(state.get('userInput', '')) context = "\n".join([f"Memory: {m.page_content}" for m in mems]) if mems else "No memories" return { "retrievedMemory": context, "execution_path": path, **add_status_update("Memory", "Memory retrieved") } def run_intent_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Clarify and refine user intent. Transforms raw input into clear, actionable objective. """ log.info("--- INTENT ---") path = ensure_list(state, 'execution_path') + ["Intent"] prompt = ( f"Refine into clear objective.\n\n" f"Memory: {state.get('retrievedMemory')}\n\n" f"Request: {state.get('userInput', '')}\n\n" f"Core Objective:" ) response = llm.invoke(prompt) core_obj = getattr(response, "content", "") or "" return { "coreObjectivePrompt": core_obj, "execution_path": path, **add_status_update("Intent", "Objective clarified") } def validate_and_fix_pm_plan(plan: Dict, user_input: str) -> Dict: """ CRITICAL: Forces artifact creation if user requested document output. Prevents experiment_needed=false when user explicitly wants files. """ if not isinstance(plan, dict): plan = {} # Detect document/file requests needs_artifact = any(keyword in user_input.lower() for keyword in [ 'docx', 'document', 'word', 'file', 'pdf', 'excel', 'output -', 'output:', 'results in', 'deliver as', 'save as', 'export to', 'create file', 'generate file', 'avoid code', 'no code' # If avoiding code, must want document ]) if needs_artifact: log.info("🔍 User requested document/file output - forcing artifact creation") # FORCE artifact creation if not plan.get("experiment_needed") or plan.get("experiment_needed") == False: log.warning("⚠️ Overriding experiment_needed from false to TRUE") plan["experiment_needed"] = True # Set document type if 'pdf' in user_input.lower(): plan["experiment_type"] = "pdf" elif 'excel' in user_input.lower() or 'xlsx' in user_input.lower(): plan["experiment_type"] = "excel" else: plan["experiment_type"] = "word" # Default to Word # Ensure goal is set if not plan.get("experiment_goal"): plan["experiment_goal"] = user_input log.info(f"📋 Forced: experiment_needed=True, type={plan.get('experiment_type')}") # Ensure basic plan structure if not plan.get("plan_steps") or not isinstance(plan["plan_steps"], list): plan["plan_steps"] = [ "Analyze request", "Gather information", "Create document", "Format output" ] return plan def run_pm_agent(state: AgentState) -> Dict[str, Any]: """ FIXED: PM Agent with proper hard limits and language preference. """ log.info("--- PM ---") # Get current state current_rework = ensure_int(state, 'rework_cycles', 0) max_loops_val = ensure_int(state, 'max_loops', INITIAL_MAX_REWORK_CYCLES) # CRITICAL FIX 1: Cap max_loops if unreasonable if max_loops_val > 10: log.warning(f"⚠️ max_loops {max_loops_val} TOO HIGH! Forcing to 3") max_loops_val = 3 state["max_loops"] = 3 # Calculate hard limit (never exceed 6 total reworks) hard_limit = min(max_loops_val * 2, 6) log.info(f"📊 Rework cycle: {current_rework}/{hard_limit} (max: {max_loops_val})") # CRITICAL FIX 2: Force completion if at hard limit if current_rework >= hard_limit: log.error(f"❌ Hard limit reached: {current_rework} >= {hard_limit}") path = ensure_list(state, 'execution_path') + ["PM"] user_input = state.get('userInput', '') is_research = any(kw in user_input.lower() for kw in [ 'research', 'everything to start', 'comprehensive', 'guide to' ]) # Create completion plan completion_plan = { "plan_steps": [ "Create comprehensive research document", "Include market analysis and competitive review", "Add Python code examples for AI implementation", "Provide actionable implementation roadmap" ], "experiment_needed": True, "experiment_type": "word", # Force document for research "experiment_goal": state.get('coreObjectivePrompt', user_input) } return { "pmPlan": completion_plan, "execution_path": path, "rework_cycles": current_rework, "approved": True, # FORCE APPROVAL to complete "max_loops": 3, **add_status_update("PM", "Limit reached - completing with document") } # CRITICAL FIX 3: Check for loops (but not on first few cycles) if current_rework > 1: if detect_loop(state): log.error("❌ Loop detected in PM agent") path = ensure_list(state, 'execution_path') + ["PM"] # Force completion return { "pmPlan": { "plan_steps": ["Create research document with analysis"], "experiment_needed": True, "experiment_type": "word", "experiment_goal": state.get('coreObjectivePrompt', '') }, "execution_path": path, "rework_cycles": current_rework, "approved": True, **add_status_update("PM", "Loop detected - forcing completion") } # Increment cycle counter current_cycles = current_rework + 1 path = ensure_list(state, 'execution_path') + ["PM"] # Build planning context user_input = state.get('userInput', '') context_parts = [ f"=== USER REQUEST ===\n{user_input}", f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", ] # Add language requirement if 'prefer python' in user_input.lower() or 'use python' in user_input.lower(): context_parts.append("\n=== LANGUAGE REQUIREMENT ===\nUSER REQUIRES PYTHON") # Add feedback if rework if state.get('qaFeedback'): context_parts.append(f"\n=== QA FEEDBACK ===\n{state.get('qaFeedback')}") context_parts.append(f"\n=== REWORK CYCLE {current_cycles}/{hard_limit} ===") full_context = "\n".join(context_parts) # Detect if research request is_research = any(kw in user_input.lower() for kw in [ 'research', 'analysis', 'everything to start', 'comprehensive' ]) # Create planning prompt prompt = f"""Create DETAILED execution plan. {full_context} {"IMPORTANT: This is a research request. Create a document with Python code examples." if is_research else ""} Return JSON: {{ "plan_steps": ["step 1", "step 2", ...], "experiment_needed": true/false, "experiment_type": "{"word" if is_research else "script|notebook|excel|word|pdf|repo"}", "experiment_goal": "specific goal", "key_requirements": ["req 1", "req 2", ...] }} Be specific and actionable. """ # Get plan from LLM try: response = llm.invoke(prompt) plan = parse_json_from_llm(getattr(response, "content", "") or "") except Exception as e: log.warning(f"PM LLM failed: {e}") plan = None # CRITICAL: Validate and fix plan BEFORE using it user_input = state.get('userInput', '') plan = validate_and_fix_pm_plan(plan or {}, user_input) # Log what we're doing log.info(f"📋 Final plan: experiment_needed={plan.get('experiment_needed')}, type={plan.get('experiment_type')}") # Fallback if still no plan structure if not plan.get("plan_steps"): plan["plan_steps"] = [ "Analyze requirements", "Research market", "Create deliverable" ] # Normalize experiment type exp_type = normalize_experiment_type( plan.get('experiment_type'), plan.get('experiment_goal', '') ) plan['experiment_type'] = exp_type # Ensure goal is set if plan.get('experiment_needed') and not plan.get('experiment_goal'): plan['experiment_goal'] = user_input # Add loop control info plan['max_loops_initial'] = max_loops_val plan['hard_limit'] = hard_limit return { "pmPlan": plan, "execution_path": path, "rework_cycles": current_cycles, "max_loops": max_loops_val, **add_status_update("PM", f"Plan created - Cycle {current_cycles}/{hard_limit}") } def extract_user_language_preference(user_input: str) -> str: """Extract language preference from user rules.""" text = user_input.lower() # Check for explicit Python preference if any(p in text for p in ["prefer python", "use python", "always use python", "code", "provide script"]): return "python" # Check for "unless" clause if "unless" in text and "python" in text: # User wants Python unless explicitly stated otherwise if not any(lang in text for lang in ["typescript", "javascript", "java", "c++", "R", "PHP", "Scala", "MATLAB", "SQL"]): return "python" return None def run_experimenter_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Generates and executes code/artifacts. Handles: - Multi-language code generation - Artifact creation (notebooks, scripts, documents) - Code execution - Result capture """ log.info("--- EXPERIMENTER ---") path = ensure_list(state, 'execution_path') + ["Experimenter"] pm = state.get('pmPlan', {}) or {} # Skip if no experiment needed if not pm.get('experiment_needed'): return { "experimentCode": None, "experimentResults": None, "execution_path": path, **add_status_update("Experimenter", "No experiment needed") } # ⭐ ADD THIS SECTION ⭐ user_input = state.get('userInput', '') # CRITICAL: Check user's language preference FIRST forced_lang = extract_user_language_preference(user_input) # CRITICAL: Detect research requests is_research = any(kw in user_input.lower() for kw in [ 'research', 'analysis', 'everything to start', 'comprehensive', 'guide to starting', 'study', 'review', 'summarize' ]) # Force document output for research if is_research: exp_type = 'word' log.info("🔍 Research request detected - using document format") else: exp_type = normalize_experiment_type( pm.get('experiment_type'), pm.get('experiment_goal', '') ) # Use forced language or detect language = forced_lang or 'python' # Default to Python log.info(f"📝 Type: {exp_type}, Language: {language}") # ⭐ END NEW SECTION ⭐ # Detect language and artifact type detected = detect_requested_output_types_enhanced(pm.get('experiment_goal', '')) language = detected.get('language', 'python') exp_type = normalize_experiment_type(pm.get('experiment_type'), pm.get('experiment_goal', '')) goal = pm.get('experiment_goal', 'No goal') # Build rich context context_parts = [ f"=== USER REQUEST ===\n{state.get('userInput', '')}", f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", f"\n=== PLAN ===\n{json.dumps(pm.get('plan_steps', []), indent=2)}", f"\n=== REQUIREMENTS ===\n{json.dumps(pm.get('key_requirements', []), indent=2)}", ] if state.get('retrievedMemory'): context_parts.append(f"\n=== CONTEXT ===\n{state.get('retrievedMemory', '')}") if state.get('qaFeedback'): context_parts.append(f"\n=== FEEDBACK TO ADDRESS ===\n{state.get('qaFeedback', '')}") full_context = "\n".join(context_parts) # Get language configuration lang_config = LANGUAGES.get(language) lang_name = lang_config.name if lang_config else "Code" # OLD PROMPT (DELETE): # enhanced_prompt = f"""Create {lang_name} {exp_type} artifact... # NEW PROMPT (USE THIS): if exp_type == 'word': # Research document prompt enhanced_prompt = f"""Create COMPREHENSIVE RESEARCH DOCUMENT. USER REQUEST: {user_input} OBJECTIVE: {state.get('coreObjectivePrompt', '')} OUTPUT REQUIREMENTS: - Complete research document in markdown format - Include ALL relevant sections (market analysis, competitors, strategies, implementation) - Add Python code EXAMPLES where relevant (AI integration, tools, APIs) - Be SPECIFIC and ACTIONABLE - provide real data and analysis - NO placeholders - complete, usable content only - Structure: Executive Summary, Market Analysis, Competitive Review, Curriculum Design, Implementation Plan, Tools & Tech Stack Generate complete research document with Python examples:""" else: # Code generation prompt enhanced_prompt = f"""Create {lang_name} {exp_type}. USER REQUEST: {user_input} REQUIREMENTS: - Write ONLY {lang_name} code (user specified) - Follow best practices - Include documentation - Production-ready - Include error handling Generate {lang_name} code:""" response = llm.invoke(enhanced_prompt) llm_text = getattr(response, "content", "") or "" # ⭐ ADD FOR RESEARCH DOCUMENTS ⭐ if exp_type == 'word': # Save as document doc_path = write_docx_from_text(llm_text, out_dir=OUT_DIR) txt_path = doc_path.replace('.docx', '.txt') # Also save as text with open(txt_path, 'w', encoding='utf-8') as f: f.write(llm_text) log.info(f"📄 Created: {os.path.basename(doc_path)}") return { "experimentCode": llm_text, "experimentResults": { "success": True, "paths": { "document": sanitize_path(doc_path), "text": sanitize_path(txt_path) }, "stdout": f"Research document created: {os.path.basename(doc_path)}", "language": "markdown", "artifact_type": "research_document" }, "execution_path": path, **add_status_update("Experimenter", "Research doc created") } # ⭐ END RESEARCH SECTION ⭐ # Continue with normal code handling... # Extract code blocks with language detection code_blocks = extract_code_blocks_multi_lang(llm_text) if code_blocks: # Use first detected language/code pair detected_lang, code_text = code_blocks[0] # Write script with proper extension script_path = write_script_multi_lang(code_text, detected_lang, out_dir=OUT_DIR) # Execute with appropriate runner exec_results = execute_code(code_text, detected_lang) results = { "success": exec_results.get("exit_code", 0) == 0, "paths": {"script": sanitize_path(script_path)}, "stdout": exec_results.get("stdout", ""), "stderr": exec_results.get("stderr", ""), "language": detected_lang, "context_used": len(full_context) } return { "experimentCode": code_text, "experimentResults": results, "execution_path": path, **add_status_update("Experimenter", f"{lang_name} script created") } # No code blocks found return { "experimentCode": None, "experimentResults": {"success": False, "error": "No code generated"}, "execution_path": path, **add_status_update("Experimenter", "Code generation failed") } def run_synthesis_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Synthesizes final response from all components. Combines: - User request - Execution results - Artifacts created - Insights and explanations """ log.info("--- SYNTHESIS ---") _state = state or {} path = ensure_list(_state, 'execution_path') + ["Synthesis"] exp_results = _state.get('experimentResults') pm_plan = _state.get('pmPlan', {}) or {} # Build synthesis context synthesis_context = [ f"=== USER REQUEST ===\n{_state.get('userInput', '')}", f"\n=== OBJECTIVE ===\n{_state.get('coreObjectivePrompt', '')}", f"\n=== PLAN ===\n{json.dumps(pm_plan.get('plan_steps', []), indent=2)}", ] artifact_details = [] artifact_message = "" # Process experiment results if exp_results and isinstance(exp_results, dict): paths = exp_results.get("paths") or {} if paths: artifact_lines = [] for artifact_type, artifact_path in paths.items(): artifact_lines.append(f"- **{artifact_type.title()}**: `{os.path.basename(artifact_path)}`") artifact_details.append(f"{artifact_type}: {artifact_path}") artifact_message = "\n\n**Artifacts Generated:**\n" + "\n".join(artifact_lines) synthesis_context.append(f"\n=== ARTIFACTS ===\n" + "\n".join(artifact_details)) if exp_results.get('stdout'): synthesis_context.append(f"\n=== OUTPUT ===\n{exp_results.get('stdout', '')}") if exp_results.get('stderr'): synthesis_context.append(f"\n=== ERRORS ===\n{exp_results.get('stderr', '')}") full_context = "\n".join(synthesis_context) # Generate final response synthesis_prompt = f"""Create FINAL RESPONSE after executing user's request. {full_context} Create comprehensive response that: - Directly addresses original request - Explains what was accomplished and HOW - References specific artifacts and explains PURPOSE - Provides context on how to USE deliverables - Highlights KEY INSIGHTS - Suggests NEXT STEPS if relevant - Be SPECIFIC about what was created.""" response = llm.invoke(synthesis_prompt) final_text = getattr(response, "content", "") or "" # After getting exp_results... # ⭐ ADD THIS SECTION ⭐ content_preview = "" if exp_results and isinstance(exp_results, dict): paths = exp_results.get("paths") or {} if paths: for artifact_type, artifact_path in paths.items(): if os.path.exists(artifact_path): try: with open(artifact_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() preview = content[:2000] # First 2000 chars content_preview += f"\n\n### 📄 {artifact_type.title()} Preview:\n```\n{preview}\n...\n```\n" except Exception as e: log.warning(f"Could not preview {artifact_path}: {e}") # ⭐ END NEW SECTION ⭐ # Then in final_text assembly: if artifact_message: final_text = final_text + "\n\n---\n" + artifact_message if content_preview: final_text = final_text + "\n\n---\n## Content Preview\n" + content_preview return { "draftResponse": final_text, "execution_path": path, **add_status_update("Synthesis", "Response synthesized") } def run_qa_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Quality assurance review. Validates: - Completeness - Correctness - Alignment with user request - Quality standards """ log.info("--- QA ---") path = ensure_list(state, 'execution_path') + ["QA"] # Build QA context qa_context = [ f"=== REQUEST ===\n{state.get('userInput', '')}", f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", f"\n=== DRAFT ===\n{state.get('draftResponse', '')}", ] if state.get('experimentResults'): qa_context.append( f"\n=== ARTIFACTS ===\n" f"{json.dumps(state.get('experimentResults', {}).get('paths', {}), indent=2)}" ) prompt = f"""You are a QA reviewer. Review the draft response against the user's objective. {chr(10).join(qa_context)} Review Instructions: - Does the draft and its artifacts COMPLETELY satisfy ALL parts of the user's request? - Is the quality of the work high? - If this is a re-submission (rework cycle > 1), has the previous feedback been successfully addressed? Response Format (required JSON or a single word 'APPROVED'): Either return EXACTLY the single word: APPROVED Or return JSON like: {{ "approved": false, "feedback": "Specific, actionable items to fix (bullet list or numbered).", "required_changes": ["..."] }} """ try: response = llm.invoke(prompt) content = getattr(response, "content", "") or "" except Exception as e: log.exception(f"QA LLM call failed: {e}") return { "approved": False, "qaFeedback": "QA LLM failed; manual review required.", "execution_path": path, **add_status_update("QA", "QA failed") } # Check for simple APPROVED response if "APPROVED" in content.strip().upper() and len(content.strip()) <= 20: return { "approved": True, "qaFeedback": None, "execution_path": path, **add_status_update("QA", "Approved") } # Try JSON parsing parsed = parse_json_from_llm(content) if isinstance(parsed, dict): approved = bool(parsed.get("approved", False)) feedback = parsed.get("feedback") or parsed.get("qaFeedback") or parsed.get("required_changes") or "" # Normalize feedback to string if isinstance(feedback, list): feedback = "\n".join([str(x) for x in feedback]) elif not isinstance(feedback, str): feedback = str(feedback) return { "approved": approved, "qaFeedback": feedback if not approved else None, "execution_path": path, **add_status_update("QA", "QA completed") } # Fallback: treat as feedback (not approved) safe_feedback = content.strip()[:2000] or "QA produced no actionable output." return { "approved": False, "qaFeedback": safe_feedback, "execution_path": path, **add_status_update("QA", "QA needs rework") } def run_archivist_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Archives successful interactions to memory. Enables learning from past experiences. """ log.info("--- ARCHIVIST ---") path = ensure_list(state, 'execution_path') + ["Archivist"] summary_prompt = ( f"Summarize for memory.\n\n" f"Objective: {state.get('coreObjectivePrompt')}\n\n" f"Response: {state.get('draftResponse')}\n\n" f"Summary:" ) response = llm.invoke(summary_prompt) memory_manager.add_to_memory( getattr(response, "content", ""), {"objective": state.get('coreObjectivePrompt')} ) return { "execution_path": path, **add_status_update("Archivist", "Saved to memory") } def run_disclaimer_agent(state: AgentState) -> Dict[str, Any]: """ BASE GRAPH: Adds disclaimer when limits are reached. Handles budget or rework cycle exhaustion. """ log.warning("--- DISCLAIMER ---") path = ensure_list(state, 'execution_path') + ["Disclaimer"] reason = "Budget limit reached." if state.get('budget_exceeded') else "Rework limit reached." disclaimer = f"**DISCLAIMER: {reason} Draft may be incomplete.**\n\n---\n\n" final_response = disclaimer + state.get('draftResponse', "No response") return { "draftResponse": final_response, "execution_path": path, **add_status_update("Disclaimer", reason) } # ============================================================================= # SECTION 8: UPGRADED GRAPH AGENT NODES # ============================================================================= # These enhanced agents add governance, compliance, and observation layers. def run_pragmatist_agent(state: AgentState) -> Dict[str, Any]: """ UPGRADED GRAPH: Risk assessment and tier recommendations. Provides: - Risk analysis (low/medium/high) - Tiered delivery options (lite/standard/full) - Cost estimates per tier - Optimization suggestions """ log.info(">>> PRAGMATIST AGENT (improved)") path = ensure_list(state, "execution_path") + ["Pragmatist"] pm = state.get("pmPlan", {}) or {} # Parse estimated cost est_cost = None try: raw = pm.get("estimated_cost_usd", None) if raw is not None and raw != "": est_cost = float(raw) except Exception: try: s = str(raw) m = re.search(r"[\d,.]+", s) if m: est_cost = float(m.group(0).replace(",", "")) except Exception: est_cost = None exp_type = pm.get("experiment_type", "word") base_est = est_cost or (50.0 if exp_type in ["script", "repo"] else 5.0) # Define tier options tiers = { "lite": { "multiplier": 0.25, "estimated_cost_usd": round(base_est * 0.25, 2), "description": "Minimal extract (CSV/text) or short summary. Minimal engineering." }, "standard": { "multiplier": 1.0, "estimated_cost_usd": round(base_est * 1.0, 2), "description": "Complete, tested script or notebook; limited UX – suitable for MVP." }, "full": { "multiplier": 3.0, "estimated_cost_usd": round(base_est * 3.0, 2), "description": "Production-ready repo, packaging, tests, and deployment instructions." } } preferred = state.get("preferred_tier") flexible_mode = bool(state.get("flexible_budget_mode", False)) # Intelligent risk assessment risk_factors = [] risk_score = 0 # Check complexity plan_steps = pm.get("plan_steps", []) if len(plan_steps) > 8: risk_factors.append("Complex multi-step plan") risk_score += 1 # Check artifact type if exp_type in ("repo", "notebook"): risk_factors.append(f"Engineering-heavy artifact type: {exp_type}") risk_score += 1 # Check cost if est_cost is None: risk_factors.append("No cost estimate provided") risk_score += 1 elif est_cost > 200: risk_factors.append(f"High estimated cost: ${est_cost}") risk_score += 2 elif est_cost > 100: risk_factors.append(f"Moderate estimated cost: ${est_cost}") risk_score += 1 # Adjust risk for flexible mode if flexible_mode: risk_score = max(0, risk_score - 2) # Calculate risk level if risk_score <= 1: risk = "low" elif risk_score <= 3: risk = "medium" else: risk = "high" # Determine feasibility (LIBERAL: almost always feasible) feasible = True if risk_score > 5 and not flexible_mode: # Very high threshold feasible = False # Recommend tier if preferred in tiers: recommended_tier = preferred elif est_cost is None: recommended_tier = "standard" elif est_cost > 500 and not flexible_mode: recommended_tier = "lite" else: recommended_tier = "standard" prag_report = { "ok": feasible, "risk_factors": risk_factors, "risk_level": risk, "risk_score": risk_score, "tier_options": tiers, "recommended_tier": recommended_tier, "explain": ( f"Assessed {len(risk_factors)} risk factor(s). " f"Risk level: {risk}. Recommended tier: {recommended_tier}. " "User can proceed with any tier; higher tiers provide more complete deliverables." ) } # Optional LLM recommendations for high complexity if len(plan_steps) > 7 and llm: try: prompt = ( "You are a pragmatic engineering advisor. Given this plan, suggest 2-3 ways to " "optimize implementation while preserving core value. Be specific and actionable. " "Return JSON {\"optimizations\": [...]}.\n\n" f"Plan: {json.dumps(pm, indent=2)}" ) r = llm.invoke(prompt) recs = parse_json_from_llm(getattr(r, "content", "") or "") if isinstance(recs, dict): prag_report["optimizations"] = recs.get("optimizations", []) except Exception as e: log.debug(f"LLM optimizations failed: {e}") out = {"pragmatistReport": prag_report, "execution_path": path} out.update(add_status_update("Pragmatist", f"Risk: {risk}, Tier: {recommended_tier}")) return out def run_governance_agent(state: AgentState) -> Dict[str, Any]: """ FIXED: Full tier bypasses ALL budget checks completely. """ log.info(">>> GOVERNANCE AGENT (improved)") path = ensure_list(state, "execution_path") + ["Governance"] pm = state.get("pmPlan", {}) or {} prag = state.get("pragmatistReport", {}) or {} preferred = state.get("preferred_tier") or prag.get("recommended_tier") or "standard" tier_opts = prag.get("tier_options", {}) chosen = tier_opts.get(preferred, tier_opts.get("standard", {})) try: chosen_cost = float(chosen.get("estimated_cost_usd", 0.0)) except Exception: chosen_cost = 0.0 flexible = bool(state.get("flexible_budget_mode", False)) # CRITICAL FIX: Full tier = unlimited budget, bypass ALL checks if preferred == "full": log.info("🎯 FULL TIER SELECTED - UNLIMITED BUDGET MODE ACTIVATED") log.info(f" Cost estimate: ${chosen_cost} (ignored)") log.info(" Budget checks: DISABLED") log.info(" Rework limits: EXTENDED") gov_report = { "budget_ok": True, "issues": ["Full tier: budget checks disabled"], "approved_for_experiment": True, "governanceDecision": "approve", "chosen_tier": "full", "chosen_cost_usd": chosen_cost, "rationale": "Full tier: no budget constraints applied", "reasoning": "Full tier approval - unlimited mode" } # Update state to disable budget enforcement out = { "governanceReport": gov_report, "execution_path": path, "budget_exceeded": False, # Can never be true for full tier "stop_threshold": float('inf'), # Infinite threshold "max_loops": 10, # Allow more iterations } out.update(add_status_update("Governance", "Full tier approved - unlimited budget")) return out # For lite/standard tiers, do normal budget checks decision = "approve" issues = [] budget = state.get("budget") or 0.0 stop_threshold = state.get("stop_threshold") or 0.0 if stop_threshold > 0: try: threshold_f = float(stop_threshold) if chosen_cost > threshold_f: if flexible: issues.append(f"Cost ${chosen_cost} exceeds ${threshold_f}, flexible mode enabled") decision = "approve_with_warning" else: issues.append(f"Cost ${chosen_cost} exceeds ${threshold_f}") decision = "reject" except Exception as e: issues.append(f"Budget check error: {e}") risk_level = prag.get("risk_level") if risk_level == "high" and not prag.get("ok", True): issues.append("High risk identified") decision = "approve_with_warning" if flexible else "reject" approved_bool = decision in ("approve", "approve_with_warning") gov_report = { "budget_ok": approved_bool, "issues": issues, "approved_for_experiment": approved_bool, "governanceDecision": decision, "chosen_tier": preferred, "chosen_cost_usd": chosen_cost, "rationale": None, "reasoning": f"Decision: {decision}. {len(issues)} issue(s)." } out = {"governanceReport": gov_report, "execution_path": path} out.update(add_status_update("Governance", f"{decision.title()} {preferred} (${chosen_cost})")) return out def scan_text_for_secrets(text: str) -> Dict[str, Any]: """ UPGRADED GRAPH: Scan text for potential secrets/credentials. Used by Compliance agent. """ findings = [] if not text: return {"suspicious": False, "findings": findings} patterns = [ r"AKIA[0-9A-Z]{16}", # AWS access key r"-----BEGIN PRIVATE KEY-----", # Private key r"AIza[0-9A-Za-z-_]{35}", # Google API key r"(?i)secret[_-]?(key|token)\b", # Secret keywords r"(?i)password\s*[:=]\s*['\"][^'\"]{6,}['\"]" # Password patterns ] for p in patterns: for m in re.finditer(p, text): findings.append({"pattern": p, "match": m.group(0)}) return {"suspicious": len(findings) > 0, "findings": findings} def run_compliance_agent(state: AgentState) -> Dict[str, Any]: """ UPGRADED GRAPH: Security and compliance scanning. Scans for: - Exposed secrets/credentials - Sensitive data - Policy violations """ log.info(">>> COMPLIANCE AGENT") path = ensure_list(state, "execution_path") + ["Compliance"] exp = state.get("experimentResults", {}) or {} report = {"suspicious": False, "issues": [], "scanned": []} # Scan stdout/stderr for key in ("stdout", "stderr"): val = exp.get(key) if isinstance(val, str) and val.strip(): scan = scan_text_for_secrets(val) if scan.get("suspicious"): report["suspicious"] = True report["issues"].append({ "type": "text_secret", "where": key, "findings": scan["findings"] }) report["scanned"].append({"type": "text", "where": key}) # Scan generated files if isinstance(exp, dict) and "paths" in exp: paths = exp.get("paths") or {} if isinstance(paths, dict): for k, p in paths.items(): try: pstr = str(p) if os.path.exists(pstr) and os.path.isfile(pstr): with open(pstr, "r", encoding="utf-8", errors="ignore") as fh: sample = fh.read(20000) scan = scan_text_for_secrets(sample) if scan.get("suspicious"): report["suspicious"] = True report["issues"].append({ "type": "file_secret", "file": pstr, "findings": scan["findings"] }) report["scanned"].append({"type": "file", "file": pstr}) else: report["scanned"].append({ "type": "path", "value": pstr, "exists": os.path.exists(pstr) }) except Exception as e: report["scanned"].append({"file": p, "error": str(e)}) # Note if repo/zip artifact # Ensure paths exists safely paths = locals().get("paths") or state.get("paths") if "state" in locals() else {} # Note if repo/zip artifact if isinstance(paths, dict) and any( str(v).lower().endswith(".zip") for v in paths.values() ): report.setdefault("notes", []).append( "Zip-based or repo artifact detected – recommend manual review." ) # Final output out = {"complianceReport": report, "execution_path": path} out.update(add_status_update("Compliance", "Compliance checks complete")) return out def summarize_logs_for_observer(log_paths: Optional[list] = None, sample_lines: int = 200) -> str: """ UPGRADED GRAPH: Summarize system logs for observer. """ if not log_paths: candidates = ["logs/performance.log", "logs/ai_lab.log", "performance.log"] log_paths = [p for p in candidates if os.path.exists(p)] parts = [] errs = 0 warns = 0 for p in log_paths: try: with open(p, "r", encoding="utf-8", errors="ignore") as fh: lines = fh.readlines()[-sample_lines:] content = "".join(lines) errs += content.upper().count("ERROR") warns += content.upper().count("WARNING") parts.append(f"--- {p} (last {len(lines)} lines) ---\n{content[:2000]}") except Exception as e: parts.append(f"Could not read {p}: {e}") header = f"Log summary: {errs} ERROR(s), {warns} WARNING(s)" return header + "\n\n" + "\n\n".join(parts) def run_observer_agent(state: AgentState) -> Dict[str, Any]: """ UPGRADED GRAPH: System performance and health monitoring. Tracks: - Execution length - Rework cycles - Cost accumulation - Error patterns """ log.info(">>> OBSERVER AGENT") path = ensure_list(state, "execution_path") + ["Observer"] # Find log files log_candidates = [] for candidate in ["logs/performance.log", "logs/ai_lab.log", "performance.log"]: if os.path.exists(candidate): log_candidates.append(candidate) summary = summarize_logs_for_observer(log_candidates or None) exec_len = len(state.get("execution_path", []) or []) rework_cycles = ensure_int(state, "rework_cycles", 0) current_cost = state.get("current_cost", 0.0) obs = { "log_summary": summary[:4000], "execution_length": exec_len, "rework_cycles": rework_cycles, "current_cost": current_cost, "status": get_latest_status(state) } # Optional LLM recommendations if llm: try: prompt = ( "You are an Observer assistant. Given this runtime summary, provide 3 prioritized " "next actions to mitigate the top risks.\n\n" f"Runtime summary: {json.dumps(obs, indent=2)}\n\nReturn plain text." ) r = llm.invoke(prompt) obs["llm_recommendations"] = getattr(r, "content", "")[:1500] except Exception as e: obs["llm_recommendations_error"] = str(e) out = {"observerReport": obs, "execution_path": path} out.update(add_status_update("Observer", "Observer summary created")) return out def run_knowledge_curator_agent(state: AgentState) -> Dict[str, Any]: """ UPGRADED GRAPH: Curates and archives knowledge for future use. Captures: - Successful patterns - Common pitfalls - Best practices - User preferences """ log.info(">>> KNOWLEDGE CURATOR AGENT") path = ensure_list(state, "execution_path") + ["KnowledgeCurator"] core = state.get("coreObjectivePrompt", "") or state.get("userInput", "") pm = state.get("pmPlan", {}) or {} draft = state.get("draftResponse", "") or "" qa_feedback = state.get("qaFeedback", "") or "" summary_text = ( f"Objective: {core}\n\n" f"Plan Steps: {json.dumps(pm.get('plan_steps', []))}\n\n" f"Draft (first 1500 chars): {draft[:1500]}\n\n" f"QA Feedback: {qa_feedback[:1000]}" ) try: memory_manager.add_to_memory( summary_text, {"source": "knowledge_curator", "timestamp": datetime.utcnow().isoformat()} ) insights = {"added": True, "summary_snippet": summary_text[:500]} except Exception as e: insights = {"added": False, "error": str(e)} out = {"knowledgeInsights": insights, "execution_path": path} out.update(add_status_update("KnowledgeCurator", "Knowledge captured")) return out # ============================================================================= # SECTION 9: CONDITIONAL ROUTING FUNCTIONS (LIBERAL BUDGET POLICY) # ============================================================================= # Two versions of should_continue: # - should_continue: For BASE graph (routes to archivist_agent) # - should_continue_upgraded: For UPGRADED graph (routes to observer_agent) def should_continue(state: AgentState) -> str: """ BASE GRAPH: Determine next step after QA. Routes to: - archivist_agent: If approved (BASE GRAPH default) - disclaimer_agent: If budget exceeded by 120% or extreme rework limit - pm_agent: If needs rework (LIBERAL: allows many cycles) LIBERAL POLICY: Allow many rework cycles, only stop at 120% budget NOTE: This function is used by BOTH base and upgraded graphs. The base graph only has archivist_agent, while the upgraded graph will override the routing to use observer_agent. """ # Budget check - use 120% threshold (LIBERAL) current_cost = state.get("current_cost", 0.0) stop_threshold = state.get("stop_threshold") or 0.0 try: cost_f = float(current_cost) threshold_f = float(stop_threshold) # Only fail if cost exceeds 120% threshold if threshold_f > 0 and cost_f > threshold_f: log.warning(f"Cost ${cost_f} exceeds stop threshold ${threshold_f} (120% of budget)") return "disclaimer_agent" except Exception: pass # Check explicit budget_exceeded flag (should only be set at 120%) if state.get("budget_exceeded"): return "disclaimer_agent" try: rework = int(state.get("rework_cycles", 0)) max_loops_allowed = int(state.get("max_loops", 0)) except Exception: rework = state.get("rework_cycles", 0) or 0 max_loops_allowed = state.get("max_loops", 0) or 0 # If approved → success path (archivist for BASE graph) if state.get("approved"): return "archivist_agent" # LIBERAL POLICY: Allow up to 150% of max_loops before stopping liberal_max = int(max_loops_allowed * 1.5) if max_loops_allowed > 0 else 15 if rework > liberal_max: log.warning(f"Rework cycles {rework} exceeded liberal limit {liberal_max}") return "disclaimer_agent" # Default: Allow rework (liberal policy) log.info(f"Allowing rework cycle {rework} (liberal limit: {liberal_max})") return "pm_agent" def should_continue_upgraded(state: AgentState) -> str: """ UPGRADED GRAPH: Determine next step after QA (with Observer). Routes to: - observer_agent: If approved (goes to Observer → Archivist → Knowledge Curator) - disclaimer_agent: If budget exceeded by 120% or extreme rework limit - pm_agent: If needs rework (LIBERAL: allows many cycles) LIBERAL POLICY: Allow many rework cycles, only stop at 120% budget """ # Budget check - use 120% threshold (LIBERAL) current_cost = state.get("current_cost", 0.0) stop_threshold = state.get("stop_threshold") or 0.0 try: cost_f = float(current_cost) threshold_f = float(stop_threshold) # Only fail if cost exceeds 120% threshold if threshold_f > 0 and cost_f > threshold_f: log.warning(f"Cost ${cost_f} exceeds stop threshold ${threshold_f} (120% of budget)") return "disclaimer_agent" except Exception: pass # Check explicit budget_exceeded flag (should only be set at 120%) if state.get("budget_exceeded"): return "disclaimer_agent" try: rework = int(state.get("rework_cycles", 0)) max_loops_allowed = int(state.get("max_loops", 0)) except Exception: rework = state.get("rework_cycles", 0) or 0 max_loops_allowed = state.get("max_loops", 0) or 0 # If approved → success path (observer for UPGRADED graph) if state.get("approved"): return "observer_agent" # LIBERAL POLICY: Allow up to 150% of max_loops before stopping liberal_max = int(max_loops_allowed * 1.5) if max_loops_allowed > 0 else 15 if rework > liberal_max: log.warning(f"Rework cycles {rework} exceeded liberal limit {liberal_max}") return "disclaimer_agent" # Default: Allow rework (liberal policy) log.info(f"Allowing rework cycle {rework} (liberal limit: {liberal_max})") return "pm_agent" def should_run_experiment(state: AgentState) -> str: """ BASE GRAPH: Determine if experiment is needed. Routes to: - experimenter_agent: If experiment needed - synthesis_agent: If no experiment needed """ pm = state.get('pmPlan', {}) or {} return "experimenter_agent" if pm.get('experiment_needed') else "synthesis_agent" def detect_loop(state: AgentState) -> bool: """Detect execution loops - STRICT version.""" path = state.get("execution_path", []) # Check total length first if len(path) > MAX_EXECUTION_PATH_LENGTH: log.error(f"❌ Path limit: {len(path)} > {MAX_EXECUTION_PATH_LENGTH}") return True if len(path) < 10: return False # Deduplicate consecutive nodes deduplicated = [] prev = None for node in path: if node != prev: deduplicated.append(node) prev = node # Check recent window for repetitions recent = deduplicated[-LOOP_DETECTION_WINDOW:] from collections import Counter counts = Counter(recent) # Flag if any node repeats too much for node, count in counts.items(): if count >= LOOP_THRESHOLD: log.error(f"❌ Loop: {node} appeared {count}x in last {LOOP_DETECTION_WINDOW}") log.error(f"Path: {' → '.join(recent[-10:])}") return True # Check for alternating patterns if len(recent) >= 8: for i in range(len(recent) - 7): window = recent[i:i+8] if len(set(window)) == 2: # Only 2 unique nodes alternating log.error(f"❌ Alternating loop detected") return True return False def should_proceed_to_experimenter(state: AgentState) -> bool: """ Helper function to determine if safe to proceed to experimenter. Returns True if should proceed, False if should exit. """ exec_path = state.get("execution_path", []) governance_count = exec_path.count("Governance") log.info(f"🔍 Checking proceed: governance #{governance_count}, path length {len(exec_path)}") # CRITICAL: ALWAYS proceed on first governance approval if governance_count <= 1: log.info("✅ First governance approval - ALWAYS PROCEED") return True # Check path length if len(exec_path) > MAX_EXECUTION_PATH_LENGTH: log.error(f"❌ Path too long: {len(exec_path)} > {MAX_EXECUTION_PATH_LENGTH}") return False # Check for Memory duplication (indicates graph bug) double_memory_count = 0 for i in range(len(exec_path) - 1): if exec_path[i] == "Memory" and exec_path[i+1] == "Memory": double_memory_count += 1 if double_memory_count > 0 and governance_count > 2: log.error(f"❌ Memory duplication ({double_memory_count}) + multiple governance = loop") return False # Check governance frequency if governance_count > 5: log.error(f"❌ Too many governance calls: {governance_count} > 5") return False # Check budget current_cost = state.get("current_cost", 0.0) stop_threshold = state.get("stop_threshold", 0.0) if stop_threshold > 0: try: if current_cost > stop_threshold: log.error(f"❌ Budget exceeded: ${current_cost} > ${stop_threshold}") return False except Exception: pass log.info("✅ Safe to proceed") return True def governance_decider(state: AgentState) -> str: """ FIXED: Simplified governance routing. Always proceeds to experimenter unless there's a critical blocker. """ gov = state.get("governanceReport", {}) or {} decision = gov.get("governanceDecision", "approve") log.info(f"🔍 Governance decision: {decision}") # Check for explicit rejection if decision == "reject": log.warning("❌ Governance explicitly rejected") return "disclaimer_agent" # Use helper to determine if safe to proceed if should_proceed_to_experimenter(state): log.info("✅ PROCEEDING TO EXPERIMENTER") return "experimenter_agent" else: log.warning("❌ Cannot proceed - routing to disclaimer") return "disclaimer_agent" # ============================================================================= # SECTION 10: BASE GRAPH DEFINITION # ============================================================================= # Triage workflow (simple greeting vs task detection) triage_workflow = StateGraph(AgentState) triage_workflow.add_node("triage", run_triage_agent) triage_workflow.set_entry_point("triage") triage_workflow.add_edge("triage", END) triage_app = triage_workflow.compile() # Planner workflow (cost estimation) planner_workflow = StateGraph(AgentState) planner_workflow.add_node("planner", run_planner_agent) planner_workflow.set_entry_point("planner") planner_workflow.add_edge("planner", END) planner_app = planner_workflow.compile() # Main workflow (full execution pipeline) main_workflow = StateGraph(AgentState) # Add base nodes main_workflow.add_node("memory_retriever", run_memory_retrieval) main_workflow.add_node("intent_agent", run_intent_agent) main_workflow.add_node("pm_agent", run_pm_agent) main_workflow.add_node("experimenter_agent", run_experimenter_agent) main_workflow.add_node("synthesis_agent", run_synthesis_agent) main_workflow.add_node("qa_agent", run_qa_agent) main_workflow.add_node("archivist_agent", run_archivist_agent) main_workflow.add_node("disclaimer_agent", run_disclaimer_agent) # Set entry point main_workflow.set_entry_point("memory_retriever") # Define edges main_workflow.add_edge("memory_retriever", "intent_agent") main_workflow.add_edge("intent_agent", "pm_agent") main_workflow.add_edge("experimenter_agent", "synthesis_agent") main_workflow.add_edge("synthesis_agent", "qa_agent") main_workflow.add_edge("archivist_agent", END) main_workflow.add_edge("disclaimer_agent", END) # Conditional edges main_workflow.add_conditional_edges("pm_agent", should_run_experiment) # BASE GRAPH: Only routes to nodes that exist in base graph main_workflow.add_conditional_edges("qa_agent", should_continue, { "archivist_agent": "archivist_agent", "pm_agent": "pm_agent", "disclaimer_agent": "disclaimer_agent" }) # Compile base graph main_app = main_workflow.compile() log.info("=" * 60) log.info("BASE GRAPH COMPILED") log.info("Flow: Memory → Intent → PM → Experimenter → Synthesis → QA") log.info(" → Archivist/Disclaimer → END") log.info("=" * 60) # ============================================================================= # SECTION 11: UPGRADED GRAPH DEFINITION (LIBERAL BUDGET POLICY) # ============================================================================= def build_upgraded_graph() -> StateGraph: """ FIXED: Correct graph edges prevent Memory → Memory loop. """ upgraded_workflow = StateGraph(AgentState) # Add all nodes upgraded_workflow.add_node("memory_retriever", run_memory_retrieval) upgraded_workflow.add_node("intent_agent", run_intent_agent) upgraded_workflow.add_node("pm_agent", run_pm_agent) upgraded_workflow.add_node("pragmatist_agent", run_pragmatist_agent) upgraded_workflow.add_node("governance_agent", run_governance_agent) upgraded_workflow.add_node("experimenter_agent", run_experimenter_agent) upgraded_workflow.add_node("compliance_agent", run_compliance_agent) upgraded_workflow.add_node("synthesis_agent", run_synthesis_agent) upgraded_workflow.add_node("qa_agent", run_qa_agent) upgraded_workflow.add_node("observer_agent", run_observer_agent) upgraded_workflow.add_node("archivist_agent", run_archivist_agent) upgraded_workflow.add_node("knowledge_curator_agent", run_knowledge_curator_agent) upgraded_workflow.add_node("disclaimer_agent", run_disclaimer_agent) # CRITICAL: Set entry point upgraded_workflow.set_entry_point("memory_retriever") # CRITICAL: Linear flow - each node called ONCE per cycle # NO branches until governance upgraded_workflow.add_edge("memory_retriever", "intent_agent") # Memory → Intent upgraded_workflow.add_edge("intent_agent", "pm_agent") # Intent → PM upgraded_workflow.add_edge("pm_agent", "pragmatist_agent") # PM → Pragmatist upgraded_workflow.add_edge("pragmatist_agent", "governance_agent") # Pragmatist → Governance # Governance conditional (first branch point) upgraded_workflow.add_conditional_edges( "governance_agent", governance_decider, { "experimenter_agent": "experimenter_agent", "disclaimer_agent": "disclaimer_agent" } ) # CRITICAL: After experimenter - LINEAR (no loops back) upgraded_workflow.add_edge("experimenter_agent", "compliance_agent") upgraded_workflow.add_edge("compliance_agent", "synthesis_agent") upgraded_workflow.add_edge("synthesis_agent", "qa_agent") # QA conditional (second branch point) # CRITICAL: Rework goes back to PM, NOT Memory upgraded_workflow.add_conditional_edges( "qa_agent", should_continue_upgraded, { "observer_agent": "observer_agent", # Success path "pm_agent": "pm_agent", # Rework path (NOT to Memory!) "disclaimer_agent": "disclaimer_agent" # Failure path } ) # Success path upgraded_workflow.add_edge("observer_agent", "archivist_agent") upgraded_workflow.add_edge("archivist_agent", "knowledge_curator_agent") upgraded_workflow.add_edge("knowledge_curator_agent", END) # Disclaimer path upgraded_workflow.add_edge("disclaimer_agent", END) return upgraded_workflow # CRITICAL: Make sure NO OTHER edges exist that point to "memory_retriever" # The ONLY way to reach Memory should be at the very start def apply_upgrades() -> bool: """ Apply upgraded graph to replace the base graph. This function rebuilds the main_app with the upgraded workflow, adding governance, compliance, and observation layers. LIBERAL BUDGET POLICY ACTIVE: - 20% budget buffer - Stop only at 120% of budget - 10 → 20 rework cycle limits - Always proceed unless explicitly rejected Returns: bool: True if successful, False otherwise """ global main_app, main_workflow log.info("=" * 60) log.info("APPLYING GRAPH UPGRADES (LIBERAL BUDGET POLICY)") log.info("=" * 60) try: # Build upgraded graph upgraded_workflow = build_upgraded_graph() # Compile and replace main_app = upgraded_workflow.compile() main_workflow = upgraded_workflow log.info("✅ GRAPH UPGRADE SUCCESSFUL") log.info("=" * 60) log.info("New flow: Memory → Intent → PM → Pragmatist → Governance") log.info(" → Experimenter → Compliance → Synthesis → QA") log.info(" → Observer → Archivist → Knowledge Curator → END") log.info("=" * 60) log.info("LIBERAL BUDGET POLICY FEATURES:") log.info(" • 20% budget buffer applied") log.info(" • Stop threshold: 120% of user budget") log.info(" • Rework cycles: 10 → 15 → 20 (initial → liberal → hard)") log.info(" • Governance: Always proceed unless explicitly rejected") log.info("=" * 60) return True except Exception as e: log.exception(f"❌ Failed to apply graph upgrades: {e}") return False # ============================================================================= # SECTION 12: EXPORTS # ============================================================================= # Export all components __all__ = [ # State 'AgentState', # Helper functions 'ensure_list', 'ensure_int', 'sanitize_path', 'add_status_update', 'get_latest_status', # LLM and parsing 'llm', 'parse_json_from_llm', # Artifact functions 'detect_requested_output_types', 'normalize_experiment_type', 'write_notebook_from_text', 'write_script', 'write_docx_from_text', 'write_excel_from_tables', 'write_pdf_from_text', 'build_repo_zip', # Base agent nodes 'run_triage_agent', 'run_planner_agent', 'run_memory_retrieval', 'run_intent_agent', 'run_pm_agent', 'run_experimenter_agent', 'run_synthesis_agent', 'run_qa_agent', 'run_archivist_agent', 'run_disclaimer_agent', # Upgraded agent nodes 'run_pragmatist_agent', 'run_governance_agent', 'run_compliance_agent', 'run_observer_agent', 'run_knowledge_curator_agent', # Routing functions 'should_continue', 'should_continue_upgraded', 'should_run_experiment', 'governance_decider', 'detect_loop', # Graphs 'triage_app', 'planner_app', 'main_app', 'main_workflow', # Upgrade function 'apply_upgrades', 'build_upgraded_graph', # Configuration constants 'INITIAL_MAX_REWORK_CYCLES', 'BUDGET_BUFFER_MULTIPLIER', 'MAX_COST_MULTIPLIER', ] # Log initialization log.info("=" * 60) log.info("GRAPH MODULE INITIALIZED (LIBERAL BUDGET POLICY v2.0)") log.info(f"Base graph available as: main_app") log.info(f"To enable upgraded graph, call: apply_upgrades()") log.info("=" * 60)