Spaces:
Runtime error
Runtime error
File size: 7,123 Bytes
fca155a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Sequence, Dict, Any, List
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
"""
Represents the 'Brain State' of the agent as it thinks.
This dictionary is passed between all the nodes in the graph.
"""
user_query: str
video_id: str
plan: str
search_topic: str
target_timestamps: List[float]
observations: Annotated[List[str], operator.add] # 'add' means new observations are appended, not overwritten
final_answer: str
context: Dict[str, Any] # Holds references to the Search Index and Tools
def create_agent_graph(perception_engine, memory_manager):
"""
Builds the Decision Graph (The 'Flowchart' of the AI).
"""
# --- NODE 1: PLANNER ---
def planner_node(state: AgentState):
query = state["user_query"]
print(f"🤖 Planner: Receiving query -> '{query}'")
# For now, we assume every query requires a search.
# Future improvement: Distinguish between 'Summary' and 'Specific Search'.
return {
"plan": "SEARCH",
"search_topic": query,
"observations": []
}
# --- NODE 2: RETRIEVER (The Librarian) ---
def retriever_node(state: AgentState):
"""
Searches both Text (Metadata) and Vision (CLIP) indices.
"""
video_id = state["video_id"]
search_topic = state["search_topic"]
# Unpack tools from context
scout = state["context"]["scout"]
visual_memory = state["context"]["vis_index"]
text_memory = state["context"]["txt_index"]
found_observations = []
timestamps_to_investigate = []
print(f"📚 Retriever: Looking up '{search_topic}'...")
query_vector = scout.embed_text(search_topic)
# A. Search Semantic Text Memory (Captions we generated earlier)
if text_memory:
# Find top 3 text matches
text_matches = text_memory.search(query_vector, top_k=3)
# Filter matches that are actually relevant (Score > 0.35)
relevant_text_matches = [match for match in text_matches if match[1] > 0.35]
if relevant_text_matches:
print(f" ✅ Found {len(relevant_text_matches)} relevant text records.")
# Note: ideally we'd fetch the exact text from the index metadata here.
# For this implementation, we rely on the generic system memory or
# we accept the timestamp and let the Analyst re-verify.
# Let's map these timestamps to potential investigation points.
for timestamp, score in relevant_text_matches:
timestamps_to_investigate.append(timestamp)
found_observations.append(f"Memory Hint: Something relevant might be at {timestamp:.1f}s (Confidence: {score:.2f})")
else:
print(" ⚠️ No strong text matches found. Switching to Visual Search.")
# B. Visual Fallback (If text failed, or to double-check)
# We look for frames that *look* like the user's query
if not found_observations and visual_memory:
visual_matches = visual_memory.search(query_vector, top_k=3)
# Visual similarity needs a lower threshold usually
valid_visual_matches = [match for match in visual_matches if match[1] > 0.22]
if valid_visual_matches:
found_timestamps = [match[0] for match in valid_visual_matches]
print(f" 🦅 Visual Scout suggests checking times: {found_timestamps}")
timestamps_to_investigate.extend(found_timestamps)
else:
found_observations.append("No direct visual matches found.")
# Remove duplicates and sort
unique_timestamps = sorted(list(set(timestamps_to_investigate)))
return {
"observations": found_observations,
"target_timestamps": unique_timestamps
}
# --- NODE 3: ANALYST (The Eyes) ---
def analyst_node(state: AgentState):
"""
Visits the specific timestamps found by the Retriever and looks closely.
"""
video_id = state["video_id"]
timestamps = state["target_timestamps"]
search_topic = state["search_topic"]
new_findings = []
if not timestamps:
return {"observations": ["Analyst: I have nowhere to look."]}
print(f"👁️ Analyst: Zooming in on {len(timestamps)} moments...")
# We give the Vision Model a very specific task
verification_prompt = f"Look specifically for '{search_topic}'. If you see it, describe it in detail. If not, say 'Not visible'."
for time_point in timestamps:
description = perception_engine.analyze_video_segment(
video_path=f"data/{video_id}.mp4",
start_time=time_point,
end_time=time_point + 1.0,
prompt=verification_prompt
)
log_entry = f"Visual Inspection at {time_point:.1f}s: {description}"
new_findings.append(log_entry)
print(f" > {log_entry}")
return {"observations": new_findings}
# --- NODE 4: SYNTHESIZER (The Speaker) ---
def synthesizer_node(state: AgentState):
"""
Compiles all observations into a final natural language answer.
"""
user_query = state["user_query"]
all_evidence = state["observations"]
if not all_evidence:
return {"final_answer": "I'm sorry, I couldn't find any information about that in the video."}
evidence_text = "\n".join(all_evidence)
system_prompt = f"""<|im_start|>system
You are a helpful video assistant.
Answer the user's question based strictly on the evidence below.
If the evidence contradicts itself, trust the 'Visual Inspection' over the 'Memory Hint'.
EVIDENCE COLLECTED:
{evidence_text}
<|im_end|>
<|im_start|>user
{user_query}
<|im_end|>
<|im_start|>assistant
"""
# We use the raw text generation here for direct control
answer = perception_engine.generate_text(system_prompt, stop=["<|im_end|>"])
return {"final_answer": answer}
# --- GRAPH CONSTRUCTION ---
workflow = StateGraph(AgentState)
# Add Nodes
workflow.add_node("planner", planner_node)
workflow.add_node("retriever", retriever_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("synthesizer", synthesizer_node)
# Define Edges (The Flow)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "retriever")
workflow.add_edge("retriever", "analyst")
workflow.add_edge("analyst", "synthesizer")
workflow.add_edge("synthesizer", END)
return workflow.compile()
|