Janus-backend / backend /app /agents /planner.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Planner agent — MiroOrg v2.
Accepts Switchboard route + Research output + (optionally) Simulation and Finance outputs.
Produces a structured plan with steps, dependencies, and risk assessment.
"""
import logging
from app.agents._model import call_model, safe_parse
from app.config import load_prompt
from pydantic import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.exceptions import OutputParserException
from typing import List, Optional
class PlannerOutput(BaseModel):
plan_steps: List[str] = Field(
description="Sequential list of steps to execute the plan"
)
resources_needed: List[str] = Field(description="Necessary tools or APIs")
dependencies: List[str] = Field(description="Prerequisites or sequence locks")
risk_level: str = Field(description="Risk assessment: low, medium, or high")
estimated_output: str = Field(
description="Brief outline of the expected final result"
)
replan_reason: Optional[str] = Field(
None, description="Why we are replanning, if applicable"
)
logger = logging.getLogger(__name__)
def _deterministic_plan(state: dict) -> dict:
route = state.get("route", {})
research = state.get("research", {})
deep_web = research.get("deep_web", {}) if isinstance(research, dict) else {}
simulation = state.get("simulation", {}) or {}
finance = state.get("finance", {}) or {}
intent = route.get("intent") or state.get("user_input", "the request")
steps = [f"Clarify the objective: {intent}."]
resources = []
dependencies = []
key_facts = research.get("key_facts", []) if isinstance(research.get("key_facts"), list) else []
if deep_web.get("top_sources"):
top_source = deep_web.get("top_sources", [])[0]
steps.append(
f"Anchor the answer in the highest-credibility source: {top_source.get('title', top_source.get('url', 'source'))}."
)
resources.append("deep_web_bundle")
elif key_facts:
steps.append("Use the strongest retrieved facts as the primary evidence base.")
if route.get("domain") == "finance":
steps.append("Separate factual market/company evidence from interpretation or advice.")
dependencies.append("credible source review")
resources.append("finance_domain_pack")
if finance:
steps.append("Incorporate the structured market data into the answer and note any stale or missing fields.")
resources.append("market_data")
if simulation:
steps.append("Compare the main evidence against the scenario view and explain what remains uncertain.")
resources.append("simulation_engine")
gaps = research.get("gaps", []) if isinstance(research.get("gaps"), list) else []
if gaps:
steps.append("Call out the most important gaps so the user knows what could change the conclusion.")
dependencies.append("gap disclosure")
steps.append("Return a decisive answer with confidence and next steps.")
confidence = float(research.get("confidence", 0.0) or 0.0)
if confidence >= 0.75:
risk_level = "low"
elif confidence >= 0.45:
risk_level = "medium"
else:
risk_level = "high"
return {
"plan_steps": steps,
"resources_needed": list(dict.fromkeys(resources)),
"dependencies": list(dict.fromkeys(dependencies)),
"risk_level": risk_level,
"estimated_output": f"A grounded answer for: {intent}",
"replan_reason": "deterministic fallback due to unavailable model synthesis",
}
def run(state: dict) -> dict:
route = state.get("route", {})
research = state.get("research", {})
simulation = state.get("simulation", {})
finance = state.get("finance", {})
replan_count = state.get("replan_count", 0)
verifier = state.get("verifier", {})
context = state.get("context", {})
prompt = load_prompt("planner")
# Build context with all available upstream data
context_parts = [
f"Route: {route}",
f"Research findings: {research}",
]
if simulation:
context_parts.append(f"Simulation results: {simulation}")
if finance:
context_parts.append(f"Finance data: {finance}")
if replan_count > 0 and verifier:
context_parts.append(f"REPLAN #{replan_count} — Verifier feedback: {verifier}")
similar_cases = context.get("memory", {}).get("similar_cases", [])
if similar_cases:
context_parts.append(f"Similar cases: {similar_cases[:3]}")
known_gaps = context.get("self_reflection", {}).get("gaps", [])
if known_gaps:
context_parts.append(f"Known gaps: {known_gaps[:3]}")
messages = [
{"role": "system", "content": prompt},
{
"role": "user",
"content": (
f"User request: {state.get('user_input', route.get('intent', ''))}\n\n"
+ "\n\n".join(context_parts)
),
},
]
result = None
raw_response = None
try:
adaptive = context.get("adaptive_intelligence", {})
personality = adaptive.get("system_personality", {})
raw_response = call_model(messages, personality=personality)
except Exception as e:
logger.error(f"[AGENT ERROR] planner: {e}")
raw_response = None
result = {"status": "error", "reason": str(e), "error": "model_failed"}
if raw_response:
result = safe_parse(raw_response)
if "error" in result:
logger.warning(f"[AGENT PARSE FALLBACK] planner: using safe_parse fallback")
result = None
if result is None:
result = _deterministic_plan(state)
if "error" in result:
logger.warning(f"[AGENT ERROR] planner: {result.get('error')}")
result = _deterministic_plan(state)
return {**state, "planner": result}