Video-Scout / src /memory /manager.py
ashleshp's picture
first commit
fca155a
from src.interfaces.base import MemoryManager
from pathlib import Path
from typing import List, Dict, Any
import json
class SimpleMemoryManager(MemoryManager):
"""
A lightweight file-based memory manager using JSON.
Perfect for single-video sessions.
"""
def __init__(self, storage_dir: Path):
self.storage_dir = storage_dir
if not self.storage_dir.exists():
self.storage_dir.mkdir(parents=True)
def _get_file_path(self, video_id: str) -> Path:
return self.storage_dir / f"{video_id}_metadata.json"
def _load_data(self, video_id: str) -> Dict[str, Any]:
path = self._get_file_path(video_id)
if not path.exists():
raise FileNotFoundError(f"Metadata not found for {video_id}")
with open(path, "r") as f:
return json.load(f)
def _save_data(self, video_id: str, data: Dict[str, Any]) -> None:
path = self._get_file_path(video_id)
with open(path, "w") as f:
json.dump(data, f, indent=2)
def initialize_storage(self, video_id: str) -> None:
"""Sets up the storage structure for a new video."""
data = {
"video_id": video_id,
"events": [],
"entities": {},
"summary": ""
}
self._save_data(video_id, data)
def commit_event(self, video_id: str, timestamp: str, description: str, metadata: Dict[str, Any]) -> None:
"""Saves a new event to the timeline."""
data = self._load_data(video_id)
event = {
"timestamp": timestamp,
"description": description,
"metadata": metadata
}
data["events"].append(event)
self._save_data(video_id, data)
def query_knowledge(self, video_id: str, query: str) -> List[Dict[str, Any]]:
"""Searches the existing knowledge base."""
data = self._load_data(video_id)
results = []
# Simple keyword search
query_lower = query.lower()
for event in data["events"]:
if query_lower in event["description"].lower():
results.append(event)
return results
def get_summary(self, video_id: str) -> str:
data = self._load_data(video_id)
return data.get("summary", "")
def save_summary(self, video_id: str, summary_text: str) -> None:
"""Updates the global summary for the video."""
data = self._load_data(video_id)
data["summary"] = summary_text
self._save_data(video_id, data)