SPOC_V1 / graph_merged.py
JatinAutonomousLabs's picture
Update graph_merged.py
2f6eb67 verified
raw
history blame
84.3 kB
"""
graph_merged.py - Unified AI Lab Graph System (Liberal Budget Policy v2.0)
=============================================================================
LIBERAL BUDGET POLICY:
- 20% budget buffer on user input
- Stop only at 120% of user budget
- 10x rework cycles (3 β†’ 10 β†’ 20 hard limit)
- 150 node execution path limit
- Always proceed to Experimenter unless explicitly rejected
This file contains both the BASE and UPGRADED graph implementations:
- BASE GRAPH: Simple workflow (Memory β†’ Intent β†’ PM β†’ Experimenter β†’ Synthesis β†’ QA β†’ Archivist)
- UPGRADED GRAPH: Enhanced workflow with governance, compliance, and observation layers
Author: AI Lab Team
Last Updated: 2025-10-08
Version: 2.0 - Liberal Budget Policy
"""
# =============================================================================
# SECTION 1: IMPORTS AND CONFIGURATION
# =============================================================================
import json
import re
import math
import os
import uuid
import shutil
import zipfile
import operator
from typing import TypedDict, List, Dict, Optional, Annotated, Any
from datetime import datetime
# LangChain imports
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
# Local imports
from memory_manager import memory_manager
from code_executor import execute_python_code
from logging_config import setup_logging, get_logger
# Multi-language support
from multi_language_support import (
detect_language,
extract_code_blocks_multi_lang,
execute_code,
detect_requested_output_types_enhanced,
write_script_multi_lang,
LANGUAGES
)
# Artifact generation libraries
import nbformat
from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell
import pandas as pd
from docx import Document as DocxDocument
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet
# Setup logging
setup_logging()
log = get_logger(__name__)
# =============================================================================
# SECTION 2: CONFIGURATION CONSTANTS (LIBERAL BUDGET POLICY)
# =============================================================================
# REPLACE from line 55 to wherever these constants end
OUT_DIR = os.environ.get("OUT_DIR", "/tmp")
os.makedirs(OUT_DIR, exist_ok=True)
EXPORTS_DIR = os.path.join(OUT_DIR, "exports")
os.makedirs(EXPORTS_DIR, exist_ok=True)
# Cost and loop control - FIXED LIMITS
INITIAL_MAX_REWORK_CYCLES = 3 # FIXED: Was 100
BUDGET_BUFFER_MULTIPLIER = 1.10 # FIXED: Was 1.20
MAX_COST_MULTIPLIER = 1.15 # FIXED: Was 1.20
MAX_EXECUTION_PATH_LENGTH = 500 # FIXED: Was 150
LOOP_DETECTION_WINDOW = 20 # NEW
LOOP_THRESHOLD = 5 # NEW
GPT4O_INPUT_COST_PER_1K_TOKENS = 0.005
GPT4O_OUTPUT_COST_PER_1K_TOKENS = 0.015
AVG_TOKENS_PER_CALL = 2.0
# Artifact types
KNOWN_ARTIFACT_TYPES = {"notebook", "excel", "word", "pdf", "image", "repo", "script"}
log.info("=" * 60)
log.info("FIXED BUDGET POLICY ACTIVE")
log.info(f"Max rework cycles: {INITIAL_MAX_REWORK_CYCLES}")
log.info(f"Budget buffer: {int((BUDGET_BUFFER_MULTIPLIER - 1) * 100)}%")
log.info(f"Stop threshold: {int(MAX_COST_MULTIPLIER * 100)}% of budget")
log.info(f"Max path length: {MAX_EXECUTION_PATH_LENGTH}")
log.info("=" * 60)
# =============================================================================
# SECTION 3: STATE DEFINITION
# =============================================================================
class AgentState(TypedDict):
"""
State shared across all agents in the workflow.
Uses Annotated with operator.add for fields that multiple agents might update.
"""
# Core inputs
userInput: str
chatHistory: List[str]
coreObjectivePrompt: str
# Memory and planning
retrievedMemory: Optional[str]
pmPlan: Dict
# Execution
experimentCode: Optional[str]
experimentResults: Optional[Dict]
# Output
draftResponse: str
qaFeedback: Optional[str]
approved: bool
# Workflow control
execution_path: Annotated[List[str], operator.add]
rework_cycles: int
max_loops: int
status_updates: Annotated[List[Dict[str, str]], operator.add]
# Budget and governance (LIBERAL POLICY)
current_cost: float
budget: float
stop_threshold: float # NEW: 120% of budget
budget_exceeded: bool
flexible_budget_mode: bool
preferred_tier: Optional[str]
auto_accept_approved_with_warning: bool
# Reports (UPGRADED GRAPH ONLY)
pragmatistReport: Optional[Dict]
governanceReport: Optional[Dict]
complianceReport: Optional[Dict]
observerReport: Optional[Dict]
knowledgeInsights: Optional[Dict]
# =============================================================================
# SECTION 4: HELPER FUNCTIONS
# =============================================================================
def ensure_list(state: AgentState, key: str) -> List:
"""Safely extract a list from state, handling None and non-list types."""
v = state.get(key) if state else None
if v is None:
return []
if isinstance(v, list):
return v
if isinstance(v, tuple):
return list(v)
return [v]
def ensure_int(state: AgentState, key: str, default: int = 0) -> int:
"""Safely extract an integer from state."""
try:
v = state.get(key) if state else None
if v is None:
return default
return int(v)
except Exception:
return default
def sanitize_path(path: str) -> str:
"""Convert path to absolute path for security."""
return os.path.abspath(path)
def get_latest_status(state: AgentState) -> str:
"""Get the most recent status update from the list."""
updates = state.get('status_updates', [])
if updates and isinstance(updates, list):
for update in reversed(updates):
if isinstance(update, dict) and 'status' in update:
return update['status']
elif isinstance(update, str):
return update
return "Processing..."
def add_status_update(node_name: str, status: str) -> Dict[str, Any]:
"""
Create a status update entry.
Returns a dict suitable for merging into agent return values.
"""
return {
"status_updates": [{
"node": node_name,
"status": status,
"timestamp": datetime.utcnow().isoformat()
}]
}
# =============================================================================
# SECTION 5: LLM INITIALIZATION AND JSON PARSING
# =============================================================================
# Initialize LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0.4, max_retries=3, request_timeout=60)
def parse_json_from_llm(llm_output: str) -> Optional[dict]:
"""
Robust JSON extraction from LLM output.
Tries multiple strategies:
1. Explicit ```json {} ``` fenced blocks
2. Any fenced code block with JSON-like content
3. First balanced {...} substring
4. ast.literal_eval for Python-like dicts
5. Conservative cleanup (remove comments, trailing commas)
Returns:
dict or None on failure
"""
import ast
if not llm_output or not isinstance(llm_output, str) or not llm_output.strip():
return None
text = llm_output.strip()
# Strategy 1: Explicit JSON fenced block
match = re.search(r"```json\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE)
if match:
candidate = match.group(1).strip()
try:
return json.loads(candidate)
except Exception as e:
log.debug(f"json.loads failed on triple-backtick json block: {e}")
# Strategy 2: Any code fence with JSON-like content
match2 = re.search(r"```(?:json|python|text)?\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE)
if match2:
candidate = match2.group(1).strip()
try:
return json.loads(candidate)
except Exception as e:
log.debug(f"json.loads failed on fenced candidate: {e}")
# Strategy 3: Find balanced {...} substring
def find_balanced_brace_substring(s: str):
start_idx = None
depth = 0
for i, ch in enumerate(s):
if ch == '{':
if start_idx is None:
start_idx = i
depth += 1
elif ch == '}':
if depth > 0:
depth -= 1
if depth == 0 and start_idx is not None:
return s[start_idx:i+1]
return None
candidate = find_balanced_brace_substring(text)
# Strategy 4: Fallback heuristic
if not candidate:
first = text.find('{')
last = text.rfind('}')
if first != -1 and last != -1 and last > first:
candidate = text[first:last+1]
if candidate:
# Try direct JSON parsing
try:
return json.loads(candidate)
except Exception as e:
log.debug(f"json.loads failed on candidate substring: {e}")
# Try ast.literal_eval (handles single quotes)
try:
parsed = ast.literal_eval(candidate)
if isinstance(parsed, (dict, list)):
return json.loads(json.dumps(parsed))
except Exception as e:
log.debug(f"ast.literal_eval failed: {e}")
# Conservative cleanup
cleaned = candidate
try:
# Remove line comments
cleaned = re.sub(r"//.*?$", "", cleaned, flags=re.MULTILINE)
# Remove block comments
cleaned = re.sub(r"/\*.*?\*/", "", cleaned, flags=re.DOTALL)
# Remove trailing commas
cleaned = re.sub(r",\s*([}\]])", r"\1", cleaned)
# Replace single quotes with double quotes (carefully)
def _single_to_double(m):
inner = m.group(1)
inner_escaped = inner.replace('"', '\\"')
return f'"{inner_escaped}"'
cleaned = re.sub(r"(?<=[:\{\[,]\s*)'([^']*?)'", _single_to_double, cleaned)
return json.loads(cleaned)
except Exception as e:
log.debug(f"json.loads still failed after cleanup: {e}")
# All strategies failed
log.error(f"parse_json_from_llm failed. Output preview: {text[:200]}")
return None
# =============================================================================
# SECTION 6: ARTIFACT DETECTION AND GENERATION
# =============================================================================
def detect_requested_output_types(text: str) -> Dict:
"""
Detect what type of artifact the user wants.
Uses the enhanced version from multi_language_support.
"""
return detect_requested_output_types_enhanced(text)
def normalize_experiment_type(exp_type: Optional[str], goal_text: str) -> str:
"""Normalize experiment type to known artifact types."""
if not exp_type:
detection = detect_requested_output_types(goal_text or "")
return detection.get("artifact_type") or "word"
s = exp_type.strip().lower()
if s in KNOWN_ARTIFACT_TYPES:
return s
# Map variations to known types
mapping = {
"notebook": ["notebook", "ipynb", "jupyter"],
"excel": ["excel", "xlsx", "spreadsheet"],
"word": ["word", "docx", "doc"],
"pdf": ["pdf"],
"repo": ["repo", "repository", "backend", "codebase"],
"script": ["script", "python", "py"]
}
for known_type, keywords in mapping.items():
if any(kw in s for kw in keywords):
return known_type
# Fallback: use detection
detection = detect_requested_output_types(goal_text or "")
return detection.get("artifact_type") or "word"
def write_notebook_from_text(llm_text: str, out_dir: Optional[str] = None) -> str:
"""Generate a Jupyter notebook from LLM output."""
out_dir = out_dir or OUT_DIR
os.makedirs(out_dir, exist_ok=True)
# Extract code blocks
code_blocks = re.findall(r"```python\s*(.*?)\s*```", llm_text, re.DOTALL)
if not code_blocks:
code_blocks = re.findall(r"```\s*(.*?)\s*```", llm_text, re.DOTALL)
# Split markdown sections
md_parts = re.split(r"```(?:python)?\s*.*?\s*```", llm_text, flags=re.DOTALL)
# Build notebook
nb = new_notebook()
cells = []
max_len = max(len(md_parts), len(code_blocks))
for i in range(max_len):
if i < len(md_parts) and md_parts[i].strip():
cells.append(new_markdown_cell(md_parts[i].strip()))
if i < len(code_blocks) and code_blocks[i].strip():
cells.append(new_code_cell(code_blocks[i].strip()))
if not cells:
cells = [new_markdown_cell("# Notebook\n\nNo content generated.")]
nb['cells'] = cells
uid = uuid.uuid4().hex[:10]
filename = os.path.join(out_dir, f"generated_notebook_{uid}.ipynb")
nbformat.write(nb, filename)
return filename
def write_script(code_text: str, language_hint: Optional[str] = None, out_dir: Optional[str] = None) -> str:
"""
Write a script file with appropriate extension.
Uses multi-language support.
"""
return write_script_multi_lang(code_text, language_hint, out_dir)
def write_docx_from_text(text: str, out_dir: Optional[str] = None) -> str:
"""Generate a Word document from text."""
out_dir = out_dir or OUT_DIR
os.makedirs(out_dir, exist_ok=True)
doc = DocxDocument()
for para in [p.strip() for p in text.split("\n\n") if p.strip()]:
doc.add_paragraph(para)
uid = uuid.uuid4().hex[:10]
filename = os.path.join(out_dir, f"generated_doc_{uid}.docx")
doc.save(filename)
return filename
def write_excel_from_tables(maybe_table_text: str, out_dir: Optional[str] = None) -> str:
"""Generate an Excel file from table data."""
out_dir = out_dir or OUT_DIR
os.makedirs(out_dir, exist_ok=True)
uid = uuid.uuid4().hex[:10]
filename = os.path.join(out_dir, f"generated_excel_{uid}.xlsx")
try:
# Try to parse as JSON
try:
parsed = json.loads(maybe_table_text)
if isinstance(parsed, list):
df = pd.DataFrame(parsed)
elif isinstance(parsed, dict):
df = pd.DataFrame([parsed])
else:
df = pd.DataFrame({"content": [str(maybe_table_text)]})
except Exception:
# Try CSV parsing
if "," in maybe_table_text:
from io import StringIO
df = pd.read_csv(StringIO(maybe_table_text))
else:
df = pd.DataFrame({"content": [maybe_table_text]})
df.to_excel(filename, index=False, engine="openpyxl")
return filename
except Exception as e:
log.error(f"Excel creation failed: {e}")
return write_docx_from_text(f"Excel error: {e}\n\n{maybe_table_text}", out_dir=out_dir)
def write_pdf_from_text(text: str, out_dir: Optional[str] = None) -> str:
"""Generate a PDF from text."""
out_dir = out_dir or OUT_DIR
os.makedirs(out_dir, exist_ok=True)
uid = uuid.uuid4().hex[:10]
filename = os.path.join(out_dir, f"generated_doc_{uid}.pdf")
try:
doc = SimpleDocTemplate(filename)
styles = getSampleStyleSheet()
flowables = []
for para in [p.strip() for p in text.split("\n\n") if p.strip()]:
flowables.append(Paragraph(para.replace("\n", "<br/>"), styles["Normal"]))
flowables.append(Spacer(1, 8))
doc.build(flowables)
return filename
except Exception as e:
log.error(f"PDF creation failed: {e}")
return write_docx_from_text(f"PDF error: {e}\n\n{text}", out_dir=out_dir)
def build_repo_zip(files_map: Dict[str, str], repo_name: str = "generated_app",
out_dir: Optional[str] = None) -> str:
"""Build a repository as a ZIP file."""
out_dir = out_dir or OUT_DIR
os.makedirs(out_dir, exist_ok=True)
uid = uuid.uuid4().hex[:8]
repo_dir = os.path.join(out_dir, f"{repo_name}_{uid}")
os.makedirs(repo_dir, exist_ok=True)
# Write all files
for rel_path, content in files_map.items():
dest = os.path.join(repo_dir, rel_path)
os.makedirs(os.path.dirname(dest), exist_ok=True)
if isinstance(content, str) and os.path.exists(content):
shutil.copyfile(content, dest)
else:
with open(dest, "w", encoding="utf-8") as fh:
fh.write(str(content))
# Create ZIP
zip_path = os.path.join(out_dir, f"{repo_name}_{uid}.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for root, _, files in os.walk(repo_dir):
for f in files:
full = os.path.join(root, f)
arc = os.path.relpath(full, repo_dir)
zf.write(full, arc)
return zip_path
# =============================================================================
# SECTION 7: BASE GRAPH AGENT NODES
# =============================================================================
# These are the core agents that form the foundation of the workflow.
def run_triage_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Determines if input is a greeting or task.
Simple classifier to route casual interactions.
"""
log.info("--- TRIAGE ---")
prompt = f"Is this a greeting or a task? '{state.get('userInput', '')}' Reply: 'greeting' or 'task'"
response = llm.invoke(prompt)
content = getattr(response, "content", "") or ""
if 'greeting' in content.lower():
return {
"draftResponse": "Hello! How can I help?",
"execution_path": ["Triage"],
**add_status_update("Triage", "Greeting")
}
return {
"execution_path": ["Triage"],
**add_status_update("Triage", "Task detected")
}
def run_planner_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Quick planning agent for cost estimation.
Used for initial triage and budgeting.
Robust behavior:
- ask LLM for JSON-only output
- try parse_json_from_llm (existing)
- if that fails, attempt regex-extract JSON and json.loads
- retry LLM with low temperature if parsing fails
- fallback to heuristic plaintext -> plan conversion (partial result)
- compute costs with sane clamping and document LLM preview when fallbacked
"""
import json
import re
import logging
log.info("--- PLANNER ---")
path = ensure_list(state, 'execution_path') + ["Planner"]
user_input = state.get('userInput', '') or ""
# Build a strict JSON-only prompt with schema + example
prompt = (
"You are a planning assistant. **Return ONLY valid JSON** (no explanation, no markdown).\n"
"Schema: {\"plan\": [string], \"estimated_llm_calls_per_loop\": integer}\n"
"Example:\n"
"{\n"
" \"plan\": [\"Market research: 1 week\", \"MVP: 4 weeks\"],\n"
" \"estimated_llm_calls_per_loop\": 4\n"
"}\n\n"
f"Create a plan for: {json.dumps(user_input)}\n"
"Keep plan concise (each item one short sentence)."
)
# Helper: extract JSON blob with regex
def _extract_json_blob(text: str) -> Optional[str]:
if not text:
return None
# Try to find outermost JSON object or array
# Use non-greedy match between braces/brackets to capture first JSON-like blob
m = re.search(r'(\{[\s\S]*\})', text)
if m:
return m.group(1)
m2 = re.search(r'(\[[\s\S]*\])', text)
if m2:
return m2.group(1)
return None
# Try primary LLM call
try:
response = llm.invoke(prompt)
except Exception as e:
log.exception("Planner: llm.invoke failed: %s", e)
return {
"pmPlan": {"error": "LLM invoke failed"},
"execution_path": path,
**add_status_update("Planner", "Error")
}
raw = (getattr(response, "content", "") or "").strip()
# 1) Try existing parser if available
plan_data = None
try:
# prefer using existing parse_json_from_llm if present
plan_data = parse_json_from_llm(raw) if 'parse_json_from_llm' in globals() else None
except Exception:
plan_data = None
# 2) If parse_json_from_llm failed, try regex extraction + json.loads
if not plan_data:
try:
blob = _extract_json_blob(raw)
if blob:
plan_data = json.loads(blob)
except Exception:
plan_data = None
# 3) Retry the LLM with stricter settings (if parse still failed)
if not plan_data:
try:
log.info("Planner: parse failed, retrying with stricter instruction / low temp.")
# Attempt a retry - adapt params to your llm wrapper if needed
retry_prompt = prompt + "\n\nIMPORTANT: Return only JSON, nothing else."
# Some wrappers accept params, try best-effort; ignore if it errors
retry_resp = None
try:
retry_resp = llm.invoke(retry_prompt, params={"temperature": 0.0})
except Exception:
retry_resp = llm.invoke(retry_prompt)
raw_retry = (getattr(retry_resp, "content", "") or "").strip()
# attempt parse_json_from_llm on retry
try:
plan_data = parse_json_from_llm(raw_retry) if 'parse_json_from_llm' in globals() else None
except Exception:
plan_data = None
if not plan_data:
blob = _extract_json_blob(raw_retry)
if blob:
try:
plan_data = json.loads(blob)
raw = raw + "\n\n--- retry ---\n\n" + raw_retry
except Exception:
plan_data = None
except Exception as e:
log.exception("Planner retry failed: %s", e)
# 4) Final fallback: convert plaintext into a minimal plan so pipeline continues
if not plan_data:
log.warning("Planner: JSON parse failed, using plaintext fallback.")
# heuristics to make steps
fragments = [f.strip() for f in re.split(r'\n+|(?<=\.)\s+', raw) if f.strip()]
steps = []
for frag in fragments:
s = frag if len(frag) <= 200 else frag[:197] + "..."
if len(s) > 10 and s not in steps:
steps.append(s)
if len(steps) >= 8:
break
if not steps:
steps = [
"Clarify requirements with user.",
"Perform market research and competitor analysis.",
"Draft curriculum and module outline.",
"Estimate effort and resources for MVP."
]
plan_data = {
"plan": steps,
"estimated_llm_calls_per_loop": 3,
# add debug info for observability
"notes": "Derived from plaintext fallback; original LLM output not valid JSON.",
"llm_preview": raw[:2000]
}
# Sanitize and normalize plan_data fields
if not isinstance(plan_data, dict):
plan_data = {"plan": ["Clarify requirements with user."], "estimated_llm_calls_per_loop": 3}
if 'plan' not in plan_data or not isinstance(plan_data['plan'], list):
plan_data['plan'] = plan_data.get('plan') and list(plan_data.get('plan')) or ["Clarify requirements with user."]
try:
calls = int(plan_data.get('estimated_llm_calls_per_loop', 3))
except Exception:
calls = 3
# clamp calls to sensible range
calls = max(1, min(calls, 50))
plan_data['estimated_llm_calls_per_loop'] = calls
# Cost calculation: tokens * avg_cost_per_1k / 1000 -> USD
avg_cost_per_1k = (GPT4O_INPUT_COST_PER_1K_TOKENS + GPT4O_OUTPUT_COST_PER_1K_TOKENS) / 2.0
# AVG_TOKENS_PER_CALL is tokens per call
try:
avg_tokens = float(AVG_TOKENS_PER_CALL)
except Exception:
avg_tokens = 1500.0
cost_per_loop = (calls * avg_tokens) * (avg_cost_per_1k / 1000.0)
plan_data['max_loops_initial'] = INITIAL_MAX_REWORK_CYCLES
plan_data['cost_per_loop_usd'] = max(0.01, round(cost_per_loop, 4))
total_loops = INITIAL_MAX_REWORK_CYCLES + 1
plan_data['estimated_cost_usd'] = round(cost_per_loop * total_loops, 2)
# Detect artifact requirements (preserve existing behavior)
detection = detect_requested_output_types(user_input)
if detection.get('requires_artifact'):
plan_data.setdefault('experiment_needed', True)
plan_data.setdefault('experiment_type', detection.get('artifact_type'))
plan_data.setdefault('experiment_goal', user_input)
return {
"pmPlan": plan_data,
"execution_path": path,
**add_status_update("Planner", "Plan created")
}
def run_memory_retrieval(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Retrieve relevant memories for context.
Uses vector similarity search on past interactions.
"""
log.info("--- MEMORY ---")
path = ensure_list(state, 'execution_path') + ["Memory"]
mems = memory_manager.retrieve_relevant_memories(state.get('userInput', ''))
context = "\n".join([f"Memory: {m.page_content}" for m in mems]) if mems else "No memories"
return {
"retrievedMemory": context,
"execution_path": path,
**add_status_update("Memory", "Memory retrieved")
}
def run_intent_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Clarify and refine user intent.
Transforms raw input into clear, actionable objective.
"""
log.info("--- INTENT ---")
path = ensure_list(state, 'execution_path') + ["Intent"]
prompt = (
f"Refine into clear objective.\n\n"
f"Memory: {state.get('retrievedMemory')}\n\n"
f"Request: {state.get('userInput', '')}\n\n"
f"Core Objective:"
)
response = llm.invoke(prompt)
core_obj = getattr(response, "content", "") or ""
return {
"coreObjectivePrompt": core_obj,
"execution_path": path,
**add_status_update("Intent", "Objective clarified")
}
def validate_and_fix_pm_plan(plan: Dict, user_input: str) -> Dict:
"""
CRITICAL: Forces artifact creation if user requested document output.
Prevents experiment_needed=false when user explicitly wants files.
"""
if not isinstance(plan, dict):
plan = {}
# Detect document/file requests
needs_artifact = any(keyword in user_input.lower() for keyword in [
'docx', 'document', 'word', 'file', 'pdf', 'excel',
'output -', 'output:', 'results in', 'deliver as',
'save as', 'export to', 'create file', 'generate file',
'avoid code', 'no code' # If avoiding code, must want document
])
if needs_artifact:
log.info("πŸ” User requested document/file output - forcing artifact creation")
# FORCE artifact creation
if not plan.get("experiment_needed") or plan.get("experiment_needed") == False:
log.warning("⚠️ Overriding experiment_needed from false to TRUE")
plan["experiment_needed"] = True
# Set document type
if 'pdf' in user_input.lower():
plan["experiment_type"] = "pdf"
elif 'excel' in user_input.lower() or 'xlsx' in user_input.lower():
plan["experiment_type"] = "excel"
else:
plan["experiment_type"] = "word" # Default to Word
# Ensure goal is set
if not plan.get("experiment_goal"):
plan["experiment_goal"] = user_input
log.info(f"πŸ“‹ Forced: experiment_needed=True, type={plan.get('experiment_type')}")
# Ensure basic plan structure
if not plan.get("plan_steps") or not isinstance(plan["plan_steps"], list):
plan["plan_steps"] = [
"Analyze request",
"Gather information",
"Create document",
"Format output"
]
return plan
def run_pm_agent(state: AgentState) -> Dict[str, Any]:
"""
FIXED: PM Agent with proper hard limits and language preference.
"""
log.info("--- PM ---")
# Get current state
current_rework = ensure_int(state, 'rework_cycles', 0)
max_loops_val = ensure_int(state, 'max_loops', INITIAL_MAX_REWORK_CYCLES)
# CRITICAL FIX 1: Cap max_loops if unreasonable
if max_loops_val > 10:
log.warning(f"⚠️ max_loops {max_loops_val} TOO HIGH! Forcing to 3")
max_loops_val = 3
state["max_loops"] = 3
# Calculate hard limit (never exceed 6 total reworks)
hard_limit = min(max_loops_val * 2, 6)
log.info(f"πŸ“Š Rework cycle: {current_rework}/{hard_limit} (max: {max_loops_val})")
# CRITICAL FIX 2: Force completion if at hard limit
if current_rework >= hard_limit:
log.error(f"❌ Hard limit reached: {current_rework} >= {hard_limit}")
path = ensure_list(state, 'execution_path') + ["PM"]
user_input = state.get('userInput', '')
is_research = any(kw in user_input.lower() for kw in [
'research', 'everything to start', 'comprehensive', 'guide to'
])
# Create completion plan
completion_plan = {
"plan_steps": [
"Create comprehensive research document",
"Include market analysis and competitive review",
"Add Python code examples for AI implementation",
"Provide actionable implementation roadmap"
],
"experiment_needed": True,
"experiment_type": "word", # Force document for research
"experiment_goal": state.get('coreObjectivePrompt', user_input)
}
return {
"pmPlan": completion_plan,
"execution_path": path,
"rework_cycles": current_rework,
"approved": True, # FORCE APPROVAL to complete
"max_loops": 3,
**add_status_update("PM", "Limit reached - completing with document")
}
# CRITICAL FIX 3: Check for loops (but not on first few cycles)
if current_rework > 1:
if detect_loop(state):
log.error("❌ Loop detected in PM agent")
path = ensure_list(state, 'execution_path') + ["PM"]
# Force completion
return {
"pmPlan": {
"plan_steps": ["Create research document with analysis"],
"experiment_needed": True,
"experiment_type": "word",
"experiment_goal": state.get('coreObjectivePrompt', '')
},
"execution_path": path,
"rework_cycles": current_rework,
"approved": True,
**add_status_update("PM", "Loop detected - forcing completion")
}
# Increment cycle counter
current_cycles = current_rework + 1
path = ensure_list(state, 'execution_path') + ["PM"]
# Build planning context
user_input = state.get('userInput', '')
context_parts = [
f"=== USER REQUEST ===\n{user_input}",
f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}",
]
# Add language requirement
if 'prefer python' in user_input.lower() or 'use python' in user_input.lower():
context_parts.append("\n=== LANGUAGE REQUIREMENT ===\nUSER REQUIRES PYTHON")
# Add feedback if rework
if state.get('qaFeedback'):
context_parts.append(f"\n=== QA FEEDBACK ===\n{state.get('qaFeedback')}")
context_parts.append(f"\n=== REWORK CYCLE {current_cycles}/{hard_limit} ===")
full_context = "\n".join(context_parts)
# Detect if research request
is_research = any(kw in user_input.lower() for kw in [
'research', 'analysis', 'everything to start', 'comprehensive'
])
# Create planning prompt
prompt = f"""Create DETAILED execution plan.
{full_context}
{"IMPORTANT: This is a research request. Create a document with Python code examples." if is_research else ""}
Return JSON:
{{
"plan_steps": ["step 1", "step 2", ...],
"experiment_needed": true/false,
"experiment_type": "{"word" if is_research else "script|notebook|excel|word|pdf|repo"}",
"experiment_goal": "specific goal",
"key_requirements": ["req 1", "req 2", ...]
}}
Be specific and actionable.
"""
# Get plan from LLM
try:
response = llm.invoke(prompt)
plan = parse_json_from_llm(getattr(response, "content", "") or "")
except Exception as e:
log.warning(f"PM LLM failed: {e}")
plan = None
# CRITICAL: Validate and fix plan BEFORE using it
user_input = state.get('userInput', '')
plan = validate_and_fix_pm_plan(plan or {}, user_input)
# Log what we're doing
log.info(f"πŸ“‹ Final plan: experiment_needed={plan.get('experiment_needed')}, type={plan.get('experiment_type')}")
# Fallback if still no plan structure
if not plan.get("plan_steps"):
plan["plan_steps"] = [
"Analyze requirements",
"Research market",
"Create deliverable"
]
# Normalize experiment type
exp_type = normalize_experiment_type(
plan.get('experiment_type'),
plan.get('experiment_goal', '')
)
plan['experiment_type'] = exp_type
# Ensure goal is set
if plan.get('experiment_needed') and not plan.get('experiment_goal'):
plan['experiment_goal'] = user_input
# Add loop control info
plan['max_loops_initial'] = max_loops_val
plan['hard_limit'] = hard_limit
return {
"pmPlan": plan,
"execution_path": path,
"rework_cycles": current_cycles,
"max_loops": max_loops_val,
**add_status_update("PM", f"Plan created - Cycle {current_cycles}/{hard_limit}")
}
def extract_user_language_preference(user_input: str) -> str:
"""Extract language preference from user rules."""
text = user_input.lower()
# Check for explicit Python preference
if any(p in text for p in ["prefer python", "use python", "always use python", "code", "provide script"]):
return "python"
# Check for "unless" clause
if "unless" in text and "python" in text:
# User wants Python unless explicitly stated otherwise
if not any(lang in text for lang in ["typescript", "javascript", "java", "c++", "R", "PHP", "Scala", "MATLAB", "SQL"]):
return "python"
return None
def run_experimenter_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Generates and executes code/artifacts.
Handles:
- Multi-language code generation
- Artifact creation (notebooks, scripts, documents)
- Code execution
- Result capture
"""
log.info("--- EXPERIMENTER ---")
path = ensure_list(state, 'execution_path') + ["Experimenter"]
pm = state.get('pmPlan', {}) or {}
# Skip if no experiment needed
if not pm.get('experiment_needed'):
return {
"experimentCode": None,
"experimentResults": None,
"execution_path": path,
**add_status_update("Experimenter", "No experiment needed")
}
# ⭐ ADD THIS SECTION ⭐
user_input = state.get('userInput', '')
# CRITICAL: Check user's language preference FIRST
forced_lang = extract_user_language_preference(user_input)
# CRITICAL: Detect research requests
is_research = any(kw in user_input.lower() for kw in [
'research', 'analysis', 'everything to start',
'comprehensive', 'guide to starting', 'study',
'review', 'summarize'
])
# Force document output for research
if is_research:
exp_type = 'word'
log.info("πŸ” Research request detected - using document format")
else:
exp_type = normalize_experiment_type(
pm.get('experiment_type'),
pm.get('experiment_goal', '')
)
# Use forced language or detect
language = forced_lang or 'python' # Default to Python
log.info(f"πŸ“ Type: {exp_type}, Language: {language}")
# ⭐ END NEW SECTION ⭐
# Detect language and artifact type
detected = detect_requested_output_types_enhanced(pm.get('experiment_goal', ''))
language = detected.get('language', 'python')
exp_type = normalize_experiment_type(pm.get('experiment_type'), pm.get('experiment_goal', ''))
goal = pm.get('experiment_goal', 'No goal')
# Build rich context
context_parts = [
f"=== USER REQUEST ===\n{state.get('userInput', '')}",
f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}",
f"\n=== PLAN ===\n{json.dumps(pm.get('plan_steps', []), indent=2)}",
f"\n=== REQUIREMENTS ===\n{json.dumps(pm.get('key_requirements', []), indent=2)}",
]
if state.get('retrievedMemory'):
context_parts.append(f"\n=== CONTEXT ===\n{state.get('retrievedMemory', '')}")
if state.get('qaFeedback'):
context_parts.append(f"\n=== FEEDBACK TO ADDRESS ===\n{state.get('qaFeedback', '')}")
full_context = "\n".join(context_parts)
# Get language configuration
lang_config = LANGUAGES.get(language)
lang_name = lang_config.name if lang_config else "Code"
# OLD PROMPT (DELETE):
# enhanced_prompt = f"""Create {lang_name} {exp_type} artifact...
# NEW PROMPT (USE THIS):
if exp_type == 'word':
# Research document prompt
enhanced_prompt = f"""Create COMPREHENSIVE RESEARCH DOCUMENT.
USER REQUEST: {user_input}
OBJECTIVE: {state.get('coreObjectivePrompt', '')}
OUTPUT REQUIREMENTS:
- Complete research document in markdown format
- Include ALL relevant sections (market analysis, competitors, strategies, implementation)
- Add Python code EXAMPLES where relevant (AI integration, tools, APIs)
- Be SPECIFIC and ACTIONABLE - provide real data and analysis
- NO placeholders - complete, usable content only
- Structure: Executive Summary, Market Analysis, Competitive Review,
Curriculum Design, Implementation Plan, Tools & Tech Stack
Generate complete research document with Python examples:"""
else:
# Code generation prompt
enhanced_prompt = f"""Create {lang_name} {exp_type}.
USER REQUEST: {user_input}
REQUIREMENTS:
- Write ONLY {lang_name} code (user specified)
- Follow best practices
- Include documentation
- Production-ready
- Include error handling
Generate {lang_name} code:"""
response = llm.invoke(enhanced_prompt)
llm_text = getattr(response, "content", "") or ""
# ⭐ ADD FOR RESEARCH DOCUMENTS ⭐
if exp_type == 'word':
# Save as document
doc_path = write_docx_from_text(llm_text, out_dir=OUT_DIR)
txt_path = doc_path.replace('.docx', '.txt')
# Also save as text
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(llm_text)
log.info(f"πŸ“„ Created: {os.path.basename(doc_path)}")
return {
"experimentCode": llm_text,
"experimentResults": {
"success": True,
"paths": {
"document": sanitize_path(doc_path),
"text": sanitize_path(txt_path)
},
"stdout": f"Research document created: {os.path.basename(doc_path)}",
"language": "markdown",
"artifact_type": "research_document"
},
"execution_path": path,
**add_status_update("Experimenter", "Research doc created")
}
# ⭐ END RESEARCH SECTION ⭐
# Continue with normal code handling...
# Extract code blocks with language detection
code_blocks = extract_code_blocks_multi_lang(llm_text)
if code_blocks:
# Use first detected language/code pair
detected_lang, code_text = code_blocks[0]
# Write script with proper extension
script_path = write_script_multi_lang(code_text, detected_lang, out_dir=OUT_DIR)
# Execute with appropriate runner
exec_results = execute_code(code_text, detected_lang)
results = {
"success": exec_results.get("exit_code", 0) == 0,
"paths": {"script": sanitize_path(script_path)},
"stdout": exec_results.get("stdout", ""),
"stderr": exec_results.get("stderr", ""),
"language": detected_lang,
"context_used": len(full_context)
}
return {
"experimentCode": code_text,
"experimentResults": results,
"execution_path": path,
**add_status_update("Experimenter", f"{lang_name} script created")
}
# No code blocks found
return {
"experimentCode": None,
"experimentResults": {"success": False, "error": "No code generated"},
"execution_path": path,
**add_status_update("Experimenter", "Code generation failed")
}
def run_synthesis_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Synthesizes final response from all components.
Combines:
- User request
- Execution results
- Artifacts created
- Insights and explanations
"""
log.info("--- SYNTHESIS ---")
_state = state or {}
path = ensure_list(_state, 'execution_path') + ["Synthesis"]
exp_results = _state.get('experimentResults')
pm_plan = _state.get('pmPlan', {}) or {}
# Build synthesis context
synthesis_context = [
f"=== USER REQUEST ===\n{_state.get('userInput', '')}",
f"\n=== OBJECTIVE ===\n{_state.get('coreObjectivePrompt', '')}",
f"\n=== PLAN ===\n{json.dumps(pm_plan.get('plan_steps', []), indent=2)}",
]
artifact_details = []
artifact_message = ""
# Process experiment results
if exp_results and isinstance(exp_results, dict):
paths = exp_results.get("paths") or {}
if paths:
artifact_lines = []
for artifact_type, artifact_path in paths.items():
artifact_lines.append(f"- **{artifact_type.title()}**: `{os.path.basename(artifact_path)}`")
artifact_details.append(f"{artifact_type}: {artifact_path}")
artifact_message = "\n\n**Artifacts Generated:**\n" + "\n".join(artifact_lines)
synthesis_context.append(f"\n=== ARTIFACTS ===\n" + "\n".join(artifact_details))
if exp_results.get('stdout'):
synthesis_context.append(f"\n=== OUTPUT ===\n{exp_results.get('stdout', '')}")
if exp_results.get('stderr'):
synthesis_context.append(f"\n=== ERRORS ===\n{exp_results.get('stderr', '')}")
full_context = "\n".join(synthesis_context)
# Generate final response
synthesis_prompt = f"""Create FINAL RESPONSE after executing user's request.
{full_context}
Create comprehensive response that:
- Directly addresses original request
- Explains what was accomplished and HOW
- References specific artifacts and explains PURPOSE
- Provides context on how to USE deliverables
- Highlights KEY INSIGHTS
- Suggests NEXT STEPS if relevant
- Be SPECIFIC about what was created."""
response = llm.invoke(synthesis_prompt)
final_text = getattr(response, "content", "") or ""
# After getting exp_results...
# ⭐ ADD THIS SECTION ⭐
content_preview = ""
if exp_results and isinstance(exp_results, dict):
paths = exp_results.get("paths") or {}
if paths:
for artifact_type, artifact_path in paths.items():
if os.path.exists(artifact_path):
try:
with open(artifact_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
preview = content[:2000] # First 2000 chars
content_preview += f"\n\n### πŸ“„ {artifact_type.title()} Preview:\n```\n{preview}\n...\n```\n"
except Exception as e:
log.warning(f"Could not preview {artifact_path}: {e}")
# ⭐ END NEW SECTION ⭐
# Then in final_text assembly:
if artifact_message:
final_text = final_text + "\n\n---\n" + artifact_message
if content_preview:
final_text = final_text + "\n\n---\n## Content Preview\n" + content_preview
return {
"draftResponse": final_text,
"execution_path": path,
**add_status_update("Synthesis", "Response synthesized")
}
def run_qa_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Quality assurance review.
Validates:
- Completeness
- Correctness
- Alignment with user request
- Quality standards
"""
log.info("--- QA ---")
path = ensure_list(state, 'execution_path') + ["QA"]
# Build QA context
qa_context = [
f"=== REQUEST ===\n{state.get('userInput', '')}",
f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}",
f"\n=== DRAFT ===\n{state.get('draftResponse', '')}",
]
if state.get('experimentResults'):
qa_context.append(
f"\n=== ARTIFACTS ===\n"
f"{json.dumps(state.get('experimentResults', {}).get('paths', {}), indent=2)}"
)
prompt = f"""You are a QA reviewer. Review the draft response against the user's objective.
{chr(10).join(qa_context)}
Review Instructions:
- Does the draft and its artifacts COMPLETELY satisfy ALL parts of the user's request?
- Is the quality of the work high?
- If this is a re-submission (rework cycle > 1), has the previous feedback been successfully addressed?
Response Format (required JSON or a single word 'APPROVED'):
Either return EXACTLY the single word:
APPROVED
Or return JSON like:
{{
"approved": false,
"feedback": "Specific, actionable items to fix (bullet list or numbered).",
"required_changes": ["..."]
}}
"""
try:
response = llm.invoke(prompt)
content = getattr(response, "content", "") or ""
except Exception as e:
log.exception(f"QA LLM call failed: {e}")
return {
"approved": False,
"qaFeedback": "QA LLM failed; manual review required.",
"execution_path": path,
**add_status_update("QA", "QA failed")
}
# Check for simple APPROVED response
if "APPROVED" in content.strip().upper() and len(content.strip()) <= 20:
return {
"approved": True,
"qaFeedback": None,
"execution_path": path,
**add_status_update("QA", "Approved")
}
# Try JSON parsing
parsed = parse_json_from_llm(content)
if isinstance(parsed, dict):
approved = bool(parsed.get("approved", False))
feedback = parsed.get("feedback") or parsed.get("qaFeedback") or parsed.get("required_changes") or ""
# Normalize feedback to string
if isinstance(feedback, list):
feedback = "\n".join([str(x) for x in feedback])
elif not isinstance(feedback, str):
feedback = str(feedback)
return {
"approved": approved,
"qaFeedback": feedback if not approved else None,
"execution_path": path,
**add_status_update("QA", "QA completed")
}
# Fallback: treat as feedback (not approved)
safe_feedback = content.strip()[:2000] or "QA produced no actionable output."
return {
"approved": False,
"qaFeedback": safe_feedback,
"execution_path": path,
**add_status_update("QA", "QA needs rework")
}
def run_archivist_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Archives successful interactions to memory.
Enables learning from past experiences.
"""
log.info("--- ARCHIVIST ---")
path = ensure_list(state, 'execution_path') + ["Archivist"]
summary_prompt = (
f"Summarize for memory.\n\n"
f"Objective: {state.get('coreObjectivePrompt')}\n\n"
f"Response: {state.get('draftResponse')}\n\n"
f"Summary:"
)
response = llm.invoke(summary_prompt)
memory_manager.add_to_memory(
getattr(response, "content", ""),
{"objective": state.get('coreObjectivePrompt')}
)
return {
"execution_path": path,
**add_status_update("Archivist", "Saved to memory")
}
def run_disclaimer_agent(state: AgentState) -> Dict[str, Any]:
"""
BASE GRAPH: Adds disclaimer when limits are reached.
Handles budget or rework cycle exhaustion.
"""
log.warning("--- DISCLAIMER ---")
path = ensure_list(state, 'execution_path') + ["Disclaimer"]
reason = "Budget limit reached." if state.get('budget_exceeded') else "Rework limit reached."
disclaimer = f"**DISCLAIMER: {reason} Draft may be incomplete.**\n\n---\n\n"
final_response = disclaimer + state.get('draftResponse', "No response")
return {
"draftResponse": final_response,
"execution_path": path,
**add_status_update("Disclaimer", reason)
}
# =============================================================================
# SECTION 8: UPGRADED GRAPH AGENT NODES
# =============================================================================
# These enhanced agents add governance, compliance, and observation layers.
def run_pragmatist_agent(state: AgentState) -> Dict[str, Any]:
"""
UPGRADED GRAPH: Risk assessment and tier recommendations.
Provides:
- Risk analysis (low/medium/high)
- Tiered delivery options (lite/standard/full)
- Cost estimates per tier
- Optimization suggestions
"""
log.info(">>> PRAGMATIST AGENT (improved)")
path = ensure_list(state, "execution_path") + ["Pragmatist"]
pm = state.get("pmPlan", {}) or {}
# Parse estimated cost
est_cost = None
try:
raw = pm.get("estimated_cost_usd", None)
if raw is not None and raw != "":
est_cost = float(raw)
except Exception:
try:
s = str(raw)
m = re.search(r"[\d,.]+", s)
if m:
est_cost = float(m.group(0).replace(",", ""))
except Exception:
est_cost = None
exp_type = pm.get("experiment_type", "word")
base_est = est_cost or (50.0 if exp_type in ["script", "repo"] else 5.0)
# Define tier options
tiers = {
"lite": {
"multiplier": 0.25,
"estimated_cost_usd": round(base_est * 0.25, 2),
"description": "Minimal extract (CSV/text) or short summary. Minimal engineering."
},
"standard": {
"multiplier": 1.0,
"estimated_cost_usd": round(base_est * 1.0, 2),
"description": "Complete, tested script or notebook; limited UX – suitable for MVP."
},
"full": {
"multiplier": 3.0,
"estimated_cost_usd": round(base_est * 3.0, 2),
"description": "Production-ready repo, packaging, tests, and deployment instructions."
}
}
preferred = state.get("preferred_tier")
flexible_mode = bool(state.get("flexible_budget_mode", False))
# Intelligent risk assessment
risk_factors = []
risk_score = 0
# Check complexity
plan_steps = pm.get("plan_steps", [])
if len(plan_steps) > 8:
risk_factors.append("Complex multi-step plan")
risk_score += 1
# Check artifact type
if exp_type in ("repo", "notebook"):
risk_factors.append(f"Engineering-heavy artifact type: {exp_type}")
risk_score += 1
# Check cost
if est_cost is None:
risk_factors.append("No cost estimate provided")
risk_score += 1
elif est_cost > 200:
risk_factors.append(f"High estimated cost: ${est_cost}")
risk_score += 2
elif est_cost > 100:
risk_factors.append(f"Moderate estimated cost: ${est_cost}")
risk_score += 1
# Adjust risk for flexible mode
if flexible_mode:
risk_score = max(0, risk_score - 2)
# Calculate risk level
if risk_score <= 1:
risk = "low"
elif risk_score <= 3:
risk = "medium"
else:
risk = "high"
# Determine feasibility (LIBERAL: almost always feasible)
feasible = True
if risk_score > 5 and not flexible_mode: # Very high threshold
feasible = False
# Recommend tier
if preferred in tiers:
recommended_tier = preferred
elif est_cost is None:
recommended_tier = "standard"
elif est_cost > 500 and not flexible_mode:
recommended_tier = "lite"
else:
recommended_tier = "standard"
prag_report = {
"ok": feasible,
"risk_factors": risk_factors,
"risk_level": risk,
"risk_score": risk_score,
"tier_options": tiers,
"recommended_tier": recommended_tier,
"explain": (
f"Assessed {len(risk_factors)} risk factor(s). "
f"Risk level: {risk}. Recommended tier: {recommended_tier}. "
"User can proceed with any tier; higher tiers provide more complete deliverables."
)
}
# Optional LLM recommendations for high complexity
if len(plan_steps) > 7 and llm:
try:
prompt = (
"You are a pragmatic engineering advisor. Given this plan, suggest 2-3 ways to "
"optimize implementation while preserving core value. Be specific and actionable. "
"Return JSON {\"optimizations\": [...]}.\n\n"
f"Plan: {json.dumps(pm, indent=2)}"
)
r = llm.invoke(prompt)
recs = parse_json_from_llm(getattr(r, "content", "") or "")
if isinstance(recs, dict):
prag_report["optimizations"] = recs.get("optimizations", [])
except Exception as e:
log.debug(f"LLM optimizations failed: {e}")
out = {"pragmatistReport": prag_report, "execution_path": path}
out.update(add_status_update("Pragmatist", f"Risk: {risk}, Tier: {recommended_tier}"))
return out
def run_governance_agent(state: AgentState) -> Dict[str, Any]:
"""
FIXED: Full tier bypasses ALL budget checks completely.
"""
log.info(">>> GOVERNANCE AGENT (improved)")
path = ensure_list(state, "execution_path") + ["Governance"]
pm = state.get("pmPlan", {}) or {}
prag = state.get("pragmatistReport", {}) or {}
preferred = state.get("preferred_tier") or prag.get("recommended_tier") or "standard"
tier_opts = prag.get("tier_options", {})
chosen = tier_opts.get(preferred, tier_opts.get("standard", {}))
try:
chosen_cost = float(chosen.get("estimated_cost_usd", 0.0))
except Exception:
chosen_cost = 0.0
flexible = bool(state.get("flexible_budget_mode", False))
# CRITICAL FIX: Full tier = unlimited budget, bypass ALL checks
if preferred == "full":
log.info("🎯 FULL TIER SELECTED - UNLIMITED BUDGET MODE ACTIVATED")
log.info(f" Cost estimate: ${chosen_cost} (ignored)")
log.info(" Budget checks: DISABLED")
log.info(" Rework limits: EXTENDED")
gov_report = {
"budget_ok": True,
"issues": ["Full tier: budget checks disabled"],
"approved_for_experiment": True,
"governanceDecision": "approve",
"chosen_tier": "full",
"chosen_cost_usd": chosen_cost,
"rationale": "Full tier: no budget constraints applied",
"reasoning": "Full tier approval - unlimited mode"
}
# Update state to disable budget enforcement
out = {
"governanceReport": gov_report,
"execution_path": path,
"budget_exceeded": False, # Can never be true for full tier
"stop_threshold": float('inf'), # Infinite threshold
"max_loops": 10, # Allow more iterations
}
out.update(add_status_update("Governance", "Full tier approved - unlimited budget"))
return out
# For lite/standard tiers, do normal budget checks
decision = "approve"
issues = []
budget = state.get("budget") or 0.0
stop_threshold = state.get("stop_threshold") or 0.0
if stop_threshold > 0:
try:
threshold_f = float(stop_threshold)
if chosen_cost > threshold_f:
if flexible:
issues.append(f"Cost ${chosen_cost} exceeds ${threshold_f}, flexible mode enabled")
decision = "approve_with_warning"
else:
issues.append(f"Cost ${chosen_cost} exceeds ${threshold_f}")
decision = "reject"
except Exception as e:
issues.append(f"Budget check error: {e}")
risk_level = prag.get("risk_level")
if risk_level == "high" and not prag.get("ok", True):
issues.append("High risk identified")
decision = "approve_with_warning" if flexible else "reject"
approved_bool = decision in ("approve", "approve_with_warning")
gov_report = {
"budget_ok": approved_bool,
"issues": issues,
"approved_for_experiment": approved_bool,
"governanceDecision": decision,
"chosen_tier": preferred,
"chosen_cost_usd": chosen_cost,
"rationale": None,
"reasoning": f"Decision: {decision}. {len(issues)} issue(s)."
}
out = {"governanceReport": gov_report, "execution_path": path}
out.update(add_status_update("Governance", f"{decision.title()} {preferred} (${chosen_cost})"))
return out
def scan_text_for_secrets(text: str) -> Dict[str, Any]:
"""
UPGRADED GRAPH: Scan text for potential secrets/credentials.
Used by Compliance agent.
"""
findings = []
if not text:
return {"suspicious": False, "findings": findings}
patterns = [
r"AKIA[0-9A-Z]{16}", # AWS access key
r"-----BEGIN PRIVATE KEY-----", # Private key
r"AIza[0-9A-Za-z-_]{35}", # Google API key
r"(?i)secret[_-]?(key|token)\b", # Secret keywords
r"(?i)password\s*[:=]\s*['\"][^'\"]{6,}['\"]" # Password patterns
]
for p in patterns:
for m in re.finditer(p, text):
findings.append({"pattern": p, "match": m.group(0)})
return {"suspicious": len(findings) > 0, "findings": findings}
def run_compliance_agent(state: AgentState) -> Dict[str, Any]:
"""
UPGRADED GRAPH: Security and compliance scanning.
Scans for:
- Exposed secrets/credentials
- Sensitive data
- Policy violations
"""
log.info(">>> COMPLIANCE AGENT")
path = ensure_list(state, "execution_path") + ["Compliance"]
exp = state.get("experimentResults", {}) or {}
report = {"suspicious": False, "issues": [], "scanned": []}
# Scan stdout/stderr
for key in ("stdout", "stderr"):
val = exp.get(key)
if isinstance(val, str) and val.strip():
scan = scan_text_for_secrets(val)
if scan.get("suspicious"):
report["suspicious"] = True
report["issues"].append({
"type": "text_secret",
"where": key,
"findings": scan["findings"]
})
report["scanned"].append({"type": "text", "where": key})
# Scan generated files
if isinstance(exp, dict) and "paths" in exp:
paths = exp.get("paths") or {}
if isinstance(paths, dict):
for k, p in paths.items():
try:
pstr = str(p)
if os.path.exists(pstr) and os.path.isfile(pstr):
with open(pstr, "r", encoding="utf-8", errors="ignore") as fh:
sample = fh.read(20000)
scan = scan_text_for_secrets(sample)
if scan.get("suspicious"):
report["suspicious"] = True
report["issues"].append({
"type": "file_secret",
"file": pstr,
"findings": scan["findings"]
})
report["scanned"].append({"type": "file", "file": pstr})
else:
report["scanned"].append({
"type": "path",
"value": pstr,
"exists": os.path.exists(pstr)
})
except Exception as e:
report["scanned"].append({"file": p, "error": str(e)})
# Note if repo/zip artifact
# Ensure paths exists safely
paths = locals().get("paths") or state.get("paths") if "state" in locals() else {}
# Note if repo/zip artifact
if isinstance(paths, dict) and any(
str(v).lower().endswith(".zip") for v in paths.values()
):
report.setdefault("notes", []).append(
"Zip-based or repo artifact detected – recommend manual review."
)
# Final output
out = {"complianceReport": report, "execution_path": path}
out.update(add_status_update("Compliance", "Compliance checks complete"))
return out
def summarize_logs_for_observer(log_paths: Optional[list] = None, sample_lines: int = 200) -> str:
"""
UPGRADED GRAPH: Summarize system logs for observer.
"""
if not log_paths:
candidates = ["logs/performance.log", "logs/ai_lab.log", "performance.log"]
log_paths = [p for p in candidates if os.path.exists(p)]
parts = []
errs = 0
warns = 0
for p in log_paths:
try:
with open(p, "r", encoding="utf-8", errors="ignore") as fh:
lines = fh.readlines()[-sample_lines:]
content = "".join(lines)
errs += content.upper().count("ERROR")
warns += content.upper().count("WARNING")
parts.append(f"--- {p} (last {len(lines)} lines) ---\n{content[:2000]}")
except Exception as e:
parts.append(f"Could not read {p}: {e}")
header = f"Log summary: {errs} ERROR(s), {warns} WARNING(s)"
return header + "\n\n" + "\n\n".join(parts)
def run_observer_agent(state: AgentState) -> Dict[str, Any]:
"""
UPGRADED GRAPH: System performance and health monitoring.
Tracks:
- Execution length
- Rework cycles
- Cost accumulation
- Error patterns
"""
log.info(">>> OBSERVER AGENT")
path = ensure_list(state, "execution_path") + ["Observer"]
# Find log files
log_candidates = []
for candidate in ["logs/performance.log", "logs/ai_lab.log", "performance.log"]:
if os.path.exists(candidate):
log_candidates.append(candidate)
summary = summarize_logs_for_observer(log_candidates or None)
exec_len = len(state.get("execution_path", []) or [])
rework_cycles = ensure_int(state, "rework_cycles", 0)
current_cost = state.get("current_cost", 0.0)
obs = {
"log_summary": summary[:4000],
"execution_length": exec_len,
"rework_cycles": rework_cycles,
"current_cost": current_cost,
"status": get_latest_status(state)
}
# Optional LLM recommendations
if llm:
try:
prompt = (
"You are an Observer assistant. Given this runtime summary, provide 3 prioritized "
"next actions to mitigate the top risks.\n\n"
f"Runtime summary: {json.dumps(obs, indent=2)}\n\nReturn plain text."
)
r = llm.invoke(prompt)
obs["llm_recommendations"] = getattr(r, "content", "")[:1500]
except Exception as e:
obs["llm_recommendations_error"] = str(e)
out = {"observerReport": obs, "execution_path": path}
out.update(add_status_update("Observer", "Observer summary created"))
return out
def run_knowledge_curator_agent(state: AgentState) -> Dict[str, Any]:
"""
UPGRADED GRAPH: Curates and archives knowledge for future use.
Captures:
- Successful patterns
- Common pitfalls
- Best practices
- User preferences
"""
log.info(">>> KNOWLEDGE CURATOR AGENT")
path = ensure_list(state, "execution_path") + ["KnowledgeCurator"]
core = state.get("coreObjectivePrompt", "") or state.get("userInput", "")
pm = state.get("pmPlan", {}) or {}
draft = state.get("draftResponse", "") or ""
qa_feedback = state.get("qaFeedback", "") or ""
summary_text = (
f"Objective: {core}\n\n"
f"Plan Steps: {json.dumps(pm.get('plan_steps', []))}\n\n"
f"Draft (first 1500 chars): {draft[:1500]}\n\n"
f"QA Feedback: {qa_feedback[:1000]}"
)
try:
memory_manager.add_to_memory(
summary_text,
{"source": "knowledge_curator", "timestamp": datetime.utcnow().isoformat()}
)
insights = {"added": True, "summary_snippet": summary_text[:500]}
except Exception as e:
insights = {"added": False, "error": str(e)}
out = {"knowledgeInsights": insights, "execution_path": path}
out.update(add_status_update("KnowledgeCurator", "Knowledge captured"))
return out
# =============================================================================
# SECTION 9: CONDITIONAL ROUTING FUNCTIONS (LIBERAL BUDGET POLICY)
# =============================================================================
# Two versions of should_continue:
# - should_continue: For BASE graph (routes to archivist_agent)
# - should_continue_upgraded: For UPGRADED graph (routes to observer_agent)
def should_continue(state: AgentState) -> str:
"""
BASE GRAPH: Determine next step after QA.
Routes to:
- archivist_agent: If approved (BASE GRAPH default)
- disclaimer_agent: If budget exceeded by 120% or extreme rework limit
- pm_agent: If needs rework (LIBERAL: allows many cycles)
LIBERAL POLICY: Allow many rework cycles, only stop at 120% budget
NOTE: This function is used by BOTH base and upgraded graphs.
The base graph only has archivist_agent, while the upgraded graph
will override the routing to use observer_agent.
"""
# Budget check - use 120% threshold (LIBERAL)
current_cost = state.get("current_cost", 0.0)
stop_threshold = state.get("stop_threshold") or 0.0
try:
cost_f = float(current_cost)
threshold_f = float(stop_threshold)
# Only fail if cost exceeds 120% threshold
if threshold_f > 0 and cost_f > threshold_f:
log.warning(f"Cost ${cost_f} exceeds stop threshold ${threshold_f} (120% of budget)")
return "disclaimer_agent"
except Exception:
pass
# Check explicit budget_exceeded flag (should only be set at 120%)
if state.get("budget_exceeded"):
return "disclaimer_agent"
try:
rework = int(state.get("rework_cycles", 0))
max_loops_allowed = int(state.get("max_loops", 0))
except Exception:
rework = state.get("rework_cycles", 0) or 0
max_loops_allowed = state.get("max_loops", 0) or 0
# If approved β†’ success path (archivist for BASE graph)
if state.get("approved"):
return "archivist_agent"
# LIBERAL POLICY: Allow up to 150% of max_loops before stopping
liberal_max = int(max_loops_allowed * 1.5) if max_loops_allowed > 0 else 15
if rework > liberal_max:
log.warning(f"Rework cycles {rework} exceeded liberal limit {liberal_max}")
return "disclaimer_agent"
# Default: Allow rework (liberal policy)
log.info(f"Allowing rework cycle {rework} (liberal limit: {liberal_max})")
return "pm_agent"
def should_continue_upgraded(state: AgentState) -> str:
"""
UPGRADED GRAPH: Determine next step after QA (with Observer).
Routes to:
- observer_agent: If approved (goes to Observer β†’ Archivist β†’ Knowledge Curator)
- disclaimer_agent: If budget exceeded by 120% or extreme rework limit
- pm_agent: If needs rework (LIBERAL: allows many cycles)
LIBERAL POLICY: Allow many rework cycles, only stop at 120% budget
"""
# Budget check - use 120% threshold (LIBERAL)
current_cost = state.get("current_cost", 0.0)
stop_threshold = state.get("stop_threshold") or 0.0
try:
cost_f = float(current_cost)
threshold_f = float(stop_threshold)
# Only fail if cost exceeds 120% threshold
if threshold_f > 0 and cost_f > threshold_f:
log.warning(f"Cost ${cost_f} exceeds stop threshold ${threshold_f} (120% of budget)")
return "disclaimer_agent"
except Exception:
pass
# Check explicit budget_exceeded flag (should only be set at 120%)
if state.get("budget_exceeded"):
return "disclaimer_agent"
try:
rework = int(state.get("rework_cycles", 0))
max_loops_allowed = int(state.get("max_loops", 0))
except Exception:
rework = state.get("rework_cycles", 0) or 0
max_loops_allowed = state.get("max_loops", 0) or 0
# If approved β†’ success path (observer for UPGRADED graph)
if state.get("approved"):
return "observer_agent"
# LIBERAL POLICY: Allow up to 150% of max_loops before stopping
liberal_max = int(max_loops_allowed * 1.5) if max_loops_allowed > 0 else 15
if rework > liberal_max:
log.warning(f"Rework cycles {rework} exceeded liberal limit {liberal_max}")
return "disclaimer_agent"
# Default: Allow rework (liberal policy)
log.info(f"Allowing rework cycle {rework} (liberal limit: {liberal_max})")
return "pm_agent"
def should_run_experiment(state: AgentState) -> str:
"""
BASE GRAPH: Determine if experiment is needed.
Routes to:
- experimenter_agent: If experiment needed
- synthesis_agent: If no experiment needed
"""
pm = state.get('pmPlan', {}) or {}
return "experimenter_agent" if pm.get('experiment_needed') else "synthesis_agent"
def detect_loop(state: AgentState) -> bool:
"""Detect execution loops - STRICT version."""
path = state.get("execution_path", [])
# Check total length first
if len(path) > MAX_EXECUTION_PATH_LENGTH:
log.error(f"❌ Path limit: {len(path)} > {MAX_EXECUTION_PATH_LENGTH}")
return True
if len(path) < 10:
return False
# Deduplicate consecutive nodes
deduplicated = []
prev = None
for node in path:
if node != prev:
deduplicated.append(node)
prev = node
# Check recent window for repetitions
recent = deduplicated[-LOOP_DETECTION_WINDOW:]
from collections import Counter
counts = Counter(recent)
# Flag if any node repeats too much
for node, count in counts.items():
if count >= LOOP_THRESHOLD:
log.error(f"❌ Loop: {node} appeared {count}x in last {LOOP_DETECTION_WINDOW}")
log.error(f"Path: {' β†’ '.join(recent[-10:])}")
return True
# Check for alternating patterns
if len(recent) >= 8:
for i in range(len(recent) - 7):
window = recent[i:i+8]
if len(set(window)) == 2: # Only 2 unique nodes alternating
log.error(f"❌ Alternating loop detected")
return True
return False
def should_proceed_to_experimenter(state: AgentState) -> bool:
"""
Helper function to determine if safe to proceed to experimenter.
Returns True if should proceed, False if should exit.
"""
exec_path = state.get("execution_path", [])
governance_count = exec_path.count("Governance")
log.info(f"πŸ” Checking proceed: governance #{governance_count}, path length {len(exec_path)}")
# CRITICAL: ALWAYS proceed on first governance approval
if governance_count <= 1:
log.info("βœ… First governance approval - ALWAYS PROCEED")
return True
# Check path length
if len(exec_path) > MAX_EXECUTION_PATH_LENGTH:
log.error(f"❌ Path too long: {len(exec_path)} > {MAX_EXECUTION_PATH_LENGTH}")
return False
# Check for Memory duplication (indicates graph bug)
double_memory_count = 0
for i in range(len(exec_path) - 1):
if exec_path[i] == "Memory" and exec_path[i+1] == "Memory":
double_memory_count += 1
if double_memory_count > 0 and governance_count > 2:
log.error(f"❌ Memory duplication ({double_memory_count}) + multiple governance = loop")
return False
# Check governance frequency
if governance_count > 5:
log.error(f"❌ Too many governance calls: {governance_count} > 5")
return False
# Check budget
current_cost = state.get("current_cost", 0.0)
stop_threshold = state.get("stop_threshold", 0.0)
if stop_threshold > 0:
try:
if current_cost > stop_threshold:
log.error(f"❌ Budget exceeded: ${current_cost} > ${stop_threshold}")
return False
except Exception:
pass
log.info("βœ… Safe to proceed")
return True
def governance_decider(state: AgentState) -> str:
"""
FIXED: Simplified governance routing.
Always proceeds to experimenter unless there's a critical blocker.
"""
gov = state.get("governanceReport", {}) or {}
decision = gov.get("governanceDecision", "approve")
log.info(f"πŸ” Governance decision: {decision}")
# Check for explicit rejection
if decision == "reject":
log.warning("❌ Governance explicitly rejected")
return "disclaimer_agent"
# Use helper to determine if safe to proceed
if should_proceed_to_experimenter(state):
log.info("βœ… PROCEEDING TO EXPERIMENTER")
return "experimenter_agent"
else:
log.warning("❌ Cannot proceed - routing to disclaimer")
return "disclaimer_agent"
# =============================================================================
# SECTION 10: BASE GRAPH DEFINITION
# =============================================================================
# Triage workflow (simple greeting vs task detection)
triage_workflow = StateGraph(AgentState)
triage_workflow.add_node("triage", run_triage_agent)
triage_workflow.set_entry_point("triage")
triage_workflow.add_edge("triage", END)
triage_app = triage_workflow.compile()
# Planner workflow (cost estimation)
planner_workflow = StateGraph(AgentState)
planner_workflow.add_node("planner", run_planner_agent)
planner_workflow.set_entry_point("planner")
planner_workflow.add_edge("planner", END)
planner_app = planner_workflow.compile()
# Main workflow (full execution pipeline)
main_workflow = StateGraph(AgentState)
# Add base nodes
main_workflow.add_node("memory_retriever", run_memory_retrieval)
main_workflow.add_node("intent_agent", run_intent_agent)
main_workflow.add_node("pm_agent", run_pm_agent)
main_workflow.add_node("experimenter_agent", run_experimenter_agent)
main_workflow.add_node("synthesis_agent", run_synthesis_agent)
main_workflow.add_node("qa_agent", run_qa_agent)
main_workflow.add_node("archivist_agent", run_archivist_agent)
main_workflow.add_node("disclaimer_agent", run_disclaimer_agent)
# Set entry point
main_workflow.set_entry_point("memory_retriever")
# Define edges
main_workflow.add_edge("memory_retriever", "intent_agent")
main_workflow.add_edge("intent_agent", "pm_agent")
main_workflow.add_edge("experimenter_agent", "synthesis_agent")
main_workflow.add_edge("synthesis_agent", "qa_agent")
main_workflow.add_edge("archivist_agent", END)
main_workflow.add_edge("disclaimer_agent", END)
# Conditional edges
main_workflow.add_conditional_edges("pm_agent", should_run_experiment)
# BASE GRAPH: Only routes to nodes that exist in base graph
main_workflow.add_conditional_edges("qa_agent", should_continue, {
"archivist_agent": "archivist_agent",
"pm_agent": "pm_agent",
"disclaimer_agent": "disclaimer_agent"
})
# Compile base graph
main_app = main_workflow.compile()
log.info("=" * 60)
log.info("BASE GRAPH COMPILED")
log.info("Flow: Memory β†’ Intent β†’ PM β†’ Experimenter β†’ Synthesis β†’ QA")
log.info(" β†’ Archivist/Disclaimer β†’ END")
log.info("=" * 60)
# =============================================================================
# SECTION 11: UPGRADED GRAPH DEFINITION (LIBERAL BUDGET POLICY)
# =============================================================================
def build_upgraded_graph() -> StateGraph:
"""
FIXED: Correct graph edges prevent Memory β†’ Memory loop.
"""
upgraded_workflow = StateGraph(AgentState)
# Add all nodes
upgraded_workflow.add_node("memory_retriever", run_memory_retrieval)
upgraded_workflow.add_node("intent_agent", run_intent_agent)
upgraded_workflow.add_node("pm_agent", run_pm_agent)
upgraded_workflow.add_node("pragmatist_agent", run_pragmatist_agent)
upgraded_workflow.add_node("governance_agent", run_governance_agent)
upgraded_workflow.add_node("experimenter_agent", run_experimenter_agent)
upgraded_workflow.add_node("compliance_agent", run_compliance_agent)
upgraded_workflow.add_node("synthesis_agent", run_synthesis_agent)
upgraded_workflow.add_node("qa_agent", run_qa_agent)
upgraded_workflow.add_node("observer_agent", run_observer_agent)
upgraded_workflow.add_node("archivist_agent", run_archivist_agent)
upgraded_workflow.add_node("knowledge_curator_agent", run_knowledge_curator_agent)
upgraded_workflow.add_node("disclaimer_agent", run_disclaimer_agent)
# CRITICAL: Set entry point
upgraded_workflow.set_entry_point("memory_retriever")
# CRITICAL: Linear flow - each node called ONCE per cycle
# NO branches until governance
upgraded_workflow.add_edge("memory_retriever", "intent_agent") # Memory β†’ Intent
upgraded_workflow.add_edge("intent_agent", "pm_agent") # Intent β†’ PM
upgraded_workflow.add_edge("pm_agent", "pragmatist_agent") # PM β†’ Pragmatist
upgraded_workflow.add_edge("pragmatist_agent", "governance_agent") # Pragmatist β†’ Governance
# Governance conditional (first branch point)
upgraded_workflow.add_conditional_edges(
"governance_agent",
governance_decider,
{
"experimenter_agent": "experimenter_agent",
"disclaimer_agent": "disclaimer_agent"
}
)
# CRITICAL: After experimenter - LINEAR (no loops back)
upgraded_workflow.add_edge("experimenter_agent", "compliance_agent")
upgraded_workflow.add_edge("compliance_agent", "synthesis_agent")
upgraded_workflow.add_edge("synthesis_agent", "qa_agent")
# QA conditional (second branch point)
# CRITICAL: Rework goes back to PM, NOT Memory
upgraded_workflow.add_conditional_edges(
"qa_agent",
should_continue_upgraded,
{
"observer_agent": "observer_agent", # Success path
"pm_agent": "pm_agent", # Rework path (NOT to Memory!)
"disclaimer_agent": "disclaimer_agent" # Failure path
}
)
# Success path
upgraded_workflow.add_edge("observer_agent", "archivist_agent")
upgraded_workflow.add_edge("archivist_agent", "knowledge_curator_agent")
upgraded_workflow.add_edge("knowledge_curator_agent", END)
# Disclaimer path
upgraded_workflow.add_edge("disclaimer_agent", END)
return upgraded_workflow
# CRITICAL: Make sure NO OTHER edges exist that point to "memory_retriever"
# The ONLY way to reach Memory should be at the very start
def apply_upgrades() -> bool:
"""
Apply upgraded graph to replace the base graph.
This function rebuilds the main_app with the upgraded workflow,
adding governance, compliance, and observation layers.
LIBERAL BUDGET POLICY ACTIVE:
- 20% budget buffer
- Stop only at 120% of budget
- 10 β†’ 20 rework cycle limits
- Always proceed unless explicitly rejected
Returns:
bool: True if successful, False otherwise
"""
global main_app, main_workflow
log.info("=" * 60)
log.info("APPLYING GRAPH UPGRADES (LIBERAL BUDGET POLICY)")
log.info("=" * 60)
try:
# Build upgraded graph
upgraded_workflow = build_upgraded_graph()
# Compile and replace
main_app = upgraded_workflow.compile()
main_workflow = upgraded_workflow
log.info("βœ… GRAPH UPGRADE SUCCESSFUL")
log.info("=" * 60)
log.info("New flow: Memory β†’ Intent β†’ PM β†’ Pragmatist β†’ Governance")
log.info(" β†’ Experimenter β†’ Compliance β†’ Synthesis β†’ QA")
log.info(" β†’ Observer β†’ Archivist β†’ Knowledge Curator β†’ END")
log.info("=" * 60)
log.info("LIBERAL BUDGET POLICY FEATURES:")
log.info(" β€’ 20% budget buffer applied")
log.info(" β€’ Stop threshold: 120% of user budget")
log.info(" β€’ Rework cycles: 10 β†’ 15 β†’ 20 (initial β†’ liberal β†’ hard)")
log.info(" β€’ Governance: Always proceed unless explicitly rejected")
log.info("=" * 60)
return True
except Exception as e:
log.exception(f"❌ Failed to apply graph upgrades: {e}")
return False
# =============================================================================
# SECTION 12: EXPORTS
# =============================================================================
# Export all components
__all__ = [
# State
'AgentState',
# Helper functions
'ensure_list',
'ensure_int',
'sanitize_path',
'add_status_update',
'get_latest_status',
# LLM and parsing
'llm',
'parse_json_from_llm',
# Artifact functions
'detect_requested_output_types',
'normalize_experiment_type',
'write_notebook_from_text',
'write_script',
'write_docx_from_text',
'write_excel_from_tables',
'write_pdf_from_text',
'build_repo_zip',
# Base agent nodes
'run_triage_agent',
'run_planner_agent',
'run_memory_retrieval',
'run_intent_agent',
'run_pm_agent',
'run_experimenter_agent',
'run_synthesis_agent',
'run_qa_agent',
'run_archivist_agent',
'run_disclaimer_agent',
# Upgraded agent nodes
'run_pragmatist_agent',
'run_governance_agent',
'run_compliance_agent',
'run_observer_agent',
'run_knowledge_curator_agent',
# Routing functions
'should_continue',
'should_continue_upgraded',
'should_run_experiment',
'governance_decider',
'detect_loop',
# Graphs
'triage_app',
'planner_app',
'main_app',
'main_workflow',
# Upgrade function
'apply_upgrades',
'build_upgraded_graph',
# Configuration constants
'INITIAL_MAX_REWORK_CYCLES',
'BUDGET_BUFFER_MULTIPLIER',
'MAX_COST_MULTIPLIER',
]
# Log initialization
log.info("=" * 60)
log.info("GRAPH MODULE INITIALIZED (LIBERAL BUDGET POLICY v2.0)")
log.info(f"Base graph available as: main_app")
log.info(f"To enable upgraded graph, call: apply_upgrades()")
log.info("=" * 60)