""" Modal Memvid Service - GPU-accelerated video memory processing This service provides: - GPU-accelerated video processing using memvid library - QR code generation and decoding optimization - Modal object storage for MP4 files - Auto-scaling based on video processing workload """ import os import time import json import modal from typing import List, Dict, Any, Optional # Modal App Configuration app = modal.App("memvid-video-service") # Docker image with all video processing dependencies memvid_image = ( modal.Image.debian_slim() .pip_install( [ "memvid>=0.1.0", "opencv-python-headless>=4.8.0", "pillow>=9.5.0", "qrcode>=7.4.2", "pyzbar>=0.1.9", # QR code decoding "numpy>=1.24.0", "torch>=2.0.0", # PyTorch for GPU acceleration ] ) .apt_install( [ "libzbar0", # For QR code decoding "ffmpeg", # For video processing "libgl1-mesa-glx", # OpenCV dependencies "libglib2.0-0", ] ) ) # Volume for persistent video storage videos_volume = modal.Volume.from_name("memvid-videos", create_if_missing=True) @app.function( image=memvid_image, gpu="T4", # GPU optimized for video processing volumes={"/storage": videos_volume}, timeout=900, # 15 minutes timeout for video processing cpu=4.0, # More CPU for video encoding memory=8192, # 8GB RAM for video processing ) def process_video_memory( text: str, client_id: str, metadata: Dict[str, Any] ) -> Dict[str, Any]: """ GPU-accelerated video memory processing on Modal Args: text: Text content to store as video memory client_id: Unique identifier for the client/user metadata: Additional metadata for the memory Returns: Dict with processing results and metrics """ import sys sys.path.append("/storage") from memvid import MemvidEncoder, MemvidRetriever import shutil import uuid start_time = time.time() processing_metrics = {"gpu_used": "T4", "cpu_count": 4, "memory_gb": 8} try: # Setup storage paths in Modal volume client_storage_path = f"/storage/{client_id}" os.makedirs(client_storage_path, exist_ok=True) print(f"๐ŸŽฌ Processing video memory for client: {client_id}") print(f"๐Ÿ“ Text content: {text[:100]}...") # Initialize memvid encoder with Modal storage encoder = MemvidEncoder() # Process video memory with GPU acceleration video_start_time = time.time() # Add text to encoder and build video encoder.add_text(text) # Create output paths video_file = f"{client_storage_path}/videos/memory_{int(time.time())}.mp4" index_file = ( f"{client_storage_path}/videos/memory_{int(time.time())}_index.json" ) # Ensure directories exist os.makedirs(os.path.dirname(video_file), exist_ok=True) # Build video with QR codes result = encoder.build_video(video_file, index_file) video_processing_time = time.time() - video_start_time processing_metrics["video_processing_time"] = video_processing_time # Get file information video_files = [] chunk_files = [] if os.path.exists(client_storage_path): # Find video files videos_dir = os.path.join(client_storage_path, "videos") if os.path.exists(videos_dir): for file in os.listdir(videos_dir): if file.endswith(".mp4"): file_path = os.path.join(videos_dir, file) file_size = os.path.getsize(file_path) video_files.append( { "filename": file, "size_bytes": file_size, "path": file_path, } ) # Find chunk files chunks_dir = os.path.join(client_storage_path, "chunks") if os.path.exists(chunks_dir): for file in os.listdir(chunks_dir): if file.endswith(".txt"): file_path = os.path.join(chunks_dir, file) file_size = os.path.getsize(file_path) chunk_files.append( { "filename": file, "size_bytes": file_size, "path": file_path, } ) # Calculate storage metrics total_video_size = sum(f["size_bytes"] for f in video_files) total_chunks_size = sum(f["size_bytes"] for f in chunk_files) processing_metrics.update( { "video_files_count": len(video_files), "chunk_files_count": len(chunk_files), "total_video_size": total_video_size, "total_chunks_size": total_chunks_size, "total_storage_size": total_video_size + total_chunks_size, } ) # Generate unique memory ID memory_id = f"modal_video_{client_id}_{int(time.time())}_{uuid.uuid4().hex[:8]}" total_time = time.time() - start_time processing_metrics["total_time"] = total_time print(f"โœ… Video memory processed successfully") print(f"๐Ÿ“Š Created {len(video_files)} videos, {len(chunk_files)} chunks") print(f"๐Ÿ’พ Total storage: {total_video_size + total_chunks_size} bytes") print(f"โฑ๏ธ Processing time: {total_time:.2f}s") return { "success": True, "memory_id": memory_id, "client_id": client_id, "video_files": video_files, "chunk_files": chunk_files, "processing_metrics": processing_metrics, "metadata": metadata, "storage_path": client_storage_path, "infrastructure": "Modal + T4 GPU + Volume Storage", } except Exception as e: print(f"โŒ Error in video processing: {str(e)}") processing_metrics["error_time"] = time.time() - start_time return { "success": False, "error": str(e), "processing_metrics": processing_metrics, "infrastructure": "Modal + T4 GPU + Volume Storage", } @app.function( image=memvid_image, gpu="T4", volumes={"/storage": videos_volume}, timeout=600, # 10 minutes timeout for search operations cpu=2.0, memory=4096, # 4GB RAM for search ) def search_video_memory( query: str, client_id: str, memory_name: Optional[str] = None, top_k: int = 5 ) -> Dict[str, Any]: """ GPU-accelerated video memory search on Modal Args: query: Search query text client_id: Client identifier to search within memory_name: Optional specific memory name filter top_k: Number of top results to return Returns: Dict with search results and metrics """ import sys sys.path.append("/storage") from memvid import MemvidEncoder, MemvidRetriever start_time = time.time() try: print(f"๐Ÿ” Searching video memory for query: {query}") print(f"๐Ÿ‘ค Client: {client_id}") # Initialize memvid retriever with Modal storage client_storage_path = f"/storage/{client_id}" # Find video files for this client videos_dir = os.path.join(client_storage_path, "videos") video_files = [] if os.path.exists(videos_dir): for file in os.listdir(videos_dir): if file.endswith(".mp4"): video_files.append(os.path.join(videos_dir, file)) if not video_files: return { "success": True, "query": query, "client_id": client_id, "results": [], "total_results": 0, "message": "No video memories found for this client", "processing_metrics": { "search_time": 0, "total_time": time.time() - start_time, "gpu_used": "T4", "infrastructure": "Modal + Video Processing", }, } # Perform video-based search search_start_time = time.time() # Search through available video files results = [] for video_file in video_files[:1]: # Search first video for now try: # Find corresponding index file index_file = video_file.replace(".mp4", "_index.json") if not os.path.exists(index_file): # Try alternative index file naming index_file = video_file.replace(".mp4", ".json") if not os.path.exists(index_file): print(f"No index file found for {video_file}") continue # Initialize retriever with video and index files retriever = MemvidRetriever(video_file, index_file) video_results = retriever.search(query, top_k=top_k) if video_results: results.extend(video_results) except Exception as e: print(f"Error searching video {video_file}: {e}") continue search_time = time.time() - search_start_time # Format results for consistency formatted_results = [] if isinstance(results, list): for i, result in enumerate(results[:top_k]): if isinstance(result, dict): formatted_results.append( { "memory_id": result.get("id", f"video_result_{i}"), "text": result.get("text", result.get("content", "")), "metadata": result.get("metadata", {}), "similarity_score": result.get( "score", 0.8 ), # Default score "video_file": result.get("video_file", ""), "chunk_file": result.get("chunk_file", ""), } ) elif isinstance(result, str): formatted_results.append( { "memory_id": f"video_result_{i}", "text": result, "metadata": {}, "similarity_score": 0.75, "video_file": "", "chunk_file": "", } ) elif isinstance(results, str): # Single result formatted_results.append( { "memory_id": "video_result_0", "text": results, "metadata": {}, "similarity_score": 0.8, "video_file": "", "chunk_file": "", } ) total_time = time.time() - start_time print(f"โœ… Video search completed") print(f"๐Ÿ“Š Found {len(formatted_results)} results") print(f"โฑ๏ธ Search time: {search_time:.2f}s, Total time: {total_time:.2f}s") return { "success": True, "query": query, "client_id": client_id, "results": formatted_results, "total_results": len(formatted_results), "processing_metrics": { "search_time": search_time, "total_time": total_time, "gpu_used": "T4", "infrastructure": "Modal + Video Processing", }, } except Exception as e: print(f"โŒ Error in video search: {str(e)}") return { "success": False, "error": str(e), "processing_time": time.time() - start_time, "results": [], "infrastructure": "Modal + T4 GPU + Volume Storage", } @app.function( image=memvid_image, volumes={"/storage": videos_volume}, timeout=60, ) def get_video_stats(client_id: str) -> Dict[str, Any]: """ Get statistics for video storage Args: client_id: Client identifier Returns: Dict with storage statistics """ import os import json try: client_storage_path = f"/storage/{client_id}" if not os.path.exists(client_storage_path): return { "client_id": client_id, "storage_type": "modal_video", "memory_count": 0, "total_video_size": 0, "total_chunks": 0, "infrastructure": "Modal + T4 GPU + Volume Storage", } # Count video files videos_dir = os.path.join(client_storage_path, "videos") video_count = 0 total_video_size = 0 if os.path.exists(videos_dir): for file in os.listdir(videos_dir): if file.endswith(".mp4"): video_count += 1 file_path = os.path.join(videos_dir, file) total_video_size += os.path.getsize(file_path) # Count chunk files chunks_dir = os.path.join(client_storage_path, "chunks") chunk_count = 0 total_chunks_size = 0 if os.path.exists(chunks_dir): for file in os.listdir(chunks_dir): if file.endswith(".txt"): chunk_count += 1 file_path = os.path.join(chunks_dir, file) total_chunks_size += os.path.getsize(file_path) # Get metadata if available metadata_file = os.path.join(client_storage_path, "metadata.json") first_memory = None last_memory = None if os.path.exists(metadata_file): try: with open(metadata_file, "r") as f: metadata = json.load(f) # Extract creation times if available first_memory = metadata.get("first_memory") last_memory = metadata.get("last_memory") except: pass return { "client_id": client_id, "storage_type": "modal_video", "memory_count": video_count, "total_video_size": total_video_size, "total_chunks": chunk_count, "total_chunks_size": total_chunks_size, "total_storage_size": total_video_size + total_chunks_size, "first_memory": first_memory, "last_memory": last_memory, "infrastructure": "Modal + T4 GPU + Volume Storage", "storage_path": client_storage_path, } except Exception as e: return { "client_id": client_id, "storage_type": "modal_video", "error": str(e), "infrastructure": "Modal + T4 GPU + Volume Storage", } # Client class for easy integration with DualStorageManager class ModalMemvidClient: """Client for interacting with Modal Memvid Service""" def __init__(self, modal_token: Optional[str] = None): """ Initialize Modal Memvid Client Args: modal_token: Optional Modal token (uses environment if not provided) """ if modal_token: os.environ["MODAL_TOKEN"] = modal_token # Test Modal connection try: import modal print("โœ… Modal Memvid Client initialized successfully") except Exception as e: print(f"โš ๏ธ Modal Memvid Client initialization warning: {e}") def store_memory( self, text: str, client_id: str, metadata: Dict[str, Any] ) -> Dict[str, Any]: """Store memory using Modal memvid service""" try: # Use the deployed app's function with correct Modal calling pattern import modal func = modal.Function.from_name( "memvid-video-service", "process_video_memory" ) return func.remote(text, client_id, metadata) except Exception as e: return {"success": False, "error": f"Modal memvid storage failed: {e}"} def search_memory( self, query: str, client_id: str, memory_name: Optional[str] = None, top_k: int = 5, ) -> Dict[str, Any]: """Search memory using Modal memvid service""" try: # Use the deployed app's function with correct Modal calling pattern import modal func = modal.Function.from_name( "memvid-video-service", "search_video_memory" ) return func.remote(query, client_id, memory_name, top_k) except Exception as e: return { "success": False, "error": f"Modal memvid search failed: {e}", "results": [], } def get_stats(self, client_id: str) -> Dict[str, Any]: """Get statistics using Modal memvid service""" try: # Use the deployed app's function with correct Modal calling pattern import modal func = modal.Function.from_name("memvid-video-service", "get_video_stats") return func.remote(client_id) except Exception as e: return {"success": False, "error": f"Modal memvid stats failed: {e}"} def list_memories(self, client_id: str) -> str: """List memories for client (Modal implementation)""" try: stats = self.get_stats(client_id) if stats.get( "success", True ): # Modal stats don't have success field currently memory_list = { "client_id": client_id, "storage_type": "modal_video", "memory_count": stats.get("memory_count", 0), "memories": [], # Modal doesn't currently track individual memory names "total_size": stats.get("total_storage_size", 0), "infrastructure": "Modal + T4 GPU + Volume Storage", } return json.dumps(memory_list, indent=2) else: return json.dumps( { "error": f"Failed to list memories: {stats.get('error', 'Unknown error')}" } ) except Exception as e: return json.dumps({"error": f"Modal memvid list_memories failed: {e}"}) def build_memory_video(self, client_id: str, memory_name: str) -> str: """Build memory video (Modal implementation)""" # For Modal, videos are built automatically during storage return f"Memory videos are automatically built during storage in Modal for client {client_id}. Memory name: {memory_name}" def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str: """Chat with memory using Modal memvid service""" try: # Use search as basis for chat search_results = self.search_memory(query, client_id, memory_name, top_k=3) if search_results.get("success", False): results = search_results.get("results", []) if results: # Simple chat response based on search results context = "\n".join( [result.get("text", "") for result in results[:2]] ) response = f"Based on your memories: {context}\n\nYour query '{query}' relates to the stored information above." return response else: return f"I couldn't find any relevant memories for '{query}' in your video storage." else: return f"Error accessing memories: {search_results.get('error', 'Unknown error')}" except Exception as e: return f"Modal memvid chat failed: {e}" def delete_memory(self, client_id: str, memory_name: str) -> str: """Delete memory (Modal implementation)""" # Modal currently doesn't support selective deletion return f"Memory deletion not yet implemented in Modal for client {client_id}, memory {memory_name}" def get_memory_stats(self, client_id: str) -> str: """Get memory statistics as JSON string""" try: stats = self.get_stats(client_id) return json.dumps(stats, indent=2) except Exception as e: return json.dumps({"error": f"Modal memvid get_memory_stats failed: {e}"}) if __name__ == "__main__": # Test the Modal functions locally print("๐Ÿงช Testing Modal Memvid Service...") # Test client client = ModalMemvidClient() # Test storage result = client.store_memory( "This is a test memory for Modal video storage with GPU acceleration", "test_client", {"test": True, "timestamp": time.time()}, ) print(f"๐ŸŽฌ Storage result: {result}") # Test search search_result = client.search_memory("test memory GPU", "test_client", top_k=3) print(f"๐Ÿ” Search result: {search_result}") # Test stats stats = client.get_stats("test_client") print(f"๏ฟฝ๏ฟฝ Stats: {stats}")