Video-Scout / src /core /graph.py
ashleshp's picture
first commit
fca155a
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Sequence, Dict, Any, List
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
"""
Represents the 'Brain State' of the agent as it thinks.
This dictionary is passed between all the nodes in the graph.
"""
user_query: str
video_id: str
plan: str
search_topic: str
target_timestamps: List[float]
observations: Annotated[List[str], operator.add] # 'add' means new observations are appended, not overwritten
final_answer: str
context: Dict[str, Any] # Holds references to the Search Index and Tools
def create_agent_graph(perception_engine, memory_manager):
"""
Builds the Decision Graph (The 'Flowchart' of the AI).
"""
# --- NODE 1: PLANNER ---
def planner_node(state: AgentState):
query = state["user_query"]
print(f"🤖 Planner: Receiving query -> '{query}'")
# For now, we assume every query requires a search.
# Future improvement: Distinguish between 'Summary' and 'Specific Search'.
return {
"plan": "SEARCH",
"search_topic": query,
"observations": []
}
# --- NODE 2: RETRIEVER (The Librarian) ---
def retriever_node(state: AgentState):
"""
Searches both Text (Metadata) and Vision (CLIP) indices.
"""
video_id = state["video_id"]
search_topic = state["search_topic"]
# Unpack tools from context
scout = state["context"]["scout"]
visual_memory = state["context"]["vis_index"]
text_memory = state["context"]["txt_index"]
found_observations = []
timestamps_to_investigate = []
print(f"📚 Retriever: Looking up '{search_topic}'...")
query_vector = scout.embed_text(search_topic)
# A. Search Semantic Text Memory (Captions we generated earlier)
if text_memory:
# Find top 3 text matches
text_matches = text_memory.search(query_vector, top_k=3)
# Filter matches that are actually relevant (Score > 0.35)
relevant_text_matches = [match for match in text_matches if match[1] > 0.35]
if relevant_text_matches:
print(f" ✅ Found {len(relevant_text_matches)} relevant text records.")
# Note: ideally we'd fetch the exact text from the index metadata here.
# For this implementation, we rely on the generic system memory or
# we accept the timestamp and let the Analyst re-verify.
# Let's map these timestamps to potential investigation points.
for timestamp, score in relevant_text_matches:
timestamps_to_investigate.append(timestamp)
found_observations.append(f"Memory Hint: Something relevant might be at {timestamp:.1f}s (Confidence: {score:.2f})")
else:
print(" ⚠️ No strong text matches found. Switching to Visual Search.")
# B. Visual Fallback (If text failed, or to double-check)
# We look for frames that *look* like the user's query
if not found_observations and visual_memory:
visual_matches = visual_memory.search(query_vector, top_k=3)
# Visual similarity needs a lower threshold usually
valid_visual_matches = [match for match in visual_matches if match[1] > 0.22]
if valid_visual_matches:
found_timestamps = [match[0] for match in valid_visual_matches]
print(f" 🦅 Visual Scout suggests checking times: {found_timestamps}")
timestamps_to_investigate.extend(found_timestamps)
else:
found_observations.append("No direct visual matches found.")
# Remove duplicates and sort
unique_timestamps = sorted(list(set(timestamps_to_investigate)))
return {
"observations": found_observations,
"target_timestamps": unique_timestamps
}
# --- NODE 3: ANALYST (The Eyes) ---
def analyst_node(state: AgentState):
"""
Visits the specific timestamps found by the Retriever and looks closely.
"""
video_id = state["video_id"]
timestamps = state["target_timestamps"]
search_topic = state["search_topic"]
new_findings = []
if not timestamps:
return {"observations": ["Analyst: I have nowhere to look."]}
print(f"👁️ Analyst: Zooming in on {len(timestamps)} moments...")
# We give the Vision Model a very specific task
verification_prompt = f"Look specifically for '{search_topic}'. If you see it, describe it in detail. If not, say 'Not visible'."
for time_point in timestamps:
description = perception_engine.analyze_video_segment(
video_path=f"data/{video_id}.mp4",
start_time=time_point,
end_time=time_point + 1.0,
prompt=verification_prompt
)
log_entry = f"Visual Inspection at {time_point:.1f}s: {description}"
new_findings.append(log_entry)
print(f" > {log_entry}")
return {"observations": new_findings}
# --- NODE 4: SYNTHESIZER (The Speaker) ---
def synthesizer_node(state: AgentState):
"""
Compiles all observations into a final natural language answer.
"""
user_query = state["user_query"]
all_evidence = state["observations"]
if not all_evidence:
return {"final_answer": "I'm sorry, I couldn't find any information about that in the video."}
evidence_text = "\n".join(all_evidence)
system_prompt = f"""<|im_start|>system
You are a helpful video assistant.
Answer the user's question based strictly on the evidence below.
If the evidence contradicts itself, trust the 'Visual Inspection' over the 'Memory Hint'.
EVIDENCE COLLECTED:
{evidence_text}
<|im_end|>
<|im_start|>user
{user_query}
<|im_end|>
<|im_start|>assistant
"""
# We use the raw text generation here for direct control
answer = perception_engine.generate_text(system_prompt, stop=["<|im_end|>"])
return {"final_answer": answer}
# --- GRAPH CONSTRUCTION ---
workflow = StateGraph(AgentState)
# Add Nodes
workflow.add_node("planner", planner_node)
workflow.add_node("retriever", retriever_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("synthesizer", synthesizer_node)
# Define Edges (The Flow)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "retriever")
workflow.add_edge("retriever", "analyst")
workflow.add_edge("analyst", "synthesizer")
workflow.add_edge("synthesizer", END)
return workflow.compile()