Spaces:
Paused
Paused
| # graph.py - Fixed version with proper state handling for concurrent updates | |
| import json | |
| import re | |
| import math | |
| import os | |
| import uuid | |
| import shutil | |
| import zipfile | |
| import operator | |
| from typing import TypedDict, List, Dict, Optional, Annotated, Any | |
| from datetime import datetime | |
| from langchain_openai import ChatOpenAI | |
| from langgraph.graph import StateGraph, END | |
| from memory_manager import memory_manager | |
| from code_executor import execute_python_code | |
| from logging_config import setup_logging, get_logger | |
| # Artifact libs | |
| import nbformat | |
| from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell | |
| import pandas as pd | |
| from docx import Document | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| # Add after other imports | |
| from multi_language_support import ( | |
| detect_language, | |
| extract_code_blocks_multi_lang, | |
| execute_code, | |
| detect_requested_output_types_enhanced, | |
| write_script_multi_lang, | |
| LANGUAGES | |
| ) | |
| # Replace existing functions | |
| detect_requested_output_types = detect_requested_output_types_enhanced | |
| write_script = write_script_multi_lang | |
| # --- Configuration --- | |
| OUT_DIR = os.environ.get("OUT_DIR", "/tmp") | |
| os.makedirs(OUT_DIR, exist_ok=True) | |
| EXPORTS_DIR = os.path.join(OUT_DIR, "exports") | |
| os.makedirs(EXPORTS_DIR, exist_ok=True) | |
| # --- Helpers --- | |
| def ensure_list(state, key): | |
| v = state.get(key) if state else None | |
| if v is None: | |
| return [] | |
| if isinstance(v, list): | |
| return v | |
| if isinstance(v, tuple): | |
| return list(v) | |
| return [v] | |
| def ensure_int(state, key, default=0): | |
| try: | |
| v = state.get(key) if state else None | |
| if v is None: | |
| return default | |
| return int(v) | |
| except Exception: | |
| return default | |
| def sanitize_path(path: str) -> str: | |
| return os.path.abspath(path) | |
| # --- Setup --- | |
| setup_logging() | |
| log = get_logger(__name__) | |
| INITIAL_MAX_REWORK_CYCLES = 3 | |
| GPT4O_INPUT_COST_PER_1K_TOKENS = 0.005 | |
| GPT4O_OUTPUT_COST_PER_1K_TOKENS = 0.015 | |
| AVG_TOKENS_PER_CALL = 2.0 | |
| # --- State --- | |
| class AgentState(TypedDict): | |
| userInput: str | |
| chatHistory: List[str] | |
| coreObjectivePrompt: str | |
| retrievedMemory: Optional[str] | |
| pmPlan: Dict | |
| experimentCode: Optional[str] | |
| experimentResults: Optional[Dict] | |
| draftResponse: str | |
| qaFeedback: Optional[str] | |
| approved: bool | |
| execution_path: Annotated[List[str], operator.add] | |
| rework_cycles: int | |
| max_loops: int | |
| # Use Annotated with operator.add for fields that multiple agents might update | |
| status_updates: Annotated[List[Dict[str, str]], operator.add] # Changed from status_update | |
| current_cost: float | |
| budget_exceeded: bool | |
| # Add other fields that might have concurrent updates | |
| pragmatistReport: Optional[Dict] | |
| governanceReport: Optional[Dict] | |
| complianceReport: Optional[Dict] | |
| observerReport: Optional[Dict] | |
| knowledgeInsights: Optional[Dict] | |
| # Helper to get latest status | |
| def get_latest_status(state: AgentState) -> str: | |
| """Get the most recent status update from the list""" | |
| updates = state.get('status_updates', []) | |
| if updates and isinstance(updates, list): | |
| # Get the last update's status value | |
| for update in reversed(updates): | |
| if isinstance(update, dict) and 'status' in update: | |
| return update['status'] | |
| elif isinstance(update, str): | |
| return update | |
| return "Processing..." | |
| # Helper to add status update | |
| def add_status_update(node_name: str, status: str) -> Dict[str, Any]: | |
| """Create a status update entry""" | |
| return { | |
| "status_updates": [{"node": node_name, "status": status, "timestamp": datetime.utcnow().isoformat()}] | |
| } | |
| # --- LLM --- | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0.5, max_retries=3, request_timeout=60) | |
| def parse_json_from_llm(llm_output: str) -> Optional[dict]: | |
| """ | |
| More robust JSON extraction: | |
| - Looks for explicit ```json {} ``` blocks | |
| - Falls back to the last balanced {...} substring in the output | |
| - Tries ast.literal_eval for Python-like dicts | |
| - Performs conservative cleanup (remove trailing commas, comments, safe single->double quote) and retries | |
| Returns dict or None. Logs failures for debugging. | |
| """ | |
| import re | |
| import json | |
| import ast | |
| from logging import getLogger | |
| logger = getLogger(__name__) | |
| if not llm_output or not isinstance(llm_output, str) or not llm_output.strip(): | |
| return None | |
| text = llm_output.strip() | |
| # 1) explicit fenced JSON block | |
| match = re.search(r"```json\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| candidate = match.group(1).strip() | |
| try: | |
| return json.loads(candidate) | |
| except Exception as e: | |
| logger.debug(f"json.loads failed on triple-backtick json block: {e}") | |
| # 2) any code-fence containing a JSON-like object | |
| match2 = re.search(r"```(?:json|python|text)?\s*({.*?})\s*```", text, re.DOTALL | re.IGNORECASE) | |
| if match2: | |
| candidate = match2.group(1).strip() | |
| try: | |
| return json.loads(candidate) | |
| except Exception as e: | |
| logger.debug(f"json.loads failed on fenced candidate: {e}") | |
| # 3) find first balanced {...} substring | |
| def find_balanced_brace_substring(s: str): | |
| start_idx = None | |
| depth = 0 | |
| for i, ch in enumerate(s): | |
| if ch == '{': | |
| if start_idx is None: | |
| start_idx = i | |
| depth += 1 | |
| elif ch == '}': | |
| if depth > 0: | |
| depth -= 1 | |
| if depth == 0 and start_idx is not None: | |
| return s[start_idx:i+1] | |
| return None | |
| candidate = find_balanced_brace_substring(text) | |
| # 4) fallback: last { ... } block heuristically | |
| if not candidate: | |
| first = text.find('{') | |
| last = text.rfind('}') | |
| if first != -1 and last != -1 and last > first: | |
| candidate = text[first:last+1] | |
| if candidate: | |
| # try json.loads directly | |
| try: | |
| return json.loads(candidate) | |
| except Exception as e: | |
| logger.debug(f"json.loads failed on candidate substring: {e}") | |
| # try ast.literal_eval (handles single quotes & Python literals) | |
| try: | |
| parsed = ast.literal_eval(candidate) | |
| if isinstance(parsed, (dict, list)): | |
| # convert to a strict JSON-compatible dict/list | |
| return json.loads(json.dumps(parsed)) | |
| except Exception as e: | |
| logger.debug(f"ast.literal_eval failed: {e}") | |
| # conservative cleanup: remove comments, trailing commas, and handle simple single-quote strings | |
| cleaned = candidate | |
| try: | |
| # remove line comments //... | |
| cleaned = re.sub(r"//.*?$", "", cleaned, flags=re.MULTILINE) | |
| # remove block comments /* ... */ | |
| cleaned = re.sub(r"/\*.*?\*/", "", cleaned, flags=re.DOTALL) | |
| # remove trailing commas before } or ] | |
| cleaned = re.sub(r",\s*([}\]])", r"\1", cleaned) | |
| # replace single-quoted strings with double quotes when likely safe | |
| def _single_to_double(m): | |
| inner = m.group(1) | |
| inner_escaped = inner.replace('"', '\\"') | |
| return f'"{inner_escaped}"' | |
| cleaned = re.sub(r"(?<=[:\{\[,]\s*)'([^']*?)'", _single_to_double, cleaned) | |
| # final attempt | |
| return json.loads(cleaned) | |
| except Exception as e: | |
| logger.debug(f"json.loads still failed after cleanup: {e}") | |
| # nothing parsed – log preview and return None | |
| logger.error("parse_json_from_llm failed to parse LLM output. LLM output preview (200 chars): %s", text[:200].replace("\n","\\n")) | |
| return None | |
| # --- Artifact detection --- | |
| KNOWN_ARTIFACT_TYPES = {"notebook","excel","word","pdf","image","repo","script"} | |
| #def detect_requested_output_types(text: str) -> Dict: | |
| # if not text: | |
| # return {"requires_artifact": False, "artifact_type": None, "artifact_hint": None} | |
| # t = text.lower() | |
| # if any(k in t for k in ["jupyter notebook", "jupyter", "notebook", "ipynb"]): | |
| # return {"requires_artifact": True, "artifact_type": "notebook", "artifact_hint": "jupyter notebook"} | |
| # if any(k in t for k in ["excel", ".xlsx", "spreadsheet", "csv"]): | |
| # return {"requires_artifact": True, "artifact_type": "excel", "artifact_hint": "Excel file"} | |
| # if any(k in t for k in ["word document", ".docx", "docx"]): | |
| # return {"requires_artifact": True, "artifact_type": "word", "artifact_hint": "Word document"} | |
| # if any(k in t for k in ["pdf", "pdf file"]): | |
| # return {"requires_artifact": True, "artifact_type": "pdf", "artifact_hint": "PDF document"} | |
| # if any(k in t for k in ["repo", "repository", "app repo", "backend", "codebase"]): | |
| # return {"requires_artifact": True, "artifact_type": "repo", "artifact_hint": "application repository"} | |
| # if any(k in t for k in [".py", "python script", "script"]): | |
| # return {"requires_artifact": True, "artifact_type": "script", "artifact_hint": "Python script"} | |
| # return {"requires_artifact": False, "artifact_type": None, "artifact_hint": None} | |
| def normalize_experiment_type(exp_type: Optional[str], goal_text: str) -> str: | |
| if not exp_type: | |
| detection = detect_requested_output_types(goal_text or "") | |
| return detection.get("artifact_type") or "word" | |
| s = exp_type.strip().lower() | |
| if s in KNOWN_ARTIFACT_TYPES: | |
| return s | |
| if "notebook" in s or "ipynb" in s: | |
| return "notebook" | |
| if "excel" in s or "xlsx" in s: | |
| return "excel" | |
| if "word" in s or "docx" in s: | |
| return "word" | |
| if "pdf" in s: | |
| return "pdf" | |
| if "repo" in s or "repository" in s or "backend" in s: | |
| return "repo" | |
| if "script" in s or "python" in s: | |
| return "script" | |
| detection = detect_requested_output_types(goal_text or "") | |
| return detection.get("artifact_type") or "word" | |
| # --- Artifact builders --- | |
| def write_notebook_from_text(llm_text: str, out_dir: Optional[str]=None) -> str: | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| code_blocks = re.findall(r"```python\s*(.*?)\s*```", llm_text, re.DOTALL) | |
| if not code_blocks: | |
| code_blocks = re.findall(r"```\s*(.*?)\s*```", llm_text, re.DOTALL) | |
| md_parts = re.split(r"```(?:python)?\s*.*?\s*```", llm_text, flags=re.DOTALL) | |
| nb = new_notebook() | |
| cells = [] | |
| max_len = max(len(md_parts), len(code_blocks)) | |
| for i in range(max_len): | |
| if i < len(md_parts) and md_parts[i].strip(): | |
| cells.append(new_markdown_cell(md_parts[i].strip())) | |
| if i < len(code_blocks) and code_blocks[i].strip(): | |
| cells.append(new_code_cell(code_blocks[i].strip())) | |
| if not cells: | |
| cells = [new_markdown_cell("# Notebook\n\nNo content generated.")] | |
| nb['cells'] = cells | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_notebook_{uid}.ipynb") | |
| nbformat.write(nb, filename) | |
| return filename | |
| #def write_script(code_text: str, language_hint: Optional[str]=None, out_dir: Optional[str]=None) -> str: | |
| # out_dir = out_dir or OUT_DIR | |
| # os.makedirs(out_dir, exist_ok=True) | |
| # ext = ".txt" | |
| # if language_hint: | |
| # l = language_hint.lower() | |
| # if "python" in l: | |
| # ext = ".py" | |
| # elif "r" in l: | |
| # ext = ".R" | |
| # elif "java" in l: | |
| # ext = ".java" | |
| # elif "javascript" in l: | |
| # ext = ".js" | |
| # uid = uuid.uuid4().hex[:10] | |
| # filename = os.path.join(out_dir, f"generated_script_{uid}{ext}") | |
| # with open(filename, "w", encoding="utf-8") as f: | |
| # f.write(code_text) | |
| # return filename | |
| def write_docx_from_text(text: str, out_dir: Optional[str]=None) -> str: | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| doc = Document() | |
| for para in [p.strip() for p in text.split("\n\n") if p.strip()]: | |
| doc.add_paragraph(para) | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_doc_{uid}.docx") | |
| doc.save(filename) | |
| return filename | |
| def write_excel_from_tables(maybe_table_text: str, out_dir: Optional[str]=None) -> str: | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_excel_{uid}.xlsx") | |
| try: | |
| try: | |
| parsed = json.loads(maybe_table_text) | |
| if isinstance(parsed, list): | |
| df = pd.DataFrame(parsed) | |
| elif isinstance(parsed, dict): | |
| df = pd.DataFrame([parsed]) | |
| else: | |
| df = pd.DataFrame({"content":[str(maybe_table_text)]}) | |
| except Exception: | |
| if "," in maybe_table_text: | |
| from io import StringIO | |
| df = pd.read_csv(StringIO(maybe_table_text)) | |
| else: | |
| df = pd.DataFrame({"content":[maybe_table_text]}) | |
| df.to_excel(filename, index=False, engine="openpyxl") | |
| return filename | |
| except Exception as e: | |
| log.error(f"Excel creation failed: {e}") | |
| return write_docx_from_text(f"Excel error: {e}\n\n{maybe_table_text}", out_dir=out_dir) | |
| def write_pdf_from_text(text: str, out_dir: Optional[str]=None) -> str: | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| uid = uuid.uuid4().hex[:10] | |
| filename = os.path.join(out_dir, f"generated_doc_{uid}.pdf") | |
| try: | |
| doc = SimpleDocTemplate(filename) | |
| styles = getSampleStyleSheet() | |
| flowables = [] | |
| for para in [p.strip() for p in text.split("\n\n") if p.strip()]: | |
| flowables.append(Paragraph(para.replace("\n","<br/>"), styles["Normal"])) | |
| flowables.append(Spacer(1, 8)) | |
| doc.build(flowables) | |
| return filename | |
| except Exception as e: | |
| log.error(f"PDF creation failed: {e}") | |
| return write_docx_from_text(f"PDF error: {e}\n\n{text}", out_dir=out_dir) | |
| def build_repo_zip(files_map: Dict[str,str], repo_name: str="generated_app", out_dir: Optional[str]=None) -> str: | |
| out_dir = out_dir or OUT_DIR | |
| os.makedirs(out_dir, exist_ok=True) | |
| uid = uuid.uuid4().hex[:8] | |
| repo_dir = os.path.join(out_dir, f"{repo_name}_{uid}") | |
| os.makedirs(repo_dir, exist_ok=True) | |
| for rel_path, content in files_map.items(): | |
| dest = os.path.join(repo_dir, rel_path) | |
| os.makedirs(os.path.dirname(dest), exist_ok=True) | |
| if isinstance(content, str) and os.path.exists(content): | |
| shutil.copyfile(content, dest) | |
| else: | |
| with open(dest, "w", encoding="utf-8") as fh: | |
| fh.write(str(content)) | |
| zip_path = os.path.join(out_dir, f"{repo_name}_{uid}.zip") | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for root, _, files in os.walk(repo_dir): | |
| for f in files: | |
| full = os.path.join(root, f) | |
| arc = os.path.relpath(full, repo_dir) | |
| zf.write(full, arc) | |
| return zip_path | |
| # --- Nodes --- | |
| def run_triage_agent(state: AgentState): | |
| log.info("--- TRIAGE ---") | |
| prompt = f"Is this a greeting or a task? '{state.get('userInput','')}' Reply: 'greeting' or 'task'" | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "") or "" | |
| if 'greeting' in content.lower(): | |
| return { | |
| "draftResponse": "Hello! How can I help?", | |
| "execution_path": ["Triage"], | |
| **add_status_update("Triage", "Greeting") | |
| } | |
| return { | |
| "execution_path": ["Triage"], | |
| **add_status_update("Triage", "Task detected") | |
| } | |
| def run_planner_agent(state: AgentState): | |
| log.info("--- PLANNER ---") | |
| path = ensure_list(state, 'execution_path') + ["Planner"] | |
| prompt = f"Create a plan for: '{state.get('userInput','')}'. JSON with 'plan' (list), 'estimated_llm_calls_per_loop' (int)" | |
| response = llm.invoke(prompt) | |
| plan_data = parse_json_from_llm(getattr(response, "content", "") or "") | |
| if not plan_data: | |
| return { | |
| "pmPlan": {"error": "Planning failed"}, | |
| "execution_path": path, | |
| **add_status_update("Planner", "Error") | |
| } | |
| calls = plan_data.get('estimated_llm_calls_per_loop', 3) | |
| cost_per_loop = (calls * AVG_TOKENS_PER_CALL) * ((GPT4O_INPUT_COST_PER_1K_TOKENS + GPT4O_OUTPUT_COST_PER_1K_TOKENS) / 2) | |
| plan_data['max_loops_initial'] = INITIAL_MAX_REWORK_CYCLES | |
| plan_data['estimated_cost_usd'] = round(cost_per_loop * (INITIAL_MAX_REWORK_CYCLES + 1), 2) | |
| plan_data['cost_per_loop_usd'] = max(0.01, round(cost_per_loop, 3)) | |
| detection = detect_requested_output_types(state.get('userInput','')) | |
| if detection.get('requires_artifact'): | |
| plan_data.setdefault('experiment_needed', True) | |
| plan_data.setdefault('experiment_type', detection.get('artifact_type')) | |
| plan_data.setdefault('experiment_goal', state.get('userInput','')) | |
| return { | |
| "pmPlan": plan_data, | |
| "execution_path": path, | |
| **add_status_update("Planner", "Plan created") | |
| } | |
| def run_memory_retrieval(state: AgentState): | |
| log.info("--- MEMORY ---") | |
| path = ensure_list(state, 'execution_path') + ["Memory"] | |
| mems = memory_manager.retrieve_relevant_memories(state.get('userInput','')) | |
| context = "\n".join([f"Memory: {m.page_content}" for m in mems]) if mems else "No memories" | |
| return { | |
| "retrievedMemory": context, | |
| "execution_path": path, | |
| **add_status_update("Memory", "Memory retrieved") | |
| } | |
| def run_intent_agent(state: AgentState): | |
| log.info("--- INTENT ---") | |
| path = ensure_list(state, 'execution_path') + ["Intent"] | |
| prompt = f"Refine into clear objective.\n\nMemory: {state.get('retrievedMemory')}\n\nRequest: {state.get('userInput','')}\n\nCore Objective:" | |
| response = llm.invoke(prompt) | |
| core_obj = getattr(response, "content", "") or "" | |
| return { | |
| "coreObjectivePrompt": core_obj, | |
| "execution_path": path, | |
| **add_status_update("Intent", "Objective clarified") | |
| } | |
| def run_pm_agent(state: AgentState): | |
| log.info("--- PM ---") | |
| # Ensure keys | |
| current_rework = ensure_int(state, 'rework_cycles', 0) | |
| max_loops_val = ensure_int(state, 'max_loops', INITIAL_MAX_REWORK_CYCLES) | |
| # If we've exhausted loops, short-circuit and produce fallback plan with a note | |
| if current_rework > max_loops_val: | |
| path = ensure_list(state, 'execution_path') + ["PM"] | |
| fallback_plan = { | |
| "plan_steps": ["Rework limit exceeded. Manual review required."], | |
| "experiment_needed": False, | |
| "experiment_type": "word", | |
| "experiment_goal": state.get('coreObjectivePrompt', state.get('userInput','')) | |
| } | |
| return { | |
| "pmPlan": fallback_plan, | |
| "execution_path": path, | |
| "rework_cycles": current_rework, | |
| **add_status_update("PM", "Rework limit hit - manual review") | |
| } | |
| # Normal behavior: increment rework count for this pass | |
| current_cycles = current_rework + 1 | |
| path = ensure_list(state, 'execution_path') + ["PM"] | |
| context_parts = [ | |
| f"=== USER REQUEST ===\n{state.get('userInput', '')}", | |
| f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", | |
| f"\n=== MEMORY ===\n{state.get('retrievedMemory', 'None')}", | |
| ] | |
| if state.get('qaFeedback'): | |
| context_parts.append(f"\n=== QA FEEDBACK (MUST FIX) ===\n{state.get('qaFeedback')}") | |
| context_parts.append(f"\n=== PREVIOUS PLAN ===\n{json.dumps(state.get('pmPlan', {}).get('plan_steps', []), indent=2)}") | |
| full_context = "\n".join(context_parts) | |
| # Detect language preference | |
| detected = detect_requested_output_types_enhanced(state.get('userInput', '')) | |
| language_hint = LANGUAGES[detected['language']].name if detected.get('language') else "appropriate language" | |
| prompt = f"""Create DETAILED, EXECUTABLE plan. | |
| {full_context} | |
| If code generation is needed, use {language_hint}. | |
| Return JSON with: | |
| {{ | |
| "plan_steps": [...], | |
| "experiment_needed": true/false, | |
| "experiment_type": "notebook|script|excel|word|pdf|repo", | |
| "experiment_goal": "...", | |
| "key_requirements": [...] | |
| }} | |
| Be concrete. | |
| """ | |
| try: | |
| response = llm.invoke(prompt) | |
| plan = parse_json_from_llm(getattr(response, "content", "") or "") | |
| except Exception as e: | |
| log.warning("PM LLM failed: %s", e) | |
| plan = None | |
| if not plan: | |
| detection = detect_requested_output_types(state.get('userInput', '')) | |
| plan = { | |
| "plan_steps": ["Analyze request", "Process information", "Create deliverable", "Review"], | |
| "experiment_needed": detection.get('requires_artifact', False), | |
| "experiment_type": detection.get('artifact_type', 'word'), | |
| "experiment_goal": state.get('coreObjectivePrompt', state.get('userInput', '')), | |
| "key_requirements": [] | |
| } | |
| # Normalize experiment_type | |
| exp_type = normalize_experiment_type(plan.get('experiment_type'), plan.get('experiment_goal','')) | |
| plan['experiment_type'] = exp_type | |
| if plan.get('experiment_needed') and not plan.get('experiment_goal'): | |
| plan['experiment_goal'] = state.get('userInput','') | |
| # Attach loop control info | |
| plan['max_loops_initial'] = max_loops_val | |
| plan['estimated_cost_usd'] = plan.get('estimated_cost_usd', 0.0) | |
| return { | |
| "pmPlan": plan, | |
| "execution_path": path, | |
| "rework_cycles": current_cycles, | |
| "max_loops": max_loops_val, | |
| **add_status_update("PM", f"Plan created ({len(plan.get('plan_steps', []))} steps)") | |
| } | |
| def _extract_code_blocks(text: str, lang_hint: Optional[str]=None) -> List[str]: | |
| if lang_hint and "python" in (lang_hint or "").lower(): | |
| blocks = re.findall(r"```python\s*(.*?)\s*```", text, re.DOTALL) | |
| if blocks: | |
| return blocks | |
| return re.findall(r"```(?:\w+)?\s*(.*?)\s*```", text, re.DOTALL) | |
| def run_experimenter_agent(state: AgentState): | |
| log.info("--- EXPERIMENTER ---") | |
| path = ensure_list(state, 'execution_path') + ["Experimenter"] | |
| pm = state.get('pmPlan', {}) or {} | |
| if not pm.get('experiment_needed'): | |
| return { | |
| "experimentCode": None, | |
| "experimentResults": None, | |
| "execution_path": path, | |
| **add_status_update("Experimenter", "No experiment needed") | |
| } | |
| # Detect language from plan or request | |
| #detected = detect_requested_output_types_enhanced(pm.get('experiment_goal', '')) | |
| #language = detected.get('language', 'python') | |
| exp_type = normalize_experiment_type(pm.get('experiment_type'), pm.get('experiment_goal','')) | |
| goal = pm.get('experiment_goal', 'No goal') | |
| # BUILD RICH CONTEXT (this was missing!) | |
| context_parts = [ | |
| f"=== USER REQUEST ===\n{state.get('userInput', '')}", | |
| f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", | |
| f"\n=== PLAN ===\n{json.dumps(pm.get('plan_steps', []), indent=2)}", | |
| f"\n=== REQUIREMENTS ===\n{json.dumps(pm.get('key_requirements', []), indent=2)}", | |
| ] | |
| if state.get('retrievedMemory'): | |
| context_parts.append(f"\n=== CONTEXT ===\n{state.get('retrievedMemory', '')}") | |
| if state.get('qaFeedback'): | |
| context_parts.append(f"\n=== FEEDBACK TO ADDRESS ===\n{state.get('qaFeedback', '')}") | |
| full_context = "\n".join(context_parts) # This line was missing! | |
| # Get language config | |
| from multi_language_support import LANGUAGES | |
| lang_config = LANGUAGES.get(language) | |
| lang_name = lang_config.name if lang_config else "Code" | |
| # Enhanced prompt with language specification | |
| enhanced_prompt = f"""Create HIGH-QUALITY {lang_name} {exp_type} artifact. | |
| {full_context} | |
| GOAL: {goal} | |
| LANGUAGE: {lang_name} | |
| REQUIREMENTS: | |
| - Write idiomatic {lang_name} code following best practices | |
| - Include appropriate comments and documentation | |
| - Use language-specific features and libraries | |
| - PRODUCTION-READY, COMPLETE content (NO placeholders) | |
| - Include error handling appropriate for {lang_name} | |
| Generate complete {lang_name} code:""" | |
| response = llm.invoke(enhanced_prompt) | |
| llm_text = getattr(response, "content", "") or "" | |
| # Extract code blocks with language detection | |
| from multi_language_support import extract_code_blocks_multi_lang | |
| code_blocks = extract_code_blocks_multi_lang(llm_text) | |
| if code_blocks: | |
| # Use the first detected language/code pair | |
| detected_lang, code_text = code_blocks[0] | |
| # Write script with proper extension | |
| script_path = write_script_multi_lang(code_text, detected_lang, out_dir=OUT_DIR) | |
| # Execute with appropriate runner | |
| exec_results = execute_code(code_text, detected_lang) | |
| results = { | |
| "success": exec_results.get("exit_code", 0) == 0, | |
| "paths": {"script": sanitize_path(script_path)}, | |
| "stdout": exec_results.get("stdout", ""), | |
| "stderr": exec_results.get("stderr", ""), | |
| "language": detected_lang, | |
| "context_used": len(full_context) | |
| } | |
| return { | |
| "experimentCode": code_text, | |
| "experimentResults": results, | |
| "execution_path": path, | |
| **add_status_update("Experimenter", f"{lang_name} script created") | |
| } | |
| def run_synthesis_agent(state: AgentState): | |
| log.info("--- SYNTHESIS ---") | |
| _state = state or {} | |
| path = ensure_list(_state, 'execution_path') + ["Synthesis"] | |
| exp_results = _state.get('experimentResults') | |
| pm_plan = _state.get('pmPlan', {}) or {} | |
| synthesis_context = [ | |
| f"=== USER REQUEST ===\n{_state.get('userInput', '')}", | |
| f"\n=== OBJECTIVE ===\n{_state.get('coreObjectivePrompt', '')}", | |
| f"\n=== PLAN ===\n{json.dumps(pm_plan.get('plan_steps', []), indent=2)}", | |
| ] | |
| artifact_details = [] | |
| artifact_message = "" | |
| if exp_results and isinstance(exp_results, dict): | |
| paths = exp_results.get("paths") or {} | |
| if paths: | |
| artifact_lines = [] | |
| for artifact_type, artifact_path in paths.items(): | |
| artifact_lines.append(f"- **{artifact_type.title()}**: `{os.path.basename(artifact_path)}`") | |
| artifact_details.append(f"{artifact_type}: {artifact_path}") | |
| artifact_message = "\n\n**Artifacts Generated:**\n" + "\n".join(artifact_lines) | |
| synthesis_context.append(f"\n=== ARTIFACTS ===\n" + "\n".join(artifact_details)) | |
| if exp_results.get('stdout'): | |
| synthesis_context.append(f"\n=== OUTPUT ===\n{exp_results.get('stdout', '')}") | |
| if exp_results.get('stderr'): | |
| synthesis_context.append(f"\n=== ERRORS ===\n{exp_results.get('stderr', '')}") | |
| full_context = "\n".join(synthesis_context) | |
| synthesis_prompt = f"""Create FINAL RESPONSE after executing user's request. | |
| {full_context} | |
| Create comprehensive response that: | |
| - Directly addresses original request | |
| - Explains what was accomplished and HOW | |
| - References specific artifacts and explains PURPOSE | |
| - Provides context on how to USE deliverables | |
| - Highlights KEY INSIGHTS | |
| - Suggests NEXT STEPS if relevant | |
| - Be SPECIFIC about what was created.""" | |
| response = llm.invoke(synthesis_prompt) | |
| final_text = getattr(response, "content", "") or "" | |
| if artifact_message: | |
| final_text = final_text + "\n\n---\n" + artifact_message | |
| return { | |
| "draftResponse": final_text, | |
| "execution_path": path, | |
| **add_status_update("Synthesis", "Response synthesized") | |
| } | |
| def run_qa_agent(state: AgentState): | |
| log.info("--- QA ---") | |
| path = ensure_list(state, 'execution_path') + ["QA"] | |
| qa_context = [ | |
| f"=== REQUEST ===\n{state.get('userInput', '')}", | |
| f"\n=== OBJECTIVE ===\n{state.get('coreObjectivePrompt', '')}", | |
| f"\n=== DRAFT ===\n{state.get('draftResponse', '')}", | |
| ] | |
| if state.get('experimentResults'): | |
| qa_context.append(f"\n=== ARTIFACTS ===\n{json.dumps(state.get('experimentResults', {}).get('paths', {}), indent=2)}") | |
| prompt = f"""You are a QA reviewer. Review the draft response against the user's objective. | |
| {chr(10).join(qa_context)} | |
| Review Instructions: | |
| - Does the draft and its artifacts COMPLETELY satisfy ALL parts of the user's request? | |
| - Is the quality of the work high? | |
| - If this is a re-submission (rework cycle > 1), has the previous feedback been successfully addressed? | |
| Response Format (required JSON or a single word 'APPROVED'): | |
| Either return EXACTLY the single word: | |
| APPROVED | |
| Or return JSON like: | |
| {{ | |
| "approved": false, | |
| "feedback": "Specific, actionable items to fix (bullet list or numbered).", | |
| "required_changes": ["..."] | |
| }} | |
| """ | |
| try: | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "") or "" | |
| except Exception as e: | |
| log.exception("QA LLM call failed: %s", e) | |
| return { | |
| "approved": False, | |
| "qaFeedback": "QA LLM failed; manual review required.", | |
| "execution_path": path, | |
| **add_status_update("QA", "QA failed") | |
| } | |
| # If LLM returned APPROVED word, treat as approved | |
| if "APPROVED" in content.strip().upper() and len(content.strip()) <= 20: | |
| return { | |
| "approved": True, | |
| "qaFeedback": None, | |
| "execution_path": path, | |
| **add_status_update("QA", "Approved") | |
| } | |
| # Else try JSON parse | |
| parsed = parse_json_from_llm(content) | |
| if isinstance(parsed, dict): | |
| approved = bool(parsed.get("approved", False)) | |
| feedback = parsed.get("feedback") or parsed.get("qaFeedback") or parsed.get("required_changes") or "" | |
| # Normalize feedback to string | |
| if isinstance(feedback, list): | |
| feedback = "\n".join([str(x) for x in feedback]) | |
| elif not isinstance(feedback, str): | |
| feedback = str(feedback) | |
| return { | |
| "approved": approved, | |
| "qaFeedback": feedback if not approved else None, | |
| "execution_path": path, | |
| **add_status_update("QA", "QA completed") | |
| } | |
| # Fallback: return raw text as feedback (not approved) | |
| safe_feedback = content.strip()[:2000] or "QA produced no actionable output." | |
| return { | |
| "approved": False, | |
| "qaFeedback": safe_feedback, | |
| "execution_path": path, | |
| **add_status_update("QA", "QA needs rework") | |
| } | |
| def run_archivist_agent(state: AgentState): | |
| log.info("--- ARCHIVIST ---") | |
| path = ensure_list(state, 'execution_path') + ["Archivist"] | |
| summary_prompt = f"Summarize for memory.\n\nObjective: {state.get('coreObjectivePrompt')}\n\nResponse: {state.get('draftResponse')}\n\nSummary:" | |
| response = llm.invoke(summary_prompt) | |
| memory_manager.add_to_memory(getattr(response,"content",""), {"objective": state.get('coreObjectivePrompt')}) | |
| return { | |
| "execution_path": path, | |
| **add_status_update("Archivist", "Saved to memory") | |
| } | |
| def run_disclaimer_agent(state: AgentState): | |
| log.warning("--- DISCLAIMER ---") | |
| path = ensure_list(state, 'execution_path') + ["Disclaimer"] | |
| reason = "Budget limit reached." if state.get('budget_exceeded') else "Rework limit reached." | |
| disclaimer = f"**DISCLAIMER: {reason} Draft may be incomplete.**\n\n---\n\n" | |
| final_response = disclaimer + state.get('draftResponse', "No response") | |
| return { | |
| "draftResponse": final_response, | |
| "execution_path": path, | |
| **add_status_update("Disclaimer", reason) | |
| } | |
| def should_continue(state: AgentState): | |
| # Budget check first | |
| if state.get("budget_exceeded"): | |
| return "disclaimer_agent" | |
| try: | |
| rework = int(state.get("rework_cycles", 0)) | |
| max_loops_allowed = int(state.get("max_loops", 0)) | |
| except Exception: | |
| rework = state.get("rework_cycles", 0) or 0 | |
| max_loops_allowed = state.get("max_loops", 0) or 0 | |
| # If approved -> archive | |
| if state.get("approved"): | |
| return "archivist_agent" | |
| # If we have exceeded allowed reworks -> disclaimer | |
| if rework > max_loops_allowed: | |
| return "disclaimer_agent" | |
| # Default: return pm_agent so planner will create next plan | |
| return "pm_agent" | |
| def should_run_experiment(state: AgentState): | |
| pm = state.get('pmPlan', {}) or {} | |
| return "experimenter_agent" if pm.get('experiment_needed') else "synthesis_agent" | |
| #--- Build graphs --- | |
| triage_workflow = StateGraph(AgentState) | |
| triage_workflow.add_node("triage", run_triage_agent) | |
| triage_workflow.set_entry_point("triage") | |
| triage_workflow.add_edge("triage", END) | |
| triage_app = triage_workflow.compile() | |
| planner_workflow = StateGraph(AgentState) | |
| planner_workflow.add_node("planner", run_planner_agent) | |
| planner_workflow.set_entry_point("planner") | |
| planner_workflow.add_edge("planner", END) | |
| planner_app = planner_workflow.compile() | |
| main_workflow = StateGraph(AgentState) | |
| main_workflow.add_node("memory_retriever", run_memory_retrieval) | |
| main_workflow.add_node("intent_agent", run_intent_agent) | |
| main_workflow.add_node("pm_agent", run_pm_agent) | |
| main_workflow.add_node("experimenter_agent", run_experimenter_agent) | |
| main_workflow.add_node("synthesis_agent", run_synthesis_agent) | |
| main_workflow.add_node("qa_agent", run_qa_agent) | |
| main_workflow.add_node("archivist_agent", run_archivist_agent) | |
| main_workflow.add_node("disclaimer_agent", run_disclaimer_agent) | |
| main_workflow.set_entry_point("memory_retriever") | |
| main_workflow.add_edge("memory_retriever", "intent_agent") | |
| main_workflow.add_edge("intent_agent", "pm_agent") | |
| main_workflow.add_edge("experimenter_agent", "synthesis_agent") | |
| main_workflow.add_edge("synthesis_agent", "qa_agent") | |
| main_workflow.add_edge("archivist_agent", END) | |
| main_workflow.add_edge("disclaimer_agent", END) | |
| main_workflow.add_conditional_edges("pm_agent", should_run_experiment) | |
| main_workflow.add_conditional_edges("qa_agent", should_continue, { | |
| "archivist_agent": "archivist_agent", | |
| "pm_agent": "pm_agent", | |
| "disclaimer_agent": "disclaimer_agent" | |
| }) | |
| main_app = main_workflow.compile() |