Spaces:
Running
Running
eldarski
π₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
| """ | |
| Storage Handler - HuggingFace Dataset integration for persistent memory storage. | |
| Handles uploading and downloading memory videos to/from HF datasets. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from typing import Dict, Any, List, Optional | |
| from pathlib import Path | |
| import tempfile | |
| import shutil | |
| try: | |
| from huggingface_hub import HfApi, create_repo, upload_file, hf_hub_download | |
| from huggingface_hub.utils import RepositoryNotFoundError | |
| HF_AVAILABLE = True | |
| except ImportError: | |
| logging.warning("HuggingFace Hub not available. Using local storage only.") | |
| HF_AVAILABLE = False | |
| class StorageHandler: | |
| """ | |
| Handles persistent storage using HuggingFace datasets. | |
| Provides backup and restore functionality for memory videos. | |
| """ | |
| def __init__( | |
| self, hf_token: Optional[str] = None, dataset_name: Optional[str] = None | |
| ): | |
| """ | |
| Initialize the storage handler. | |
| Args: | |
| hf_token (str, optional): HuggingFace API token | |
| dataset_name (str, optional): Name of the HF dataset to use | |
| """ | |
| self.logger = logging.getLogger(__name__) | |
| # Get HF token from environment or parameter | |
| self.hf_token = ( | |
| hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN") | |
| ) | |
| # Set default dataset name | |
| self.dataset_name = dataset_name or os.getenv( | |
| "HF_DATASET_NAME", "memvid-memory-store" | |
| ) | |
| # Initialize HF API if available | |
| self.hf_api = None | |
| self.hf_enabled = False | |
| if HF_AVAILABLE and self.hf_token: | |
| try: | |
| self.hf_api = HfApi(token=self.hf_token) | |
| self.hf_enabled = True | |
| self.logger.info( | |
| f"HuggingFace integration enabled with dataset: {self.dataset_name}" | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to initialize HF API: {e}") | |
| else: | |
| self.logger.info( | |
| "HuggingFace integration disabled - using local storage only" | |
| ) | |
| def ensure_dataset_exists(self) -> bool: | |
| """ | |
| Ensure the HF dataset exists, create if it doesn't. | |
| Returns: | |
| bool: True if dataset exists or was created successfully | |
| """ | |
| if not self.hf_enabled: | |
| return False | |
| try: | |
| # Try to get dataset info | |
| self.hf_api.dataset_info(self.dataset_name) | |
| self.logger.info(f"Dataset {self.dataset_name} already exists") | |
| return True | |
| except RepositoryNotFoundError: | |
| try: | |
| # Create the dataset | |
| create_repo( | |
| repo_id=self.dataset_name, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| private=True, # Make it private by default | |
| ) | |
| self.logger.info(f"Created new dataset: {self.dataset_name}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to create dataset {self.dataset_name}: {e}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"Error checking dataset {self.dataset_name}: {e}") | |
| return False | |
| def upload_memory_video( | |
| self, client_id: str, memory_name: str, video_path: Path, index_path: Path | |
| ) -> bool: | |
| """ | |
| Upload memory video and index to HF dataset. | |
| Args: | |
| client_id (str): Client identifier | |
| memory_name (str): Memory video name | |
| video_path (Path): Local path to video file | |
| index_path (Path): Local path to index file | |
| Returns: | |
| bool: True if upload successful | |
| """ | |
| if not self.hf_enabled: | |
| self.logger.info("HF upload skipped - not enabled") | |
| return False | |
| if not self.ensure_dataset_exists(): | |
| return False | |
| try: | |
| # Upload video file | |
| video_remote_path = f"{client_id}/videos/{memory_name}.mp4" | |
| upload_file( | |
| path_or_fileobj=str(video_path), | |
| path_in_repo=video_remote_path, | |
| repo_id=self.dataset_name, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| ) | |
| # Upload index file | |
| index_remote_path = f"{client_id}/videos/{memory_name}_index.json" | |
| upload_file( | |
| path_or_fileobj=str(index_path), | |
| path_in_repo=index_remote_path, | |
| repo_id=self.dataset_name, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| ) | |
| self.logger.info( | |
| f"Successfully uploaded memory '{memory_name}' for client {client_id}" | |
| ) | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to upload memory video: {e}") | |
| return False | |
| def download_memory_video( | |
| self, client_id: str, memory_name: str, local_videos_dir: Path | |
| ) -> bool: | |
| """ | |
| Download memory video and index from HF dataset. | |
| Args: | |
| client_id (str): Client identifier | |
| memory_name (str): Memory video name | |
| local_videos_dir (Path): Local directory to save files | |
| Returns: | |
| bool: True if download successful | |
| """ | |
| if not self.hf_enabled: | |
| self.logger.info("HF download skipped - not enabled") | |
| return False | |
| try: | |
| # Download video file | |
| video_remote_path = f"{client_id}/videos/{memory_name}.mp4" | |
| video_local_path = local_videos_dir / f"{memory_name}.mp4" | |
| hf_hub_download( | |
| repo_id=self.dataset_name, | |
| filename=video_remote_path, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| local_dir=str(local_videos_dir.parent), | |
| local_dir_use_symlinks=False, | |
| ) | |
| # Download index file | |
| index_remote_path = f"{client_id}/videos/{memory_name}_index.json" | |
| index_local_path = local_videos_dir / f"{memory_name}_index.json" | |
| hf_hub_download( | |
| repo_id=self.dataset_name, | |
| filename=index_remote_path, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| local_dir=str(local_videos_dir.parent), | |
| local_dir_use_symlinks=False, | |
| ) | |
| self.logger.info( | |
| f"Successfully downloaded memory '{memory_name}' for client {client_id}" | |
| ) | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to download memory video: {e}") | |
| return False | |
| def upload_client_metadata(self, client_id: str, metadata: Dict[str, Any]) -> bool: | |
| """ | |
| Upload client metadata to HF dataset. | |
| Args: | |
| client_id (str): Client identifier | |
| metadata (dict): Client metadata | |
| Returns: | |
| bool: True if upload successful | |
| """ | |
| if not self.hf_enabled: | |
| return False | |
| if not self.ensure_dataset_exists(): | |
| return False | |
| try: | |
| # Create temporary file for metadata | |
| with tempfile.NamedTemporaryFile( | |
| mode="w", suffix=".json", delete=False | |
| ) as f: | |
| json.dump(metadata, f, indent=2) | |
| temp_path = f.name | |
| # Upload metadata | |
| remote_path = f"{client_id}/metadata.json" | |
| upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=remote_path, | |
| repo_id=self.dataset_name, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| ) | |
| # Clean up temp file | |
| os.unlink(temp_path) | |
| self.logger.info(f"Successfully uploaded metadata for client {client_id}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to upload metadata: {e}") | |
| return False | |
| def download_client_metadata(self, client_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Download client metadata from HF dataset. | |
| Args: | |
| client_id (str): Client identifier | |
| Returns: | |
| dict or None: Client metadata if successful | |
| """ | |
| if not self.hf_enabled: | |
| return None | |
| try: | |
| # Download metadata to temporary file | |
| remote_path = f"{client_id}/metadata.json" | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| local_path = hf_hub_download( | |
| repo_id=self.dataset_name, | |
| filename=remote_path, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| local_dir=temp_dir, | |
| local_dir_use_symlinks=False, | |
| ) | |
| # Read metadata | |
| with open(local_path, "r") as f: | |
| metadata = json.load(f) | |
| self.logger.info( | |
| f"Successfully downloaded metadata for client {client_id}" | |
| ) | |
| return metadata | |
| except Exception as e: | |
| self.logger.error(f"Failed to download metadata: {e}") | |
| return None | |
| def list_client_memories(self, client_id: str) -> List[str]: | |
| """ | |
| List available memory videos for a client in HF dataset. | |
| Args: | |
| client_id (str): Client identifier | |
| Returns: | |
| list: List of memory names | |
| """ | |
| if not self.hf_enabled: | |
| return [] | |
| try: | |
| # Get dataset files | |
| files = self.hf_api.list_repo_files( | |
| repo_id=self.dataset_name, repo_type="dataset" | |
| ) | |
| # Filter for this client's video files | |
| memory_names = [] | |
| prefix = f"{client_id}/videos/" | |
| for file_path in files: | |
| if file_path.startswith(prefix) and file_path.endswith(".mp4"): | |
| # Extract memory name from path | |
| filename = file_path[len(prefix) :] | |
| memory_name = filename[:-4] # Remove .mp4 extension | |
| memory_names.append(memory_name) | |
| return memory_names | |
| except Exception as e: | |
| self.logger.error(f"Failed to list client memories: {e}") | |
| return [] | |
| def backup_client_data(self, client_id: str, local_client_dir: Path) -> bool: | |
| """ | |
| Backup all client data to HF dataset. | |
| Args: | |
| client_id (str): Client identifier | |
| local_client_dir (Path): Local client directory | |
| Returns: | |
| bool: True if backup successful | |
| """ | |
| if not self.hf_enabled: | |
| self.logger.info("HF backup skipped - not enabled") | |
| return False | |
| try: | |
| success_count = 0 | |
| total_files = 0 | |
| # Upload all video files | |
| videos_dir = local_client_dir / "videos" | |
| if videos_dir.exists(): | |
| for video_file in videos_dir.glob("*.mp4"): | |
| memory_name = video_file.stem | |
| index_file = videos_dir / f"{memory_name}_index.json" | |
| if index_file.exists(): | |
| total_files += 2 | |
| if self.upload_memory_video( | |
| client_id, memory_name, video_file, index_file | |
| ): | |
| success_count += 2 | |
| # Upload metadata | |
| metadata_file = local_client_dir / "metadata.json" | |
| if metadata_file.exists(): | |
| total_files += 1 | |
| with open(metadata_file, "r") as f: | |
| metadata = json.load(f) | |
| if self.upload_client_metadata(client_id, metadata): | |
| success_count += 1 | |
| self.logger.info( | |
| f"Backup completed: {success_count}/{total_files} files uploaded for client {client_id}" | |
| ) | |
| return success_count == total_files | |
| except Exception as e: | |
| self.logger.error(f"Failed to backup client data: {e}") | |
| return False | |
| def restore_client_data(self, client_id: str, local_client_dir: Path) -> bool: | |
| """ | |
| Restore client data from HF dataset. | |
| Args: | |
| client_id (str): Client identifier | |
| local_client_dir (Path): Local client directory | |
| Returns: | |
| bool: True if restore successful | |
| """ | |
| if not self.hf_enabled: | |
| self.logger.info("HF restore skipped - not enabled") | |
| return False | |
| try: | |
| # Ensure local directories exist | |
| local_client_dir.mkdir(exist_ok=True) | |
| (local_client_dir / "videos").mkdir(exist_ok=True) | |
| (local_client_dir / "chunks").mkdir(exist_ok=True) | |
| # Restore metadata | |
| metadata = self.download_client_metadata(client_id) | |
| if metadata: | |
| metadata_file = local_client_dir / "metadata.json" | |
| with open(metadata_file, "w") as f: | |
| json.dump(metadata, f, indent=2) | |
| # Restore memory videos | |
| memory_names = self.list_client_memories(client_id) | |
| videos_dir = local_client_dir / "videos" | |
| success_count = 0 | |
| for memory_name in memory_names: | |
| if self.download_memory_video(client_id, memory_name, videos_dir): | |
| success_count += 1 | |
| self.logger.info( | |
| f"Restore completed: {success_count}/{len(memory_names)} memories restored for client {client_id}" | |
| ) | |
| return success_count == len(memory_names) | |
| except Exception as e: | |
| self.logger.error(f"Failed to restore client data: {e}") | |
| return False | |
| def get_storage_info(self) -> Dict[str, Any]: | |
| """ | |
| Get storage handler information and status. | |
| Returns: | |
| dict: Storage information | |
| """ | |
| info = { | |
| "hf_available": HF_AVAILABLE, | |
| "hf_enabled": self.hf_enabled, | |
| "dataset_name": self.dataset_name, | |
| "has_token": bool(self.hf_token), | |
| "storage_mode": "hybrid" if self.hf_enabled else "local_only", | |
| } | |
| if self.hf_enabled: | |
| try: | |
| dataset_exists = self.ensure_dataset_exists() | |
| info["dataset_exists"] = dataset_exists | |
| except Exception as e: | |
| info["dataset_error"] = str(e) | |
| return info | |