Spaces:
Running
Running
File size: 14,844 Bytes
168b0da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
"""
Storage Handler - HuggingFace Dataset integration for persistent memory storage.
Handles uploading and downloading memory videos to/from HF datasets.
"""
import os
import json
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
import tempfile
import shutil
try:
from huggingface_hub import HfApi, create_repo, upload_file, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError
HF_AVAILABLE = True
except ImportError:
logging.warning("HuggingFace Hub not available. Using local storage only.")
HF_AVAILABLE = False
class StorageHandler:
"""
Handles persistent storage using HuggingFace datasets.
Provides backup and restore functionality for memory videos.
"""
def __init__(
self, hf_token: Optional[str] = None, dataset_name: Optional[str] = None
):
"""
Initialize the storage handler.
Args:
hf_token (str, optional): HuggingFace API token
dataset_name (str, optional): Name of the HF dataset to use
"""
self.logger = logging.getLogger(__name__)
# Get HF token from environment or parameter
self.hf_token = (
hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN")
)
# Set default dataset name
self.dataset_name = dataset_name or os.getenv(
"HF_DATASET_NAME", "memvid-memory-store"
)
# Initialize HF API if available
self.hf_api = None
self.hf_enabled = False
if HF_AVAILABLE and self.hf_token:
try:
self.hf_api = HfApi(token=self.hf_token)
self.hf_enabled = True
self.logger.info(
f"HuggingFace integration enabled with dataset: {self.dataset_name}"
)
except Exception as e:
self.logger.warning(f"Failed to initialize HF API: {e}")
else:
self.logger.info(
"HuggingFace integration disabled - using local storage only"
)
def ensure_dataset_exists(self) -> bool:
"""
Ensure the HF dataset exists, create if it doesn't.
Returns:
bool: True if dataset exists or was created successfully
"""
if not self.hf_enabled:
return False
try:
# Try to get dataset info
self.hf_api.dataset_info(self.dataset_name)
self.logger.info(f"Dataset {self.dataset_name} already exists")
return True
except RepositoryNotFoundError:
try:
# Create the dataset
create_repo(
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
private=True, # Make it private by default
)
self.logger.info(f"Created new dataset: {self.dataset_name}")
return True
except Exception as e:
self.logger.error(f"Failed to create dataset {self.dataset_name}: {e}")
return False
except Exception as e:
self.logger.error(f"Error checking dataset {self.dataset_name}: {e}")
return False
def upload_memory_video(
self, client_id: str, memory_name: str, video_path: Path, index_path: Path
) -> bool:
"""
Upload memory video and index to HF dataset.
Args:
client_id (str): Client identifier
memory_name (str): Memory video name
video_path (Path): Local path to video file
index_path (Path): Local path to index file
Returns:
bool: True if upload successful
"""
if not self.hf_enabled:
self.logger.info("HF upload skipped - not enabled")
return False
if not self.ensure_dataset_exists():
return False
try:
# Upload video file
video_remote_path = f"{client_id}/videos/{memory_name}.mp4"
upload_file(
path_or_fileobj=str(video_path),
path_in_repo=video_remote_path,
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
)
# Upload index file
index_remote_path = f"{client_id}/videos/{memory_name}_index.json"
upload_file(
path_or_fileobj=str(index_path),
path_in_repo=index_remote_path,
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
)
self.logger.info(
f"Successfully uploaded memory '{memory_name}' for client {client_id}"
)
return True
except Exception as e:
self.logger.error(f"Failed to upload memory video: {e}")
return False
def download_memory_video(
self, client_id: str, memory_name: str, local_videos_dir: Path
) -> bool:
"""
Download memory video and index from HF dataset.
Args:
client_id (str): Client identifier
memory_name (str): Memory video name
local_videos_dir (Path): Local directory to save files
Returns:
bool: True if download successful
"""
if not self.hf_enabled:
self.logger.info("HF download skipped - not enabled")
return False
try:
# Download video file
video_remote_path = f"{client_id}/videos/{memory_name}.mp4"
video_local_path = local_videos_dir / f"{memory_name}.mp4"
hf_hub_download(
repo_id=self.dataset_name,
filename=video_remote_path,
repo_type="dataset",
token=self.hf_token,
local_dir=str(local_videos_dir.parent),
local_dir_use_symlinks=False,
)
# Download index file
index_remote_path = f"{client_id}/videos/{memory_name}_index.json"
index_local_path = local_videos_dir / f"{memory_name}_index.json"
hf_hub_download(
repo_id=self.dataset_name,
filename=index_remote_path,
repo_type="dataset",
token=self.hf_token,
local_dir=str(local_videos_dir.parent),
local_dir_use_symlinks=False,
)
self.logger.info(
f"Successfully downloaded memory '{memory_name}' for client {client_id}"
)
return True
except Exception as e:
self.logger.error(f"Failed to download memory video: {e}")
return False
def upload_client_metadata(self, client_id: str, metadata: Dict[str, Any]) -> bool:
"""
Upload client metadata to HF dataset.
Args:
client_id (str): Client identifier
metadata (dict): Client metadata
Returns:
bool: True if upload successful
"""
if not self.hf_enabled:
return False
if not self.ensure_dataset_exists():
return False
try:
# Create temporary file for metadata
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False
) as f:
json.dump(metadata, f, indent=2)
temp_path = f.name
# Upload metadata
remote_path = f"{client_id}/metadata.json"
upload_file(
path_or_fileobj=temp_path,
path_in_repo=remote_path,
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token,
)
# Clean up temp file
os.unlink(temp_path)
self.logger.info(f"Successfully uploaded metadata for client {client_id}")
return True
except Exception as e:
self.logger.error(f"Failed to upload metadata: {e}")
return False
def download_client_metadata(self, client_id: str) -> Optional[Dict[str, Any]]:
"""
Download client metadata from HF dataset.
Args:
client_id (str): Client identifier
Returns:
dict or None: Client metadata if successful
"""
if not self.hf_enabled:
return None
try:
# Download metadata to temporary file
remote_path = f"{client_id}/metadata.json"
with tempfile.TemporaryDirectory() as temp_dir:
local_path = hf_hub_download(
repo_id=self.dataset_name,
filename=remote_path,
repo_type="dataset",
token=self.hf_token,
local_dir=temp_dir,
local_dir_use_symlinks=False,
)
# Read metadata
with open(local_path, "r") as f:
metadata = json.load(f)
self.logger.info(
f"Successfully downloaded metadata for client {client_id}"
)
return metadata
except Exception as e:
self.logger.error(f"Failed to download metadata: {e}")
return None
def list_client_memories(self, client_id: str) -> List[str]:
"""
List available memory videos for a client in HF dataset.
Args:
client_id (str): Client identifier
Returns:
list: List of memory names
"""
if not self.hf_enabled:
return []
try:
# Get dataset files
files = self.hf_api.list_repo_files(
repo_id=self.dataset_name, repo_type="dataset"
)
# Filter for this client's video files
memory_names = []
prefix = f"{client_id}/videos/"
for file_path in files:
if file_path.startswith(prefix) and file_path.endswith(".mp4"):
# Extract memory name from path
filename = file_path[len(prefix) :]
memory_name = filename[:-4] # Remove .mp4 extension
memory_names.append(memory_name)
return memory_names
except Exception as e:
self.logger.error(f"Failed to list client memories: {e}")
return []
def backup_client_data(self, client_id: str, local_client_dir: Path) -> bool:
"""
Backup all client data to HF dataset.
Args:
client_id (str): Client identifier
local_client_dir (Path): Local client directory
Returns:
bool: True if backup successful
"""
if not self.hf_enabled:
self.logger.info("HF backup skipped - not enabled")
return False
try:
success_count = 0
total_files = 0
# Upload all video files
videos_dir = local_client_dir / "videos"
if videos_dir.exists():
for video_file in videos_dir.glob("*.mp4"):
memory_name = video_file.stem
index_file = videos_dir / f"{memory_name}_index.json"
if index_file.exists():
total_files += 2
if self.upload_memory_video(
client_id, memory_name, video_file, index_file
):
success_count += 2
# Upload metadata
metadata_file = local_client_dir / "metadata.json"
if metadata_file.exists():
total_files += 1
with open(metadata_file, "r") as f:
metadata = json.load(f)
if self.upload_client_metadata(client_id, metadata):
success_count += 1
self.logger.info(
f"Backup completed: {success_count}/{total_files} files uploaded for client {client_id}"
)
return success_count == total_files
except Exception as e:
self.logger.error(f"Failed to backup client data: {e}")
return False
def restore_client_data(self, client_id: str, local_client_dir: Path) -> bool:
"""
Restore client data from HF dataset.
Args:
client_id (str): Client identifier
local_client_dir (Path): Local client directory
Returns:
bool: True if restore successful
"""
if not self.hf_enabled:
self.logger.info("HF restore skipped - not enabled")
return False
try:
# Ensure local directories exist
local_client_dir.mkdir(exist_ok=True)
(local_client_dir / "videos").mkdir(exist_ok=True)
(local_client_dir / "chunks").mkdir(exist_ok=True)
# Restore metadata
metadata = self.download_client_metadata(client_id)
if metadata:
metadata_file = local_client_dir / "metadata.json"
with open(metadata_file, "w") as f:
json.dump(metadata, f, indent=2)
# Restore memory videos
memory_names = self.list_client_memories(client_id)
videos_dir = local_client_dir / "videos"
success_count = 0
for memory_name in memory_names:
if self.download_memory_video(client_id, memory_name, videos_dir):
success_count += 1
self.logger.info(
f"Restore completed: {success_count}/{len(memory_names)} memories restored for client {client_id}"
)
return success_count == len(memory_names)
except Exception as e:
self.logger.error(f"Failed to restore client data: {e}")
return False
def get_storage_info(self) -> Dict[str, Any]:
"""
Get storage handler information and status.
Returns:
dict: Storage information
"""
info = {
"hf_available": HF_AVAILABLE,
"hf_enabled": self.hf_enabled,
"dataset_name": self.dataset_name,
"has_token": bool(self.hf_token),
"storage_mode": "hybrid" if self.hf_enabled else "local_only",
}
if self.hf_enabled:
try:
dataset_exists = self.ensure_dataset_exists()
info["dataset_exists"] = dataset_exists
except Exception as e:
info["dataset_error"] = str(e)
return info
|