# revised_graph_upgraded.py # Updated agents with tiered pragmatist and flexible governance # Save as revised_graph_upgraded.py and replace your graph_upgraded.py as needed. import os import re import json import logging from datetime import datetime from typing import Optional, Dict, Any # Import base graph & helpers import graph as base_graph from graph import AgentState, ensure_list, ensure_int # add_status_update may not exist in older graph.py — provide a fallback try: from graph import add_status_update # type: ignore except Exception: def add_status_update(node_name: str, message: str) -> Dict[str, Any]: """ Fallback: return a node-scoped status key so agents don't collide on 'status_update'. Example: add_status_update('Pragmatist','done') => {'node_status__Pragmatist': 'done'} """ return {f"node_status__{node_name}": message} from memory_manager import memory_manager from logging_config import get_logger log = get_logger(__name__) llm = getattr(base_graph, "llm", None) # --- Utility helpers --- def simple_cost_feasibility_check(pm_plan: Dict[str, Any]) -> Dict[str, Any]: """ Heuristic cost/complexity check for a PM plan. """ report = {"ok": True, "notes": []} try: raw = pm_plan.get("estimated_cost_usd", 0) est_cost = float(raw or 0) except Exception: est_cost = None exp_type = pm_plan.get("experiment_type", "word") if est_cost is None or est_cost == 0: report["notes"].append("No reliable estimated_cost_usd provided.") report["ok"] = False else: if est_cost > 500: report["notes"].append(f"High estimated cost: ${est_cost}. Governance advised.") report["ok"] = False elif est_cost > 200: report["notes"].append(f"Moderately high estimated cost: ${est_cost}. Consider simplifications.") if exp_type in ("repo", "notebook", "script"): report["notes"].append(f"Artifact type '{exp_type}' indicates engineering-heavy work.") return report # --- Pragmatist (flexible, produces tiered options) --- # Complete replacement section for graph_upgraded.py # Find these functions in your file and replace them entirely # --- Pragmatist (improved - around line 60 in original) --- def run_pragmatist_agent(state: AgentState) -> Dict[str, Any]: """ Pragmatist Agent (improved): - More nuanced risk assessment - Doesn't block on complexity alone - Provides constructive guidance """ log.info(">>> PRAGMATIST AGENT (improved)") path = ensure_list(state, "execution_path") + ["Pragmatist"] pm = state.get("pmPlan", {}) or {} # Parse estimated cost est_cost = None try: raw = pm.get("estimated_cost_usd", None) if raw is not None and raw != "": est_cost = float(raw) except Exception: try: s = str(raw) m = re.search(r"[\d,.]+", s) if m: est_cost = float(m.group(0).replace(",", "")) except Exception: est_cost = None exp_type = pm.get("experiment_type", "word") base_est = est_cost or (50.0 if exp_type in ["script", "repo"] else 5.0) # Tier definitions tiers = { "lite": { "multiplier": 0.25, "estimated_cost_usd": round(base_est * 0.25, 2), "description": "Minimal extract (CSV/text) or short summary. Minimal engineering." }, "standard": { "multiplier": 1.0, "estimated_cost_usd": round(base_est * 1.0, 2), "description": "Complete, tested script or notebook; limited UX – suitable for MVP." }, "full": { "multiplier": 3.0, "estimated_cost_usd": round(base_est * 3.0, 2), "description": "Production-ready repo, packaging, tests, and deployment instructions." } } preferred = state.get("preferred_tier") flexible_mode = bool(state.get("flexible_budget_mode", False)) # IMPROVED: More intelligent risk assessment risk_factors = [] risk_score = 0 # Check complexity plan_steps = pm.get("plan_steps", []) if len(plan_steps) > 8: risk_factors.append("Complex multi-step plan") risk_score += 1 # Check artifact type if exp_type in ("repo", "notebook"): risk_factors.append(f"Engineering-heavy artifact type: {exp_type}") risk_score += 1 # Check cost if est_cost is None: risk_factors.append("No cost estimate provided") risk_score += 1 elif est_cost > 200: risk_factors.append(f"High estimated cost: ${est_cost}") risk_score += 2 elif est_cost > 100: risk_factors.append(f"Moderate estimated cost: ${est_cost}") risk_score += 1 # IMPROVED: Risk is relative to user's flexibility if flexible_mode: risk_score = max(0, risk_score - 2) # Reduce risk if user is flexible # Calculate risk level if risk_score <= 1: risk = "low" elif risk_score <= 3: risk = "medium" else: risk = "high" # IMPROVED: Don't mark as "not ok" unless truly blocked feasible = True if risk_score > 4 and not flexible_mode: feasible = False # Recommend tier based on context if preferred in tiers: recommended_tier = preferred elif est_cost is None: recommended_tier = "standard" elif est_cost > 500 and not flexible_mode: recommended_tier = "lite" else: recommended_tier = "standard" prag_report = { "ok": feasible, "risk_factors": risk_factors, "risk_level": risk, "risk_score": risk_score, "tier_options": tiers, "recommended_tier": recommended_tier, "explain": ( f"Assessed {len(risk_factors)} risk factor(s). " f"Risk level: {risk}. Recommended tier: {recommended_tier}. " "User can proceed with any tier; higher tiers provide more complete deliverables." ) } # Optional LLM recommendations for high complexity (not high risk) if len(plan_steps) > 7 and llm: try: prompt = ( "You are a pragmatic engineering advisor. Given this plan, suggest 2-3 ways to " "optimize implementation while preserving core value. Be specific and actionable. " "Return JSON {\"optimizations\": [...]}.\n\n" f"Plan: {json.dumps(pm, indent=2)}" ) r = llm.invoke(prompt) parsed = getattr(base_graph, "parse_json_from_llm", None) recs = None if callable(parsed): recs = parsed(getattr(r, "content", "") or "") if isinstance(recs, dict): prag_report["optimizations"] = recs.get("optimizations", []) except Exception as e: log.debug(f"LLM optimizations failed: {e}") out = {"pragmatistReport": prag_report, "execution_path": path} out.update(add_status_update("Pragmatist", f"Risk: {risk}, Tier: {recommended_tier}")) return out # --- Governance (flexible decisions) --- def run_governance_agent(state: AgentState) -> Dict[str, Any]: """ Governance Agent (improved): - Respects user's explicit choices - Only rejects on genuine blockers - Provides clear reasoning """ log.info(">>> GOVERNANCE AGENT (improved)") path = ensure_list(state, "execution_path") + ["Governance"] pm = state.get("pmPlan", {}) or {} prag = state.get("pragmatistReport", {}) or {} preferred = state.get("preferred_tier") or prag.get("recommended_tier") or "standard" tier_opts = prag.get("tier_options", {}) chosen = tier_opts.get(preferred, tier_opts.get("standard", {})) try: chosen_cost = float(chosen.get("estimated_cost_usd", 0.0)) except Exception: chosen_cost = 0.0 flexible = bool(state.get("flexible_budget_mode", False)) allow_escalate = bool(state.get("allow_escalation", False)) auto_accept_warn = bool(state.get("auto_accept_approved_with_warning", False)) # IMPROVED: Start with approved unless there's a blocker decision = "approve" issues = [] # Check budget budget = state.get("current_budget") or state.get("budget") or None if budget: try: budget_f = float(budget) if chosen_cost > budget_f: # IMPROVED: If user explicitly chose this tier with flexible mode, warn but don't block if flexible and preferred: issues.append( f"Chosen tier (${chosen_cost}) exceeds budget (${budget_f}), " f"but user enabled flexible budget mode." ) decision = "approve_with_warning" if not auto_accept_warn else "approve" elif flexible: issues.append(f"Cost ${chosen_cost} exceeds budget ${budget_f}.") decision = "approve_with_warning" if not auto_accept_warn else "approve" else: # Only reject if inflexible AND significantly over budget if chosen_cost > budget_f * 2: issues.append( f"Cost ${chosen_cost} is 2x over budget ${budget_f}. " f"Enable flexible budget or reduce scope." ) decision = "reject" else: issues.append(f"Cost ${chosen_cost} exceeds budget ${budget_f}.") decision = "require_escalation" if allow_escalate else "approve_with_warning" except Exception as e: issues.append(f"Could not parse budget: {e}") decision = "approve_with_warning" # IMPROVED: Check pragmatist risk more intelligently risk_level = prag.get("risk_level") risk_score = prag.get("risk_score", 0) if risk_level == "high" and risk_score > 4: # IMPROVED: Only escalate/warn on genuinely high risk, not just complexity if not prag.get("ok", True): issues.append("Pragmatist identified blocking concerns.") if allow_escalate: decision = "require_escalation" else: decision = "approve_with_warning" if flexible else "reject" else: # High risk but feasible - just warn issues.append( f"Complex request with {len(prag.get('risk_factors', []))} risk factors. " f"Proceeding with caution." ) if decision == "approve": decision = "approve_with_warning" # IMPROVED: Check for genuine blockers experiment_type = pm.get("experiment_type") plan_steps = pm.get("plan_steps", []) # Check for problematic content request_text = (state.get("userInput", "") + " " + state.get("coreObjectivePrompt", "")).lower() blockers = [] if "scrape" in request_text and "million" in request_text: # Large-scale scraping - legal concern but not blocking if properly addressed if not any("legal" in str(step).lower() or "compliance" in str(step).lower() for step in plan_steps): issues.append( "Large-scale web scraping requires legal compliance consideration. " "Ensure plan addresses terms of service and data protection." ) # Don't block - the plan can address this # Check for missing critical components if experiment_type in ["repo", "script"] and not plan_steps: blockers.append("No implementation plan provided for engineering task.") # IMPROVED: Only reject on genuine blockers if blockers: issues.extend(blockers) decision = "reject" approved_bool = decision in ("approve", "approve_with_warning") # LLM rationale (optional, informative) rationale = None if llm and decision in ("require_escalation", "approve_with_warning", "reject"): try: prompt = ( "You are a governance advisor. Provide a 2-3 sentence rationale for this decision " "and list top 2 risks to monitor.\n\n" f"Decision: {decision}\n" f"Request: {state.get('userInput', '')[:200]}\n" f"Tier: {preferred} (${chosen_cost})\n" f"Budget: {budget}\n" f"Risk level: {risk_level}\n\n" "Be concise and actionable." ) r = llm.invoke(prompt) rationale = (getattr(r, "content", "") or "")[:800] except Exception as e: log.debug(f"Rationale generation failed: {e}") gov_report = { "budget_ok": approved_bool, "issues": issues, "approved_for_experiment": approved_bool, "governanceDecision": decision, "chosen_tier": preferred, "chosen_cost_usd": chosen_cost, "rationale": rationale, "reasoning": ( f"Decision: {decision}. " f"Risk: {risk_level}. " f"User mode: {'flexible' if flexible else 'standard'}. " f"{len(issues)} issue(s) noted." ) } status_msg = { "approve": f"Approved {preferred} tier (${chosen_cost})", "approve_with_warning": f"Approved with warnings: {preferred} tier (${chosen_cost})", "require_escalation": "Manual approval required", "reject": "Request rejected - blocking issues found" }.get(decision, decision) out = {"governanceReport": gov_report, "execution_path": path} out.update(add_status_update("Governance", status_msg)) return out # --- Compliance (keeps namespaced node_status__) --- def scan_text_for_secrets(text: str) -> Dict[str, Any]: findings = [] if not text: return {"suspicious": False, "findings": findings} patterns = [ r"AKIA[0-9A-Z]{16}", r"-----BEGIN PRIVATE KEY-----", r"AIza[0-9A-Za-z-_]{35}", r"(?i)secret[_-]?(key|token)\b", r"(?i)password\s*[:=]\s*['\"][^'\"]{6,}['\"]" ] for p in patterns: for m in re.finditer(p, text): findings.append({"pattern": p, "match": m.group(0)}) return {"suspicious": len(findings) > 0, "findings": findings} def run_compliance_agent(state: AgentState) -> Dict[str, Any]: log.info(">>> COMPLIANCE AGENT") path = ensure_list(state, "execution_path") + ["Compliance"] exp = state.get("experimentResults", {}) or {} report = {"suspicious": False, "issues": [], "scanned": []} for key in ("stdout", "stderr"): val = exp.get(key) if isinstance(val, str) and val.strip(): scan = scan_text_for_secrets(val) if scan.get("suspicious"): report["suspicious"] = True report["issues"].append({"type": "text_secret", "where": key, "findings": scan["findings"]}) report["scanned"].append({"type": "text", "where": key}) if isinstance(exp, dict) and "paths" in exp: paths = exp.get("paths") or {} if isinstance(paths, dict): for k, p in paths.items(): try: pstr = str(p) if os.path.exists(pstr) and os.path.isfile(pstr): with open(pstr, "r", encoding="utf-8", errors="ignore") as fh: sample = fh.read(20000) scan = scan_text_for_secrets(sample) if scan.get("suspicious"): report["suspicious"] = True report["issues"].append({"type": "file_secret", "file": pstr, "findings": scan["findings"]}) report["scanned"].append({"type": "file", "file": pstr}) else: report["scanned"].append({"type": "path", "value": pstr, "exists": os.path.exists(pstr)}) except Exception as e: report["scanned"].append({"file": p, "error": str(e)}) if any(str(v).lower().endswith(".zip") for v in (paths.values() if isinstance(paths, dict) else [])): report.setdefault("notes", []).append("Zip-based or repo artifact detected — recommend manual review.") out = {"complianceReport": report, "execution_path": path} out.update(add_status_update("Compliance", "Compliance checks complete")) return out # --- Observer --- def summarize_logs_for_observer(log_paths: Optional[list] = None, sample_lines: int = 200) -> str: if not log_paths: candidates = ["logs/performance.log", "logs/ai_lab.log", "performance.log"] log_paths = [p for p in candidates if os.path.exists(p)] parts = [] errs = 0 warns = 0 for p in log_paths: try: with open(p, "r", encoding="utf-8", errors="ignore") as fh: lines = fh.readlines()[-sample_lines:] content = "".join(lines) errs += content.upper().count("ERROR") warns += content.upper().count("WARNING") parts.append(f"--- {p} (last {len(lines)} lines) ---\n{content[:2000]}") except Exception as e: parts.append(f"Could not read {p}: {e}") header = f"Log summary: {errs} ERROR(s), {warns} WARNING(s)" return header + "\n\n" + "\n\n".join(parts) def run_observer_agent(state: AgentState) -> Dict[str, Any]: log.info(">>> OBSERVER AGENT") path = ensure_list(state, "execution_path") + ["Observer"] log_candidates = [] for candidate in ["logs/performance.log", "logs/ai_lab.log", "performance.log"]: if os.path.exists(candidate): log_candidates.append(candidate) summary = summarize_logs_for_observer(log_candidates or None) exec_len = len(state.get("execution_path", []) or []) rework_cycles = ensure_int(state, "rework_cycles", 0) current_cost = state.get("current_cost", 0.0) obs = { "log_summary": summary[:4000], "execution_length": exec_len, "rework_cycles": rework_cycles, "current_cost": current_cost, "status": state.get("status_update") } if llm: try: prompt = ( "You are an Observer assistant. Given this runtime summary, provide 3 prioritized next actions to mitigate the top risks.\n\n" f"Runtime summary: {json.dumps(obs, indent=2)}\n\nReturn plain text." ) r = llm.invoke(prompt) obs["llm_recommendations"] = getattr(r, "content", "")[:1500] except Exception as e: obs["llm_recommendations_error"] = str(e) out = {"observerReport": obs, "execution_path": path} out.update(add_status_update("Observer", "Observer summary created")) return out # --- Knowledge Curator --- def run_knowledge_curator_agent(state: AgentState) -> Dict[str, Any]: log.info(">>> KNOWLEDGE CURATOR AGENT") path = ensure_list(state, "execution_path") + ["KnowledgeCurator"] core = state.get("coreObjectivePrompt", "") or state.get("userInput", "") pm = state.get("pmPlan", {}) or {} draft = state.get("draftResponse", "") or "" qa_feedback = state.get("qaFeedback", "") or "" summary_text = ( f"Objective: {core}\n\n" f"Plan Steps: {json.dumps(pm.get('plan_steps', []))}\n\n" f"Draft (first 1500 chars): {draft[:1500]}\n\n" f"QA Feedback: {qa_feedback[:1000]}" ) try: memory_manager.add_to_memory(summary_text, {"source": "knowledge_curator", "timestamp": datetime.utcnow().isoformat()}) insights = {"added": True, "summary_snippet": summary_text[:500]} except Exception as e: insights = {"added": False, "error": str(e)} out = {"knowledgeInsights": insights, "execution_path": path} out.update(add_status_update("KnowledgeCurator", "Knowledge captured")) return out # --- Wiring / injection into existing main_workflow --- def apply_upgrades(): """ Rebuild the main workflow graph with upgraded routing. CRITICAL: Creates a NEW graph instead of modifying the compiled one. """ log.info("Applying graph upgrades (rebuilding graph with proper routing)") try: from langgraph.graph import StateGraph, END # Import the missing agent functions from base graph from graph import ( run_memory_retrieval, run_intent_agent, run_pm_agent, run_experimenter_agent, run_synthesis_agent, run_qa_agent, run_archivist_agent, run_disclaimer_agent, should_continue, should_run_experiment ) # Create BRAND NEW graph new_workflow = StateGraph(AgentState) # Add all nodes (using imported functions and local upgraded ones) new_workflow.add_node("memory_retriever", run_memory_retrieval) new_workflow.add_node("intent_agent", run_intent_agent) new_workflow.add_node("pm_agent", run_pm_agent) new_workflow.add_node("pragmatist_agent", run_pragmatist_agent) # Local upgraded new_workflow.add_node("governance_agent", run_governance_agent) # Local upgraded new_workflow.add_node("experimenter_agent", run_experimenter_agent) new_workflow.add_node("compliance_agent", run_compliance_agent) # Local upgraded new_workflow.add_node("synthesis_agent", run_synthesis_agent) new_workflow.add_node("qa_agent", run_qa_agent) new_workflow.add_node("observer_agent", run_observer_agent) # Local upgraded new_workflow.add_node("archivist_agent", run_archivist_agent) new_workflow.add_node("knowledge_curator_agent", run_knowledge_curator_agent) # Local upgraded new_workflow.add_node("disclaimer_agent", run_disclaimer_agent) log.info("✅ All nodes added to new graph") # Set entry point new_workflow.set_entry_point("memory_retriever") # Standard flow: Memory → Intent → PM new_workflow.add_edge("memory_retriever", "intent_agent") new_workflow.add_edge("intent_agent", "pm_agent") # NEW ROUTING: PM → Pragmatist → Governance new_workflow.add_edge("pm_agent", "pragmatist_agent") new_workflow.add_edge("pragmatist_agent", "governance_agent") log.info("✅ New routing added: PM → Pragmatist → Governance") # Governance conditional: approved → Experimenter, rejected → PM def governance_decider(state: AgentState): """Decide next step based on governance decision""" gov = state.get("governanceReport", {}) or {} decision = gov.get("governanceDecision", "approve") approved = gov.get("approved_for_experiment", True) log.info(f"Governance decision: {decision}, approved: {approved}") if approved and decision in ("approve", "approve_with_warning"): return "experimenter_agent" else: # Rejected or requires escalation - loop back to PM return "pm_agent" new_workflow.add_conditional_edges( "governance_agent", governance_decider, { "experimenter_agent": "experimenter_agent", "pm_agent": "pm_agent" } ) log.info("✅ Governance conditional routing added") # Continue standard flow: Experimenter → Compliance → Synthesis → QA new_workflow.add_edge("experimenter_agent", "compliance_agent") new_workflow.add_edge("compliance_agent", "synthesis_agent") new_workflow.add_edge("synthesis_agent", "qa_agent") # QA conditional routing (use imported should_continue) new_workflow.add_conditional_edges( "qa_agent", should_continue, { "observer_agent": "observer_agent", "pm_agent": "pm_agent", "disclaimer_agent": "disclaimer_agent" } ) log.info("✅ QA conditional routing added") # Final success path: Observer → Archivist → Knowledge Curator → END new_workflow.add_edge("observer_agent", "archivist_agent") new_workflow.add_edge("archivist_agent", "knowledge_curator_agent") new_workflow.add_edge("knowledge_curator_agent", END) # Disclaimer path (failure/limit reached) new_workflow.add_edge("disclaimer_agent", END) log.info("✅ Final flow edges added") # CRITICAL: Compile NEW graph and REPLACE old one base_graph.main_app = new_workflow.compile() base_graph.main_workflow = new_workflow # Also update workflow reference log.info("=" * 60) log.info("✅ GRAPH REBUILD SUCCESSFUL") log.info("=" * 60) log.info("New flow: Memory → Intent → PM → Pragmatist → Governance") log.info(" → Experimenter → Compliance → Synthesis → QA") log.info(" → Observer → Archivist → Knowledge Curator → END") log.info("=" * 60) return True except Exception as e: log.exception(f"❌ Failed to rebuild graph: {e}") return False