File size: 7,123 Bytes
fca155a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Sequence, Dict, Any, List
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    """
    Represents the 'Brain State' of the agent as it thinks.
    This dictionary is passed between all the nodes in the graph.
    """
    user_query: str
    video_id: str
    plan: str
    search_topic: str
    target_timestamps: List[float]
    observations: Annotated[List[str], operator.add] # 'add' means new observations are appended, not overwritten
    final_answer: str
    context: Dict[str, Any] # Holds references to the Search Index and Tools

def create_agent_graph(perception_engine, memory_manager):
    """
    Builds the Decision Graph (The 'Flowchart' of the AI).
    """
    
    # --- NODE 1: PLANNER ---
    def planner_node(state: AgentState):
        query = state["user_query"]
        print(f"🤖 Planner: Receiving query -> '{query}'")
        # For now, we assume every query requires a search. 
        # Future improvement: Distinguish between 'Summary' and 'Specific Search'.
        return {
            "plan": "SEARCH", 
            "search_topic": query, 
            "observations": []
        }

    # --- NODE 2: RETRIEVER (The Librarian) ---
    def retriever_node(state: AgentState):
        """
        Searches both Text (Metadata) and Vision (CLIP) indices.
        """
        video_id = state["video_id"]
        search_topic = state["search_topic"]
        
        # Unpack tools from context
        scout = state["context"]["scout"]
        visual_memory = state["context"]["vis_index"]
        text_memory = state["context"]["txt_index"]
        
        found_observations = []
        timestamps_to_investigate = []
        
        print(f"📚 Retriever: Looking up '{search_topic}'...")
        query_vector = scout.embed_text(search_topic)
        
        # A. Search Semantic Text Memory (Captions we generated earlier)
        if text_memory:
            # Find top 3 text matches
            text_matches = text_memory.search(query_vector, top_k=3)
            
            # Filter matches that are actually relevant (Score > 0.35)
            relevant_text_matches = [match for match in text_matches if match[1] > 0.35]
            
            if relevant_text_matches:
                print(f"   ✅ Found {len(relevant_text_matches)} relevant text records.")
                
                # Note: ideally we'd fetch the exact text from the index metadata here.
                # For this implementation, we rely on the generic system memory or 
                # we accept the timestamp and let the Analyst re-verify.
                # Let's map these timestamps to potential investigation points.
                for timestamp, score in relevant_text_matches:
                    timestamps_to_investigate.append(timestamp)
                    found_observations.append(f"Memory Hint: Something relevant might be at {timestamp:.1f}s (Confidence: {score:.2f})")

            else:
                print("   ⚠️ No strong text matches found. Switching to Visual Search.")
        
        # B. Visual Fallback (If text failed, or to double-check)
        # We look for frames that *look* like the user's query
        if not found_observations and visual_memory:
            visual_matches = visual_memory.search(query_vector, top_k=3)
            
            # Visual similarity needs a lower threshold usually
            valid_visual_matches = [match for match in visual_matches if match[1] > 0.22]
            
            if valid_visual_matches:
                found_timestamps = [match[0] for match in valid_visual_matches]
                print(f"   🦅 Visual Scout suggests checking times: {found_timestamps}")
                timestamps_to_investigate.extend(found_timestamps)
            else:
                found_observations.append("No direct visual matches found.")

        # Remove duplicates and sort
        unique_timestamps = sorted(list(set(timestamps_to_investigate)))
        
        return {
            "observations": found_observations, 
            "target_timestamps": unique_timestamps
        }

    # --- NODE 3: ANALYST (The Eyes) ---
    def analyst_node(state: AgentState):
        """
        Visits the specific timestamps found by the Retriever and looks closely.
        """
        video_id = state["video_id"]
        timestamps = state["target_timestamps"]
        search_topic = state["search_topic"]
        new_findings = []
        
        if not timestamps:
            return {"observations": ["Analyst: I have nowhere to look."]}
            
        print(f"👁️ Analyst: Zooming in on {len(timestamps)} moments...")
        
        # We give the Vision Model a very specific task
        verification_prompt = f"Look specifically for '{search_topic}'. If you see it, describe it in detail. If not, say 'Not visible'."
        
        for time_point in timestamps:
            description = perception_engine.analyze_video_segment(
                video_path=f"data/{video_id}.mp4",
                start_time=time_point,
                end_time=time_point + 1.0,
                prompt=verification_prompt
            )
            
            log_entry = f"Visual Inspection at {time_point:.1f}s: {description}"
            new_findings.append(log_entry)
            print(f"   > {log_entry}")
            
        return {"observations": new_findings}

    # --- NODE 4: SYNTHESIZER (The Speaker) ---
    def synthesizer_node(state: AgentState):
        """
        Compiles all observations into a final natural language answer.
        """
        user_query = state["user_query"]
        all_evidence = state["observations"]
        
        if not all_evidence:
            return {"final_answer": "I'm sorry, I couldn't find any information about that in the video."}
            
        evidence_text = "\n".join(all_evidence)
        
        system_prompt = f"""<|im_start|>system
You are a helpful video assistant. 
Answer the user's question based strictly on the evidence below.
If the evidence contradicts itself, trust the 'Visual Inspection' over the 'Memory Hint'.

EVIDENCE COLLECTED:
{evidence_text}
<|im_end|>
<|im_start|>user
{user_query}
<|im_end|>
<|im_start|>assistant
"""
        # We use the raw text generation here for direct control
        answer = perception_engine.generate_text(system_prompt, stop=["<|im_end|>"])
        return {"final_answer": answer}

    # --- GRAPH CONSTRUCTION ---
    workflow = StateGraph(AgentState)
    
    # Add Nodes
    workflow.add_node("planner", planner_node)
    workflow.add_node("retriever", retriever_node)
    workflow.add_node("analyst", analyst_node)
    workflow.add_node("synthesizer", synthesizer_node)
    
    # Define Edges (The Flow)
    workflow.set_entry_point("planner")
    workflow.add_edge("planner", "retriever")
    workflow.add_edge("retriever", "analyst")
    workflow.add_edge("analyst", "synthesizer")
    workflow.add_edge("synthesizer", END)
    
    return workflow.compile()