Spaces:
Paused
Paused
| """ | |
| graph_merged.py - Unified AI Lab Graph System (Liberal Budget Policy v2.0) | |
| ============================================================================= | |
| LIBERAL BUDGET POLICY: | |
| - 20% budget buffer on user input | |
| - Stop only at 120% of user budget | |
| - 10x rework cycles (3 β 10 β 20 hard limit) | |
| - 150 node execution path limit | |
| - Always proceed to Experimenter unless explicitly rejected | |
| This file contains both the BASE and UPGRADED graph implementations: | |
| - BASE GRAPH: Simple workflow (Memory β Intent β PM β Experimenter β Synthesis β QA β Archivist) | |
| - UPGRADED GRAPH: Enhanced workflow with governance, compliance, and observation layers | |
| Author: AI Lab Team | |
| Last Updated: 2025-10-08 | |
| Version: 2.0 - Liberal Budget Policy | |
| """ | |
| # ============================================================================= | |
| # SECTION 1: IMPORTS AND CONFIGURATION | |
| # ============================================================================= | |
| import json | |
| import re | |
| import math | |
| import os | |
| import uuid | |
| import shutil | |
| import zipfile | |
| import operator | |
| from typing import TypedDict, List, Dict, Optional, Annotated, Any | |
| from datetime import datetime | |
| # LangChain imports | |
| from langchain_openai import ChatOpenAI | |
| from langgraph.graph import StateGraph, END | |
| # Local imports | |
| from memory_manager import memory_manager | |
| from code_executor import execute_python_code | |
| from logging_config import setup_logging, get_logger | |
| # Multi-language support | |
| from multi_language_support import ( | |
| detect_language, | |
| extract_code_blocks_multi_lang, | |
| execute_code, | |
| detect_requested_output_types_enhanced, | |
| write_script_multi_lang, | |
| LANGUAGES | |
| ) | |
| # Artifact generation libraries | |
| import nbformat | |
| from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell | |
| import pandas as pd | |
| from docx import Document as DocxDocument | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| # Setup logging | |
| setup_logging() | |
| log = get_logger(__name__) | |
| # ============================================================================= | |
| # SECTION 2: CONFIGURATION CONSTANTS (LIBERAL BUDGET POLICY) | |
| # ============================================================================= | |
| # REPLACE from line 55 to wherever these constants end | |
| OUT_DIR = os.environ.get("OUT_DIR", "/tmp") | |
| os.makedirs(OUT_DIR, exist_ok=True) | |
| EXPORTS_DIR = os.path.join(OUT_DIR, "exports") | |
| os.makedirs(EXPORTS_DIR, exist_ok=True) | |
| # Cost and loop control - FIXED LIMITS | |
| INITIAL_MAX_REWORK_CYCLES = 3 # FIXED: Was 100 | |
| BUDGET_BUFFER_MULTIPLIER = 1.10 # FIXED: Was 1.20 | |
| MAX_COST_MULTIPLIER = 1.15 # FIXED: Was 1.20 | |
| MAX_EXECUTION_PATH_LENGTH = 500 # FIXED: Was 150 | |
| LOOP_DETECTION_WINDOW = 20 # NEW | |
| LOOP_THRESHOLD = 5 # NEW | |
| GPT4O_INPUT_COST_PER_1K_TOKENS = 0.005 | |
| GPT4O_OUTPUT_COST_PER_1K_TOKENS = 0.015 | |
| AVG_TOKENS_PER_CALL = 2.0 | |
| # Artifact types | |
| KNOWN_ARTIFACT_TYPES = {"notebook", "excel", "word", "pdf", "image", "repo", "script"} | |
| log.info("=" * 60) | |
| log.info("FIXED BUDGET POLICY ACTIVE") | |
| log.info(f"Max rework cycles: {INITIAL_MAX_REWORK_CYCLES}") | |
| log.info(f"Budget buffer: {int((BUDGET_BUFFER_MULTIPLIER - 1) * 100)}%") | |
| log.info(f"Stop threshold: {int(MAX_COST_MULTIPLIER * 100)}% of budget") | |
| log.info(f"Max path length: {MAX_EXECUTION_PATH_LENGTH}") | |
| log.info("=" * 60) | |
| # ============================================================================= | |
| # SECTION 3: STATE DEFINITION | |
| # ============================================================================= | |
| class AgentState(TypedDict): | |
| """ | |
| State shared across all agents in the workflow. | |
| Uses Annotated with operator.add for fields that multiple agents might update. | |
| """ | |
| # Core inputs | |
| userInput: str | |
| chatHistory: List[str] | |
| coreObjectivePrompt: str | |
| # Memory and planning | |
| retrievedMemory: Optional[str] | |
| pmPlan: Dict | |
| # Execution | |
| experimentCode: Optional[str] | |
| experimentResults: Optional[Dict] | |
| # Output | |
| draftResponse: str | |
| qaFeedback: Optional[str] | |
| approved: bool | |
| # Workflow control | |
| execution_path: Annotated[List[str], operator.add] | |
| rework_cycles: int | |
| max_loops: int | |
| status_updates: Annotated[List[Dict[str, str]], operator.add] | |
| # Budget and governance (LIBERAL POLICY) | |
| current_cost: float | |
| budget: float | |
| stop_threshold: float # NEW: 120% of budget | |
| budget_exceeded: bool | |
| flexible_budget_mode: bool | |
| preferred_tier: Optional[str] | |
| auto_accept_approved_with_warning: bool | |
| # Reports (UPGRADED GRAPH ONLY) | |
| pragmatistReport: Optional[Dict] | |
| governanceReport: Optional[Dict] | |
| complianceReport: Optional[Dict] | |
| observerReport: Optional[Dict] | |
| knowledgeInsights: Optional[Dict] | |
| # ============================================================================= | |
| # SECTION 4: HELPER FUNCTIONS | |
| # ============================================================================= | |
| def ensure_list(state: AgentState, key: str) -> List: | |
| """Safely extract a list from state, handling None and non-list types.""" | |
| v = state.get(key) if state else None | |
| if v is None: | |
| return [] | |
| if isinstance(v, list): | |
| return v | |
| if isinstance(v, tuple): | |
| return list(v) | |
| return [v] | |
| def ensure_int(state: AgentState, key: str, default: int = 0) -> int: | |
| """Safely extract an integer from state.""" | |
| try: | |
| v = state.get(key) if state else None | |
| if v is None: | |
| return default | |
| return int(v) | |
| except Exception: | |
| return default | |
| def sanitize_path(path: str) -> str: | |
| """Convert path to absolute path for security.""" | |
| return os.path.abspath(path) | |
| def get_latest_status(state: AgentState) -> str: | |
| """Get the most recent status update from the list.""" | |
| updates = state.get('status_updates', []) | |
| if updates and isinstance(updates, list): | |
| for update in reversed(updates): | |
| if isinstance(update, dict) and 'status' in update: | |
| return update['status'] | |
| elif isinstance(update, str): | |
| return update | |
| return "Processing..." | |
| def add_status_update(node_name: str, status: str) -> Dict[str, Any]: | |
| """ | |
| Create a status update entry. | |
| Returns a dict suitable for merging into agent return values. | |
| """ | |
| return { | |
| "status_updates": [{ | |
| "node": node_name, | |
| "status": status, | |
| "timestamp": datetime.utcnow().isoformat() | |
| }] | |
| } | |
| # ============================================================================= | |
| # SECTION 5: LLM INITIALIZATION AND JSON PARSING | |
| # ============================================================================= | |
| # Initialize LLM | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0.4, max_retries=3, request_timeout=60) | |
| def parse_json_from_llm(llm_output: str) -> Optional[dict]: | |
| """ | |
| Robust JSON extraction from LLM output. | |
| Tries multiple strategies: | |
| 1. Explicit ```json {} ``` fenced blocks | |
| 2. Any fenced code block with JSON-like content | |
| 3. First balanced {...} substring | |
| 4. ast.literal_eval for Python-like dicts | |
| 5. Conservative cleanup (remove comments, trailing commas) | |
| Returns: | |
| dict or None on failure | |
| """ | |
| import ast | |
| if not llm_output or not isinstance(llm_output, str) or not llm_output.strip(): | |
| return None | |
| text = llm_output.strip() | |
| # Strategy 1: Explicit JSON fenced block | |
| match = re.search(r"```json\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| candidate = match.group(1).strip() | |
| try: | |
| return json.loads(candidate) | |
| except Exception as e: | |
| log.debug(f"json.loads failed on triple-backtick json block: {e}") | |
| # Strategy 2: Any code fence with JSON-like content | |
| match2 = re.search(r"```(?:json|python|text)?\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE) | |
| if match2: | |
| candidate = match2.group(1).strip() | |
| try: | |
| return json.loads(candidate) | |
| except Exception as e: | |
| log.debug(f"json.loads failed on fenced candidate: {e}") | |
| # Strategy 3: Find balanced {...} substring | |
| def find_balanced_brace_substring(s: str): | |
| start_idx = None | |
| depth = 0 | |
| for i, ch in enumerate(s): | |
| if ch == '{': | |
| if start_idx is None: | |
| start_idx = i | |
| depth += 1 | |
| elif ch == '}': | |
| if depth > 0: | |
| depth -= 1 | |
| if depth == 0 and start_idx is not None: | |
| return s[start_idx:i+1] | |
| return None | |
| candidate = find_balanced_brace_substring(text) | |
| # Strategy 4: Fallback heuristic | |
| if not candidate: | |
| first = text.find('{') | |
| last = text.rfind('}') | |
| if first != -1 and last != -1 and last > first: | |
| candidate = text[first:last+1] | |
| if candidate: | |
| # Try direct JSON parsing | |
| try: | |
| return json.loads(candidate) | |
| except Exception as e: | |
| log.debug(f"json.loads failed on candidate substring: {e}") | |
| # Try ast.literal_eval (handles single quotes) | |
| try: | |
| parsed = ast.literal_eval(candidate) | |
| if isinstance(parsed, (dict, list)): | |
| return json.loads(json.dumps(parsed)) | |
| except Exception as e: | |
| log.debug(f"ast.literal_eval failed: {e}") | |
| # Conservative cleanup | |
| cleaned = candidate | |
| try: | |
| # Remove line comments | |
| cleaned = re.sub(r"//.*?$", "", cleaned, flags=re.MULTILINE) | |
| # Remove block comments | |
| cleaned = re.sub(r"/\*.*?\*/", "", cleaned, flags=re.DOTALL) | |
| # Remove trailing commas | |
| cleaned = re.sub(r",\s*([}\]])", r"\1", cleaned) | |
| # Replace single quotes with double quotes (carefully) | |
| def _single_to_double(m): | |
| inner = m.group(1) | |
| inner_escaped = inner.replace('"', '\\"') | |
| return f'"{inner_escaped}"' | |
| cleaned = re.sub(r"(?<=[:\{\[,]\s*)'([^']*?)'", _single_to_double, cleaned) | |
| return json.loads(cleaned) | |
| except Exception as e: | |
| log.debug(f"json.loads still failed after cleanup: {e}") | |
| # All strategies failed | |
| log.error(f"parse_json_from_llm failed. Output preview: {text[:200]}") | |
| return None | |
| # ============================================================================= | |
| # SECTION 6: ARTIFACT DETECTION AND GENERATION | |
| # ============================================================================= | |
| def detect_requested_output_types(text: str) -> Dict: | |
| """ | |
| Detect what type of artifact the user wants. | |
| Uses the enhanced version from multi_language_support. | |
| """ | |
| return detect_requested_output_types_enhanced(text) | |
| def normalize_experiment_type(exp_type: Optional[str], goal_text: str) -> str: | |
| """Normalize experiment type to known artifact types.""" | |
| if not exp_type: | |
| detection = detect_requested_output_types(goal_text or "") | |
| return detection.get("artifact_type") or "word" | |
| s = exp_type.strip().lower() | |
| if s in KNOWN_ARTIFACT_TYPES: | |
| return s | |
| # Map variations to known types | |
| mapping = { | |
| "notebook": ["notebook", "ipynb", "jupyter"], | |
| "excel": ["excel", "xlsx", "spreadsheet"], | |
| "word": ["word", "docx", "doc"], | |
| "pdf": ["pdf"], | |
| "repo": ["repo", "repository", "backend", "codebase"], | |
| "script": ["script", "python", "py"] | |
| } | |
| for known_type, keywords in mapping.items(): | |
| if any(kw in s for kw in keywords): | |
| return known_type | |
| # Fallback: use detection | |
| detection = detect_requested_output_types(goal_text or "") | |
| return detection.get("artifact_type") or "word" | |
| def write_notebook_from_text(llm_text: str, out_dir: Optional[str] = None) -> str: | |
| """Generate a Jupyter notebook from LLM output.""" | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| # Extract code blocks | |
| code_blocks = re.findall(r"```python\s*(.*?)\s*```", llm_text, re.DOTALL) | |
| if not code_blocks: | |
| code_blocks = re.findall(r"```\s*(.*?)\s*```", llm_text, re.DOTALL) | |
| # Split markdown sections | |
| md_parts = re.split(r"```(?:python)?\s*.*?\s*```", llm_text, flags=re.DOTALL) | |
| # Build notebook | |
| nb = new_notebook() | |
| cells = [] | |
| max_len = max(len(md_parts), len(code_blocks)) | |
| for i in range(max_len): | |
| if i < len(md_parts) and md_parts[i].strip(): | |
| cells.append(new_markdown_cell(md_parts[i].strip())) | |
| if i < len(code_blocks) and code_blocks[i].strip(): | |
| cells.append(new_code_cell(code_blocks[i].strip())) | |
| if not cells: | |
| cells = [new_markdown_cell("# Notebook\n\nNo content generated.")] | |
| nb['cells'] = cells | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_notebook_{uid}.ipynb") | |
| nbformat.write(nb, filename) | |
| return filename | |
| def write_script(code_text: str, language_hint: Optional[str] = None, out_dir: Optional[str] = None) -> str: | |
| """ | |
| Write a script file with appropriate extension. | |
| Uses multi-language support. | |
| """ | |
| return write_script_multi_lang(code_text, language_hint, out_dir) | |
| def write_docx_from_text(text: str, out_dir: Optional[str] = None) -> str: | |
| """Generate a Word document from text.""" | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| doc = DocxDocument() | |
| for para in [p.strip() for p in text.split("\n\n") if p.strip()]: | |
| doc.add_paragraph(para) | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_doc_{uid}.docx") | |
| doc.save(filename) | |
| return filename | |
| def write_excel_from_tables(maybe_table_text: str, out_dir: Optional[str] = None) -> str: | |
| """Generate an Excel file from table data.""" | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_excel_{uid}.xlsx") | |
| try: | |
| # Try to parse as JSON | |
| try: | |
| parsed = json.loads(maybe_table_text) | |
| if isinstance(parsed, list): | |
| df = pd.DataFrame(parsed) | |
| elif isinstance(parsed, dict): | |
| df = pd.DataFrame([parsed]) | |
| else: | |
| df = pd.DataFrame({"content": [str(maybe_table_text)]}) | |
| except Exception: | |
| # Try CSV parsing | |
| if "," in maybe_table_text: | |
| from io import StringIO | |
| df = pd.read_csv(StringIO(maybe_table_text)) | |
| else: | |
| df = pd.DataFrame({"content": [maybe_table_text]}) | |
| df.to_excel(filename, index=False, engine="openpyxl") | |
| return filename | |
| except Exception as e: | |
| log.error(f"Excel creation failed: {e}") | |
| return write_docx_from_text(f"Excel error: {e}\n\n{maybe_table_text}", out_dir=out_dir) | |
| def write_pdf_from_text(text: str, out_dir: Optional[str] = None) -> str: | |
| """Generate a PDF from text.""" | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_doc_{uid}.pdf") | |
| try: | |
| doc = SimpleDocTemplate(filename) | |
| styles = getSampleStyleSheet() | |
| flowables = [] | |
| for para in [p.strip() for p in text.split("\n\n") if p.strip()]: | |
| flowables.append(Paragraph(para.replace("\n", "<br/>"), styles["Normal"])) | |
| flowables.append(Spacer(1, 8)) | |
| doc.build(flowables) | |
| return filename | |
| except Exception as e: | |
| log.error(f"PDF creation failed: {e}") | |
| return write_docx_from_text(f"PDF error: {e}\n\n{text}", out_dir=out_dir) | |
| def build_repo_zip(files_map: Dict[str, str], repo_name: str = "generated_app", | |
| out_dir: Optional[str] = None) -> str: | |
| """Build a repository as a ZIP file.""" | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| uid = uuid.uuid4().hex[:8] | |
| repo_dir = os.path.join(out_dir, f"{repo_name}_{uid}") | |
| os.makedirs(repo_dir, exist_ok=True) | |
| # Write all files | |
| for rel_path, content in files_map.items(): | |
| dest = os.path.join(repo_dir, rel_path) | |
| os.makedirs(os.path.dirname(dest), exist_ok=True) | |
| if isinstance(content, str) and os.path.exists(content): | |
| shutil.copyfile(content, dest) | |
| else: | |
| with open(dest, "w", encoding="utf-8") as fh: | |
| fh.write(str(content)) | |
| # Create ZIP | |
| zip_path = os.path.join(out_dir, f"{repo_name}_{uid}.zip") | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for root, _, files in os.walk(repo_dir): | |
| for f in files: | |
| full = os.path.join(root, f) | |
| arc = os.path.relpath(full, repo_dir) | |
| zf.write(full, arc) | |
| return zip_path | |
| # ============================================================================= | |
| # SECTION 7: BASE GRAPH AGENT NODES | |
| # ============================================================================= | |
| # These are the core agents that form the foundation of the workflow. | |
| def run_triage_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Determines if input is a greeting or task. | |
| Simple classifier to route casual interactions. | |
| """ | |
| log.info("--- TRIAGE ---") | |
| prompt = f"Is this a greeting or a task? '{state.get('userInput', '')}' Reply: 'greeting' or 'task'" | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "") or "" | |
| if 'greeting' in content.lower(): | |
| return { | |
| "draftResponse": "Hello! How can I help?", | |
| "execution_path": ["Triage"], | |
| **add_status_update("Triage", "Greeting") | |
| } | |
| return { | |
| "execution_path": ["Triage"], | |
| **add_status_update("Triage", "Task detected") | |
| } | |
| def run_planner_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Quick planning agent for cost estimation. | |
| Used for initial triage and budgeting. | |
| Robust behavior: | |
| - ask LLM for JSON-only output | |
| - try parse_json_from_llm (existing) | |
| - if that fails, attempt regex-extract JSON and json.loads | |
| - retry LLM with low temperature if parsing fails | |
| - fallback to heuristic plaintext -> plan conversion (partial result) | |
| - compute costs with sane clamping and document LLM preview when fallbacked | |
| """ | |
| import json | |
| import re | |
| import logging | |
| log.info("--- PLANNER ---") | |
| path = ensure_list(state, 'execution_path') + ["Planner"] | |
| user_input = state.get('userInput', '') or "" | |
| # Build a strict JSON-only prompt with schema + example | |
| prompt = ( | |
| "You are a planning assistant. **Return ONLY valid JSON** (no explanation, no markdown).\n" | |
| "Schema: {\"plan\": [string], \"estimated_llm_calls_per_loop\": integer}\n" | |
| "Example:\n" | |
| "{\n" | |
| " \"plan\": [\"Market research: 1 week\", \"MVP: 4 weeks\"],\n" | |
| " \"estimated_llm_calls_per_loop\": 4\n" | |
| "}\n\n" | |
| f"Create a plan for: {json.dumps(user_input)}\n" | |
| "Keep plan concise (each item one short sentence)." | |
| ) | |
| # Helper: extract JSON blob with regex | |
| def _extract_json_blob(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| # Try to find outermost JSON object or array | |
| # Use non-greedy match between braces/brackets to capture first JSON-like blob | |
| m = re.search(r'(\{[\s\S]*\})', text) | |
| if m: | |
| return m.group(1) | |
| m2 = re.search(r'(\[[\s\S]*\])', text) | |
| if m2: | |
| return m2.group(1) | |
| return None | |
| # Try primary LLM call | |
| try: | |
| response = llm.invoke(prompt) | |
| except Exception as e: | |
| log.exception("Planner: llm.invoke failed: %s", e) | |
| return { | |
| "pmPlan": {"error": "LLM invoke failed"}, | |
| "execution_path": path, | |
| **add_status_update("Planner", "Error") | |
| } | |
| raw = (getattr(response, "content", "") or "").strip() | |
| # 1) Try existing parser if available | |
| plan_data = None | |
| try: | |
| # prefer using existing parse_json_from_llm if present | |
| plan_data = parse_json_from_llm(raw) if 'parse_json_from_llm' in globals() else None | |
| except Exception: | |
| plan_data = None | |
| # 2) If parse_json_from_llm failed, try regex extraction + json.loads | |
| if not plan_data: | |
| try: | |
| blob = _extract_json_blob(raw) | |
| if blob: | |
| plan_data = json.loads(blob) | |
| except Exception: | |
| plan_data = None | |
| # 3) Retry the LLM with stricter settings (if parse still failed) | |
| if not plan_data: | |
| try: | |
| log.info("Planner: parse failed, retrying with stricter instruction / low temp.") | |
| # Attempt a retry - adapt params to your llm wrapper if needed | |
| retry_prompt = prompt + "\n\nIMPORTANT: Return only JSON, nothing else." | |
| # Some wrappers accept params, try best-effort; ignore if it errors | |
| retry_resp = None | |
| try: | |
| retry_resp = llm.invoke(retry_prompt, params={"temperature": 0.0}) | |
| except Exception: | |
| retry_resp = llm.invoke(retry_prompt) | |
| raw_retry = (getattr(retry_resp, "content", "") or "").strip() | |
| # attempt parse_json_from_llm on retry | |
| try: | |
| plan_data = parse_json_from_llm(raw_retry) if 'parse_json_from_llm' in globals() else None | |
| except Exception: | |
| plan_data = None | |
| if not plan_data: | |
| blob = _extract_json_blob(raw_retry) | |
| if blob: | |
| try: | |
| plan_data = json.loads(blob) | |
| raw = raw + "\n\n--- retry ---\n\n" + raw_retry | |
| except Exception: | |
| plan_data = None | |
| except Exception as e: | |
| log.exception("Planner retry failed: %s", e) | |
| # 4) Final fallback: convert plaintext into a minimal plan so pipeline continues | |
| if not plan_data: | |
| log.warning("Planner: JSON parse failed, using plaintext fallback.") | |
| # heuristics to make steps | |
| fragments = [f.strip() for f in re.split(r'\n+|(?<=\.)\s+', raw) if f.strip()] | |
| steps = [] | |
| for frag in fragments: | |
| s = frag if len(frag) <= 200 else frag[:197] + "..." | |
| if len(s) > 10 and s not in steps: | |
| steps.append(s) | |
| if len(steps) >= 8: | |
| break | |
| if not steps: | |
| steps = [ | |
| "Clarify requirements with user.", | |
| "Perform market research and competitor analysis.", | |
| "Draft curriculum and module outline.", | |
| "Estimate effort and resources for MVP." | |
| ] | |
| plan_data = { | |
| "plan": steps, | |
| "estimated_llm_calls_per_loop": 3, | |
| # add debug info for observability | |
| "notes": "Derived from plaintext fallback; original LLM output not valid JSON.", | |
| "llm_preview": raw[:2000] | |
| } | |
| # Sanitize and normalize plan_data fields | |
| if not isinstance(plan_data, dict): | |
| plan_data = {"plan": ["Clarify requirements with user."], "estimated_llm_calls_per_loop": 3} | |
| if 'plan' not in plan_data or not isinstance(plan_data['plan'], list): | |
| plan_data['plan'] = plan_data.get('plan') and list(plan_data.get('plan')) or ["Clarify requirements with user."] | |
| try: | |
| calls = int(plan_data.get('estimated_llm_calls_per_loop', 3)) | |
| except Exception: | |
| calls = 3 | |
| # clamp calls to sensible range | |
| calls = max(1, min(calls, 50)) | |
| plan_data['estimated_llm_calls_per_loop'] = calls | |
| # Cost calculation: tokens * avg_cost_per_1k / 1000 -> USD | |
| avg_cost_per_1k = (GPT4O_INPUT_COST_PER_1K_TOKENS + GPT4O_OUTPUT_COST_PER_1K_TOKENS) / 2.0 | |
| # AVG_TOKENS_PER_CALL is tokens per call | |
| try: | |
| avg_tokens = float(AVG_TOKENS_PER_CALL) | |
| except Exception: | |
| avg_tokens = 1500.0 | |
| cost_per_loop = (calls * avg_tokens) * (avg_cost_per_1k / 1000.0) | |
| plan_data['max_loops_initial'] = INITIAL_MAX_REWORK_CYCLES | |
| plan_data['cost_per_loop_usd'] = max(0.01, round(cost_per_loop, 4)) | |
| total_loops = INITIAL_MAX_REWORK_CYCLES + 1 | |
| plan_data['estimated_cost_usd'] = round(cost_per_loop * total_loops, 2) | |
| # Detect artifact requirements (preserve existing behavior) | |
| detection = detect_requested_output_types(user_input) | |
| if detection.get('requires_artifact'): | |
| plan_data.setdefault('experiment_needed', True) | |
| plan_data.setdefault('experiment_type', detection.get('artifact_type')) | |
| plan_data.setdefault('experiment_goal', user_input) | |
| return { | |
| "pmPlan": plan_data, | |
| "execution_path": path, | |
| **add_status_update("Planner", "Plan created") | |
| } | |
| def run_memory_retrieval(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Retrieve relevant memories for context. | |
| Uses vector similarity search on past interactions. | |
| """ | |
| log.info("--- MEMORY ---") | |
| path = ensure_list(state, 'execution_path') + ["Memory"] | |
| mems = memory_manager.retrieve_relevant_memories(state.get('userInput', '')) | |
| context = "\n".join([f"Memory: {m.page_content}" for m in mems]) if mems else "No memories" | |
| return { | |
| "retrievedMemory": context, | |
| "execution_path": path, | |
| **add_status_update("Memory", "Memory retrieved") | |
| } | |
| def run_intent_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Clarify and refine user intent. | |
| Transforms raw input into clear, actionable objective. | |
| """ | |
| log.info("--- INTENT ---") | |
| path = ensure_list(state, 'execution_path') + ["Intent"] | |
| prompt = ( | |
| f"Refine into clear objective.\n\n" | |
| f"Memory: {state.get('retrievedMemory')}\n\n" | |
| f"Request: {state.get('userInput', '')}\n\n" | |
| f"Core Objective:" | |
| ) | |
| response = llm.invoke(prompt) | |
| core_obj = getattr(response, "content", "") or "" | |
| return { | |
| "coreObjectivePrompt": core_obj, | |
| "execution_path": path, | |
| **add_status_update("Intent", "Objective clarified") | |
| } | |
| def validate_and_fix_pm_plan(plan: Dict, user_input: str) -> Dict: | |
| """ | |
| CRITICAL: Forces artifact creation if user requested document output. | |
| Prevents experiment_needed=false when user explicitly wants files. | |
| """ | |
| if not isinstance(plan, dict): | |
| plan = {} | |
| # Detect document/file requests | |
| needs_artifact = any(keyword in user_input.lower() for keyword in [ | |
| 'docx', 'document', 'word', 'file', 'pdf', 'excel', | |
| 'output -', 'output:', 'results in', 'deliver as', | |
| 'save as', 'export to', 'create file', 'generate file', | |
| 'avoid code', 'no code' # If avoiding code, must want document | |
| ]) | |
| if needs_artifact: | |
| log.info("π User requested document/file output - forcing artifact creation") | |
| # FORCE artifact creation | |
| if not plan.get("experiment_needed") or plan.get("experiment_needed") == False: | |
| log.warning("β οΈ Overriding experiment_needed from false to TRUE") | |
| plan["experiment_needed"] = True | |
| # Set document type | |
| if 'pdf' in user_input.lower(): | |
| plan["experiment_type"] = "pdf" | |
| elif 'excel' in user_input.lower() or 'xlsx' in user_input.lower(): | |
| plan["experiment_type"] = "excel" | |
| else: | |
| plan["experiment_type"] = "word" # Default to Word | |
| # Ensure goal is set | |
| if not plan.get("experiment_goal"): | |
| plan["experiment_goal"] = user_input | |
| log.info(f"π Forced: experiment_needed=True, type={plan.get('experiment_type')}") | |
| # Ensure basic plan structure | |
| if not plan.get("plan_steps") or not isinstance(plan["plan_steps"], list): | |
| plan["plan_steps"] = [ | |
| "Analyze request", | |
| "Gather information", | |
| "Create document", | |
| "Format output" | |
| ] | |
| return plan | |
| def run_pm_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| FIXED: PM Agent with proper hard limits and language preference. | |
| """ | |
| log.info("--- PM ---") | |
| # Get current state | |
| current_rework = ensure_int(state, 'rework_cycles', 0) | |
| max_loops_val = ensure_int(state, 'max_loops', INITIAL_MAX_REWORK_CYCLES) | |
| # CRITICAL FIX 1: Cap max_loops if unreasonable | |
| if max_loops_val > 10: | |
| log.warning(f"β οΈ max_loops {max_loops_val} TOO HIGH! Forcing to 3") | |
| max_loops_val = 3 | |
| state["max_loops"] = 3 | |
| # Calculate hard limit (never exceed 6 total reworks) | |
| hard_limit = min(max_loops_val * 2, 6) | |
| log.info(f"π Rework cycle: {current_rework}/{hard_limit} (max: {max_loops_val})") | |
| # CRITICAL FIX 2: Force completion if at hard limit | |
| if current_rework >= hard_limit: | |
| log.error(f"β Hard limit reached: {current_rework} >= {hard_limit}") | |
| path = ensure_list(state, 'execution_path') + ["PM"] | |
| user_input = state.get('userInput', '') | |
| is_research = any(kw in user_input.lower() for kw in [ | |
| 'research', 'everything to start', 'comprehensive', 'guide to' | |
| ]) | |
| # Create completion plan | |
| completion_plan = { | |
| "plan_steps": [ | |
| "Create comprehensive research document", | |
| "Include market analysis and competitive review", | |
| "Add Python code examples for AI implementation", | |
| "Provide actionable implementation roadmap" | |
| ], | |
| "experiment_needed": True, | |
| "experiment_type": "word", # Force document for research | |
| "experiment_goal": state.get('coreObjectivePrompt', user_input) | |
| } | |
| return { | |
| "pmPlan": completion_plan, | |
| "execution_path": path, | |
| "rework_cycles": current_rework, | |
| "approved": True, # FORCE APPROVAL to complete | |
| "max_loops": 3, | |
| **add_status_update("PM", "Limit reached - completing with document") | |
| } | |
| # CRITICAL FIX 3: Check for loops (but not on first few cycles) | |
| if current_rework > 1: | |
| if detect_loop(state): | |
| log.error("β Loop detected in PM agent") | |
| path = ensure_list(state, 'execution_path') + ["PM"] | |
| # Force completion | |
| return { | |
| "pmPlan": { | |
| "plan_steps": ["Create research document with analysis"], | |
| "experiment_needed": True, | |
| "experiment_type": "word", | |
| "experiment_goal": state.get('coreObjectivePrompt', '') | |
| }, | |
| "execution_path": path, | |
| "rework_cycles": current_rework, | |
| "approved": True, | |
| **add_status_update("PM", "Loop detected - forcing completion") | |
| } | |
| # Increment cycle counter | |
| current_cycles = current_rework + 1 | |
| path = ensure_list(state, 'execution_path') + ["PM"] | |
| # Build planning context | |
| user_input = state.get('userInput', '') | |
| context_parts = [ | |
| f"=== USER REQUEST ===\n{user_input}", | |
| f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", | |
| ] | |
| # Add language requirement | |
| if 'prefer python' in user_input.lower() or 'use python' in user_input.lower(): | |
| context_parts.append("\n=== LANGUAGE REQUIREMENT ===\nUSER REQUIRES PYTHON") | |
| # Add feedback if rework | |
| if state.get('qaFeedback'): | |
| context_parts.append(f"\n=== QA FEEDBACK ===\n{state.get('qaFeedback')}") | |
| context_parts.append(f"\n=== REWORK CYCLE {current_cycles}/{hard_limit} ===") | |
| full_context = "\n".join(context_parts) | |
| # Detect if research request | |
| is_research = any(kw in user_input.lower() for kw in [ | |
| 'research', 'analysis', 'everything to start', 'comprehensive' | |
| ]) | |
| # Create planning prompt | |
| prompt = f"""Create DETAILED execution plan. | |
| {full_context} | |
| {"IMPORTANT: This is a research request. Create a document with Python code examples." if is_research else ""} | |
| Return JSON: | |
| {{ | |
| "plan_steps": ["step 1", "step 2", ...], | |
| "experiment_needed": true/false, | |
| "experiment_type": "{"word" if is_research else "script|notebook|excel|word|pdf|repo"}", | |
| "experiment_goal": "specific goal", | |
| "key_requirements": ["req 1", "req 2", ...] | |
| }} | |
| Be specific and actionable. | |
| """ | |
| # Get plan from LLM | |
| try: | |
| response = llm.invoke(prompt) | |
| plan = parse_json_from_llm(getattr(response, "content", "") or "") | |
| except Exception as e: | |
| log.warning(f"PM LLM failed: {e}") | |
| plan = None | |
| # CRITICAL: Validate and fix plan BEFORE using it | |
| user_input = state.get('userInput', '') | |
| plan = validate_and_fix_pm_plan(plan or {}, user_input) | |
| # Log what we're doing | |
| log.info(f"π Final plan: experiment_needed={plan.get('experiment_needed')}, type={plan.get('experiment_type')}") | |
| # Fallback if still no plan structure | |
| if not plan.get("plan_steps"): | |
| plan["plan_steps"] = [ | |
| "Analyze requirements", | |
| "Research market", | |
| "Create deliverable" | |
| ] | |
| # Normalize experiment type | |
| exp_type = normalize_experiment_type( | |
| plan.get('experiment_type'), | |
| plan.get('experiment_goal', '') | |
| ) | |
| plan['experiment_type'] = exp_type | |
| # Ensure goal is set | |
| if plan.get('experiment_needed') and not plan.get('experiment_goal'): | |
| plan['experiment_goal'] = user_input | |
| # Add loop control info | |
| plan['max_loops_initial'] = max_loops_val | |
| plan['hard_limit'] = hard_limit | |
| return { | |
| "pmPlan": plan, | |
| "execution_path": path, | |
| "rework_cycles": current_cycles, | |
| "max_loops": max_loops_val, | |
| **add_status_update("PM", f"Plan created - Cycle {current_cycles}/{hard_limit}") | |
| } | |
| def extract_user_language_preference(user_input: str) -> str: | |
| """Extract language preference from user rules.""" | |
| text = user_input.lower() | |
| # Check for explicit Python preference | |
| if any(p in text for p in ["prefer python", "use python", "always use python", "code", "provide script"]): | |
| return "python" | |
| # Check for "unless" clause | |
| if "unless" in text and "python" in text: | |
| # User wants Python unless explicitly stated otherwise | |
| if not any(lang in text for lang in ["typescript", "javascript", "java", "c++", "R", "PHP", "Scala", "MATLAB", "SQL"]): | |
| return "python" | |
| return None | |
| def run_experimenter_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Generates and executes code/artifacts. | |
| Handles: | |
| - Multi-language code generation | |
| - Artifact creation (notebooks, scripts, documents) | |
| - Code execution | |
| - Result capture | |
| """ | |
| log.info("--- EXPERIMENTER ---") | |
| path = ensure_list(state, 'execution_path') + ["Experimenter"] | |
| pm = state.get('pmPlan', {}) or {} | |
| # Skip if no experiment needed | |
| if not pm.get('experiment_needed'): | |
| return { | |
| "experimentCode": None, | |
| "experimentResults": None, | |
| "execution_path": path, | |
| **add_status_update("Experimenter", "No experiment needed") | |
| } | |
| # β ADD THIS SECTION β | |
| user_input = state.get('userInput', '') | |
| # CRITICAL: Check user's language preference FIRST | |
| forced_lang = extract_user_language_preference(user_input) | |
| # CRITICAL: Detect research requests | |
| is_research = any(kw in user_input.lower() for kw in [ | |
| 'research', 'analysis', 'everything to start', | |
| 'comprehensive', 'guide to starting', 'study', | |
| 'review', 'summarize' | |
| ]) | |
| # Force document output for research | |
| if is_research: | |
| exp_type = 'word' | |
| log.info("π Research request detected - using document format") | |
| else: | |
| exp_type = normalize_experiment_type( | |
| pm.get('experiment_type'), | |
| pm.get('experiment_goal', '') | |
| ) | |
| # Use forced language or detect | |
| language = forced_lang or 'python' # Default to Python | |
| log.info(f"π Type: {exp_type}, Language: {language}") | |
| # β END NEW SECTION β | |
| # Detect language and artifact type | |
| detected = detect_requested_output_types_enhanced(pm.get('experiment_goal', '')) | |
| language = detected.get('language', 'python') | |
| exp_type = normalize_experiment_type(pm.get('experiment_type'), pm.get('experiment_goal', '')) | |
| goal = pm.get('experiment_goal', 'No goal') | |
| # Build rich context | |
| context_parts = [ | |
| f"=== USER REQUEST ===\n{state.get('userInput', '')}", | |
| f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", | |
| f"\n=== PLAN ===\n{json.dumps(pm.get('plan_steps', []), indent=2)}", | |
| f"\n=== REQUIREMENTS ===\n{json.dumps(pm.get('key_requirements', []), indent=2)}", | |
| ] | |
| if state.get('retrievedMemory'): | |
| context_parts.append(f"\n=== CONTEXT ===\n{state.get('retrievedMemory', '')}") | |
| if state.get('qaFeedback'): | |
| context_parts.append(f"\n=== FEEDBACK TO ADDRESS ===\n{state.get('qaFeedback', '')}") | |
| full_context = "\n".join(context_parts) | |
| # Get language configuration | |
| lang_config = LANGUAGES.get(language) | |
| lang_name = lang_config.name if lang_config else "Code" | |
| # OLD PROMPT (DELETE): | |
| # enhanced_prompt = f"""Create {lang_name} {exp_type} artifact... | |
| # NEW PROMPT (USE THIS): | |
| if exp_type == 'word': | |
| # Research document prompt | |
| enhanced_prompt = f"""Create COMPREHENSIVE RESEARCH DOCUMENT. | |
| USER REQUEST: {user_input} | |
| OBJECTIVE: {state.get('coreObjectivePrompt', '')} | |
| OUTPUT REQUIREMENTS: | |
| - Complete research document in markdown format | |
| - Include ALL relevant sections (market analysis, competitors, strategies, implementation) | |
| - Add Python code EXAMPLES where relevant (AI integration, tools, APIs) | |
| - Be SPECIFIC and ACTIONABLE - provide real data and analysis | |
| - NO placeholders - complete, usable content only | |
| - Structure: Executive Summary, Market Analysis, Competitive Review, | |
| Curriculum Design, Implementation Plan, Tools & Tech Stack | |
| Generate complete research document with Python examples:""" | |
| else: | |
| # Code generation prompt | |
| enhanced_prompt = f"""Create {lang_name} {exp_type}. | |
| USER REQUEST: {user_input} | |
| REQUIREMENTS: | |
| - Write ONLY {lang_name} code (user specified) | |
| - Follow best practices | |
| - Include documentation | |
| - Production-ready | |
| - Include error handling | |
| Generate {lang_name} code:""" | |
| response = llm.invoke(enhanced_prompt) | |
| llm_text = getattr(response, "content", "") or "" | |
| # β ADD FOR RESEARCH DOCUMENTS β | |
| if exp_type == 'word': | |
| # Save as document | |
| doc_path = write_docx_from_text(llm_text, out_dir=OUT_DIR) | |
| txt_path = doc_path.replace('.docx', '.txt') | |
| # Also save as text | |
| with open(txt_path, 'w', encoding='utf-8') as f: | |
| f.write(llm_text) | |
| log.info(f"π Created: {os.path.basename(doc_path)}") | |
| return { | |
| "experimentCode": llm_text, | |
| "experimentResults": { | |
| "success": True, | |
| "paths": { | |
| "document": sanitize_path(doc_path), | |
| "text": sanitize_path(txt_path) | |
| }, | |
| "stdout": f"Research document created: {os.path.basename(doc_path)}", | |
| "language": "markdown", | |
| "artifact_type": "research_document" | |
| }, | |
| "execution_path": path, | |
| **add_status_update("Experimenter", "Research doc created") | |
| } | |
| # β END RESEARCH SECTION β | |
| # Continue with normal code handling... | |
| # Extract code blocks with language detection | |
| code_blocks = extract_code_blocks_multi_lang(llm_text) | |
| if code_blocks: | |
| # Use first detected language/code pair | |
| detected_lang, code_text = code_blocks[0] | |
| # Write script with proper extension | |
| script_path = write_script_multi_lang(code_text, detected_lang, out_dir=OUT_DIR) | |
| # Execute with appropriate runner | |
| exec_results = execute_code(code_text, detected_lang) | |
| results = { | |
| "success": exec_results.get("exit_code", 0) == 0, | |
| "paths": {"script": sanitize_path(script_path)}, | |
| "stdout": exec_results.get("stdout", ""), | |
| "stderr": exec_results.get("stderr", ""), | |
| "language": detected_lang, | |
| "context_used": len(full_context) | |
| } | |
| return { | |
| "experimentCode": code_text, | |
| "experimentResults": results, | |
| "execution_path": path, | |
| **add_status_update("Experimenter", f"{lang_name} script created") | |
| } | |
| # No code blocks found | |
| return { | |
| "experimentCode": None, | |
| "experimentResults": {"success": False, "error": "No code generated"}, | |
| "execution_path": path, | |
| **add_status_update("Experimenter", "Code generation failed") | |
| } | |
| def run_synthesis_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Synthesizes final response from all components. | |
| Combines: | |
| - User request | |
| - Execution results | |
| - Artifacts created | |
| - Insights and explanations | |
| """ | |
| log.info("--- SYNTHESIS ---") | |
| _state = state or {} | |
| path = ensure_list(_state, 'execution_path') + ["Synthesis"] | |
| exp_results = _state.get('experimentResults') | |
| pm_plan = _state.get('pmPlan', {}) or {} | |
| # Build synthesis context | |
| synthesis_context = [ | |
| f"=== USER REQUEST ===\n{_state.get('userInput', '')}", | |
| f"\n=== OBJECTIVE ===\n{_state.get('coreObjectivePrompt', '')}", | |
| f"\n=== PLAN ===\n{json.dumps(pm_plan.get('plan_steps', []), indent=2)}", | |
| ] | |
| artifact_details = [] | |
| artifact_message = "" | |
| # Process experiment results | |
| if exp_results and isinstance(exp_results, dict): | |
| paths = exp_results.get("paths") or {} | |
| if paths: | |
| artifact_lines = [] | |
| for artifact_type, artifact_path in paths.items(): | |
| artifact_lines.append(f"- **{artifact_type.title()}**: `{os.path.basename(artifact_path)}`") | |
| artifact_details.append(f"{artifact_type}: {artifact_path}") | |
| artifact_message = "\n\n**Artifacts Generated:**\n" + "\n".join(artifact_lines) | |
| synthesis_context.append(f"\n=== ARTIFACTS ===\n" + "\n".join(artifact_details)) | |
| if exp_results.get('stdout'): | |
| synthesis_context.append(f"\n=== OUTPUT ===\n{exp_results.get('stdout', '')}") | |
| if exp_results.get('stderr'): | |
| synthesis_context.append(f"\n=== ERRORS ===\n{exp_results.get('stderr', '')}") | |
| full_context = "\n".join(synthesis_context) | |
| # Generate final response | |
| synthesis_prompt = f"""Create FINAL RESPONSE after executing user's request. | |
| {full_context} | |
| Create comprehensive response that: | |
| - Directly addresses original request | |
| - Explains what was accomplished and HOW | |
| - References specific artifacts and explains PURPOSE | |
| - Provides context on how to USE deliverables | |
| - Highlights KEY INSIGHTS | |
| - Suggests NEXT STEPS if relevant | |
| - Be SPECIFIC about what was created.""" | |
| response = llm.invoke(synthesis_prompt) | |
| final_text = getattr(response, "content", "") or "" | |
| # After getting exp_results... | |
| # β ADD THIS SECTION β | |
| content_preview = "" | |
| if exp_results and isinstance(exp_results, dict): | |
| paths = exp_results.get("paths") or {} | |
| if paths: | |
| for artifact_type, artifact_path in paths.items(): | |
| if os.path.exists(artifact_path): | |
| try: | |
| with open(artifact_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| preview = content[:2000] # First 2000 chars | |
| content_preview += f"\n\n### π {artifact_type.title()} Preview:\n```\n{preview}\n...\n```\n" | |
| except Exception as e: | |
| log.warning(f"Could not preview {artifact_path}: {e}") | |
| # β END NEW SECTION β | |
| # Then in final_text assembly: | |
| if artifact_message: | |
| final_text = final_text + "\n\n---\n" + artifact_message | |
| if content_preview: | |
| final_text = final_text + "\n\n---\n## Content Preview\n" + content_preview | |
| return { | |
| "draftResponse": final_text, | |
| "execution_path": path, | |
| **add_status_update("Synthesis", "Response synthesized") | |
| } | |
| def run_qa_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Quality assurance review. | |
| Validates: | |
| - Completeness | |
| - Correctness | |
| - Alignment with user request | |
| - Quality standards | |
| """ | |
| log.info("--- QA ---") | |
| path = ensure_list(state, 'execution_path') + ["QA"] | |
| # Build QA context | |
| qa_context = [ | |
| f"=== REQUEST ===\n{state.get('userInput', '')}", | |
| f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", | |
| f"\n=== DRAFT ===\n{state.get('draftResponse', '')}", | |
| ] | |
| if state.get('experimentResults'): | |
| qa_context.append( | |
| f"\n=== ARTIFACTS ===\n" | |
| f"{json.dumps(state.get('experimentResults', {}).get('paths', {}), indent=2)}" | |
| ) | |
| prompt = f"""You are a QA reviewer. Review the draft response against the user's objective. | |
| {chr(10).join(qa_context)} | |
| Review Instructions: | |
| - Does the draft and its artifacts COMPLETELY satisfy ALL parts of the user's request? | |
| - Is the quality of the work high? | |
| - If this is a re-submission (rework cycle > 1), has the previous feedback been successfully addressed? | |
| Response Format (required JSON or a single word 'APPROVED'): | |
| Either return EXACTLY the single word: | |
| APPROVED | |
| Or return JSON like: | |
| {{ | |
| "approved": false, | |
| "feedback": "Specific, actionable items to fix (bullet list or numbered).", | |
| "required_changes": ["..."] | |
| }} | |
| """ | |
| try: | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "") or "" | |
| except Exception as e: | |
| log.exception(f"QA LLM call failed: {e}") | |
| return { | |
| "approved": False, | |
| "qaFeedback": "QA LLM failed; manual review required.", | |
| "execution_path": path, | |
| **add_status_update("QA", "QA failed") | |
| } | |
| # Check for simple APPROVED response | |
| if "APPROVED" in content.strip().upper() and len(content.strip()) <= 20: | |
| return { | |
| "approved": True, | |
| "qaFeedback": None, | |
| "execution_path": path, | |
| **add_status_update("QA", "Approved") | |
| } | |
| # Try JSON parsing | |
| parsed = parse_json_from_llm(content) | |
| if isinstance(parsed, dict): | |
| approved = bool(parsed.get("approved", False)) | |
| feedback = parsed.get("feedback") or parsed.get("qaFeedback") or parsed.get("required_changes") or "" | |
| # Normalize feedback to string | |
| if isinstance(feedback, list): | |
| feedback = "\n".join([str(x) for x in feedback]) | |
| elif not isinstance(feedback, str): | |
| feedback = str(feedback) | |
| return { | |
| "approved": approved, | |
| "qaFeedback": feedback if not approved else None, | |
| "execution_path": path, | |
| **add_status_update("QA", "QA completed") | |
| } | |
| # Fallback: treat as feedback (not approved) | |
| safe_feedback = content.strip()[:2000] or "QA produced no actionable output." | |
| return { | |
| "approved": False, | |
| "qaFeedback": safe_feedback, | |
| "execution_path": path, | |
| **add_status_update("QA", "QA needs rework") | |
| } | |
| def run_archivist_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Archives successful interactions to memory. | |
| Enables learning from past experiences. | |
| """ | |
| log.info("--- ARCHIVIST ---") | |
| path = ensure_list(state, 'execution_path') + ["Archivist"] | |
| summary_prompt = ( | |
| f"Summarize for memory.\n\n" | |
| f"Objective: {state.get('coreObjectivePrompt')}\n\n" | |
| f"Response: {state.get('draftResponse')}\n\n" | |
| f"Summary:" | |
| ) | |
| response = llm.invoke(summary_prompt) | |
| memory_manager.add_to_memory( | |
| getattr(response, "content", ""), | |
| {"objective": state.get('coreObjectivePrompt')} | |
| ) | |
| return { | |
| "execution_path": path, | |
| **add_status_update("Archivist", "Saved to memory") | |
| } | |
| def run_disclaimer_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| BASE GRAPH: Adds disclaimer when limits are reached. | |
| Handles budget or rework cycle exhaustion. | |
| """ | |
| log.warning("--- DISCLAIMER ---") | |
| path = ensure_list(state, 'execution_path') + ["Disclaimer"] | |
| reason = "Budget limit reached." if state.get('budget_exceeded') else "Rework limit reached." | |
| disclaimer = f"**DISCLAIMER: {reason} Draft may be incomplete.**\n\n---\n\n" | |
| final_response = disclaimer + state.get('draftResponse', "No response") | |
| return { | |
| "draftResponse": final_response, | |
| "execution_path": path, | |
| **add_status_update("Disclaimer", reason) | |
| } | |
| # ============================================================================= | |
| # SECTION 8: UPGRADED GRAPH AGENT NODES | |
| # ============================================================================= | |
| # These enhanced agents add governance, compliance, and observation layers. | |
| def run_pragmatist_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| UPGRADED GRAPH: Risk assessment and tier recommendations. | |
| Provides: | |
| - Risk analysis (low/medium/high) | |
| - Tiered delivery options (lite/standard/full) | |
| - Cost estimates per tier | |
| - Optimization suggestions | |
| """ | |
| log.info(">>> PRAGMATIST AGENT (improved)") | |
| path = ensure_list(state, "execution_path") + ["Pragmatist"] | |
| pm = state.get("pmPlan", {}) or {} | |
| # Parse estimated cost | |
| est_cost = None | |
| try: | |
| raw = pm.get("estimated_cost_usd", None) | |
| if raw is not None and raw != "": | |
| est_cost = float(raw) | |
| except Exception: | |
| try: | |
| s = str(raw) | |
| m = re.search(r"[\d,.]+", s) | |
| if m: | |
| est_cost = float(m.group(0).replace(",", "")) | |
| except Exception: | |
| est_cost = None | |
| exp_type = pm.get("experiment_type", "word") | |
| base_est = est_cost or (50.0 if exp_type in ["script", "repo"] else 5.0) | |
| # Define tier options | |
| tiers = { | |
| "lite": { | |
| "multiplier": 0.25, | |
| "estimated_cost_usd": round(base_est * 0.25, 2), | |
| "description": "Minimal extract (CSV/text) or short summary. Minimal engineering." | |
| }, | |
| "standard": { | |
| "multiplier": 1.0, | |
| "estimated_cost_usd": round(base_est * 1.0, 2), | |
| "description": "Complete, tested script or notebook; limited UX β suitable for MVP." | |
| }, | |
| "full": { | |
| "multiplier": 3.0, | |
| "estimated_cost_usd": round(base_est * 3.0, 2), | |
| "description": "Production-ready repo, packaging, tests, and deployment instructions." | |
| } | |
| } | |
| preferred = state.get("preferred_tier") | |
| flexible_mode = bool(state.get("flexible_budget_mode", False)) | |
| # Intelligent risk assessment | |
| risk_factors = [] | |
| risk_score = 0 | |
| # Check complexity | |
| plan_steps = pm.get("plan_steps", []) | |
| if len(plan_steps) > 8: | |
| risk_factors.append("Complex multi-step plan") | |
| risk_score += 1 | |
| # Check artifact type | |
| if exp_type in ("repo", "notebook"): | |
| risk_factors.append(f"Engineering-heavy artifact type: {exp_type}") | |
| risk_score += 1 | |
| # Check cost | |
| if est_cost is None: | |
| risk_factors.append("No cost estimate provided") | |
| risk_score += 1 | |
| elif est_cost > 200: | |
| risk_factors.append(f"High estimated cost: ${est_cost}") | |
| risk_score += 2 | |
| elif est_cost > 100: | |
| risk_factors.append(f"Moderate estimated cost: ${est_cost}") | |
| risk_score += 1 | |
| # Adjust risk for flexible mode | |
| if flexible_mode: | |
| risk_score = max(0, risk_score - 2) | |
| # Calculate risk level | |
| if risk_score <= 1: | |
| risk = "low" | |
| elif risk_score <= 3: | |
| risk = "medium" | |
| else: | |
| risk = "high" | |
| # Determine feasibility (LIBERAL: almost always feasible) | |
| feasible = True | |
| if risk_score > 5 and not flexible_mode: # Very high threshold | |
| feasible = False | |
| # Recommend tier | |
| if preferred in tiers: | |
| recommended_tier = preferred | |
| elif est_cost is None: | |
| recommended_tier = "standard" | |
| elif est_cost > 500 and not flexible_mode: | |
| recommended_tier = "lite" | |
| else: | |
| recommended_tier = "standard" | |
| prag_report = { | |
| "ok": feasible, | |
| "risk_factors": risk_factors, | |
| "risk_level": risk, | |
| "risk_score": risk_score, | |
| "tier_options": tiers, | |
| "recommended_tier": recommended_tier, | |
| "explain": ( | |
| f"Assessed {len(risk_factors)} risk factor(s). " | |
| f"Risk level: {risk}. Recommended tier: {recommended_tier}. " | |
| "User can proceed with any tier; higher tiers provide more complete deliverables." | |
| ) | |
| } | |
| # Optional LLM recommendations for high complexity | |
| if len(plan_steps) > 7 and llm: | |
| try: | |
| prompt = ( | |
| "You are a pragmatic engineering advisor. Given this plan, suggest 2-3 ways to " | |
| "optimize implementation while preserving core value. Be specific and actionable. " | |
| "Return JSON {\"optimizations\": [...]}.\n\n" | |
| f"Plan: {json.dumps(pm, indent=2)}" | |
| ) | |
| r = llm.invoke(prompt) | |
| recs = parse_json_from_llm(getattr(r, "content", "") or "") | |
| if isinstance(recs, dict): | |
| prag_report["optimizations"] = recs.get("optimizations", []) | |
| except Exception as e: | |
| log.debug(f"LLM optimizations failed: {e}") | |
| out = {"pragmatistReport": prag_report, "execution_path": path} | |
| out.update(add_status_update("Pragmatist", f"Risk: {risk}, Tier: {recommended_tier}")) | |
| return out | |
| def run_governance_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| FIXED: Full tier bypasses ALL budget checks completely. | |
| """ | |
| log.info(">>> GOVERNANCE AGENT (improved)") | |
| path = ensure_list(state, "execution_path") + ["Governance"] | |
| pm = state.get("pmPlan", {}) or {} | |
| prag = state.get("pragmatistReport", {}) or {} | |
| preferred = state.get("preferred_tier") or prag.get("recommended_tier") or "standard" | |
| tier_opts = prag.get("tier_options", {}) | |
| chosen = tier_opts.get(preferred, tier_opts.get("standard", {})) | |
| try: | |
| chosen_cost = float(chosen.get("estimated_cost_usd", 0.0)) | |
| except Exception: | |
| chosen_cost = 0.0 | |
| flexible = bool(state.get("flexible_budget_mode", False)) | |
| # CRITICAL FIX: Full tier = unlimited budget, bypass ALL checks | |
| if preferred == "full": | |
| log.info("π― FULL TIER SELECTED - UNLIMITED BUDGET MODE ACTIVATED") | |
| log.info(f" Cost estimate: ${chosen_cost} (ignored)") | |
| log.info(" Budget checks: DISABLED") | |
| log.info(" Rework limits: EXTENDED") | |
| gov_report = { | |
| "budget_ok": True, | |
| "issues": ["Full tier: budget checks disabled"], | |
| "approved_for_experiment": True, | |
| "governanceDecision": "approve", | |
| "chosen_tier": "full", | |
| "chosen_cost_usd": chosen_cost, | |
| "rationale": "Full tier: no budget constraints applied", | |
| "reasoning": "Full tier approval - unlimited mode" | |
| } | |
| # Update state to disable budget enforcement | |
| out = { | |
| "governanceReport": gov_report, | |
| "execution_path": path, | |
| "budget_exceeded": False, # Can never be true for full tier | |
| "stop_threshold": float('inf'), # Infinite threshold | |
| "max_loops": 10, # Allow more iterations | |
| } | |
| out.update(add_status_update("Governance", "Full tier approved - unlimited budget")) | |
| return out | |
| # For lite/standard tiers, do normal budget checks | |
| decision = "approve" | |
| issues = [] | |
| budget = state.get("budget") or 0.0 | |
| stop_threshold = state.get("stop_threshold") or 0.0 | |
| if stop_threshold > 0: | |
| try: | |
| threshold_f = float(stop_threshold) | |
| if chosen_cost > threshold_f: | |
| if flexible: | |
| issues.append(f"Cost ${chosen_cost} exceeds ${threshold_f}, flexible mode enabled") | |
| decision = "approve_with_warning" | |
| else: | |
| issues.append(f"Cost ${chosen_cost} exceeds ${threshold_f}") | |
| decision = "reject" | |
| except Exception as e: | |
| issues.append(f"Budget check error: {e}") | |
| risk_level = prag.get("risk_level") | |
| if risk_level == "high" and not prag.get("ok", True): | |
| issues.append("High risk identified") | |
| decision = "approve_with_warning" if flexible else "reject" | |
| approved_bool = decision in ("approve", "approve_with_warning") | |
| gov_report = { | |
| "budget_ok": approved_bool, | |
| "issues": issues, | |
| "approved_for_experiment": approved_bool, | |
| "governanceDecision": decision, | |
| "chosen_tier": preferred, | |
| "chosen_cost_usd": chosen_cost, | |
| "rationale": None, | |
| "reasoning": f"Decision: {decision}. {len(issues)} issue(s)." | |
| } | |
| out = {"governanceReport": gov_report, "execution_path": path} | |
| out.update(add_status_update("Governance", f"{decision.title()} {preferred} (${chosen_cost})")) | |
| return out | |
| def scan_text_for_secrets(text: str) -> Dict[str, Any]: | |
| """ | |
| UPGRADED GRAPH: Scan text for potential secrets/credentials. | |
| Used by Compliance agent. | |
| """ | |
| findings = [] | |
| if not text: | |
| return {"suspicious": False, "findings": findings} | |
| patterns = [ | |
| r"AKIA[0-9A-Z]{16}", # AWS access key | |
| r"-----BEGIN PRIVATE KEY-----", # Private key | |
| r"AIza[0-9A-Za-z-_]{35}", # Google API key | |
| r"(?i)secret[_-]?(key|token)\b", # Secret keywords | |
| r"(?i)password\s*[:=]\s*['\"][^'\"]{6,}['\"]" # Password patterns | |
| ] | |
| for p in patterns: | |
| for m in re.finditer(p, text): | |
| findings.append({"pattern": p, "match": m.group(0)}) | |
| return {"suspicious": len(findings) > 0, "findings": findings} | |
| def run_compliance_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| UPGRADED GRAPH: Security and compliance scanning. | |
| Scans for: | |
| - Exposed secrets/credentials | |
| - Sensitive data | |
| - Policy violations | |
| """ | |
| log.info(">>> COMPLIANCE AGENT") | |
| path = ensure_list(state, "execution_path") + ["Compliance"] | |
| exp = state.get("experimentResults", {}) or {} | |
| report = {"suspicious": False, "issues": [], "scanned": []} | |
| # Scan stdout/stderr | |
| for key in ("stdout", "stderr"): | |
| val = exp.get(key) | |
| if isinstance(val, str) and val.strip(): | |
| scan = scan_text_for_secrets(val) | |
| if scan.get("suspicious"): | |
| report["suspicious"] = True | |
| report["issues"].append({ | |
| "type": "text_secret", | |
| "where": key, | |
| "findings": scan["findings"] | |
| }) | |
| report["scanned"].append({"type": "text", "where": key}) | |
| # Scan generated files | |
| if isinstance(exp, dict) and "paths" in exp: | |
| paths = exp.get("paths") or {} | |
| if isinstance(paths, dict): | |
| for k, p in paths.items(): | |
| try: | |
| pstr = str(p) | |
| if os.path.exists(pstr) and os.path.isfile(pstr): | |
| with open(pstr, "r", encoding="utf-8", errors="ignore") as fh: | |
| sample = fh.read(20000) | |
| scan = scan_text_for_secrets(sample) | |
| if scan.get("suspicious"): | |
| report["suspicious"] = True | |
| report["issues"].append({ | |
| "type": "file_secret", | |
| "file": pstr, | |
| "findings": scan["findings"] | |
| }) | |
| report["scanned"].append({"type": "file", "file": pstr}) | |
| else: | |
| report["scanned"].append({ | |
| "type": "path", | |
| "value": pstr, | |
| "exists": os.path.exists(pstr) | |
| }) | |
| except Exception as e: | |
| report["scanned"].append({"file": p, "error": str(e)}) | |
| # Note if repo/zip artifact | |
| # Ensure paths exists safely | |
| paths = locals().get("paths") or state.get("paths") if "state" in locals() else {} | |
| # Note if repo/zip artifact | |
| if isinstance(paths, dict) and any( | |
| str(v).lower().endswith(".zip") for v in paths.values() | |
| ): | |
| report.setdefault("notes", []).append( | |
| "Zip-based or repo artifact detected β recommend manual review." | |
| ) | |
| # Final output | |
| out = {"complianceReport": report, "execution_path": path} | |
| out.update(add_status_update("Compliance", "Compliance checks complete")) | |
| return out | |
| def summarize_logs_for_observer(log_paths: Optional[list] = None, sample_lines: int = 200) -> str: | |
| """ | |
| UPGRADED GRAPH: Summarize system logs for observer. | |
| """ | |
| if not log_paths: | |
| candidates = ["logs/performance.log", "logs/ai_lab.log", "performance.log"] | |
| log_paths = [p for p in candidates if os.path.exists(p)] | |
| parts = [] | |
| errs = 0 | |
| warns = 0 | |
| for p in log_paths: | |
| try: | |
| with open(p, "r", encoding="utf-8", errors="ignore") as fh: | |
| lines = fh.readlines()[-sample_lines:] | |
| content = "".join(lines) | |
| errs += content.upper().count("ERROR") | |
| warns += content.upper().count("WARNING") | |
| parts.append(f"--- {p} (last {len(lines)} lines) ---\n{content[:2000]}") | |
| except Exception as e: | |
| parts.append(f"Could not read {p}: {e}") | |
| header = f"Log summary: {errs} ERROR(s), {warns} WARNING(s)" | |
| return header + "\n\n" + "\n\n".join(parts) | |
| def run_observer_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| UPGRADED GRAPH: System performance and health monitoring. | |
| Tracks: | |
| - Execution length | |
| - Rework cycles | |
| - Cost accumulation | |
| - Error patterns | |
| """ | |
| log.info(">>> OBSERVER AGENT") | |
| path = ensure_list(state, "execution_path") + ["Observer"] | |
| # Find log files | |
| log_candidates = [] | |
| for candidate in ["logs/performance.log", "logs/ai_lab.log", "performance.log"]: | |
| if os.path.exists(candidate): | |
| log_candidates.append(candidate) | |
| summary = summarize_logs_for_observer(log_candidates or None) | |
| exec_len = len(state.get("execution_path", []) or []) | |
| rework_cycles = ensure_int(state, "rework_cycles", 0) | |
| current_cost = state.get("current_cost", 0.0) | |
| obs = { | |
| "log_summary": summary[:4000], | |
| "execution_length": exec_len, | |
| "rework_cycles": rework_cycles, | |
| "current_cost": current_cost, | |
| "status": get_latest_status(state) | |
| } | |
| # Optional LLM recommendations | |
| if llm: | |
| try: | |
| prompt = ( | |
| "You are an Observer assistant. Given this runtime summary, provide 3 prioritized " | |
| "next actions to mitigate the top risks.\n\n" | |
| f"Runtime summary: {json.dumps(obs, indent=2)}\n\nReturn plain text." | |
| ) | |
| r = llm.invoke(prompt) | |
| obs["llm_recommendations"] = getattr(r, "content", "")[:1500] | |
| except Exception as e: | |
| obs["llm_recommendations_error"] = str(e) | |
| out = {"observerReport": obs, "execution_path": path} | |
| out.update(add_status_update("Observer", "Observer summary created")) | |
| return out | |
| def run_knowledge_curator_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| UPGRADED GRAPH: Curates and archives knowledge for future use. | |
| Captures: | |
| - Successful patterns | |
| - Common pitfalls | |
| - Best practices | |
| - User preferences | |
| """ | |
| log.info(">>> KNOWLEDGE CURATOR AGENT") | |
| path = ensure_list(state, "execution_path") + ["KnowledgeCurator"] | |
| core = state.get("coreObjectivePrompt", "") or state.get("userInput", "") | |
| pm = state.get("pmPlan", {}) or {} | |
| draft = state.get("draftResponse", "") or "" | |
| qa_feedback = state.get("qaFeedback", "") or "" | |
| summary_text = ( | |
| f"Objective: {core}\n\n" | |
| f"Plan Steps: {json.dumps(pm.get('plan_steps', []))}\n\n" | |
| f"Draft (first 1500 chars): {draft[:1500]}\n\n" | |
| f"QA Feedback: {qa_feedback[:1000]}" | |
| ) | |
| try: | |
| memory_manager.add_to_memory( | |
| summary_text, | |
| {"source": "knowledge_curator", "timestamp": datetime.utcnow().isoformat()} | |
| ) | |
| insights = {"added": True, "summary_snippet": summary_text[:500]} | |
| except Exception as e: | |
| insights = {"added": False, "error": str(e)} | |
| out = {"knowledgeInsights": insights, "execution_path": path} | |
| out.update(add_status_update("KnowledgeCurator", "Knowledge captured")) | |
| return out | |
| # ============================================================================= | |
| # SECTION 9: CONDITIONAL ROUTING FUNCTIONS (LIBERAL BUDGET POLICY) | |
| # ============================================================================= | |
| # Two versions of should_continue: | |
| # - should_continue: For BASE graph (routes to archivist_agent) | |
| # - should_continue_upgraded: For UPGRADED graph (routes to observer_agent) | |
| def should_continue(state: AgentState) -> str: | |
| """ | |
| BASE GRAPH: Determine next step after QA. | |
| Routes to: | |
| - archivist_agent: If approved (BASE GRAPH default) | |
| - disclaimer_agent: If budget exceeded by 120% or extreme rework limit | |
| - pm_agent: If needs rework (LIBERAL: allows many cycles) | |
| LIBERAL POLICY: Allow many rework cycles, only stop at 120% budget | |
| NOTE: This function is used by BOTH base and upgraded graphs. | |
| The base graph only has archivist_agent, while the upgraded graph | |
| will override the routing to use observer_agent. | |
| """ | |
| # Budget check - use 120% threshold (LIBERAL) | |
| current_cost = state.get("current_cost", 0.0) | |
| stop_threshold = state.get("stop_threshold") or 0.0 | |
| try: | |
| cost_f = float(current_cost) | |
| threshold_f = float(stop_threshold) | |
| # Only fail if cost exceeds 120% threshold | |
| if threshold_f > 0 and cost_f > threshold_f: | |
| log.warning(f"Cost ${cost_f} exceeds stop threshold ${threshold_f} (120% of budget)") | |
| return "disclaimer_agent" | |
| except Exception: | |
| pass | |
| # Check explicit budget_exceeded flag (should only be set at 120%) | |
| if state.get("budget_exceeded"): | |
| return "disclaimer_agent" | |
| try: | |
| rework = int(state.get("rework_cycles", 0)) | |
| max_loops_allowed = int(state.get("max_loops", 0)) | |
| except Exception: | |
| rework = state.get("rework_cycles", 0) or 0 | |
| max_loops_allowed = state.get("max_loops", 0) or 0 | |
| # If approved β success path (archivist for BASE graph) | |
| if state.get("approved"): | |
| return "archivist_agent" | |
| # LIBERAL POLICY: Allow up to 150% of max_loops before stopping | |
| liberal_max = int(max_loops_allowed * 1.5) if max_loops_allowed > 0 else 15 | |
| if rework > liberal_max: | |
| log.warning(f"Rework cycles {rework} exceeded liberal limit {liberal_max}") | |
| return "disclaimer_agent" | |
| # Default: Allow rework (liberal policy) | |
| log.info(f"Allowing rework cycle {rework} (liberal limit: {liberal_max})") | |
| return "pm_agent" | |
| def should_continue_upgraded(state: AgentState) -> str: | |
| """ | |
| UPGRADED GRAPH: Determine next step after QA (with Observer). | |
| Routes to: | |
| - observer_agent: If approved (goes to Observer β Archivist β Knowledge Curator) | |
| - disclaimer_agent: If budget exceeded by 120% or extreme rework limit | |
| - pm_agent: If needs rework (LIBERAL: allows many cycles) | |
| LIBERAL POLICY: Allow many rework cycles, only stop at 120% budget | |
| """ | |
| # Budget check - use 120% threshold (LIBERAL) | |
| current_cost = state.get("current_cost", 0.0) | |
| stop_threshold = state.get("stop_threshold") or 0.0 | |
| try: | |
| cost_f = float(current_cost) | |
| threshold_f = float(stop_threshold) | |
| # Only fail if cost exceeds 120% threshold | |
| if threshold_f > 0 and cost_f > threshold_f: | |
| log.warning(f"Cost ${cost_f} exceeds stop threshold ${threshold_f} (120% of budget)") | |
| return "disclaimer_agent" | |
| except Exception: | |
| pass | |
| # Check explicit budget_exceeded flag (should only be set at 120%) | |
| if state.get("budget_exceeded"): | |
| return "disclaimer_agent" | |
| try: | |
| rework = int(state.get("rework_cycles", 0)) | |
| max_loops_allowed = int(state.get("max_loops", 0)) | |
| except Exception: | |
| rework = state.get("rework_cycles", 0) or 0 | |
| max_loops_allowed = state.get("max_loops", 0) or 0 | |
| # If approved β success path (observer for UPGRADED graph) | |
| if state.get("approved"): | |
| return "observer_agent" | |
| # LIBERAL POLICY: Allow up to 150% of max_loops before stopping | |
| liberal_max = int(max_loops_allowed * 1.5) if max_loops_allowed > 0 else 15 | |
| if rework > liberal_max: | |
| log.warning(f"Rework cycles {rework} exceeded liberal limit {liberal_max}") | |
| return "disclaimer_agent" | |
| # Default: Allow rework (liberal policy) | |
| log.info(f"Allowing rework cycle {rework} (liberal limit: {liberal_max})") | |
| return "pm_agent" | |
| def should_run_experiment(state: AgentState) -> str: | |
| """ | |
| BASE GRAPH: Determine if experiment is needed. | |
| Routes to: | |
| - experimenter_agent: If experiment needed | |
| - synthesis_agent: If no experiment needed | |
| """ | |
| pm = state.get('pmPlan', {}) or {} | |
| return "experimenter_agent" if pm.get('experiment_needed') else "synthesis_agent" | |
| def detect_loop(state: AgentState) -> bool: | |
| """Detect execution loops - STRICT version.""" | |
| path = state.get("execution_path", []) | |
| # Check total length first | |
| if len(path) > MAX_EXECUTION_PATH_LENGTH: | |
| log.error(f"β Path limit: {len(path)} > {MAX_EXECUTION_PATH_LENGTH}") | |
| return True | |
| if len(path) < 10: | |
| return False | |
| # Deduplicate consecutive nodes | |
| deduplicated = [] | |
| prev = None | |
| for node in path: | |
| if node != prev: | |
| deduplicated.append(node) | |
| prev = node | |
| # Check recent window for repetitions | |
| recent = deduplicated[-LOOP_DETECTION_WINDOW:] | |
| from collections import Counter | |
| counts = Counter(recent) | |
| # Flag if any node repeats too much | |
| for node, count in counts.items(): | |
| if count >= LOOP_THRESHOLD: | |
| log.error(f"β Loop: {node} appeared {count}x in last {LOOP_DETECTION_WINDOW}") | |
| log.error(f"Path: {' β '.join(recent[-10:])}") | |
| return True | |
| # Check for alternating patterns | |
| if len(recent) >= 8: | |
| for i in range(len(recent) - 7): | |
| window = recent[i:i+8] | |
| if len(set(window)) == 2: # Only 2 unique nodes alternating | |
| log.error(f"β Alternating loop detected") | |
| return True | |
| return False | |
| def should_proceed_to_experimenter(state: AgentState) -> bool: | |
| """ | |
| Helper function to determine if safe to proceed to experimenter. | |
| Returns True if should proceed, False if should exit. | |
| """ | |
| exec_path = state.get("execution_path", []) | |
| governance_count = exec_path.count("Governance") | |
| log.info(f"π Checking proceed: governance #{governance_count}, path length {len(exec_path)}") | |
| # CRITICAL: ALWAYS proceed on first governance approval | |
| if governance_count <= 1: | |
| log.info("β First governance approval - ALWAYS PROCEED") | |
| return True | |
| # Check path length | |
| if len(exec_path) > MAX_EXECUTION_PATH_LENGTH: | |
| log.error(f"β Path too long: {len(exec_path)} > {MAX_EXECUTION_PATH_LENGTH}") | |
| return False | |
| # Check for Memory duplication (indicates graph bug) | |
| double_memory_count = 0 | |
| for i in range(len(exec_path) - 1): | |
| if exec_path[i] == "Memory" and exec_path[i+1] == "Memory": | |
| double_memory_count += 1 | |
| if double_memory_count > 0 and governance_count > 2: | |
| log.error(f"β Memory duplication ({double_memory_count}) + multiple governance = loop") | |
| return False | |
| # Check governance frequency | |
| if governance_count > 5: | |
| log.error(f"β Too many governance calls: {governance_count} > 5") | |
| return False | |
| # Check budget | |
| current_cost = state.get("current_cost", 0.0) | |
| stop_threshold = state.get("stop_threshold", 0.0) | |
| if stop_threshold > 0: | |
| try: | |
| if current_cost > stop_threshold: | |
| log.error(f"β Budget exceeded: ${current_cost} > ${stop_threshold}") | |
| return False | |
| except Exception: | |
| pass | |
| log.info("β Safe to proceed") | |
| return True | |
| def governance_decider(state: AgentState) -> str: | |
| """ | |
| FIXED: Simplified governance routing. | |
| Always proceeds to experimenter unless there's a critical blocker. | |
| """ | |
| gov = state.get("governanceReport", {}) or {} | |
| decision = gov.get("governanceDecision", "approve") | |
| log.info(f"π Governance decision: {decision}") | |
| # Check for explicit rejection | |
| if decision == "reject": | |
| log.warning("β Governance explicitly rejected") | |
| return "disclaimer_agent" | |
| # Use helper to determine if safe to proceed | |
| if should_proceed_to_experimenter(state): | |
| log.info("β PROCEEDING TO EXPERIMENTER") | |
| return "experimenter_agent" | |
| else: | |
| log.warning("β Cannot proceed - routing to disclaimer") | |
| return "disclaimer_agent" | |
| # ============================================================================= | |
| # SECTION 10: BASE GRAPH DEFINITION | |
| # ============================================================================= | |
| # Triage workflow (simple greeting vs task detection) | |
| triage_workflow = StateGraph(AgentState) | |
| triage_workflow.add_node("triage", run_triage_agent) | |
| triage_workflow.set_entry_point("triage") | |
| triage_workflow.add_edge("triage", END) | |
| triage_app = triage_workflow.compile() | |
| # Planner workflow (cost estimation) | |
| planner_workflow = StateGraph(AgentState) | |
| planner_workflow.add_node("planner", run_planner_agent) | |
| planner_workflow.set_entry_point("planner") | |
| planner_workflow.add_edge("planner", END) | |
| planner_app = planner_workflow.compile() | |
| # Main workflow (full execution pipeline) | |
| main_workflow = StateGraph(AgentState) | |
| # Add base nodes | |
| main_workflow.add_node("memory_retriever", run_memory_retrieval) | |
| main_workflow.add_node("intent_agent", run_intent_agent) | |
| main_workflow.add_node("pm_agent", run_pm_agent) | |
| main_workflow.add_node("experimenter_agent", run_experimenter_agent) | |
| main_workflow.add_node("synthesis_agent", run_synthesis_agent) | |
| main_workflow.add_node("qa_agent", run_qa_agent) | |
| main_workflow.add_node("archivist_agent", run_archivist_agent) | |
| main_workflow.add_node("disclaimer_agent", run_disclaimer_agent) | |
| # Set entry point | |
| main_workflow.set_entry_point("memory_retriever") | |
| # Define edges | |
| main_workflow.add_edge("memory_retriever", "intent_agent") | |
| main_workflow.add_edge("intent_agent", "pm_agent") | |
| main_workflow.add_edge("experimenter_agent", "synthesis_agent") | |
| main_workflow.add_edge("synthesis_agent", "qa_agent") | |
| main_workflow.add_edge("archivist_agent", END) | |
| main_workflow.add_edge("disclaimer_agent", END) | |
| # Conditional edges | |
| main_workflow.add_conditional_edges("pm_agent", should_run_experiment) | |
| # BASE GRAPH: Only routes to nodes that exist in base graph | |
| main_workflow.add_conditional_edges("qa_agent", should_continue, { | |
| "archivist_agent": "archivist_agent", | |
| "pm_agent": "pm_agent", | |
| "disclaimer_agent": "disclaimer_agent" | |
| }) | |
| # Compile base graph | |
| main_app = main_workflow.compile() | |
| log.info("=" * 60) | |
| log.info("BASE GRAPH COMPILED") | |
| log.info("Flow: Memory β Intent β PM β Experimenter β Synthesis β QA") | |
| log.info(" β Archivist/Disclaimer β END") | |
| log.info("=" * 60) | |
| # ============================================================================= | |
| # SECTION 11: UPGRADED GRAPH DEFINITION (LIBERAL BUDGET POLICY) | |
| # ============================================================================= | |
| def build_upgraded_graph() -> StateGraph: | |
| """ | |
| FIXED: Correct graph edges prevent Memory β Memory loop. | |
| """ | |
| upgraded_workflow = StateGraph(AgentState) | |
| # Add all nodes | |
| upgraded_workflow.add_node("memory_retriever", run_memory_retrieval) | |
| upgraded_workflow.add_node("intent_agent", run_intent_agent) | |
| upgraded_workflow.add_node("pm_agent", run_pm_agent) | |
| upgraded_workflow.add_node("pragmatist_agent", run_pragmatist_agent) | |
| upgraded_workflow.add_node("governance_agent", run_governance_agent) | |
| upgraded_workflow.add_node("experimenter_agent", run_experimenter_agent) | |
| upgraded_workflow.add_node("compliance_agent", run_compliance_agent) | |
| upgraded_workflow.add_node("synthesis_agent", run_synthesis_agent) | |
| upgraded_workflow.add_node("qa_agent", run_qa_agent) | |
| upgraded_workflow.add_node("observer_agent", run_observer_agent) | |
| upgraded_workflow.add_node("archivist_agent", run_archivist_agent) | |
| upgraded_workflow.add_node("knowledge_curator_agent", run_knowledge_curator_agent) | |
| upgraded_workflow.add_node("disclaimer_agent", run_disclaimer_agent) | |
| # CRITICAL: Set entry point | |
| upgraded_workflow.set_entry_point("memory_retriever") | |
| # CRITICAL: Linear flow - each node called ONCE per cycle | |
| # NO branches until governance | |
| upgraded_workflow.add_edge("memory_retriever", "intent_agent") # Memory β Intent | |
| upgraded_workflow.add_edge("intent_agent", "pm_agent") # Intent β PM | |
| upgraded_workflow.add_edge("pm_agent", "pragmatist_agent") # PM β Pragmatist | |
| upgraded_workflow.add_edge("pragmatist_agent", "governance_agent") # Pragmatist β Governance | |
| # Governance conditional (first branch point) | |
| upgraded_workflow.add_conditional_edges( | |
| "governance_agent", | |
| governance_decider, | |
| { | |
| "experimenter_agent": "experimenter_agent", | |
| "disclaimer_agent": "disclaimer_agent" | |
| } | |
| ) | |
| # CRITICAL: After experimenter - LINEAR (no loops back) | |
| upgraded_workflow.add_edge("experimenter_agent", "compliance_agent") | |
| upgraded_workflow.add_edge("compliance_agent", "synthesis_agent") | |
| upgraded_workflow.add_edge("synthesis_agent", "qa_agent") | |
| # QA conditional (second branch point) | |
| # CRITICAL: Rework goes back to PM, NOT Memory | |
| upgraded_workflow.add_conditional_edges( | |
| "qa_agent", | |
| should_continue_upgraded, | |
| { | |
| "observer_agent": "observer_agent", # Success path | |
| "pm_agent": "pm_agent", # Rework path (NOT to Memory!) | |
| "disclaimer_agent": "disclaimer_agent" # Failure path | |
| } | |
| ) | |
| # Success path | |
| upgraded_workflow.add_edge("observer_agent", "archivist_agent") | |
| upgraded_workflow.add_edge("archivist_agent", "knowledge_curator_agent") | |
| upgraded_workflow.add_edge("knowledge_curator_agent", END) | |
| # Disclaimer path | |
| upgraded_workflow.add_edge("disclaimer_agent", END) | |
| return upgraded_workflow | |
| # CRITICAL: Make sure NO OTHER edges exist that point to "memory_retriever" | |
| # The ONLY way to reach Memory should be at the very start | |
| def apply_upgrades() -> bool: | |
| """ | |
| Apply upgraded graph to replace the base graph. | |
| This function rebuilds the main_app with the upgraded workflow, | |
| adding governance, compliance, and observation layers. | |
| LIBERAL BUDGET POLICY ACTIVE: | |
| - 20% budget buffer | |
| - Stop only at 120% of budget | |
| - 10 β 20 rework cycle limits | |
| - Always proceed unless explicitly rejected | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| global main_app, main_workflow | |
| log.info("=" * 60) | |
| log.info("APPLYING GRAPH UPGRADES (LIBERAL BUDGET POLICY)") | |
| log.info("=" * 60) | |
| try: | |
| # Build upgraded graph | |
| upgraded_workflow = build_upgraded_graph() | |
| # Compile and replace | |
| main_app = upgraded_workflow.compile() | |
| main_workflow = upgraded_workflow | |
| log.info("β GRAPH UPGRADE SUCCESSFUL") | |
| log.info("=" * 60) | |
| log.info("New flow: Memory β Intent β PM β Pragmatist β Governance") | |
| log.info(" β Experimenter β Compliance β Synthesis β QA") | |
| log.info(" β Observer β Archivist β Knowledge Curator β END") | |
| log.info("=" * 60) | |
| log.info("LIBERAL BUDGET POLICY FEATURES:") | |
| log.info(" β’ 20% budget buffer applied") | |
| log.info(" β’ Stop threshold: 120% of user budget") | |
| log.info(" β’ Rework cycles: 10 β 15 β 20 (initial β liberal β hard)") | |
| log.info(" β’ Governance: Always proceed unless explicitly rejected") | |
| log.info("=" * 60) | |
| return True | |
| except Exception as e: | |
| log.exception(f"β Failed to apply graph upgrades: {e}") | |
| return False | |
| # ============================================================================= | |
| # SECTION 12: EXPORTS | |
| # ============================================================================= | |
| # Export all components | |
| __all__ = [ | |
| # State | |
| 'AgentState', | |
| # Helper functions | |
| 'ensure_list', | |
| 'ensure_int', | |
| 'sanitize_path', | |
| 'add_status_update', | |
| 'get_latest_status', | |
| # LLM and parsing | |
| 'llm', | |
| 'parse_json_from_llm', | |
| # Artifact functions | |
| 'detect_requested_output_types', | |
| 'normalize_experiment_type', | |
| 'write_notebook_from_text', | |
| 'write_script', | |
| 'write_docx_from_text', | |
| 'write_excel_from_tables', | |
| 'write_pdf_from_text', | |
| 'build_repo_zip', | |
| # Base agent nodes | |
| 'run_triage_agent', | |
| 'run_planner_agent', | |
| 'run_memory_retrieval', | |
| 'run_intent_agent', | |
| 'run_pm_agent', | |
| 'run_experimenter_agent', | |
| 'run_synthesis_agent', | |
| 'run_qa_agent', | |
| 'run_archivist_agent', | |
| 'run_disclaimer_agent', | |
| # Upgraded agent nodes | |
| 'run_pragmatist_agent', | |
| 'run_governance_agent', | |
| 'run_compliance_agent', | |
| 'run_observer_agent', | |
| 'run_knowledge_curator_agent', | |
| # Routing functions | |
| 'should_continue', | |
| 'should_continue_upgraded', | |
| 'should_run_experiment', | |
| 'governance_decider', | |
| 'detect_loop', | |
| # Graphs | |
| 'triage_app', | |
| 'planner_app', | |
| 'main_app', | |
| 'main_workflow', | |
| # Upgrade function | |
| 'apply_upgrades', | |
| 'build_upgraded_graph', | |
| # Configuration constants | |
| 'INITIAL_MAX_REWORK_CYCLES', | |
| 'BUDGET_BUFFER_MULTIPLIER', | |
| 'MAX_COST_MULTIPLIER', | |
| ] | |
| # Log initialization | |
| log.info("=" * 60) | |
| log.info("GRAPH MODULE INITIALIZED (LIBERAL BUDGET POLICY v2.0)") | |
| log.info(f"Base graph available as: main_app") | |
| log.info(f"To enable upgraded graph, call: apply_upgrades()") | |
| log.info("=" * 60) |