SPOC_V1 / graph_upgraded.py
JatinAutonomousLabs's picture
Update graph_upgraded.py
2eb099b verified
raw
history blame
25.6 kB
# revised_graph_upgraded.py
# Updated agents with tiered pragmatist and flexible governance
# Save as revised_graph_upgraded.py and replace your graph_upgraded.py as needed.
import os
import re
import json
import logging
from datetime import datetime
from typing import Optional, Dict, Any
# Import base graph & helpers
import graph as base_graph
from graph import AgentState, ensure_list, ensure_int
# add_status_update may not exist in older graph.py β€” provide a fallback
try:
from graph import add_status_update # type: ignore
except Exception:
def add_status_update(node_name: str, message: str) -> Dict[str, Any]:
"""
Fallback: return a node-scoped status key so agents don't collide on 'status_update'.
Example: add_status_update('Pragmatist','done') => {'node_status__Pragmatist': 'done'}
"""
return {f"node_status__{node_name}": message}
from memory_manager import memory_manager
from logging_config import get_logger
log = get_logger(__name__)
llm = getattr(base_graph, "llm", None)
# --- Utility helpers ---
def simple_cost_feasibility_check(pm_plan: Dict[str, Any]) -> Dict[str, Any]:
"""
Heuristic cost/complexity check for a PM plan.
"""
report = {"ok": True, "notes": []}
try:
raw = pm_plan.get("estimated_cost_usd", 0)
est_cost = float(raw or 0)
except Exception:
est_cost = None
exp_type = pm_plan.get("experiment_type", "word")
if est_cost is None or est_cost == 0:
report["notes"].append("No reliable estimated_cost_usd provided.")
report["ok"] = False
else:
if est_cost > 500:
report["notes"].append(f"High estimated cost: ${est_cost}. Governance advised.")
report["ok"] = False
elif est_cost > 200:
report["notes"].append(f"Moderately high estimated cost: ${est_cost}. Consider simplifications.")
if exp_type in ("repo", "notebook", "script"):
report["notes"].append(f"Artifact type '{exp_type}' indicates engineering-heavy work.")
return report
# --- Pragmatist (flexible, produces tiered options) ---
# Complete replacement section for graph_upgraded.py
# Find these functions in your file and replace them entirely
# --- Pragmatist (improved - around line 60 in original) ---
def run_pragmatist_agent(state: AgentState) -> Dict[str, Any]:
"""
Pragmatist Agent (improved):
- More nuanced risk assessment
- Doesn't block on complexity alone
- Provides constructive guidance
"""
log.info(">>> PRAGMATIST AGENT (improved)")
path = ensure_list(state, "execution_path") + ["Pragmatist"]
pm = state.get("pmPlan", {}) or {}
# Parse estimated cost
est_cost = None
try:
raw = pm.get("estimated_cost_usd", None)
if raw is not None and raw != "":
est_cost = float(raw)
except Exception:
try:
s = str(raw)
m = re.search(r"[\d,.]+", s)
if m:
est_cost = float(m.group(0).replace(",", ""))
except Exception:
est_cost = None
exp_type = pm.get("experiment_type", "word")
base_est = est_cost or (50.0 if exp_type in ["script", "repo"] else 5.0)
# Tier definitions
tiers = {
"lite": {
"multiplier": 0.25,
"estimated_cost_usd": round(base_est * 0.25, 2),
"description": "Minimal extract (CSV/text) or short summary. Minimal engineering."
},
"standard": {
"multiplier": 1.0,
"estimated_cost_usd": round(base_est * 1.0, 2),
"description": "Complete, tested script or notebook; limited UX – suitable for MVP."
},
"full": {
"multiplier": 3.0,
"estimated_cost_usd": round(base_est * 3.0, 2),
"description": "Production-ready repo, packaging, tests, and deployment instructions."
}
}
preferred = state.get("preferred_tier")
flexible_mode = bool(state.get("flexible_budget_mode", False))
# IMPROVED: More intelligent risk assessment
risk_factors = []
risk_score = 0
# Check complexity
plan_steps = pm.get("plan_steps", [])
if len(plan_steps) > 8:
risk_factors.append("Complex multi-step plan")
risk_score += 1
# Check artifact type
if exp_type in ("repo", "notebook"):
risk_factors.append(f"Engineering-heavy artifact type: {exp_type}")
risk_score += 1
# Check cost
if est_cost is None:
risk_factors.append("No cost estimate provided")
risk_score += 1
elif est_cost > 200:
risk_factors.append(f"High estimated cost: ${est_cost}")
risk_score += 2
elif est_cost > 100:
risk_factors.append(f"Moderate estimated cost: ${est_cost}")
risk_score += 1
# IMPROVED: Risk is relative to user's flexibility
if flexible_mode:
risk_score = max(0, risk_score - 2) # Reduce risk if user is flexible
# Calculate risk level
if risk_score <= 1:
risk = "low"
elif risk_score <= 3:
risk = "medium"
else:
risk = "high"
# IMPROVED: Don't mark as "not ok" unless truly blocked
feasible = True
if risk_score > 4 and not flexible_mode:
feasible = False
# Recommend tier based on context
if preferred in tiers:
recommended_tier = preferred
elif est_cost is None:
recommended_tier = "standard"
elif est_cost > 500 and not flexible_mode:
recommended_tier = "lite"
else:
recommended_tier = "standard"
prag_report = {
"ok": feasible,
"risk_factors": risk_factors,
"risk_level": risk,
"risk_score": risk_score,
"tier_options": tiers,
"recommended_tier": recommended_tier,
"explain": (
f"Assessed {len(risk_factors)} risk factor(s). "
f"Risk level: {risk}. Recommended tier: {recommended_tier}. "
"User can proceed with any tier; higher tiers provide more complete deliverables."
)
}
# Optional LLM recommendations for high complexity (not high risk)
if len(plan_steps) > 7 and llm:
try:
prompt = (
"You are a pragmatic engineering advisor. Given this plan, suggest 2-3 ways to "
"optimize implementation while preserving core value. Be specific and actionable. "
"Return JSON {\"optimizations\": [...]}.\n\n"
f"Plan: {json.dumps(pm, indent=2)}"
)
r = llm.invoke(prompt)
parsed = getattr(base_graph, "parse_json_from_llm", None)
recs = None
if callable(parsed):
recs = parsed(getattr(r, "content", "") or "")
if isinstance(recs, dict):
prag_report["optimizations"] = recs.get("optimizations", [])
except Exception as e:
log.debug(f"LLM optimizations failed: {e}")
out = {"pragmatistReport": prag_report, "execution_path": path}
out.update(add_status_update("Pragmatist", f"Risk: {risk}, Tier: {recommended_tier}"))
return out
# --- Governance (flexible decisions) ---
def run_governance_agent(state: AgentState) -> Dict[str, Any]:
"""
Governance Agent (improved):
- Respects user's explicit choices
- Only rejects on genuine blockers
- Provides clear reasoning
"""
log.info(">>> GOVERNANCE AGENT (improved)")
path = ensure_list(state, "execution_path") + ["Governance"]
pm = state.get("pmPlan", {}) or {}
prag = state.get("pragmatistReport", {}) or {}
preferred = state.get("preferred_tier") or prag.get("recommended_tier") or "standard"
tier_opts = prag.get("tier_options", {})
chosen = tier_opts.get(preferred, tier_opts.get("standard", {}))
try:
chosen_cost = float(chosen.get("estimated_cost_usd", 0.0))
except Exception:
chosen_cost = 0.0
flexible = bool(state.get("flexible_budget_mode", False))
allow_escalate = bool(state.get("allow_escalation", False))
auto_accept_warn = bool(state.get("auto_accept_approved_with_warning", False))
# IMPROVED: Start with approved unless there's a blocker
decision = "approve"
issues = []
# Check budget
budget = state.get("current_budget") or state.get("budget") or None
if budget:
try:
budget_f = float(budget)
if chosen_cost > budget_f:
# IMPROVED: If user explicitly chose this tier with flexible mode, warn but don't block
if flexible and preferred:
issues.append(
f"Chosen tier (${chosen_cost}) exceeds budget (${budget_f}), "
f"but user enabled flexible budget mode."
)
decision = "approve_with_warning" if not auto_accept_warn else "approve"
elif flexible:
issues.append(f"Cost ${chosen_cost} exceeds budget ${budget_f}.")
decision = "approve_with_warning" if not auto_accept_warn else "approve"
else:
# Only reject if inflexible AND significantly over budget
if chosen_cost > budget_f * 2:
issues.append(
f"Cost ${chosen_cost} is 2x over budget ${budget_f}. "
f"Enable flexible budget or reduce scope."
)
decision = "reject"
else:
issues.append(f"Cost ${chosen_cost} exceeds budget ${budget_f}.")
decision = "require_escalation" if allow_escalate else "approve_with_warning"
except Exception as e:
issues.append(f"Could not parse budget: {e}")
decision = "approve_with_warning"
# IMPROVED: Check pragmatist risk more intelligently
risk_level = prag.get("risk_level")
risk_score = prag.get("risk_score", 0)
if risk_level == "high" and risk_score > 4:
# IMPROVED: Only escalate/warn on genuinely high risk, not just complexity
if not prag.get("ok", True):
issues.append("Pragmatist identified blocking concerns.")
if allow_escalate:
decision = "require_escalation"
else:
decision = "approve_with_warning" if flexible else "reject"
else:
# High risk but feasible - just warn
issues.append(
f"Complex request with {len(prag.get('risk_factors', []))} risk factors. "
f"Proceeding with caution."
)
if decision == "approve":
decision = "approve_with_warning"
# IMPROVED: Check for genuine blockers
experiment_type = pm.get("experiment_type")
plan_steps = pm.get("plan_steps", [])
# Check for problematic content
request_text = (state.get("userInput", "") + " " + state.get("coreObjectivePrompt", "")).lower()
blockers = []
if "scrape" in request_text and "million" in request_text:
# Large-scale scraping - legal concern but not blocking if properly addressed
if not any("legal" in str(step).lower() or "compliance" in str(step).lower() for step in plan_steps):
issues.append(
"Large-scale web scraping requires legal compliance consideration. "
"Ensure plan addresses terms of service and data protection."
)
# Don't block - the plan can address this
# Check for missing critical components
if experiment_type in ["repo", "script"] and not plan_steps:
blockers.append("No implementation plan provided for engineering task.")
# IMPROVED: Only reject on genuine blockers
if blockers:
issues.extend(blockers)
decision = "reject"
approved_bool = decision in ("approve", "approve_with_warning")
# LLM rationale (optional, informative)
rationale = None
if llm and decision in ("require_escalation", "approve_with_warning", "reject"):
try:
prompt = (
"You are a governance advisor. Provide a 2-3 sentence rationale for this decision "
"and list top 2 risks to monitor.\n\n"
f"Decision: {decision}\n"
f"Request: {state.get('userInput', '')[:200]}\n"
f"Tier: {preferred} (${chosen_cost})\n"
f"Budget: {budget}\n"
f"Risk level: {risk_level}\n\n"
"Be concise and actionable."
)
r = llm.invoke(prompt)
rationale = (getattr(r, "content", "") or "")[:800]
except Exception as e:
log.debug(f"Rationale generation failed: {e}")
gov_report = {
"budget_ok": approved_bool,
"issues": issues,
"approved_for_experiment": approved_bool,
"governanceDecision": decision,
"chosen_tier": preferred,
"chosen_cost_usd": chosen_cost,
"rationale": rationale,
"reasoning": (
f"Decision: {decision}. "
f"Risk: {risk_level}. "
f"User mode: {'flexible' if flexible else 'standard'}. "
f"{len(issues)} issue(s) noted."
)
}
status_msg = {
"approve": f"Approved {preferred} tier (${chosen_cost})",
"approve_with_warning": f"Approved with warnings: {preferred} tier (${chosen_cost})",
"require_escalation": "Manual approval required",
"reject": "Request rejected - blocking issues found"
}.get(decision, decision)
out = {"governanceReport": gov_report, "execution_path": path}
out.update(add_status_update("Governance", status_msg))
return out
# --- Compliance (keeps namespaced node_status__) ---
def scan_text_for_secrets(text: str) -> Dict[str, Any]:
findings = []
if not text:
return {"suspicious": False, "findings": findings}
patterns = [
r"AKIA[0-9A-Z]{16}",
r"-----BEGIN PRIVATE KEY-----",
r"AIza[0-9A-Za-z-_]{35}",
r"(?i)secret[_-]?(key|token)\b",
r"(?i)password\s*[:=]\s*['\"][^'\"]{6,}['\"]"
]
for p in patterns:
for m in re.finditer(p, text):
findings.append({"pattern": p, "match": m.group(0)})
return {"suspicious": len(findings) > 0, "findings": findings}
def run_compliance_agent(state: AgentState) -> Dict[str, Any]:
log.info(">>> COMPLIANCE AGENT")
path = ensure_list(state, "execution_path") + ["Compliance"]
exp = state.get("experimentResults", {}) or {}
report = {"suspicious": False, "issues": [], "scanned": []}
for key in ("stdout", "stderr"):
val = exp.get(key)
if isinstance(val, str) and val.strip():
scan = scan_text_for_secrets(val)
if scan.get("suspicious"):
report["suspicious"] = True
report["issues"].append({"type": "text_secret", "where": key, "findings": scan["findings"]})
report["scanned"].append({"type": "text", "where": key})
if isinstance(exp, dict) and "paths" in exp:
paths = exp.get("paths") or {}
if isinstance(paths, dict):
for k, p in paths.items():
try:
pstr = str(p)
if os.path.exists(pstr) and os.path.isfile(pstr):
with open(pstr, "r", encoding="utf-8", errors="ignore") as fh:
sample = fh.read(20000)
scan = scan_text_for_secrets(sample)
if scan.get("suspicious"):
report["suspicious"] = True
report["issues"].append({"type": "file_secret", "file": pstr, "findings": scan["findings"]})
report["scanned"].append({"type": "file", "file": pstr})
else:
report["scanned"].append({"type": "path", "value": pstr, "exists": os.path.exists(pstr)})
except Exception as e:
report["scanned"].append({"file": p, "error": str(e)})
if any(str(v).lower().endswith(".zip") for v in (paths.values() if isinstance(paths, dict) else [])):
report.setdefault("notes", []).append("Zip-based or repo artifact detected β€” recommend manual review.")
out = {"complianceReport": report, "execution_path": path}
out.update(add_status_update("Compliance", "Compliance checks complete"))
return out
# --- Observer ---
def summarize_logs_for_observer(log_paths: Optional[list] = None, sample_lines: int = 200) -> str:
if not log_paths:
candidates = ["logs/performance.log", "logs/ai_lab.log", "performance.log"]
log_paths = [p for p in candidates if os.path.exists(p)]
parts = []
errs = 0
warns = 0
for p in log_paths:
try:
with open(p, "r", encoding="utf-8", errors="ignore") as fh:
lines = fh.readlines()[-sample_lines:]
content = "".join(lines)
errs += content.upper().count("ERROR")
warns += content.upper().count("WARNING")
parts.append(f"--- {p} (last {len(lines)} lines) ---\n{content[:2000]}")
except Exception as e:
parts.append(f"Could not read {p}: {e}")
header = f"Log summary: {errs} ERROR(s), {warns} WARNING(s)"
return header + "\n\n" + "\n\n".join(parts)
def run_observer_agent(state: AgentState) -> Dict[str, Any]:
log.info(">>> OBSERVER AGENT")
path = ensure_list(state, "execution_path") + ["Observer"]
log_candidates = []
for candidate in ["logs/performance.log", "logs/ai_lab.log", "performance.log"]:
if os.path.exists(candidate):
log_candidates.append(candidate)
summary = summarize_logs_for_observer(log_candidates or None)
exec_len = len(state.get("execution_path", []) or [])
rework_cycles = ensure_int(state, "rework_cycles", 0)
current_cost = state.get("current_cost", 0.0)
obs = {
"log_summary": summary[:4000],
"execution_length": exec_len,
"rework_cycles": rework_cycles,
"current_cost": current_cost,
"status": state.get("status_update")
}
if llm:
try:
prompt = (
"You are an Observer assistant. Given this runtime summary, provide 3 prioritized next actions to mitigate the top risks.\n\n"
f"Runtime summary: {json.dumps(obs, indent=2)}\n\nReturn plain text."
)
r = llm.invoke(prompt)
obs["llm_recommendations"] = getattr(r, "content", "")[:1500]
except Exception as e:
obs["llm_recommendations_error"] = str(e)
out = {"observerReport": obs, "execution_path": path}
out.update(add_status_update("Observer", "Observer summary created"))
return out
# --- Knowledge Curator ---
def run_knowledge_curator_agent(state: AgentState) -> Dict[str, Any]:
log.info(">>> KNOWLEDGE CURATOR AGENT")
path = ensure_list(state, "execution_path") + ["KnowledgeCurator"]
core = state.get("coreObjectivePrompt", "") or state.get("userInput", "")
pm = state.get("pmPlan", {}) or {}
draft = state.get("draftResponse", "") or ""
qa_feedback = state.get("qaFeedback", "") or ""
summary_text = (
f"Objective: {core}\n\n"
f"Plan Steps: {json.dumps(pm.get('plan_steps', []))}\n\n"
f"Draft (first 1500 chars): {draft[:1500]}\n\n"
f"QA Feedback: {qa_feedback[:1000]}"
)
try:
memory_manager.add_to_memory(summary_text, {"source": "knowledge_curator", "timestamp": datetime.utcnow().isoformat()})
insights = {"added": True, "summary_snippet": summary_text[:500]}
except Exception as e:
insights = {"added": False, "error": str(e)}
out = {"knowledgeInsights": insights, "execution_path": path}
out.update(add_status_update("KnowledgeCurator", "Knowledge captured"))
return out
# --- Wiring / injection into existing main_workflow ---
def apply_upgrades():
"""
Rebuild the main workflow graph with upgraded routing.
CRITICAL: Creates a NEW graph instead of modifying the compiled one.
"""
log.info("Applying graph upgrades (rebuilding graph with proper routing)")
try:
from langgraph.graph import StateGraph, END
# Import the missing agent functions from base graph
from graph import (
run_memory_retrieval,
run_intent_agent,
run_pm_agent,
run_experimenter_agent,
run_synthesis_agent,
run_qa_agent,
run_archivist_agent,
run_disclaimer_agent,
should_continue,
should_run_experiment
)
# Create BRAND NEW graph
new_workflow = StateGraph(AgentState)
# Add all nodes (using imported functions and local upgraded ones)
new_workflow.add_node("memory_retriever", run_memory_retrieval)
new_workflow.add_node("intent_agent", run_intent_agent)
new_workflow.add_node("pm_agent", run_pm_agent)
new_workflow.add_node("pragmatist_agent", run_pragmatist_agent) # Local upgraded
new_workflow.add_node("governance_agent", run_governance_agent) # Local upgraded
new_workflow.add_node("experimenter_agent", run_experimenter_agent)
new_workflow.add_node("compliance_agent", run_compliance_agent) # Local upgraded
new_workflow.add_node("synthesis_agent", run_synthesis_agent)
new_workflow.add_node("qa_agent", run_qa_agent)
new_workflow.add_node("observer_agent", run_observer_agent) # Local upgraded
new_workflow.add_node("archivist_agent", run_archivist_agent)
new_workflow.add_node("knowledge_curator_agent", run_knowledge_curator_agent) # Local upgraded
new_workflow.add_node("disclaimer_agent", run_disclaimer_agent)
log.info("βœ… All nodes added to new graph")
# Set entry point
new_workflow.set_entry_point("memory_retriever")
# Standard flow: Memory β†’ Intent β†’ PM
new_workflow.add_edge("memory_retriever", "intent_agent")
new_workflow.add_edge("intent_agent", "pm_agent")
# NEW ROUTING: PM β†’ Pragmatist β†’ Governance
new_workflow.add_edge("pm_agent", "pragmatist_agent")
new_workflow.add_edge("pragmatist_agent", "governance_agent")
log.info("βœ… New routing added: PM β†’ Pragmatist β†’ Governance")
# Governance conditional: approved β†’ Experimenter, rejected β†’ PM
def governance_decider(state: AgentState):
"""Decide next step based on governance decision"""
gov = state.get("governanceReport", {}) or {}
decision = gov.get("governanceDecision", "approve")
approved = gov.get("approved_for_experiment", True)
log.info(f"Governance decision: {decision}, approved: {approved}")
if approved and decision in ("approve", "approve_with_warning"):
return "experimenter_agent"
else:
# Rejected or requires escalation - loop back to PM
return "pm_agent"
new_workflow.add_conditional_edges(
"governance_agent",
governance_decider,
{
"experimenter_agent": "experimenter_agent",
"pm_agent": "pm_agent"
}
)
log.info("βœ… Governance conditional routing added")
# Continue standard flow: Experimenter β†’ Compliance β†’ Synthesis β†’ QA
new_workflow.add_edge("experimenter_agent", "compliance_agent")
new_workflow.add_edge("compliance_agent", "synthesis_agent")
new_workflow.add_edge("synthesis_agent", "qa_agent")
# QA conditional routing (use imported should_continue)
new_workflow.add_conditional_edges(
"qa_agent",
should_continue,
{
"observer_agent": "observer_agent",
"pm_agent": "pm_agent",
"disclaimer_agent": "disclaimer_agent"
}
)
log.info("βœ… QA conditional routing added")
# Final success path: Observer β†’ Archivist β†’ Knowledge Curator β†’ END
new_workflow.add_edge("observer_agent", "archivist_agent")
new_workflow.add_edge("archivist_agent", "knowledge_curator_agent")
new_workflow.add_edge("knowledge_curator_agent", END)
# Disclaimer path (failure/limit reached)
new_workflow.add_edge("disclaimer_agent", END)
log.info("βœ… Final flow edges added")
# CRITICAL: Compile NEW graph and REPLACE old one
base_graph.main_app = new_workflow.compile()
base_graph.main_workflow = new_workflow # Also update workflow reference
log.info("=" * 60)
log.info("βœ… GRAPH REBUILD SUCCESSFUL")
log.info("=" * 60)
log.info("New flow: Memory β†’ Intent β†’ PM β†’ Pragmatist β†’ Governance")
log.info(" β†’ Experimenter β†’ Compliance β†’ Synthesis β†’ QA")
log.info(" β†’ Observer β†’ Archivist β†’ Knowledge Curator β†’ END")
log.info("=" * 60)
return True
except Exception as e:
log.exception(f"❌ Failed to rebuild graph: {e}")
return False