File size: 16,869 Bytes
168b0da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
"""
Dual Storage Manager - Orchestrates memvid and vector storage with performance comparison.
Provides unified interface for dual storage modes with background metrics collection.
"""

import os
import json
import time
import logging
from typing import Dict, Any, Optional
from pathlib import Path

from .memvid_manager import MemvidManager
from .vector_storage_manager import VectorStorageManager

# Modal services imports (with fallback for local development)
try:
    import sys
    from pathlib import Path

    # Add parent directory to path for Modal service imports
    parent_dir = Path(__file__).parent.parent
    if str(parent_dir) not in sys.path:
        sys.path.insert(0, str(parent_dir))

    from modal_vector_service import ModalVectorClient
    from modal_memvid_service import ModalMemvidClient

    MODAL_AVAILABLE = True
    print("βœ… Modal services imported successfully")
except ImportError as e:
    print(f"⚠️ Modal services not available, using local implementations: {e}")
    MODAL_AVAILABLE = False
from .metrics_collector import MetricsCollector


class DualStorageManager:
    """
    Orchestrates dual storage between memvid (video-based) and vector storage.
    Provides unified interface with configurable storage modes and performance tracking.
    """

    def __init__(self, data_dir: str = "data"):
        """
        Initialize dual storage manager with Modal-first architecture.

        Args:
            data_dir (str): Base directory for storing data
        """
        self.logger = logging.getLogger(__name__)

        # Get storage mode from environment
        self.storage_mode = os.getenv("STORAGE_MODE", "dual").lower()
        self.enable_metrics = (
            os.getenv("ENABLE_PERFORMANCE_TRACKING", "true").lower() == "true"
        )

        # Check for Modal configuration
        modal_token = os.getenv("MODAL_TOKEN")
        use_modal = MODAL_AVAILABLE and modal_token

        # Initialize storage backends (Modal-first with local fallback)
        if use_modal:
            print("πŸš€ Initializing Modal-powered storage backends...")
            try:
                self.memvid_manager = ModalMemvidClient(modal_token=modal_token)
                self.vector_manager = ModalVectorClient(modal_token=modal_token)
                self.using_modal = True
                print("βœ… Modal services initialized successfully")
            except Exception as e:
                print(f"⚠️ Modal initialization failed, falling back to local: {e}")
                self.memvid_manager = MemvidManager(data_dir)
                self.vector_manager = VectorStorageManager(
                    data_dir, storage_handler=self.memvid_manager.storage_handler
                )  # Shared HF storage
                self.using_modal = False
        else:
            print("🏠 Using local storage backends...")
            self.memvid_manager = MemvidManager(data_dir)
            self.vector_manager = VectorStorageManager(
                data_dir, storage_handler=self.memvid_manager.storage_handler
            )  # Shared HF storage
            self.using_modal = False

        # Initialize metrics collector
        self.metrics = MetricsCollector() if self.enable_metrics else None

        infrastructure = "Modal" if self.using_modal else "Local"
        self.logger.info(
            f"DualStorageManager initialized with mode: {self.storage_mode}"
        )
        print(f"πŸ—οΈ Infrastructure: {infrastructure}")
        print(
            f"πŸ“Š Performance tracking: {'enabled' if self.enable_metrics else 'disabled'}"
        )

    def set_storage_mode(self, mode: str, client_id: str = "") -> str:
        """
        Set storage mode at runtime.

        Args:
            mode (str): Storage mode (memvid_only, vector_only, dual)
            client_id (str): Optional client-specific setting

        Returns:
            str: Success message
        """
        valid_modes = ["memvid_only", "vector_only", "dual"]
        if mode not in valid_modes:
            return f"Error: Invalid mode '{mode}'. Valid modes: {valid_modes}"

        self.storage_mode = mode
        return f"Storage mode set to: {mode}" + (
            f" for client {client_id}" if client_id else " (global)"
        )

    def get_storage_mode(self, client_id: str = "") -> str:
        """
        Get current storage mode.

        Args:
            client_id (str): Client identifier (for future client-specific modes)

        Returns:
            str: Current storage mode information
        """
        return json.dumps(
            {
                "storage_mode": self.storage_mode,
                "metrics_enabled": self.enable_metrics,
                "backends_available": {
                    "memvid": True,
                    "vector": self.vector_manager is not None,
                },
            },
            indent=2,
        )

    def store_memory(
        self, text: str, client_id: str, metadata: Dict[str, Any] = None
    ) -> str:
        """
        Universal memory storage interface.

        Args:
            text (str): Text content to store
            client_id (str): Client identifier
            metadata (dict): Additional metadata

        Returns:
            str: Storage result message
        """
        try:
            if self.storage_mode == "memvid_only":
                return self._store_memvid_only(text, client_id, metadata)
            elif self.storage_mode == "vector_only":
                return self._store_vector_only(text, client_id, metadata)
            else:  # dual mode
                return self._store_dual_mode(text, client_id, metadata)

        except Exception as e:
            error_msg = f"Error in store_memory: {str(e)}"
            self.logger.error(error_msg)
            return error_msg

    def search_memory(
        self, query: str, client_id: str, memory_name: str, top_k: int = 5
    ) -> str:
        """
        Universal memory search interface.

        Args:
            query (str): Search query
            client_id (str): Client identifier
            memory_name (str): Memory name to search
            top_k (int): Number of results

        Returns:
            str: Search results
        """
        try:
            if self.storage_mode == "memvid_only":
                return self._search_memvid_only(query, client_id, memory_name, top_k)
            elif self.storage_mode == "vector_only":
                return self._search_vector_only(query, client_id, memory_name, top_k)
            else:  # dual mode
                return self._search_dual_mode(query, client_id, memory_name, top_k)

        except Exception as e:
            error_msg = f"Error in search_memory: {str(e)}"
            self.logger.error(error_msg)
            return json.dumps({"error": error_msg})

    def get_memory_stats(self, client_id: str) -> str:
        """
        Get aggregated memory statistics based on storage mode.

        Args:
            client_id (str): Client identifier

        Returns:
            str: JSON string with statistics
        """
        try:
            if self.storage_mode == "dual" and self.metrics:
                return self.metrics.get_comparison_report(client_id)
            elif self.storage_mode == "memvid_only":
                return self.memvid_manager.get_memory_stats(client_id)
            elif self.storage_mode == "vector_only" and self.vector_manager:
                return self.vector_manager.get_stats(client_id)
            else:
                # Fallback to memvid stats
                return self.memvid_manager.get_memory_stats(client_id)

        except Exception as e:
            error_msg = f"Error getting memory stats: {str(e)}"
            self.logger.error(error_msg)
            return json.dumps({"error": error_msg})

    def delete_memory(self, client_id: str, memory_name: str) -> str:
        """
        Universal memory deletion interface.

        Args:
            client_id (str): Client identifier
            memory_name (str): Memory name to delete

        Returns:
            str: Deletion result
        """
        try:
            results = []

            if self.storage_mode in ["memvid_only", "dual"]:
                result = self.memvid_manager.delete_memory(client_id, memory_name)
                results.append(f"Memvid: {result}")

            if self.storage_mode in ["vector_only", "dual"] and self.vector_manager:
                result = self.vector_manager.delete_memory(client_id, memory_name)
                results.append(f"Vector: {result}")

            return " | ".join(results) if results else "No storage backends available"

        except Exception as e:
            error_msg = f"Error deleting memory: {str(e)}"
            self.logger.error(error_msg)
            return error_msg

    def list_memories(self, client_id: str) -> str:
        """
        Universal memory listing interface.

        Args:
            client_id (str): Client identifier

        Returns:
            str: JSON string with memory list
        """
        try:
            # Use memvid as primary source for listing
            return self.memvid_manager.list_memories(client_id)
        except Exception as e:
            error_msg = f"Error listing memories: {str(e)}"
            self.logger.error(error_msg)
            return json.dumps({"error": error_msg})

    def build_memory_video(self, client_id: str, memory_name: str) -> str:
        """
        Build memory video from stored chunks (memvid-specific).

        Args:
            client_id (str): Client identifier
            memory_name (str): Name for the memory video

        Returns:
            str: Build result message
        """
        try:
            return self.memvid_manager.build_memory_video(client_id, memory_name)
        except Exception as e:
            error_msg = f"Error in build_memory_video: {str(e)}"
            self.logger.error(error_msg)
            return error_msg

    def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str:
        """
        Universal chat interface.

        Args:
            query (str): User query
            client_id (str): Client identifier
            memory_name (str): Memory name to chat with

        Returns:
            str: Chat response
        """
        try:
            # Use memvid for chat (better for conversational AI)
            return self.memvid_manager.chat_with_memory(query, client_id, memory_name)
        except Exception as e:
            error_msg = f"Error in chat_with_memory: {str(e)}"
            self.logger.error(error_msg)
            return error_msg

    # Private methods for storage mode implementations

    def _store_memvid_only(
        self, text: str, client_id: str, metadata: Dict[str, Any]
    ) -> str:
        """Store using memvid only."""
        start_time = time.time()
        result = self.memvid_manager.store_memory(text, client_id, metadata)

        if self.metrics:
            self.metrics.track_storage_operation(
                "memvid", time.time() - start_time, len(text)
            )

        return result

    def _store_vector_only(
        self, text: str, client_id: str, metadata: Dict[str, Any]
    ) -> str:
        """Store using vector storage only."""
        if not self.vector_manager:
            return "Error: Vector storage not available (Modal credentials needed)"

        start_time = time.time()
        result = self.vector_manager.store_memory(text, client_id, metadata)

        if self.metrics:
            self.metrics.track_storage_operation(
                "vector", time.time() - start_time, len(text)
            )

        return result

    def _store_dual_mode(
        self, text: str, client_id: str, metadata: Dict[str, Any]
    ) -> str:
        """Store using both storage backends with performance comparison."""
        results = []

        # Store in memvid
        start_time = time.time()
        memvid_result = self.memvid_manager.store_memory(text, client_id, metadata)
        memvid_time = time.time() - start_time
        results.append(f"Memvid({memvid_time:.3f}s): {memvid_result}")

        # Store in vector (if available)
        if self.vector_manager:
            start_time = time.time()
            vector_result = self.vector_manager.store_memory(text, client_id, metadata)
            vector_time = time.time() - start_time
            results.append(f"Vector({vector_time:.3f}s): {vector_result}")

            # Track comparison metrics
            if self.metrics:
                self.metrics.track_dual_storage_comparison(
                    memvid_time, vector_time, len(text), client_id
                )
        else:
            results.append("Vector: Not available (Modal credentials needed)")

        return " | ".join(results)

    def _search_memvid_only(
        self, query: str, client_id: str, memory_name: str, top_k: int
    ) -> str:
        """Search using memvid only."""
        start_time = time.time()
        result = self.memvid_manager.search_memory(query, client_id, memory_name, top_k)

        if self.metrics:
            self.metrics.track_search_operation(
                "memvid", time.time() - start_time, top_k
            )

        # Convert dict to JSON string for MCP interface
        if isinstance(result, dict):
            return json.dumps(result, indent=2)
        return result

    def _search_vector_only(
        self, query: str, client_id: str, memory_name: str, top_k: int
    ) -> str:
        """Search using vector storage only."""
        if not self.vector_manager:
            return json.dumps(
                {"error": "Vector storage not available (Modal credentials needed)"}
            )

        start_time = time.time()
        result = self.vector_manager.search_memory(query, client_id, top_k=top_k)

        if self.metrics:
            self.metrics.track_search_operation(
                "vector", time.time() - start_time, top_k
            )

        # Convert dict to JSON string for MCP interface
        if isinstance(result, dict):
            return json.dumps(result, indent=2)
        return result

    def _search_dual_mode(
        self, query: str, client_id: str, memory_name: str, top_k: int
    ) -> str:
        """Search using both backends with performance comparison."""

        # Search memvid first
        memvid_data = {"error": "Memvid search not attempted"}
        memvid_time = 0

        start_time = time.time()
        memvid_result = self.memvid_manager.search_memory(
            query, client_id, memory_name, top_k
        )
        memvid_time = time.time() - start_time

        # Handle memvid result - Modal clients should return dicts
        memvid_data = (
            memvid_result
            if isinstance(memvid_result, dict)
            else {
                "error": f"Unexpected memvid type: {type(memvid_result)}",
                "content": str(memvid_result)[:200],
            }
        )

        # Search vector second
        vector_data = {"error": "Vector search not attempted"}
        vector_time = 0

        if self.vector_manager:
            start_time = time.time()
            vector_result = self.vector_manager.search_memory(
                query, client_id, memory_name=memory_name, top_k=top_k
            )
            vector_time = time.time() - start_time

            # Handle vector result - Modal clients should return dicts
            vector_data = (
                vector_result
                if isinstance(vector_result, dict)
                else {
                    "error": f"Unexpected vector type: {type(vector_result)}",
                    "content": str(vector_result)[:200],
                }
            )
        else:
            vector_data = {"error": "Vector storage not available"}

        # Track comparison metrics
        if self.metrics:
            self.metrics.track_dual_search_comparison(
                memvid_time, vector_time, query, client_id
            )

        # Return comparison results
        return json.dumps(
            {
                "query": query,
                "client_id": client_id,
                "memory_name": memory_name,
                "dual_search_results": {
                    "memvid": {
                        "time_ms": round(memvid_time * 1000, 2),
                        "results": memvid_data,
                    },
                    "vector": {
                        "time_ms": round(vector_time * 1000, 2),
                        "results": vector_data,
                    },
                },
                "performance_winner": (
                    "memvid" if memvid_time < vector_time else "vector"
                ),
            },
            indent=2,
        )