RAG / modules /rag_indexer.py
Hanzo03's picture
inatial commit
ccdd4a4
import chromadb
import json
COLLECTION_NAME = 'video_analysis_data'
# ... (rest of imports/constants)
def generate_text_summary(record):
"""
Converts a structured detection record into a natural language text description
by summarizing all detected objects clearly.
"""
video_id = record['video_id']
timestamp = record['timestamp_sec']
detections = record['detections']
if not detections:
return f"Analysis of video '{video_id}' at {timestamp} seconds: No objects were detected in this frame."
# Group detections by label for a complete object count summary
object_counts = {}
for det in detections:
label = det['label']
object_counts[label] = object_counts.get(label, 0) + 1
summary_parts = []
if object_counts:
# Format: N instances of 'label', M instances of 'other_label', etc.
object_descriptions = [
f"{count} instances of '{label}'"
for label, count in object_counts.items()
]
summary_parts.append("Detected objects include: " + ", ".join(object_descriptions) + ".")
summary_doc = f"Analysis of video '{video_id}' at {timestamp} seconds: {' '.join(summary_parts)}"
return summary_doc
def index_analysis_data(json_file='raw_analysis.json', collection_name='video_analysis_data'):
"""
Loads raw analysis, generates documents, and indexes them in ChromaDB.
"""
try:
with open(json_file, 'r') as f:
raw_data = json.load(f)
except FileNotFoundError:
print(f"Error: {json_file} not found. Run 'video_analyzer.py' first.")
return
# Initialize ChromaDB client
client = chromadb.PersistentClient(path="./chroma_db") # Stores data locally
# Changed collection name to be more generic
collection = client.get_or_create_collection(name=collection_name)
documents = []
metadatas = []
ids = []
print(f"Indexing {len(raw_data)} analysis records...")
for i, record in enumerate(raw_data):
doc_text = generate_text_summary(record)
if doc_text:
documents.append(doc_text)
# Metadata is crucial for filtering and context
metadatas.append({
'video_id': record['video_id'],
'timestamp_sec': record['timestamp_sec'],
'frame_id': record['frame_id']
})
ids.append(f"doc_{i}")
# ChromaDB automatically handles embedding and storage
if documents:
collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
print(f"Successfully indexed {len(documents)} documents into ChromaDB collection '{collection_name}'.")
else:
print("No valid documents generated for indexing.")
if __name__ == '__main__':
index_analysis_data()