from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated, Sequence, Dict, Any, List from langchain_core.messages import BaseMessage import operator class AgentState(TypedDict): """ Represents the 'Brain State' of the agent as it thinks. This dictionary is passed between all the nodes in the graph. """ user_query: str video_id: str plan: str search_topic: str target_timestamps: List[float] observations: Annotated[List[str], operator.add] # 'add' means new observations are appended, not overwritten final_answer: str context: Dict[str, Any] # Holds references to the Search Index and Tools def create_agent_graph(perception_engine, memory_manager): """ Builds the Decision Graph (The 'Flowchart' of the AI). """ # --- NODE 1: PLANNER --- def planner_node(state: AgentState): query = state["user_query"] print(f"🤖 Planner: Receiving query -> '{query}'") # For now, we assume every query requires a search. # Future improvement: Distinguish between 'Summary' and 'Specific Search'. return { "plan": "SEARCH", "search_topic": query, "observations": [] } # --- NODE 2: RETRIEVER (The Librarian) --- def retriever_node(state: AgentState): """ Searches both Text (Metadata) and Vision (CLIP) indices. """ video_id = state["video_id"] search_topic = state["search_topic"] # Unpack tools from context scout = state["context"]["scout"] visual_memory = state["context"]["vis_index"] text_memory = state["context"]["txt_index"] found_observations = [] timestamps_to_investigate = [] print(f"📚 Retriever: Looking up '{search_topic}'...") query_vector = scout.embed_text(search_topic) # A. Search Semantic Text Memory (Captions we generated earlier) if text_memory: # Find top 3 text matches text_matches = text_memory.search(query_vector, top_k=3) # Filter matches that are actually relevant (Score > 0.35) relevant_text_matches = [match for match in text_matches if match[1] > 0.35] if relevant_text_matches: print(f" ✅ Found {len(relevant_text_matches)} relevant text records.") # Note: ideally we'd fetch the exact text from the index metadata here. # For this implementation, we rely on the generic system memory or # we accept the timestamp and let the Analyst re-verify. # Let's map these timestamps to potential investigation points. for timestamp, score in relevant_text_matches: timestamps_to_investigate.append(timestamp) found_observations.append(f"Memory Hint: Something relevant might be at {timestamp:.1f}s (Confidence: {score:.2f})") else: print(" ⚠️ No strong text matches found. Switching to Visual Search.") # B. Visual Fallback (If text failed, or to double-check) # We look for frames that *look* like the user's query if not found_observations and visual_memory: visual_matches = visual_memory.search(query_vector, top_k=3) # Visual similarity needs a lower threshold usually valid_visual_matches = [match for match in visual_matches if match[1] > 0.22] if valid_visual_matches: found_timestamps = [match[0] for match in valid_visual_matches] print(f" 🦅 Visual Scout suggests checking times: {found_timestamps}") timestamps_to_investigate.extend(found_timestamps) else: found_observations.append("No direct visual matches found.") # Remove duplicates and sort unique_timestamps = sorted(list(set(timestamps_to_investigate))) return { "observations": found_observations, "target_timestamps": unique_timestamps } # --- NODE 3: ANALYST (The Eyes) --- def analyst_node(state: AgentState): """ Visits the specific timestamps found by the Retriever and looks closely. """ video_id = state["video_id"] timestamps = state["target_timestamps"] search_topic = state["search_topic"] new_findings = [] if not timestamps: return {"observations": ["Analyst: I have nowhere to look."]} print(f"👁️ Analyst: Zooming in on {len(timestamps)} moments...") # We give the Vision Model a very specific task verification_prompt = f"Look specifically for '{search_topic}'. If you see it, describe it in detail. If not, say 'Not visible'." for time_point in timestamps: description = perception_engine.analyze_video_segment( video_path=f"data/{video_id}.mp4", start_time=time_point, end_time=time_point + 1.0, prompt=verification_prompt ) log_entry = f"Visual Inspection at {time_point:.1f}s: {description}" new_findings.append(log_entry) print(f" > {log_entry}") return {"observations": new_findings} # --- NODE 4: SYNTHESIZER (The Speaker) --- def synthesizer_node(state: AgentState): """ Compiles all observations into a final natural language answer. """ user_query = state["user_query"] all_evidence = state["observations"] if not all_evidence: return {"final_answer": "I'm sorry, I couldn't find any information about that in the video."} evidence_text = "\n".join(all_evidence) system_prompt = f"""<|im_start|>system You are a helpful video assistant. Answer the user's question based strictly on the evidence below. If the evidence contradicts itself, trust the 'Visual Inspection' over the 'Memory Hint'. EVIDENCE COLLECTED: {evidence_text} <|im_end|> <|im_start|>user {user_query} <|im_end|> <|im_start|>assistant """ # We use the raw text generation here for direct control answer = perception_engine.generate_text(system_prompt, stop=["<|im_end|>"]) return {"final_answer": answer} # --- GRAPH CONSTRUCTION --- workflow = StateGraph(AgentState) # Add Nodes workflow.add_node("planner", planner_node) workflow.add_node("retriever", retriever_node) workflow.add_node("analyst", analyst_node) workflow.add_node("synthesizer", synthesizer_node) # Define Edges (The Flow) workflow.set_entry_point("planner") workflow.add_edge("planner", "retriever") workflow.add_edge("retriever", "analyst") workflow.add_edge("analyst", "synthesizer") workflow.add_edge("synthesizer", END) return workflow.compile()