memvid-mcp / utils /storage_handler.py
eldarski
πŸŽ₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
"""
Storage Handler - HuggingFace Dataset integration for persistent memory storage.
Handles uploading and downloading memory videos to/from HF datasets.
"""
import os
import json
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
import tempfile
import shutil
try:
from huggingface_hub import HfApi, create_repo, upload_file, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError
HF_AVAILABLE = True
except ImportError:
logging.warning("HuggingFace Hub not available. Using local storage only.")
HF_AVAILABLE = False
class StorageHandler:
"""
Handles persistent storage using HuggingFace datasets.
Provides backup and restore functionality for memory videos.
"""
def __init__(
self, hf_token: Optional[str] = None, dataset_name: Optional[str] = None
):
"""
Initialize the storage handler.
Args:
hf_token (str, optional): HuggingFace API token
dataset_name (str, optional): Name of the HF dataset to use
"""
self.logger = logging.getLogger(__name__)
# Get HF token from environment or parameter
self.hf_token = (
hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN")
)
# Set default dataset name
self.dataset_name = dataset_name or os.getenv(
"HF_DATASET_NAME", "memvid-memory-store"
)
# Initialize HF API if available
self.hf_api = None
self.hf_enabled = False
if HF_AVAILABLE and self.hf_token:
try:
self.hf_api = HfApi(token=self.hf_token)
self.hf_enabled = True
self.logger.info(
f"HuggingFace integration enabled with dataset: {self.dataset_name}"
)
except Exception as e:
self.logger.warning(f"Failed to initialize HF API: {e}")
else:
self.logger.info(
"HuggingFace integration disabled - using local storage only"
)
def ensure_dataset_exists(self) -> bool:
"""
Ensure the HF dataset exists, create if it doesn't.
Returns:
bool: True if dataset exists or was created successfully
"""
if not self.hf_enabled:
return False
try:
# Try to get dataset info
self.hf_api.dataset_info(self.dataset_name)
self.logger.info(f"Dataset {self.dataset_name} already exists")
return True
except RepositoryNotFoundError:
try:
# Create the dataset
create_repo(
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
private=True, # Make it private by default
)
self.logger.info(f"Created new dataset: {self.dataset_name}")
return True
except Exception as e:
self.logger.error(f"Failed to create dataset {self.dataset_name}: {e}")
return False
except Exception as e:
self.logger.error(f"Error checking dataset {self.dataset_name}: {e}")
return False
def upload_memory_video(
self, client_id: str, memory_name: str, video_path: Path, index_path: Path
) -> bool:
"""
Upload memory video and index to HF dataset.
Args:
client_id (str): Client identifier
memory_name (str): Memory video name
video_path (Path): Local path to video file
index_path (Path): Local path to index file
Returns:
bool: True if upload successful
"""
if not self.hf_enabled:
self.logger.info("HF upload skipped - not enabled")
return False
if not self.ensure_dataset_exists():
return False
try:
# Upload video file
video_remote_path = f"{client_id}/videos/{memory_name}.mp4"
upload_file(
path_or_fileobj=str(video_path),
path_in_repo=video_remote_path,
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
)
# Upload index file
index_remote_path = f"{client_id}/videos/{memory_name}_index.json"
upload_file(
path_or_fileobj=str(index_path),
path_in_repo=index_remote_path,
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
)
self.logger.info(
f"Successfully uploaded memory '{memory_name}' for client {client_id}"
)
return True
except Exception as e:
self.logger.error(f"Failed to upload memory video: {e}")
return False
def download_memory_video(
self, client_id: str, memory_name: str, local_videos_dir: Path
) -> bool:
"""
Download memory video and index from HF dataset.
Args:
client_id (str): Client identifier
memory_name (str): Memory video name
local_videos_dir (Path): Local directory to save files
Returns:
bool: True if download successful
"""
if not self.hf_enabled:
self.logger.info("HF download skipped - not enabled")
return False
try:
# Download video file
video_remote_path = f"{client_id}/videos/{memory_name}.mp4"
video_local_path = local_videos_dir / f"{memory_name}.mp4"
hf_hub_download(
repo_id=self.dataset_name,
filename=video_remote_path,
repo_type="dataset",
token=self.hf_token,
local_dir=str(local_videos_dir.parent),
local_dir_use_symlinks=False,
)
# Download index file
index_remote_path = f"{client_id}/videos/{memory_name}_index.json"
index_local_path = local_videos_dir / f"{memory_name}_index.json"
hf_hub_download(
repo_id=self.dataset_name,
filename=index_remote_path,
repo_type="dataset",
token=self.hf_token,
local_dir=str(local_videos_dir.parent),
local_dir_use_symlinks=False,
)
self.logger.info(
f"Successfully downloaded memory '{memory_name}' for client {client_id}"
)
return True
except Exception as e:
self.logger.error(f"Failed to download memory video: {e}")
return False
def upload_client_metadata(self, client_id: str, metadata: Dict[str, Any]) -> bool:
"""
Upload client metadata to HF dataset.
Args:
client_id (str): Client identifier
metadata (dict): Client metadata
Returns:
bool: True if upload successful
"""
if not self.hf_enabled:
return False
if not self.ensure_dataset_exists():
return False
try:
# Create temporary file for metadata
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False
) as f:
json.dump(metadata, f, indent=2)
temp_path = f.name
# Upload metadata
remote_path = f"{client_id}/metadata.json"
upload_file(
path_or_fileobj=temp_path,
path_in_repo=remote_path,
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
)
# Clean up temp file
os.unlink(temp_path)
self.logger.info(f"Successfully uploaded metadata for client {client_id}")
return True
except Exception as e:
self.logger.error(f"Failed to upload metadata: {e}")
return False
def download_client_metadata(self, client_id: str) -> Optional[Dict[str, Any]]:
"""
Download client metadata from HF dataset.
Args:
client_id (str): Client identifier
Returns:
dict or None: Client metadata if successful
"""
if not self.hf_enabled:
return None
try:
# Download metadata to temporary file
remote_path = f"{client_id}/metadata.json"
with tempfile.TemporaryDirectory() as temp_dir:
local_path = hf_hub_download(
repo_id=self.dataset_name,
filename=remote_path,
repo_type="dataset",
token=self.hf_token,
local_dir=temp_dir,
local_dir_use_symlinks=False,
)
# Read metadata
with open(local_path, "r") as f:
metadata = json.load(f)
self.logger.info(
f"Successfully downloaded metadata for client {client_id}"
)
return metadata
except Exception as e:
self.logger.error(f"Failed to download metadata: {e}")
return None
def list_client_memories(self, client_id: str) -> List[str]:
"""
List available memory videos for a client in HF dataset.
Args:
client_id (str): Client identifier
Returns:
list: List of memory names
"""
if not self.hf_enabled:
return []
try:
# Get dataset files
files = self.hf_api.list_repo_files(
repo_id=self.dataset_name, repo_type="dataset"
)
# Filter for this client's video files
memory_names = []
prefix = f"{client_id}/videos/"
for file_path in files:
if file_path.startswith(prefix) and file_path.endswith(".mp4"):
# Extract memory name from path
filename = file_path[len(prefix) :]
memory_name = filename[:-4] # Remove .mp4 extension
memory_names.append(memory_name)
return memory_names
except Exception as e:
self.logger.error(f"Failed to list client memories: {e}")
return []
def backup_client_data(self, client_id: str, local_client_dir: Path) -> bool:
"""
Backup all client data to HF dataset.
Args:
client_id (str): Client identifier
local_client_dir (Path): Local client directory
Returns:
bool: True if backup successful
"""
if not self.hf_enabled:
self.logger.info("HF backup skipped - not enabled")
return False
try:
success_count = 0
total_files = 0
# Upload all video files
videos_dir = local_client_dir / "videos"
if videos_dir.exists():
for video_file in videos_dir.glob("*.mp4"):
memory_name = video_file.stem
index_file = videos_dir / f"{memory_name}_index.json"
if index_file.exists():
total_files += 2
if self.upload_memory_video(
client_id, memory_name, video_file, index_file
):
success_count += 2
# Upload metadata
metadata_file = local_client_dir / "metadata.json"
if metadata_file.exists():
total_files += 1
with open(metadata_file, "r") as f:
metadata = json.load(f)
if self.upload_client_metadata(client_id, metadata):
success_count += 1
self.logger.info(
f"Backup completed: {success_count}/{total_files} files uploaded for client {client_id}"
)
return success_count == total_files
except Exception as e:
self.logger.error(f"Failed to backup client data: {e}")
return False
def restore_client_data(self, client_id: str, local_client_dir: Path) -> bool:
"""
Restore client data from HF dataset.
Args:
client_id (str): Client identifier
local_client_dir (Path): Local client directory
Returns:
bool: True if restore successful
"""
if not self.hf_enabled:
self.logger.info("HF restore skipped - not enabled")
return False
try:
# Ensure local directories exist
local_client_dir.mkdir(exist_ok=True)
(local_client_dir / "videos").mkdir(exist_ok=True)
(local_client_dir / "chunks").mkdir(exist_ok=True)
# Restore metadata
metadata = self.download_client_metadata(client_id)
if metadata:
metadata_file = local_client_dir / "metadata.json"
with open(metadata_file, "w") as f:
json.dump(metadata, f, indent=2)
# Restore memory videos
memory_names = self.list_client_memories(client_id)
videos_dir = local_client_dir / "videos"
success_count = 0
for memory_name in memory_names:
if self.download_memory_video(client_id, memory_name, videos_dir):
success_count += 1
self.logger.info(
f"Restore completed: {success_count}/{len(memory_names)} memories restored for client {client_id}"
)
return success_count == len(memory_names)
except Exception as e:
self.logger.error(f"Failed to restore client data: {e}")
return False
def get_storage_info(self) -> Dict[str, Any]:
"""
Get storage handler information and status.
Returns:
dict: Storage information
"""
info = {
"hf_available": HF_AVAILABLE,
"hf_enabled": self.hf_enabled,
"dataset_name": self.dataset_name,
"has_token": bool(self.hf_token),
"storage_mode": "hybrid" if self.hf_enabled else "local_only",
}
if self.hf_enabled:
try:
dataset_exists = self.ensure_dataset_exists()
info["dataset_exists"] = dataset_exists
except Exception as e:
info["dataset_error"] = str(e)
return info