""" Storage Handler - HuggingFace Dataset integration for persistent memory storage. Handles uploading and downloading memory videos to/from HF datasets. """ import os import json import logging from typing import Dict, Any, List, Optional from pathlib import Path import tempfile import shutil try: from huggingface_hub import HfApi, create_repo, upload_file, hf_hub_download from huggingface_hub.utils import RepositoryNotFoundError HF_AVAILABLE = True except ImportError: logging.warning("HuggingFace Hub not available. Using local storage only.") HF_AVAILABLE = False class StorageHandler: """ Handles persistent storage using HuggingFace datasets. Provides backup and restore functionality for memory videos. """ def __init__( self, hf_token: Optional[str] = None, dataset_name: Optional[str] = None ): """ Initialize the storage handler. Args: hf_token (str, optional): HuggingFace API token dataset_name (str, optional): Name of the HF dataset to use """ self.logger = logging.getLogger(__name__) # Get HF token from environment or parameter self.hf_token = ( hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN") ) # Set default dataset name self.dataset_name = dataset_name or os.getenv( "HF_DATASET_NAME", "memvid-memory-store" ) # Initialize HF API if available self.hf_api = None self.hf_enabled = False if HF_AVAILABLE and self.hf_token: try: self.hf_api = HfApi(token=self.hf_token) self.hf_enabled = True self.logger.info( f"HuggingFace integration enabled with dataset: {self.dataset_name}" ) except Exception as e: self.logger.warning(f"Failed to initialize HF API: {e}") else: self.logger.info( "HuggingFace integration disabled - using local storage only" ) def ensure_dataset_exists(self) -> bool: """ Ensure the HF dataset exists, create if it doesn't. Returns: bool: True if dataset exists or was created successfully """ if not self.hf_enabled: return False try: # Try to get dataset info self.hf_api.dataset_info(self.dataset_name) self.logger.info(f"Dataset {self.dataset_name} already exists") return True except RepositoryNotFoundError: try: # Create the dataset create_repo( repo_id=self.dataset_name, repo_type="dataset", token=self.hf_token, private=True, # Make it private by default ) self.logger.info(f"Created new dataset: {self.dataset_name}") return True except Exception as e: self.logger.error(f"Failed to create dataset {self.dataset_name}: {e}") return False except Exception as e: self.logger.error(f"Error checking dataset {self.dataset_name}: {e}") return False def upload_memory_video( self, client_id: str, memory_name: str, video_path: Path, index_path: Path ) -> bool: """ Upload memory video and index to HF dataset. Args: client_id (str): Client identifier memory_name (str): Memory video name video_path (Path): Local path to video file index_path (Path): Local path to index file Returns: bool: True if upload successful """ if not self.hf_enabled: self.logger.info("HF upload skipped - not enabled") return False if not self.ensure_dataset_exists(): return False try: # Upload video file video_remote_path = f"{client_id}/videos/{memory_name}.mp4" upload_file( path_or_fileobj=str(video_path), path_in_repo=video_remote_path, repo_id=self.dataset_name, repo_type="dataset", token=self.hf_token, ) # Upload index file index_remote_path = f"{client_id}/videos/{memory_name}_index.json" upload_file( path_or_fileobj=str(index_path), path_in_repo=index_remote_path, repo_id=self.dataset_name, repo_type="dataset", token=self.hf_token, ) self.logger.info( f"Successfully uploaded memory '{memory_name}' for client {client_id}" ) return True except Exception as e: self.logger.error(f"Failed to upload memory video: {e}") return False def download_memory_video( self, client_id: str, memory_name: str, local_videos_dir: Path ) -> bool: """ Download memory video and index from HF dataset. Args: client_id (str): Client identifier memory_name (str): Memory video name local_videos_dir (Path): Local directory to save files Returns: bool: True if download successful """ if not self.hf_enabled: self.logger.info("HF download skipped - not enabled") return False try: # Download video file video_remote_path = f"{client_id}/videos/{memory_name}.mp4" video_local_path = local_videos_dir / f"{memory_name}.mp4" hf_hub_download( repo_id=self.dataset_name, filename=video_remote_path, repo_type="dataset", token=self.hf_token, local_dir=str(local_videos_dir.parent), local_dir_use_symlinks=False, ) # Download index file index_remote_path = f"{client_id}/videos/{memory_name}_index.json" index_local_path = local_videos_dir / f"{memory_name}_index.json" hf_hub_download( repo_id=self.dataset_name, filename=index_remote_path, repo_type="dataset", token=self.hf_token, local_dir=str(local_videos_dir.parent), local_dir_use_symlinks=False, ) self.logger.info( f"Successfully downloaded memory '{memory_name}' for client {client_id}" ) return True except Exception as e: self.logger.error(f"Failed to download memory video: {e}") return False def upload_client_metadata(self, client_id: str, metadata: Dict[str, Any]) -> bool: """ Upload client metadata to HF dataset. Args: client_id (str): Client identifier metadata (dict): Client metadata Returns: bool: True if upload successful """ if not self.hf_enabled: return False if not self.ensure_dataset_exists(): return False try: # Create temporary file for metadata with tempfile.NamedTemporaryFile( mode="w", suffix=".json", delete=False ) as f: json.dump(metadata, f, indent=2) temp_path = f.name # Upload metadata remote_path = f"{client_id}/metadata.json" upload_file( path_or_fileobj=temp_path, path_in_repo=remote_path, repo_id=self.dataset_name, repo_type="dataset", token=self.hf_token, ) # Clean up temp file os.unlink(temp_path) self.logger.info(f"Successfully uploaded metadata for client {client_id}") return True except Exception as e: self.logger.error(f"Failed to upload metadata: {e}") return False def download_client_metadata(self, client_id: str) -> Optional[Dict[str, Any]]: """ Download client metadata from HF dataset. Args: client_id (str): Client identifier Returns: dict or None: Client metadata if successful """ if not self.hf_enabled: return None try: # Download metadata to temporary file remote_path = f"{client_id}/metadata.json" with tempfile.TemporaryDirectory() as temp_dir: local_path = hf_hub_download( repo_id=self.dataset_name, filename=remote_path, repo_type="dataset", token=self.hf_token, local_dir=temp_dir, local_dir_use_symlinks=False, ) # Read metadata with open(local_path, "r") as f: metadata = json.load(f) self.logger.info( f"Successfully downloaded metadata for client {client_id}" ) return metadata except Exception as e: self.logger.error(f"Failed to download metadata: {e}") return None def list_client_memories(self, client_id: str) -> List[str]: """ List available memory videos for a client in HF dataset. Args: client_id (str): Client identifier Returns: list: List of memory names """ if not self.hf_enabled: return [] try: # Get dataset files files = self.hf_api.list_repo_files( repo_id=self.dataset_name, repo_type="dataset" ) # Filter for this client's video files memory_names = [] prefix = f"{client_id}/videos/" for file_path in files: if file_path.startswith(prefix) and file_path.endswith(".mp4"): # Extract memory name from path filename = file_path[len(prefix) :] memory_name = filename[:-4] # Remove .mp4 extension memory_names.append(memory_name) return memory_names except Exception as e: self.logger.error(f"Failed to list client memories: {e}") return [] def backup_client_data(self, client_id: str, local_client_dir: Path) -> bool: """ Backup all client data to HF dataset. Args: client_id (str): Client identifier local_client_dir (Path): Local client directory Returns: bool: True if backup successful """ if not self.hf_enabled: self.logger.info("HF backup skipped - not enabled") return False try: success_count = 0 total_files = 0 # Upload all video files videos_dir = local_client_dir / "videos" if videos_dir.exists(): for video_file in videos_dir.glob("*.mp4"): memory_name = video_file.stem index_file = videos_dir / f"{memory_name}_index.json" if index_file.exists(): total_files += 2 if self.upload_memory_video( client_id, memory_name, video_file, index_file ): success_count += 2 # Upload metadata metadata_file = local_client_dir / "metadata.json" if metadata_file.exists(): total_files += 1 with open(metadata_file, "r") as f: metadata = json.load(f) if self.upload_client_metadata(client_id, metadata): success_count += 1 self.logger.info( f"Backup completed: {success_count}/{total_files} files uploaded for client {client_id}" ) return success_count == total_files except Exception as e: self.logger.error(f"Failed to backup client data: {e}") return False def restore_client_data(self, client_id: str, local_client_dir: Path) -> bool: """ Restore client data from HF dataset. Args: client_id (str): Client identifier local_client_dir (Path): Local client directory Returns: bool: True if restore successful """ if not self.hf_enabled: self.logger.info("HF restore skipped - not enabled") return False try: # Ensure local directories exist local_client_dir.mkdir(exist_ok=True) (local_client_dir / "videos").mkdir(exist_ok=True) (local_client_dir / "chunks").mkdir(exist_ok=True) # Restore metadata metadata = self.download_client_metadata(client_id) if metadata: metadata_file = local_client_dir / "metadata.json" with open(metadata_file, "w") as f: json.dump(metadata, f, indent=2) # Restore memory videos memory_names = self.list_client_memories(client_id) videos_dir = local_client_dir / "videos" success_count = 0 for memory_name in memory_names: if self.download_memory_video(client_id, memory_name, videos_dir): success_count += 1 self.logger.info( f"Restore completed: {success_count}/{len(memory_names)} memories restored for client {client_id}" ) return success_count == len(memory_names) except Exception as e: self.logger.error(f"Failed to restore client data: {e}") return False def get_storage_info(self) -> Dict[str, Any]: """ Get storage handler information and status. Returns: dict: Storage information """ info = { "hf_available": HF_AVAILABLE, "hf_enabled": self.hf_enabled, "dataset_name": self.dataset_name, "has_token": bool(self.hf_token), "storage_mode": "hybrid" if self.hf_enabled else "local_only", } if self.hf_enabled: try: dataset_exists = self.ensure_dataset_exists() info["dataset_exists"] = dataset_exists except Exception as e: info["dataset_error"] = str(e) return info