Spaces:
Paused
Paused
| # revised_graph_upgraded.py | |
| # Updated agents with tiered pragmatist and flexible governance | |
| # Save as revised_graph_upgraded.py and replace your graph_upgraded.py as needed. | |
| import os | |
| import re | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any | |
| # Import base graph & helpers | |
| import graph as base_graph | |
| from graph import AgentState, ensure_list, ensure_int | |
| # add_status_update may not exist in older graph.py β provide a fallback | |
| try: | |
| from graph import add_status_update # type: ignore | |
| except Exception: | |
| def add_status_update(node_name: str, message: str) -> Dict[str, Any]: | |
| """ | |
| Fallback: return a node-scoped status key so agents don't collide on 'status_update'. | |
| Example: add_status_update('Pragmatist','done') => {'node_status__Pragmatist': 'done'} | |
| """ | |
| return {f"node_status__{node_name}": message} | |
| from memory_manager import memory_manager | |
| from logging_config import get_logger | |
| log = get_logger(__name__) | |
| llm = getattr(base_graph, "llm", None) | |
| # --- Utility helpers --- | |
| def simple_cost_feasibility_check(pm_plan: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Heuristic cost/complexity check for a PM plan. | |
| """ | |
| report = {"ok": True, "notes": []} | |
| try: | |
| raw = pm_plan.get("estimated_cost_usd", 0) | |
| est_cost = float(raw or 0) | |
| except Exception: | |
| est_cost = None | |
| exp_type = pm_plan.get("experiment_type", "word") | |
| if est_cost is None or est_cost == 0: | |
| report["notes"].append("No reliable estimated_cost_usd provided.") | |
| report["ok"] = False | |
| else: | |
| if est_cost > 500: | |
| report["notes"].append(f"High estimated cost: ${est_cost}. Governance advised.") | |
| report["ok"] = False | |
| elif est_cost > 200: | |
| report["notes"].append(f"Moderately high estimated cost: ${est_cost}. Consider simplifications.") | |
| if exp_type in ("repo", "notebook", "script"): | |
| report["notes"].append(f"Artifact type '{exp_type}' indicates engineering-heavy work.") | |
| return report | |
| # --- Pragmatist (flexible, produces tiered options) --- | |
| # Complete replacement section for graph_upgraded.py | |
| # Find these functions in your file and replace them entirely | |
| # --- Pragmatist (improved - around line 60 in original) --- | |
| def run_pragmatist_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| Pragmatist Agent (improved): | |
| - More nuanced risk assessment | |
| - Doesn't block on complexity alone | |
| - Provides constructive guidance | |
| """ | |
| log.info(">>> PRAGMATIST AGENT (improved)") | |
| path = ensure_list(state, "execution_path") + ["Pragmatist"] | |
| pm = state.get("pmPlan", {}) or {} | |
| # Parse estimated cost | |
| est_cost = None | |
| try: | |
| raw = pm.get("estimated_cost_usd", None) | |
| if raw is not None and raw != "": | |
| est_cost = float(raw) | |
| except Exception: | |
| try: | |
| s = str(raw) | |
| m = re.search(r"[\d,.]+", s) | |
| if m: | |
| est_cost = float(m.group(0).replace(",", "")) | |
| except Exception: | |
| est_cost = None | |
| exp_type = pm.get("experiment_type", "word") | |
| base_est = est_cost or (50.0 if exp_type in ["script", "repo"] else 5.0) | |
| # Tier definitions | |
| tiers = { | |
| "lite": { | |
| "multiplier": 0.25, | |
| "estimated_cost_usd": round(base_est * 0.25, 2), | |
| "description": "Minimal extract (CSV/text) or short summary. Minimal engineering." | |
| }, | |
| "standard": { | |
| "multiplier": 1.0, | |
| "estimated_cost_usd": round(base_est * 1.0, 2), | |
| "description": "Complete, tested script or notebook; limited UX β suitable for MVP." | |
| }, | |
| "full": { | |
| "multiplier": 3.0, | |
| "estimated_cost_usd": round(base_est * 3.0, 2), | |
| "description": "Production-ready repo, packaging, tests, and deployment instructions." | |
| } | |
| } | |
| preferred = state.get("preferred_tier") | |
| flexible_mode = bool(state.get("flexible_budget_mode", False)) | |
| # IMPROVED: More intelligent risk assessment | |
| risk_factors = [] | |
| risk_score = 0 | |
| # Check complexity | |
| plan_steps = pm.get("plan_steps", []) | |
| if len(plan_steps) > 8: | |
| risk_factors.append("Complex multi-step plan") | |
| risk_score += 1 | |
| # Check artifact type | |
| if exp_type in ("repo", "notebook"): | |
| risk_factors.append(f"Engineering-heavy artifact type: {exp_type}") | |
| risk_score += 1 | |
| # Check cost | |
| if est_cost is None: | |
| risk_factors.append("No cost estimate provided") | |
| risk_score += 1 | |
| elif est_cost > 200: | |
| risk_factors.append(f"High estimated cost: ${est_cost}") | |
| risk_score += 2 | |
| elif est_cost > 100: | |
| risk_factors.append(f"Moderate estimated cost: ${est_cost}") | |
| risk_score += 1 | |
| # IMPROVED: Risk is relative to user's flexibility | |
| if flexible_mode: | |
| risk_score = max(0, risk_score - 2) # Reduce risk if user is flexible | |
| # Calculate risk level | |
| if risk_score <= 1: | |
| risk = "low" | |
| elif risk_score <= 3: | |
| risk = "medium" | |
| else: | |
| risk = "high" | |
| # IMPROVED: Don't mark as "not ok" unless truly blocked | |
| feasible = True | |
| if risk_score > 4 and not flexible_mode: | |
| feasible = False | |
| # Recommend tier based on context | |
| if preferred in tiers: | |
| recommended_tier = preferred | |
| elif est_cost is None: | |
| recommended_tier = "standard" | |
| elif est_cost > 500 and not flexible_mode: | |
| recommended_tier = "lite" | |
| else: | |
| recommended_tier = "standard" | |
| prag_report = { | |
| "ok": feasible, | |
| "risk_factors": risk_factors, | |
| "risk_level": risk, | |
| "risk_score": risk_score, | |
| "tier_options": tiers, | |
| "recommended_tier": recommended_tier, | |
| "explain": ( | |
| f"Assessed {len(risk_factors)} risk factor(s). " | |
| f"Risk level: {risk}. Recommended tier: {recommended_tier}. " | |
| "User can proceed with any tier; higher tiers provide more complete deliverables." | |
| ) | |
| } | |
| # Optional LLM recommendations for high complexity (not high risk) | |
| if len(plan_steps) > 7 and llm: | |
| try: | |
| prompt = ( | |
| "You are a pragmatic engineering advisor. Given this plan, suggest 2-3 ways to " | |
| "optimize implementation while preserving core value. Be specific and actionable. " | |
| "Return JSON {\"optimizations\": [...]}.\n\n" | |
| f"Plan: {json.dumps(pm, indent=2)}" | |
| ) | |
| r = llm.invoke(prompt) | |
| parsed = getattr(base_graph, "parse_json_from_llm", None) | |
| recs = None | |
| if callable(parsed): | |
| recs = parsed(getattr(r, "content", "") or "") | |
| if isinstance(recs, dict): | |
| prag_report["optimizations"] = recs.get("optimizations", []) | |
| except Exception as e: | |
| log.debug(f"LLM optimizations failed: {e}") | |
| out = {"pragmatistReport": prag_report, "execution_path": path} | |
| out.update(add_status_update("Pragmatist", f"Risk: {risk}, Tier: {recommended_tier}")) | |
| return out | |
| # --- Governance (flexible decisions) --- | |
| def run_governance_agent(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| Governance Agent (improved): | |
| - Respects user's explicit choices | |
| - Only rejects on genuine blockers | |
| - Provides clear reasoning | |
| """ | |
| log.info(">>> GOVERNANCE AGENT (improved)") | |
| path = ensure_list(state, "execution_path") + ["Governance"] | |
| pm = state.get("pmPlan", {}) or {} | |
| prag = state.get("pragmatistReport", {}) or {} | |
| preferred = state.get("preferred_tier") or prag.get("recommended_tier") or "standard" | |
| tier_opts = prag.get("tier_options", {}) | |
| chosen = tier_opts.get(preferred, tier_opts.get("standard", {})) | |
| try: | |
| chosen_cost = float(chosen.get("estimated_cost_usd", 0.0)) | |
| except Exception: | |
| chosen_cost = 0.0 | |
| flexible = bool(state.get("flexible_budget_mode", False)) | |
| allow_escalate = bool(state.get("allow_escalation", False)) | |
| auto_accept_warn = bool(state.get("auto_accept_approved_with_warning", False)) | |
| # IMPROVED: Start with approved unless there's a blocker | |
| decision = "approve" | |
| issues = [] | |
| # Check budget | |
| budget = state.get("current_budget") or state.get("budget") or None | |
| if budget: | |
| try: | |
| budget_f = float(budget) | |
| if chosen_cost > budget_f: | |
| # IMPROVED: If user explicitly chose this tier with flexible mode, warn but don't block | |
| if flexible and preferred: | |
| issues.append( | |
| f"Chosen tier (${chosen_cost}) exceeds budget (${budget_f}), " | |
| f"but user enabled flexible budget mode." | |
| ) | |
| decision = "approve_with_warning" if not auto_accept_warn else "approve" | |
| elif flexible: | |
| issues.append(f"Cost ${chosen_cost} exceeds budget ${budget_f}.") | |
| decision = "approve_with_warning" if not auto_accept_warn else "approve" | |
| else: | |
| # Only reject if inflexible AND significantly over budget | |
| if chosen_cost > budget_f * 2: | |
| issues.append( | |
| f"Cost ${chosen_cost} is 2x over budget ${budget_f}. " | |
| f"Enable flexible budget or reduce scope." | |
| ) | |
| decision = "reject" | |
| else: | |
| issues.append(f"Cost ${chosen_cost} exceeds budget ${budget_f}.") | |
| decision = "require_escalation" if allow_escalate else "approve_with_warning" | |
| except Exception as e: | |
| issues.append(f"Could not parse budget: {e}") | |
| decision = "approve_with_warning" | |
| # IMPROVED: Check pragmatist risk more intelligently | |
| risk_level = prag.get("risk_level") | |
| risk_score = prag.get("risk_score", 0) | |
| if risk_level == "high" and risk_score > 4: | |
| # IMPROVED: Only escalate/warn on genuinely high risk, not just complexity | |
| if not prag.get("ok", True): | |
| issues.append("Pragmatist identified blocking concerns.") | |
| if allow_escalate: | |
| decision = "require_escalation" | |
| else: | |
| decision = "approve_with_warning" if flexible else "reject" | |
| else: | |
| # High risk but feasible - just warn | |
| issues.append( | |
| f"Complex request with {len(prag.get('risk_factors', []))} risk factors. " | |
| f"Proceeding with caution." | |
| ) | |
| if decision == "approve": | |
| decision = "approve_with_warning" | |
| # IMPROVED: Check for genuine blockers | |
| experiment_type = pm.get("experiment_type") | |
| plan_steps = pm.get("plan_steps", []) | |
| # Check for problematic content | |
| request_text = (state.get("userInput", "") + " " + state.get("coreObjectivePrompt", "")).lower() | |
| blockers = [] | |
| if "scrape" in request_text and "million" in request_text: | |
| # Large-scale scraping - legal concern but not blocking if properly addressed | |
| if not any("legal" in str(step).lower() or "compliance" in str(step).lower() for step in plan_steps): | |
| issues.append( | |
| "Large-scale web scraping requires legal compliance consideration. " | |
| "Ensure plan addresses terms of service and data protection." | |
| ) | |
| # Don't block - the plan can address this | |
| # Check for missing critical components | |
| if experiment_type in ["repo", "script"] and not plan_steps: | |
| blockers.append("No implementation plan provided for engineering task.") | |
| # IMPROVED: Only reject on genuine blockers | |
| if blockers: | |
| issues.extend(blockers) | |
| decision = "reject" | |
| approved_bool = decision in ("approve", "approve_with_warning") | |
| # LLM rationale (optional, informative) | |
| rationale = None | |
| if llm and decision in ("require_escalation", "approve_with_warning", "reject"): | |
| try: | |
| prompt = ( | |
| "You are a governance advisor. Provide a 2-3 sentence rationale for this decision " | |
| "and list top 2 risks to monitor.\n\n" | |
| f"Decision: {decision}\n" | |
| f"Request: {state.get('userInput', '')[:200]}\n" | |
| f"Tier: {preferred} (${chosen_cost})\n" | |
| f"Budget: {budget}\n" | |
| f"Risk level: {risk_level}\n\n" | |
| "Be concise and actionable." | |
| ) | |
| r = llm.invoke(prompt) | |
| rationale = (getattr(r, "content", "") or "")[:800] | |
| except Exception as e: | |
| log.debug(f"Rationale generation failed: {e}") | |
| gov_report = { | |
| "budget_ok": approved_bool, | |
| "issues": issues, | |
| "approved_for_experiment": approved_bool, | |
| "governanceDecision": decision, | |
| "chosen_tier": preferred, | |
| "chosen_cost_usd": chosen_cost, | |
| "rationale": rationale, | |
| "reasoning": ( | |
| f"Decision: {decision}. " | |
| f"Risk: {risk_level}. " | |
| f"User mode: {'flexible' if flexible else 'standard'}. " | |
| f"{len(issues)} issue(s) noted." | |
| ) | |
| } | |
| status_msg = { | |
| "approve": f"Approved {preferred} tier (${chosen_cost})", | |
| "approve_with_warning": f"Approved with warnings: {preferred} tier (${chosen_cost})", | |
| "require_escalation": "Manual approval required", | |
| "reject": "Request rejected - blocking issues found" | |
| }.get(decision, decision) | |
| out = {"governanceReport": gov_report, "execution_path": path} | |
| out.update(add_status_update("Governance", status_msg)) | |
| return out | |
| # --- Compliance (keeps namespaced node_status__) --- | |
| def scan_text_for_secrets(text: str) -> Dict[str, Any]: | |
| findings = [] | |
| if not text: | |
| return {"suspicious": False, "findings": findings} | |
| patterns = [ | |
| r"AKIA[0-9A-Z]{16}", | |
| r"-----BEGIN PRIVATE KEY-----", | |
| r"AIza[0-9A-Za-z-_]{35}", | |
| r"(?i)secret[_-]?(key|token)\b", | |
| r"(?i)password\s*[:=]\s*['\"][^'\"]{6,}['\"]" | |
| ] | |
| for p in patterns: | |
| for m in re.finditer(p, text): | |
| findings.append({"pattern": p, "match": m.group(0)}) | |
| return {"suspicious": len(findings) > 0, "findings": findings} | |
| def run_compliance_agent(state: AgentState) -> Dict[str, Any]: | |
| log.info(">>> COMPLIANCE AGENT") | |
| path = ensure_list(state, "execution_path") + ["Compliance"] | |
| exp = state.get("experimentResults", {}) or {} | |
| report = {"suspicious": False, "issues": [], "scanned": []} | |
| for key in ("stdout", "stderr"): | |
| val = exp.get(key) | |
| if isinstance(val, str) and val.strip(): | |
| scan = scan_text_for_secrets(val) | |
| if scan.get("suspicious"): | |
| report["suspicious"] = True | |
| report["issues"].append({"type": "text_secret", "where": key, "findings": scan["findings"]}) | |
| report["scanned"].append({"type": "text", "where": key}) | |
| if isinstance(exp, dict) and "paths" in exp: | |
| paths = exp.get("paths") or {} | |
| if isinstance(paths, dict): | |
| for k, p in paths.items(): | |
| try: | |
| pstr = str(p) | |
| if os.path.exists(pstr) and os.path.isfile(pstr): | |
| with open(pstr, "r", encoding="utf-8", errors="ignore") as fh: | |
| sample = fh.read(20000) | |
| scan = scan_text_for_secrets(sample) | |
| if scan.get("suspicious"): | |
| report["suspicious"] = True | |
| report["issues"].append({"type": "file_secret", "file": pstr, "findings": scan["findings"]}) | |
| report["scanned"].append({"type": "file", "file": pstr}) | |
| else: | |
| report["scanned"].append({"type": "path", "value": pstr, "exists": os.path.exists(pstr)}) | |
| except Exception as e: | |
| report["scanned"].append({"file": p, "error": str(e)}) | |
| if any(str(v).lower().endswith(".zip") for v in (paths.values() if isinstance(paths, dict) else [])): | |
| report.setdefault("notes", []).append("Zip-based or repo artifact detected β recommend manual review.") | |
| out = {"complianceReport": report, "execution_path": path} | |
| out.update(add_status_update("Compliance", "Compliance checks complete")) | |
| return out | |
| # --- Observer --- | |
| def summarize_logs_for_observer(log_paths: Optional[list] = None, sample_lines: int = 200) -> str: | |
| if not log_paths: | |
| candidates = ["logs/performance.log", "logs/ai_lab.log", "performance.log"] | |
| log_paths = [p for p in candidates if os.path.exists(p)] | |
| parts = [] | |
| errs = 0 | |
| warns = 0 | |
| for p in log_paths: | |
| try: | |
| with open(p, "r", encoding="utf-8", errors="ignore") as fh: | |
| lines = fh.readlines()[-sample_lines:] | |
| content = "".join(lines) | |
| errs += content.upper().count("ERROR") | |
| warns += content.upper().count("WARNING") | |
| parts.append(f"--- {p} (last {len(lines)} lines) ---\n{content[:2000]}") | |
| except Exception as e: | |
| parts.append(f"Could not read {p}: {e}") | |
| header = f"Log summary: {errs} ERROR(s), {warns} WARNING(s)" | |
| return header + "\n\n" + "\n\n".join(parts) | |
| def run_observer_agent(state: AgentState) -> Dict[str, Any]: | |
| log.info(">>> OBSERVER AGENT") | |
| path = ensure_list(state, "execution_path") + ["Observer"] | |
| log_candidates = [] | |
| for candidate in ["logs/performance.log", "logs/ai_lab.log", "performance.log"]: | |
| if os.path.exists(candidate): | |
| log_candidates.append(candidate) | |
| summary = summarize_logs_for_observer(log_candidates or None) | |
| exec_len = len(state.get("execution_path", []) or []) | |
| rework_cycles = ensure_int(state, "rework_cycles", 0) | |
| current_cost = state.get("current_cost", 0.0) | |
| obs = { | |
| "log_summary": summary[:4000], | |
| "execution_length": exec_len, | |
| "rework_cycles": rework_cycles, | |
| "current_cost": current_cost, | |
| "status": state.get("status_update") | |
| } | |
| if llm: | |
| try: | |
| prompt = ( | |
| "You are an Observer assistant. Given this runtime summary, provide 3 prioritized next actions to mitigate the top risks.\n\n" | |
| f"Runtime summary: {json.dumps(obs, indent=2)}\n\nReturn plain text." | |
| ) | |
| r = llm.invoke(prompt) | |
| obs["llm_recommendations"] = getattr(r, "content", "")[:1500] | |
| except Exception as e: | |
| obs["llm_recommendations_error"] = str(e) | |
| out = {"observerReport": obs, "execution_path": path} | |
| out.update(add_status_update("Observer", "Observer summary created")) | |
| return out | |
| # --- Knowledge Curator --- | |
| def run_knowledge_curator_agent(state: AgentState) -> Dict[str, Any]: | |
| log.info(">>> KNOWLEDGE CURATOR AGENT") | |
| path = ensure_list(state, "execution_path") + ["KnowledgeCurator"] | |
| core = state.get("coreObjectivePrompt", "") or state.get("userInput", "") | |
| pm = state.get("pmPlan", {}) or {} | |
| draft = state.get("draftResponse", "") or "" | |
| qa_feedback = state.get("qaFeedback", "") or "" | |
| summary_text = ( | |
| f"Objective: {core}\n\n" | |
| f"Plan Steps: {json.dumps(pm.get('plan_steps', []))}\n\n" | |
| f"Draft (first 1500 chars): {draft[:1500]}\n\n" | |
| f"QA Feedback: {qa_feedback[:1000]}" | |
| ) | |
| try: | |
| memory_manager.add_to_memory(summary_text, {"source": "knowledge_curator", "timestamp": datetime.utcnow().isoformat()}) | |
| insights = {"added": True, "summary_snippet": summary_text[:500]} | |
| except Exception as e: | |
| insights = {"added": False, "error": str(e)} | |
| out = {"knowledgeInsights": insights, "execution_path": path} | |
| out.update(add_status_update("KnowledgeCurator", "Knowledge captured")) | |
| return out | |
| # --- Wiring / injection into existing main_workflow --- | |
| def apply_upgrades(): | |
| """ | |
| Rebuild the main workflow graph with upgraded routing. | |
| CRITICAL: Creates a NEW graph instead of modifying the compiled one. | |
| """ | |
| log.info("Applying graph upgrades (rebuilding graph with proper routing)") | |
| try: | |
| from langgraph.graph import StateGraph, END | |
| # Import the missing agent functions from base graph | |
| from graph import ( | |
| run_memory_retrieval, | |
| run_intent_agent, | |
| run_pm_agent, | |
| run_experimenter_agent, | |
| run_synthesis_agent, | |
| run_qa_agent, | |
| run_archivist_agent, | |
| run_disclaimer_agent, | |
| should_continue, | |
| should_run_experiment | |
| ) | |
| # Create BRAND NEW graph | |
| new_workflow = StateGraph(AgentState) | |
| # Add all nodes (using imported functions and local upgraded ones) | |
| new_workflow.add_node("memory_retriever", run_memory_retrieval) | |
| new_workflow.add_node("intent_agent", run_intent_agent) | |
| new_workflow.add_node("pm_agent", run_pm_agent) | |
| new_workflow.add_node("pragmatist_agent", run_pragmatist_agent) # Local upgraded | |
| new_workflow.add_node("governance_agent", run_governance_agent) # Local upgraded | |
| new_workflow.add_node("experimenter_agent", run_experimenter_agent) | |
| new_workflow.add_node("compliance_agent", run_compliance_agent) # Local upgraded | |
| new_workflow.add_node("synthesis_agent", run_synthesis_agent) | |
| new_workflow.add_node("qa_agent", run_qa_agent) | |
| new_workflow.add_node("observer_agent", run_observer_agent) # Local upgraded | |
| new_workflow.add_node("archivist_agent", run_archivist_agent) | |
| new_workflow.add_node("knowledge_curator_agent", run_knowledge_curator_agent) # Local upgraded | |
| new_workflow.add_node("disclaimer_agent", run_disclaimer_agent) | |
| log.info("β All nodes added to new graph") | |
| # Set entry point | |
| new_workflow.set_entry_point("memory_retriever") | |
| # Standard flow: Memory β Intent β PM | |
| new_workflow.add_edge("memory_retriever", "intent_agent") | |
| new_workflow.add_edge("intent_agent", "pm_agent") | |
| # NEW ROUTING: PM β Pragmatist β Governance | |
| new_workflow.add_edge("pm_agent", "pragmatist_agent") | |
| new_workflow.add_edge("pragmatist_agent", "governance_agent") | |
| log.info("β New routing added: PM β Pragmatist β Governance") | |
| # Governance conditional: approved β Experimenter, rejected β PM | |
| def governance_decider(state: AgentState): | |
| """Decide next step based on governance decision""" | |
| gov = state.get("governanceReport", {}) or {} | |
| decision = gov.get("governanceDecision", "approve") | |
| approved = gov.get("approved_for_experiment", True) | |
| log.info(f"Governance decision: {decision}, approved: {approved}") | |
| if approved and decision in ("approve", "approve_with_warning"): | |
| return "experimenter_agent" | |
| else: | |
| # Rejected or requires escalation - loop back to PM | |
| return "pm_agent" | |
| new_workflow.add_conditional_edges( | |
| "governance_agent", | |
| governance_decider, | |
| { | |
| "experimenter_agent": "experimenter_agent", | |
| "pm_agent": "pm_agent" | |
| } | |
| ) | |
| log.info("β Governance conditional routing added") | |
| # Continue standard flow: Experimenter β Compliance β Synthesis β QA | |
| new_workflow.add_edge("experimenter_agent", "compliance_agent") | |
| new_workflow.add_edge("compliance_agent", "synthesis_agent") | |
| new_workflow.add_edge("synthesis_agent", "qa_agent") | |
| # QA conditional routing (use imported should_continue) | |
| new_workflow.add_conditional_edges( | |
| "qa_agent", | |
| should_continue, | |
| { | |
| "observer_agent": "observer_agent", | |
| "pm_agent": "pm_agent", | |
| "disclaimer_agent": "disclaimer_agent" | |
| } | |
| ) | |
| log.info("β QA conditional routing added") | |
| # Final success path: Observer β Archivist β Knowledge Curator β END | |
| new_workflow.add_edge("observer_agent", "archivist_agent") | |
| new_workflow.add_edge("archivist_agent", "knowledge_curator_agent") | |
| new_workflow.add_edge("knowledge_curator_agent", END) | |
| # Disclaimer path (failure/limit reached) | |
| new_workflow.add_edge("disclaimer_agent", END) | |
| log.info("β Final flow edges added") | |
| # CRITICAL: Compile NEW graph and REPLACE old one | |
| base_graph.main_app = new_workflow.compile() | |
| base_graph.main_workflow = new_workflow # Also update workflow reference | |
| log.info("=" * 60) | |
| log.info("β GRAPH REBUILD SUCCESSFUL") | |
| log.info("=" * 60) | |
| log.info("New flow: Memory β Intent β PM β Pragmatist β Governance") | |
| log.info(" β Experimenter β Compliance β Synthesis β QA") | |
| log.info(" β Observer β Archivist β Knowledge Curator β END") | |
| log.info("=" * 60) | |
| return True | |
| except Exception as e: | |
| log.exception(f"β Failed to rebuild graph: {e}") | |
| return False |