File size: 2,569 Bytes
b4856f1
 
 
 
 
752f5cc
 
b4856f1
 
 
 
 
 
 
752f5cc
 
 
b4856f1
 
 
 
 
 
 
 
 
 
 
 
 
 
752f5cc
b4856f1
 
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
752f5cc
b4856f1
 
 
 
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
 
752f5cc
b4856f1
 
752f5cc
b4856f1
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
src/states/intelligenceAgentState.py
Intelligence Agent State - Competitive Intelligence & Profile Monitoring
FIXED: Added custom reducer for domain_insights to prevent InvalidUpdateError
"""

import operator
from typing import Optional, List, Dict, Any, Union
from typing_extensions import TypedDict, Annotated


# ============================================================================
# CUSTOM REDUCER (Fixes InvalidUpdateError for parallel node updates)
# ============================================================================
def reduce_domain_insights(
    existing: List[Dict], new: Union[List[Dict], str]
) -> List[Dict]:
    """Custom reducer for domain_insights to handle concurrent updates"""
    if isinstance(new, str) and new == "RESET":
        return []
    current = existing if isinstance(existing, list) else []
    if isinstance(new, list):
        return current + new
    return current


class IntelligenceAgentState(TypedDict, total=False):
    """
    State for Intelligence Agent.
    Monitors competitors, profiles, product reviews, competitive intelligence.
    """

    # ===== ORCHESTRATOR/WORKER BOOKKEEPING =====
    generated_tasks: List[Dict[str, Any]]
    current_task: Optional[Dict[str, Any]]
    tasks_for_workers: List[Dict[str, Any]]
    worker: Optional[List[Dict[str, Any]]]

    # ===== TOOL RESULTS =====
    worker_results: Annotated[List[Dict[str, Any]], operator.add]
    latest_worker_results: Annotated[List[Dict[str, Any]], operator.add]

    # ===== CHANGE DETECTION =====
    last_alerts_hash: Optional[int]
    change_detected: bool

    # ===== SOCIAL MEDIA MONITORING =====
    social_media_results: Annotated[List[Dict[str, Any]], operator.add]

    # ===== STRUCTURED FEED OUTPUT =====
    profile_feeds: Dict[str, List[Dict[str, Any]]]  # {username: [posts]}
    competitor_feeds: Dict[str, List[Dict[str, Any]]]  # {competitor: [mentions]}
    product_review_feeds: Dict[str, List[Dict[str, Any]]]  # {product: [reviews]}
    local_intel: List[Dict[str, Any]]  # Local competitors
    global_intel: List[Dict[str, Any]]  # Global competitors

    # ===== LLM PROCESSING =====
    llm_summary: Optional[str]
    structured_output: Dict[str, Any]  # Final formatted output

    # ===== FEED OUTPUT =====
    final_feed: str
    feed_history: Annotated[List[str], operator.add]

    # ===== INTEGRATION WITH PARENT GRAPH =====
    domain_insights: Annotated[List[Dict[str, Any]], reduce_domain_insights]

    # ===== FEED AGGREGATOR =====
    aggregator_stats: Dict[str, Any]
    dataset_path: str