File size: 14,844 Bytes
168b0da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
"""
Storage Handler - HuggingFace Dataset integration for persistent memory storage.
Handles uploading and downloading memory videos to/from HF datasets.
"""

import os
import json
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
import tempfile
import shutil

try:
    from huggingface_hub import HfApi, create_repo, upload_file, hf_hub_download
    from huggingface_hub.utils import RepositoryNotFoundError

    HF_AVAILABLE = True
except ImportError:
    logging.warning("HuggingFace Hub not available. Using local storage only.")
    HF_AVAILABLE = False


class StorageHandler:
    """
    Handles persistent storage using HuggingFace datasets.
    Provides backup and restore functionality for memory videos.
    """

    def __init__(
        self, hf_token: Optional[str] = None, dataset_name: Optional[str] = None
    ):
        """
        Initialize the storage handler.

        Args:
            hf_token (str, optional): HuggingFace API token
            dataset_name (str, optional): Name of the HF dataset to use
        """
        self.logger = logging.getLogger(__name__)

        # Get HF token from environment or parameter
        self.hf_token = (
            hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN")
        )

        # Set default dataset name
        self.dataset_name = dataset_name or os.getenv(
            "HF_DATASET_NAME", "memvid-memory-store"
        )

        # Initialize HF API if available
        self.hf_api = None
        self.hf_enabled = False

        if HF_AVAILABLE and self.hf_token:
            try:
                self.hf_api = HfApi(token=self.hf_token)
                self.hf_enabled = True
                self.logger.info(
                    f"HuggingFace integration enabled with dataset: {self.dataset_name}"
                )
            except Exception as e:
                self.logger.warning(f"Failed to initialize HF API: {e}")
        else:
            self.logger.info(
                "HuggingFace integration disabled - using local storage only"
            )

    def ensure_dataset_exists(self) -> bool:
        """
        Ensure the HF dataset exists, create if it doesn't.

        Returns:
            bool: True if dataset exists or was created successfully
        """
        if not self.hf_enabled:
            return False

        try:
            # Try to get dataset info
            self.hf_api.dataset_info(self.dataset_name)
            self.logger.info(f"Dataset {self.dataset_name} already exists")
            return True
        except RepositoryNotFoundError:
            try:
                # Create the dataset
                create_repo(
                    repo_id=self.dataset_name,
                    repo_type="dataset",
                    token=self.hf_token,
                    private=True,  # Make it private by default
                )
                self.logger.info(f"Created new dataset: {self.dataset_name}")
                return True
            except Exception as e:
                self.logger.error(f"Failed to create dataset {self.dataset_name}: {e}")
                return False
        except Exception as e:
            self.logger.error(f"Error checking dataset {self.dataset_name}: {e}")
            return False

    def upload_memory_video(
        self, client_id: str, memory_name: str, video_path: Path, index_path: Path
    ) -> bool:
        """
        Upload memory video and index to HF dataset.

        Args:
            client_id (str): Client identifier
            memory_name (str): Memory video name
            video_path (Path): Local path to video file
            index_path (Path): Local path to index file

        Returns:
            bool: True if upload successful
        """
        if not self.hf_enabled:
            self.logger.info("HF upload skipped - not enabled")
            return False

        if not self.ensure_dataset_exists():
            return False

        try:
            # Upload video file
            video_remote_path = f"{client_id}/videos/{memory_name}.mp4"
            upload_file(
                path_or_fileobj=str(video_path),
                path_in_repo=video_remote_path,
                repo_id=self.dataset_name,
                repo_type="dataset",
                token=self.hf_token,
            )

            # Upload index file
            index_remote_path = f"{client_id}/videos/{memory_name}_index.json"
            upload_file(
                path_or_fileobj=str(index_path),
                path_in_repo=index_remote_path,
                repo_id=self.dataset_name,
                repo_type="dataset",
                token=self.hf_token,
            )

            self.logger.info(
                f"Successfully uploaded memory '{memory_name}' for client {client_id}"
            )
            return True

        except Exception as e:
            self.logger.error(f"Failed to upload memory video: {e}")
            return False

    def download_memory_video(
        self, client_id: str, memory_name: str, local_videos_dir: Path
    ) -> bool:
        """
        Download memory video and index from HF dataset.

        Args:
            client_id (str): Client identifier
            memory_name (str): Memory video name
            local_videos_dir (Path): Local directory to save files

        Returns:
            bool: True if download successful
        """
        if not self.hf_enabled:
            self.logger.info("HF download skipped - not enabled")
            return False

        try:
            # Download video file
            video_remote_path = f"{client_id}/videos/{memory_name}.mp4"
            video_local_path = local_videos_dir / f"{memory_name}.mp4"

            hf_hub_download(
                repo_id=self.dataset_name,
                filename=video_remote_path,
                repo_type="dataset",
                token=self.hf_token,
                local_dir=str(local_videos_dir.parent),
                local_dir_use_symlinks=False,
            )

            # Download index file
            index_remote_path = f"{client_id}/videos/{memory_name}_index.json"
            index_local_path = local_videos_dir / f"{memory_name}_index.json"

            hf_hub_download(
                repo_id=self.dataset_name,
                filename=index_remote_path,
                repo_type="dataset",
                token=self.hf_token,
                local_dir=str(local_videos_dir.parent),
                local_dir_use_symlinks=False,
            )

            self.logger.info(
                f"Successfully downloaded memory '{memory_name}' for client {client_id}"
            )
            return True

        except Exception as e:
            self.logger.error(f"Failed to download memory video: {e}")
            return False

    def upload_client_metadata(self, client_id: str, metadata: Dict[str, Any]) -> bool:
        """
        Upload client metadata to HF dataset.

        Args:
            client_id (str): Client identifier
            metadata (dict): Client metadata

        Returns:
            bool: True if upload successful
        """
        if not self.hf_enabled:
            return False

        if not self.ensure_dataset_exists():
            return False

        try:
            # Create temporary file for metadata
            with tempfile.NamedTemporaryFile(
                mode="w", suffix=".json", delete=False
            ) as f:
                json.dump(metadata, f, indent=2)
                temp_path = f.name

            # Upload metadata
            remote_path = f"{client_id}/metadata.json"
            upload_file(
                path_or_fileobj=temp_path,
                path_in_repo=remote_path,
                repo_id=self.dataset_name,
                repo_type="dataset",
                token=self.hf_token,
            )

            # Clean up temp file
            os.unlink(temp_path)

            self.logger.info(f"Successfully uploaded metadata for client {client_id}")
            return True

        except Exception as e:
            self.logger.error(f"Failed to upload metadata: {e}")
            return False

    def download_client_metadata(self, client_id: str) -> Optional[Dict[str, Any]]:
        """
        Download client metadata from HF dataset.

        Args:
            client_id (str): Client identifier

        Returns:
            dict or None: Client metadata if successful
        """
        if not self.hf_enabled:
            return None

        try:
            # Download metadata to temporary file
            remote_path = f"{client_id}/metadata.json"

            with tempfile.TemporaryDirectory() as temp_dir:
                local_path = hf_hub_download(
                    repo_id=self.dataset_name,
                    filename=remote_path,
                    repo_type="dataset",
                    token=self.hf_token,
                    local_dir=temp_dir,
                    local_dir_use_symlinks=False,
                )

                # Read metadata
                with open(local_path, "r") as f:
                    metadata = json.load(f)

                self.logger.info(
                    f"Successfully downloaded metadata for client {client_id}"
                )
                return metadata

        except Exception as e:
            self.logger.error(f"Failed to download metadata: {e}")
            return None

    def list_client_memories(self, client_id: str) -> List[str]:
        """
        List available memory videos for a client in HF dataset.

        Args:
            client_id (str): Client identifier

        Returns:
            list: List of memory names
        """
        if not self.hf_enabled:
            return []

        try:
            # Get dataset files
            files = self.hf_api.list_repo_files(
                repo_id=self.dataset_name, repo_type="dataset"
            )

            # Filter for this client's video files
            memory_names = []
            prefix = f"{client_id}/videos/"

            for file_path in files:
                if file_path.startswith(prefix) and file_path.endswith(".mp4"):
                    # Extract memory name from path
                    filename = file_path[len(prefix) :]
                    memory_name = filename[:-4]  # Remove .mp4 extension
                    memory_names.append(memory_name)

            return memory_names

        except Exception as e:
            self.logger.error(f"Failed to list client memories: {e}")
            return []

    def backup_client_data(self, client_id: str, local_client_dir: Path) -> bool:
        """
        Backup all client data to HF dataset.

        Args:
            client_id (str): Client identifier
            local_client_dir (Path): Local client directory

        Returns:
            bool: True if backup successful
        """
        if not self.hf_enabled:
            self.logger.info("HF backup skipped - not enabled")
            return False

        try:
            success_count = 0
            total_files = 0

            # Upload all video files
            videos_dir = local_client_dir / "videos"
            if videos_dir.exists():
                for video_file in videos_dir.glob("*.mp4"):
                    memory_name = video_file.stem
                    index_file = videos_dir / f"{memory_name}_index.json"

                    if index_file.exists():
                        total_files += 2
                        if self.upload_memory_video(
                            client_id, memory_name, video_file, index_file
                        ):
                            success_count += 2

            # Upload metadata
            metadata_file = local_client_dir / "metadata.json"
            if metadata_file.exists():
                total_files += 1
                with open(metadata_file, "r") as f:
                    metadata = json.load(f)
                if self.upload_client_metadata(client_id, metadata):
                    success_count += 1

            self.logger.info(
                f"Backup completed: {success_count}/{total_files} files uploaded for client {client_id}"
            )
            return success_count == total_files

        except Exception as e:
            self.logger.error(f"Failed to backup client data: {e}")
            return False

    def restore_client_data(self, client_id: str, local_client_dir: Path) -> bool:
        """
        Restore client data from HF dataset.

        Args:
            client_id (str): Client identifier
            local_client_dir (Path): Local client directory

        Returns:
            bool: True if restore successful
        """
        if not self.hf_enabled:
            self.logger.info("HF restore skipped - not enabled")
            return False

        try:
            # Ensure local directories exist
            local_client_dir.mkdir(exist_ok=True)
            (local_client_dir / "videos").mkdir(exist_ok=True)
            (local_client_dir / "chunks").mkdir(exist_ok=True)

            # Restore metadata
            metadata = self.download_client_metadata(client_id)
            if metadata:
                metadata_file = local_client_dir / "metadata.json"
                with open(metadata_file, "w") as f:
                    json.dump(metadata, f, indent=2)

            # Restore memory videos
            memory_names = self.list_client_memories(client_id)
            videos_dir = local_client_dir / "videos"

            success_count = 0
            for memory_name in memory_names:
                if self.download_memory_video(client_id, memory_name, videos_dir):
                    success_count += 1

            self.logger.info(
                f"Restore completed: {success_count}/{len(memory_names)} memories restored for client {client_id}"
            )
            return success_count == len(memory_names)

        except Exception as e:
            self.logger.error(f"Failed to restore client data: {e}")
            return False

    def get_storage_info(self) -> Dict[str, Any]:
        """
        Get storage handler information and status.

        Returns:
            dict: Storage information
        """
        info = {
            "hf_available": HF_AVAILABLE,
            "hf_enabled": self.hf_enabled,
            "dataset_name": self.dataset_name,
            "has_token": bool(self.hf_token),
            "storage_mode": "hybrid" if self.hf_enabled else "local_only",
        }

        if self.hf_enabled:
            try:
                dataset_exists = self.ensure_dataset_exists()
                info["dataset_exists"] = dataset_exists
            except Exception as e:
                info["dataset_error"] = str(e)

        return info