File size: 4,390 Bytes
b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 752f5cc b4856f1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | """
src/states/combinedAgentState.py
COMPLETE - All original states preserved with proper typing and Reducer
"""
from __future__ import annotations
import operator
from typing import Optional, List, Dict, Any, Annotated, Union
from datetime import datetime
from pydantic import BaseModel, Field
# =============================================================================
# CUSTOM REDUCER (Fixes InvalidUpdateError & Enables Reset)
# =============================================================================
def reduce_insights(existing: List[Dict], new: Union[List[Dict], str]) -> List[Dict]:
"""
Custom reducer for domain_insights.
1. If new value is "RESET", clears the list (for continuous loops).
2. If new value is a list, appends it to existing list (for parallel agents).
"""
if isinstance(new, str) and new == "RESET":
return []
# Ensure existing is a list (handles initialization)
current = existing if isinstance(existing, list) else []
if isinstance(new, list):
return current + new
return current
# =============================================================================
# DATA MODELS
# =============================================================================
class RiskMetrics(BaseModel):
"""
Quantifiable indicators for the Operational Risk Radar.
Maps to the dashboard metrics in your project report.
"""
logistics_friction: float = Field(
default=0.0, description="Route risk score from mobility data"
)
compliance_volatility: float = Field(
default=0.0, description="Regulatory risk from political data"
)
market_instability: float = Field(
default=0.0, description="Market volatility from economic data"
)
opportunity_index: float = Field(
default=0.0, description="Positive growth signal score"
)
class CombinedAgentState(BaseModel):
"""
Main state for the Roger combined graph.
This is the parent state that receives outputs from all domain agents.
CRITICAL: All domain agents must write to 'domain_insights' field.
"""
# ===== INPUT FROM DOMAIN AGENTS =====
# This is where domain agents write their outputs
domain_insights: Annotated[List[Dict[str, Any]], reduce_insights] = Field(
default_factory=list,
description="Insights from domain agents (Social, Political, Economic, etc.)",
)
# ===== AGGREGATED OUTPUTS =====
# After FeedAggregator processes domain_insights
final_ranked_feed: List[Dict[str, Any]] = Field(
default_factory=list,
description="Ranked and deduplicated feed for National Activity Feed",
)
# NEW: Categorized feeds organized by domain for frontend sections
categorized_feeds: Dict[str, List[Dict[str, Any]]] = Field(
default_factory=lambda: {
"political": [],
"economical": [],
"social": [],
"meteorological": [],
"intelligence": [],
},
description="Feeds organized by domain category for frontend display",
)
# Dashboard snapshot for Operational Risk Radar
risk_dashboard_snapshot: Dict[str, Any] = Field(
default_factory=lambda: {
"logistics_friction": 0.0,
"compliance_volatility": 0.0,
"market_instability": 0.0,
"opportunity_index": 0.0,
"avg_confidence": 0.0,
"high_priority_count": 0,
"total_events": 0,
"last_updated": "",
},
description="Real-time risk and opportunity metrics dashboard",
)
# ===== EXECUTION CONTROL =====
# Loop control to prevent infinite recursion
run_count: int = Field(
default=0, description="Number of times graph has executed (safety counter)"
)
max_runs: int = Field(default=5, description="Maximum allowed loop iterations")
last_run_ts: Optional[datetime] = Field(
default=None, description="Timestamp of last execution"
)
# ===== ROUTING CONTROL =====
# CRITICAL: Used by DataRefreshRouter for conditional edges
# Must be Optional[str] - None means END, "GraphInitiator" means loop
route: Optional[str] = Field(
default=None, description="Router decision: None=END, 'GraphInitiator'=loop"
)
class Config:
arbitrary_types_allowed = True
|