Janus-backend / backend /app /graph.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Janus — LangGraph pipeline.
Deliberative graph topology:
[switchboard]
├─ requires_simulation=true → [mirofish] ┐
├─ requires_finance_data=true → [finance] ├→ [research] → [planner] → [verifier]
└─ (default) → [research] ┘ │
├─ pass → [synthesizer] → [END]
└─ fail once → [repair] → [planner]
Context from the context engine is injected into every LLM call.
"""
import uuid
import time
import logging
from typing import TypedDict, Dict, Any, Optional
from langgraph.graph import StateGraph, START, END
from app.agents import (
planner,
switchboard,
research,
synthesizer,
verifier,
mental_scratchpad,
)
from app.agents import mirofish_node, finance_node
logger = logging.getLogger(__name__)
class AgentState(TypedDict, total=False):
user_input: str
case_id: str
route: dict
simulation: dict
finance: dict
research: dict
planner: dict
verifier: dict
final: dict
errors: list
context: dict
replan_count: int
scratchpad: dict
def switchboard_node(state: AgentState) -> dict:
t0 = time.perf_counter()
result = switchboard.run(state)
elapsed = time.perf_counter() - t0
logger.info(
f"[{state.get('case_id', '?')[:8]}] switchboard: {elapsed:.2f}s — domain={result.get('route', {}).get('domain')}"
)
return result
async def mirofish_node_fn(state: AgentState) -> dict:
t0 = time.perf_counter()
result = await mirofish_node.run(state)
elapsed = time.perf_counter() - t0
logger.info(f"[{state.get('case_id', '?')[:8]}] mirofish: {elapsed:.2f}s")
return result
async def finance_node_fn(state: AgentState) -> dict:
t0 = time.perf_counter()
result = await finance_node.run(state)
elapsed = time.perf_counter() - t0
logger.info(f"[{state.get('case_id', '?')[:8]}] finance: {elapsed:.2f}s")
return result
def research_node(state: AgentState) -> dict:
t0 = time.perf_counter()
result = research.run(state)
elapsed = time.perf_counter() - t0
logger.info(f"[{state.get('case_id', '?')[:8]}] research: {elapsed:.2f}s")
return result
def planner_node(state: AgentState) -> dict:
t0 = time.perf_counter()
result = planner.run(state)
elapsed = time.perf_counter() - t0
logger.info(f"[{state.get('case_id', '?')[:8]}] planner: {elapsed:.2f}s")
return result
def verifier_node(state: AgentState) -> dict:
t0 = time.perf_counter()
result = verifier.run(state)
elapsed = time.perf_counter() - t0
verdict = result.get("verifier", {}).get("passed")
logger.info(
f"[{state.get('case_id', '?')[:8]}] verifier: {elapsed:.2f}s — passed={verdict}"
)
return result
def synthesizer_node(state: AgentState) -> dict:
t0 = time.perf_counter()
result = synthesizer.run(state)
elapsed = time.perf_counter() - t0
logger.info(f"[{state.get('case_id', '?')[:8]}] synthesizer: {elapsed:.2f}s")
return result
def mental_scratchpad_node(state: AgentState) -> dict:
t0 = time.perf_counter()
result = mental_scratchpad.run(state)
elapsed = time.perf_counter() - t0
logger.info(f"[{state.get('case_id', '?')[:8]}] mental_scratchpad: {elapsed:.2f}s")
return result
def repair_node(state: AgentState) -> dict:
next_replan = state.get("replan_count", 0) + 1
logger.info(
"[%s] repair: incrementing replan_count to %s",
state.get("case_id", "?")[:8],
next_replan,
)
return {**state, "replan_count": next_replan}
def after_switchboard(state: AgentState) -> str:
route = state.get("route", {})
# Finance is cheap/structured and should run even if simulation is also needed.
if route.get("requires_finance_data"):
return "finance"
if route.get("requires_simulation"):
return "mirofish"
if route.get("confidence", 0.5) < 0.45 and route.get(
"complexity", "medium"
) in {"medium", "high", "very_high"}:
logger.info(
"[%s] switchboard triggered simulation due to low confidence",
state.get("case_id", "?")[:8],
)
# High Complexity Deliberation: Go through Scratchpad
if route.get("complexity") in {"high", "very_high"}:
return "mental_scratchpad"
return "research"
def after_finance(state: AgentState) -> str:
route = state.get("route", {})
if route.get("requires_simulation"):
return "mirofish"
if route.get("confidence", 0.5) < 0.45 and route.get(
"complexity", "medium"
) in {"medium", "high", "very_high"}:
return "mirofish"
return "research"
def after_verifier(state: AgentState) -> str:
verifier_result = state.get("verifier", {})
route = state.get("route", {})
complexity = route.get("complexity", "medium")
# Allow 2 replans (3 attempts total) for high/very_high complexity, otherwise 1 replan.
max_replans = 2 if complexity in {"high", "very_high"} else 1
if not verifier_result.get("passed", True) and state.get("replan_count", 0) < max_replans:
return "repair"
return "synthesizer"
def build_graph():
g = StateGraph(AgentState)
g.add_node("switchboard", switchboard_node)
g.add_node("research", research_node)
g.add_node("mirofish", mirofish_node_fn)
g.add_node("finance", finance_node_fn)
g.add_node("planner", planner_node)
g.add_node("verifier", verifier_node)
g.add_node("repair", repair_node)
g.add_node("synthesizer", synthesizer_node)
g.add_node("mental_scratchpad", mental_scratchpad_node)
g.set_entry_point("switchboard")
g.add_conditional_edges(
"switchboard",
after_switchboard,
{
"mirofish": "mirofish",
"finance": "finance",
"research": "research",
"synthesizer": "synthesizer",
"mental_scratchpad": "mental_scratchpad",
},
)
g.add_edge("mental_scratchpad", "research")
g.add_edge("mirofish", "research")
g.add_conditional_edges(
"finance",
after_finance,
{"mirofish": "mirofish", "research": "research"},
)
g.add_edge("research", "planner")
g.add_edge("planner", "verifier")
g.add_conditional_edges(
"verifier",
after_verifier,
{"repair": "repair", "synthesizer": "synthesizer"},
)
g.add_edge("repair", "planner")
g.add_edge("synthesizer", END)
return g.compile()
# Lazy graph compilation — prevents import-time crash if agents fail to load
_compiled_graph = None
_graph_build_error = None
def get_compiled_graph():
"""Lazy graph compilation with error handling. Call at runtime, not import."""
global _compiled_graph, _graph_build_error
if _compiled_graph is not None:
return _compiled_graph
if _graph_build_error is not None:
raise RuntimeError(f"Graph compilation previously failed: {_graph_build_error}")
try:
_compiled_graph = build_graph()
logger.info("LangGraph pipeline compiled successfully")
return _compiled_graph
except Exception as e:
_graph_build_error = str(e)
logger.error(f"LangGraph build failed: {e}")
raise
def graph_status():
"""Return graph compilation status without triggering compilation."""
if _compiled_graph is not None:
return {"status": "ready"}
if _graph_build_error:
return {"status": "failed", "error": _graph_build_error}
return {"status": "not_compiled"}
async def run_case(user_input: str, context: dict = None) -> dict:
"""Run the optimized agent pipeline on user input."""
graph = get_compiled_graph()
case_id = str(uuid.uuid4())
t0 = time.perf_counter()
logger.info("Starting case %s", case_id)
initial_state = {
"case_id": case_id,
"user_input": user_input,
"route": {},
"research": {},
"planner": {},
"verifier": {},
"final": {},
"errors": [],
"replan_count": 0,
}
if context:
initial_state["context"] = context
result = await graph.ainvoke(initial_state)
elapsed = time.perf_counter() - t0
logger.info("Case %s completed in %.2fs", case_id, elapsed)
return result