Spaces:
Runtime error
Runtime error
| from langgraph.graph import StateGraph, END | |
| from typing import TypedDict, Annotated, Sequence, Dict, Any, List | |
| from langchain_core.messages import BaseMessage | |
| import operator | |
| class AgentState(TypedDict): | |
| """ | |
| Represents the 'Brain State' of the agent as it thinks. | |
| This dictionary is passed between all the nodes in the graph. | |
| """ | |
| user_query: str | |
| video_id: str | |
| plan: str | |
| search_topic: str | |
| target_timestamps: List[float] | |
| observations: Annotated[List[str], operator.add] # 'add' means new observations are appended, not overwritten | |
| final_answer: str | |
| context: Dict[str, Any] # Holds references to the Search Index and Tools | |
| def create_agent_graph(perception_engine, memory_manager): | |
| """ | |
| Builds the Decision Graph (The 'Flowchart' of the AI). | |
| """ | |
| # --- NODE 1: PLANNER --- | |
| def planner_node(state: AgentState): | |
| query = state["user_query"] | |
| print(f"🤖 Planner: Receiving query -> '{query}'") | |
| # For now, we assume every query requires a search. | |
| # Future improvement: Distinguish between 'Summary' and 'Specific Search'. | |
| return { | |
| "plan": "SEARCH", | |
| "search_topic": query, | |
| "observations": [] | |
| } | |
| # --- NODE 2: RETRIEVER (The Librarian) --- | |
| def retriever_node(state: AgentState): | |
| """ | |
| Searches both Text (Metadata) and Vision (CLIP) indices. | |
| """ | |
| video_id = state["video_id"] | |
| search_topic = state["search_topic"] | |
| # Unpack tools from context | |
| scout = state["context"]["scout"] | |
| visual_memory = state["context"]["vis_index"] | |
| text_memory = state["context"]["txt_index"] | |
| found_observations = [] | |
| timestamps_to_investigate = [] | |
| print(f"📚 Retriever: Looking up '{search_topic}'...") | |
| query_vector = scout.embed_text(search_topic) | |
| # A. Search Semantic Text Memory (Captions we generated earlier) | |
| if text_memory: | |
| # Find top 3 text matches | |
| text_matches = text_memory.search(query_vector, top_k=3) | |
| # Filter matches that are actually relevant (Score > 0.35) | |
| relevant_text_matches = [match for match in text_matches if match[1] > 0.35] | |
| if relevant_text_matches: | |
| print(f" ✅ Found {len(relevant_text_matches)} relevant text records.") | |
| # Note: ideally we'd fetch the exact text from the index metadata here. | |
| # For this implementation, we rely on the generic system memory or | |
| # we accept the timestamp and let the Analyst re-verify. | |
| # Let's map these timestamps to potential investigation points. | |
| for timestamp, score in relevant_text_matches: | |
| timestamps_to_investigate.append(timestamp) | |
| found_observations.append(f"Memory Hint: Something relevant might be at {timestamp:.1f}s (Confidence: {score:.2f})") | |
| else: | |
| print(" ⚠️ No strong text matches found. Switching to Visual Search.") | |
| # B. Visual Fallback (If text failed, or to double-check) | |
| # We look for frames that *look* like the user's query | |
| if not found_observations and visual_memory: | |
| visual_matches = visual_memory.search(query_vector, top_k=3) | |
| # Visual similarity needs a lower threshold usually | |
| valid_visual_matches = [match for match in visual_matches if match[1] > 0.22] | |
| if valid_visual_matches: | |
| found_timestamps = [match[0] for match in valid_visual_matches] | |
| print(f" 🦅 Visual Scout suggests checking times: {found_timestamps}") | |
| timestamps_to_investigate.extend(found_timestamps) | |
| else: | |
| found_observations.append("No direct visual matches found.") | |
| # Remove duplicates and sort | |
| unique_timestamps = sorted(list(set(timestamps_to_investigate))) | |
| return { | |
| "observations": found_observations, | |
| "target_timestamps": unique_timestamps | |
| } | |
| # --- NODE 3: ANALYST (The Eyes) --- | |
| def analyst_node(state: AgentState): | |
| """ | |
| Visits the specific timestamps found by the Retriever and looks closely. | |
| """ | |
| video_id = state["video_id"] | |
| timestamps = state["target_timestamps"] | |
| search_topic = state["search_topic"] | |
| new_findings = [] | |
| if not timestamps: | |
| return {"observations": ["Analyst: I have nowhere to look."]} | |
| print(f"👁️ Analyst: Zooming in on {len(timestamps)} moments...") | |
| # We give the Vision Model a very specific task | |
| verification_prompt = f"Look specifically for '{search_topic}'. If you see it, describe it in detail. If not, say 'Not visible'." | |
| for time_point in timestamps: | |
| description = perception_engine.analyze_video_segment( | |
| video_path=f"data/{video_id}.mp4", | |
| start_time=time_point, | |
| end_time=time_point + 1.0, | |
| prompt=verification_prompt | |
| ) | |
| log_entry = f"Visual Inspection at {time_point:.1f}s: {description}" | |
| new_findings.append(log_entry) | |
| print(f" > {log_entry}") | |
| return {"observations": new_findings} | |
| # --- NODE 4: SYNTHESIZER (The Speaker) --- | |
| def synthesizer_node(state: AgentState): | |
| """ | |
| Compiles all observations into a final natural language answer. | |
| """ | |
| user_query = state["user_query"] | |
| all_evidence = state["observations"] | |
| if not all_evidence: | |
| return {"final_answer": "I'm sorry, I couldn't find any information about that in the video."} | |
| evidence_text = "\n".join(all_evidence) | |
| system_prompt = f"""<|im_start|>system | |
| You are a helpful video assistant. | |
| Answer the user's question based strictly on the evidence below. | |
| If the evidence contradicts itself, trust the 'Visual Inspection' over the 'Memory Hint'. | |
| EVIDENCE COLLECTED: | |
| {evidence_text} | |
| <|im_end|> | |
| <|im_start|>user | |
| {user_query} | |
| <|im_end|> | |
| <|im_start|>assistant | |
| """ | |
| # We use the raw text generation here for direct control | |
| answer = perception_engine.generate_text(system_prompt, stop=["<|im_end|>"]) | |
| return {"final_answer": answer} | |
| # --- GRAPH CONSTRUCTION --- | |
| workflow = StateGraph(AgentState) | |
| # Add Nodes | |
| workflow.add_node("planner", planner_node) | |
| workflow.add_node("retriever", retriever_node) | |
| workflow.add_node("analyst", analyst_node) | |
| workflow.add_node("synthesizer", synthesizer_node) | |
| # Define Edges (The Flow) | |
| workflow.set_entry_point("planner") | |
| workflow.add_edge("planner", "retriever") | |
| workflow.add_edge("retriever", "analyst") | |
| workflow.add_edge("analyst", "synthesizer") | |
| workflow.add_edge("synthesizer", END) | |
| return workflow.compile() | |