modelx / src /states /socialAgentState.py
nivakaran's picture
Upload folder using huggingface_hub
752f5cc verified
"""
src/states/socialAgentState.py
Social Agent State - handles trending topics, events, people, social intelligence
FIXED: Added custom reducer for domain_insights to prevent InvalidUpdateError
"""
import operator
from typing import Optional, List, Dict, Any, Union
from typing_extensions import TypedDict, Annotated
# ============================================================================
# CUSTOM REDUCER (Fixes InvalidUpdateError for parallel node updates)
# ============================================================================
def reduce_domain_insights(
existing: List[Dict], new: Union[List[Dict], str]
) -> List[Dict]:
"""Custom reducer for domain_insights to handle concurrent updates"""
if isinstance(new, str) and new == "RESET":
return []
current = existing if isinstance(existing, list) else []
if isinstance(new, list):
return current + new
return current
class SocialAgentState(TypedDict, total=False):
"""
State for Social Agent.
Monitors trending topics, events, people, social sentiment across geographic scopes.
"""
# ===== ORCHESTRATOR/WORKER BOOKKEEPING =====
generated_tasks: List[Dict[str, Any]]
current_task: Optional[Dict[str, Any]]
tasks_for_workers: List[Dict[str, Any]]
worker: Optional[List[Dict[str, Any]]]
# ===== TOOL RESULTS =====
worker_results: Annotated[List[Dict[str, Any]], operator.add]
latest_worker_results: List[Dict[str, Any]]
# ===== CHANGE DETECTION =====
last_alerts_hash: Optional[int]
change_detected: bool
# ===== SOCIAL MEDIA MONITORING =====
social_media_results: Annotated[List[Dict[str, Any]], operator.add]
# ===== STRUCTURED FEED OUTPUT =====
geographic_feeds: Dict[str, List[Dict[str, Any]]] # {region: [posts]}
sri_lanka_feed: List[Dict[str, Any]] # Sri Lankan trending
asia_feed: List[Dict[str, Any]] # Asian trends
world_feed: List[Dict[str, Any]] # World trends
# ===== LLM PROCESSING =====
llm_summary: Optional[str]
structured_output: Dict[str, Any] # Final formatted output
# ===== FEED OUTPUT =====
final_feed: str
feed_history: Annotated[List[str], operator.add]
# ===== INTEGRATION WITH PARENT GRAPH =====
domain_insights: Annotated[List[Dict[str, Any]], reduce_domain_insights]
# ===== FEED AGGREGATOR =====
aggregator_stats: Dict[str, Any]
dataset_path: str