""" Janus — LangGraph pipeline. Deliberative graph topology: [switchboard] │ ├─ requires_simulation=true → [mirofish] ┐ ├─ requires_finance_data=true → [finance] ├→ [research] → [planner] → [verifier] └─ (default) → [research] ┘ │ ├─ pass → [synthesizer] → [END] └─ fail once → [repair] → [planner] Context from the context engine is injected into every LLM call. """ import uuid import time import logging from typing import TypedDict, Dict, Any, Optional from langgraph.graph import StateGraph, START, END from app.agents import ( planner, switchboard, research, synthesizer, verifier, mental_scratchpad, ) from app.agents import mirofish_node, finance_node logger = logging.getLogger(__name__) class AgentState(TypedDict, total=False): user_input: str case_id: str route: dict simulation: dict finance: dict research: dict planner: dict verifier: dict final: dict errors: list context: dict replan_count: int scratchpad: dict def switchboard_node(state: AgentState) -> dict: t0 = time.perf_counter() result = switchboard.run(state) elapsed = time.perf_counter() - t0 logger.info( f"[{state.get('case_id', '?')[:8]}] switchboard: {elapsed:.2f}s — domain={result.get('route', {}).get('domain')}" ) return result async def mirofish_node_fn(state: AgentState) -> dict: t0 = time.perf_counter() result = await mirofish_node.run(state) elapsed = time.perf_counter() - t0 logger.info(f"[{state.get('case_id', '?')[:8]}] mirofish: {elapsed:.2f}s") return result async def finance_node_fn(state: AgentState) -> dict: t0 = time.perf_counter() result = await finance_node.run(state) elapsed = time.perf_counter() - t0 logger.info(f"[{state.get('case_id', '?')[:8]}] finance: {elapsed:.2f}s") return result def research_node(state: AgentState) -> dict: t0 = time.perf_counter() result = research.run(state) elapsed = time.perf_counter() - t0 logger.info(f"[{state.get('case_id', '?')[:8]}] research: {elapsed:.2f}s") return result def planner_node(state: AgentState) -> dict: t0 = time.perf_counter() result = planner.run(state) elapsed = time.perf_counter() - t0 logger.info(f"[{state.get('case_id', '?')[:8]}] planner: {elapsed:.2f}s") return result def verifier_node(state: AgentState) -> dict: t0 = time.perf_counter() result = verifier.run(state) elapsed = time.perf_counter() - t0 verdict = result.get("verifier", {}).get("passed") logger.info( f"[{state.get('case_id', '?')[:8]}] verifier: {elapsed:.2f}s — passed={verdict}" ) return result def synthesizer_node(state: AgentState) -> dict: t0 = time.perf_counter() result = synthesizer.run(state) elapsed = time.perf_counter() - t0 logger.info(f"[{state.get('case_id', '?')[:8]}] synthesizer: {elapsed:.2f}s") return result def mental_scratchpad_node(state: AgentState) -> dict: t0 = time.perf_counter() result = mental_scratchpad.run(state) elapsed = time.perf_counter() - t0 logger.info(f"[{state.get('case_id', '?')[:8]}] mental_scratchpad: {elapsed:.2f}s") return result def repair_node(state: AgentState) -> dict: next_replan = state.get("replan_count", 0) + 1 logger.info( "[%s] repair: incrementing replan_count to %s", state.get("case_id", "?")[:8], next_replan, ) return {**state, "replan_count": next_replan} def after_switchboard(state: AgentState) -> str: route = state.get("route", {}) # Finance is cheap/structured and should run even if simulation is also needed. if route.get("requires_finance_data"): return "finance" if route.get("requires_simulation"): return "mirofish" if route.get("confidence", 0.5) < 0.45 and route.get( "complexity", "medium" ) in {"medium", "high", "very_high"}: logger.info( "[%s] switchboard triggered simulation due to low confidence", state.get("case_id", "?")[:8], ) # High Complexity Deliberation: Go through Scratchpad if route.get("complexity") in {"high", "very_high"}: return "mental_scratchpad" return "research" def after_finance(state: AgentState) -> str: route = state.get("route", {}) if route.get("requires_simulation"): return "mirofish" if route.get("confidence", 0.5) < 0.45 and route.get( "complexity", "medium" ) in {"medium", "high", "very_high"}: return "mirofish" return "research" def after_verifier(state: AgentState) -> str: verifier_result = state.get("verifier", {}) route = state.get("route", {}) complexity = route.get("complexity", "medium") # Allow 2 replans (3 attempts total) for high/very_high complexity, otherwise 1 replan. max_replans = 2 if complexity in {"high", "very_high"} else 1 if not verifier_result.get("passed", True) and state.get("replan_count", 0) < max_replans: return "repair" return "synthesizer" def build_graph(): g = StateGraph(AgentState) g.add_node("switchboard", switchboard_node) g.add_node("research", research_node) g.add_node("mirofish", mirofish_node_fn) g.add_node("finance", finance_node_fn) g.add_node("planner", planner_node) g.add_node("verifier", verifier_node) g.add_node("repair", repair_node) g.add_node("synthesizer", synthesizer_node) g.add_node("mental_scratchpad", mental_scratchpad_node) g.set_entry_point("switchboard") g.add_conditional_edges( "switchboard", after_switchboard, { "mirofish": "mirofish", "finance": "finance", "research": "research", "synthesizer": "synthesizer", "mental_scratchpad": "mental_scratchpad", }, ) g.add_edge("mental_scratchpad", "research") g.add_edge("mirofish", "research") g.add_conditional_edges( "finance", after_finance, {"mirofish": "mirofish", "research": "research"}, ) g.add_edge("research", "planner") g.add_edge("planner", "verifier") g.add_conditional_edges( "verifier", after_verifier, {"repair": "repair", "synthesizer": "synthesizer"}, ) g.add_edge("repair", "planner") g.add_edge("synthesizer", END) return g.compile() # Lazy graph compilation — prevents import-time crash if agents fail to load _compiled_graph = None _graph_build_error = None def get_compiled_graph(): """Lazy graph compilation with error handling. Call at runtime, not import.""" global _compiled_graph, _graph_build_error if _compiled_graph is not None: return _compiled_graph if _graph_build_error is not None: raise RuntimeError(f"Graph compilation previously failed: {_graph_build_error}") try: _compiled_graph = build_graph() logger.info("LangGraph pipeline compiled successfully") return _compiled_graph except Exception as e: _graph_build_error = str(e) logger.error(f"LangGraph build failed: {e}") raise def graph_status(): """Return graph compilation status without triggering compilation.""" if _compiled_graph is not None: return {"status": "ready"} if _graph_build_error: return {"status": "failed", "error": _graph_build_error} return {"status": "not_compiled"} async def run_case(user_input: str, context: dict = None) -> dict: """Run the optimized agent pipeline on user input.""" graph = get_compiled_graph() case_id = str(uuid.uuid4()) t0 = time.perf_counter() logger.info("Starting case %s", case_id) initial_state = { "case_id": case_id, "user_input": user_input, "route": {}, "research": {}, "planner": {}, "verifier": {}, "final": {}, "errors": [], "replan_count": 0, } if context: initial_state["context"] = context result = await graph.ainvoke(initial_state) elapsed = time.perf_counter() - t0 logger.info("Case %s completed in %.2fs", case_id, elapsed) return result