Spaces:
Running
Running
eldarski
π₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
| """ | |
| Memvid Manager - Wrapper for memvid operations with error handling. | |
| Handles video-based memory storage, search, and chat functionality. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Optional, Tuple | |
| import tempfile | |
| import shutil | |
| try: | |
| from memvid import MemvidEncoder, MemvidRetriever, MemvidChat | |
| MEMVID_AVAILABLE = True | |
| except ImportError: | |
| logging.warning("Memvid library not available. Using mock implementation.") | |
| MemvidEncoder = None | |
| MemvidRetriever = None | |
| MemvidChat = None | |
| MEMVID_AVAILABLE = False | |
| from .storage_handler import StorageHandler | |
| class MemvidManager: | |
| """ | |
| Manages memvid operations with HuggingFace dataset integration. | |
| Provides video-based memory storage for MCP server. | |
| """ | |
| def __init__(self, data_dir: str = "data"): | |
| """ | |
| Initialize the memvid manager. | |
| Args: | |
| data_dir (str): Base directory for storing memory data | |
| """ | |
| self.data_dir = Path(data_dir) | |
| self.data_dir.mkdir(exist_ok=True) | |
| self.logger = logging.getLogger(__name__) | |
| # Initialize storage handler for HuggingFace integration | |
| self.storage_handler = StorageHandler() | |
| self.logger.info(f"MemvidManager initialized with data_dir: {self.data_dir}") | |
| def _get_client_dir(self, client_id: str) -> Path: | |
| """Get client-specific directory.""" | |
| client_dir = self.data_dir / client_id | |
| client_dir.mkdir(exist_ok=True) | |
| # Create subdirectories | |
| (client_dir / "chunks").mkdir(exist_ok=True) | |
| (client_dir / "videos").mkdir(exist_ok=True) | |
| return client_dir | |
| def _get_metadata_path(self, client_id: str) -> Path: | |
| """Get path to client metadata file.""" | |
| return self._get_client_dir(client_id) / "metadata.json" | |
| def _load_metadata(self, client_id: str) -> Dict[str, Any]: | |
| """Load client metadata.""" | |
| metadata_path = self._get_metadata_path(client_id) | |
| if metadata_path.exists(): | |
| try: | |
| with open(metadata_path, "r") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| self.logger.error(f"Error loading metadata for {client_id}: {e}") | |
| # Return default metadata | |
| return { | |
| "client_id": client_id, | |
| "total_chunks": 0, | |
| "total_memories": 0, | |
| "created_at": "", | |
| "last_updated": "", | |
| } | |
| def _save_metadata(self, client_id: str, metadata: Dict[str, Any]) -> None: | |
| """Save client metadata.""" | |
| try: | |
| metadata_path = self._get_metadata_path(client_id) | |
| import datetime | |
| metadata["last_updated"] = datetime.datetime.now().isoformat() | |
| if not metadata.get("created_at"): | |
| metadata["created_at"] = metadata["last_updated"] | |
| with open(metadata_path, "w") as f: | |
| json.dump(metadata, f, indent=2) | |
| # Upload metadata to HuggingFace if enabled | |
| self.storage_handler.upload_client_metadata(client_id, metadata) | |
| except Exception as e: | |
| self.logger.error(f"Error saving metadata for {client_id}: {e}") | |
| def store_memory( | |
| self, text: str, client_id: str, metadata: Dict[str, Any] = None | |
| ) -> str: | |
| """ | |
| Store a text chunk in memory. | |
| Args: | |
| text (str): Text content to store | |
| client_id (str): Client identifier | |
| metadata (dict): Additional metadata | |
| Returns: | |
| str: Success message with storage details | |
| """ | |
| try: | |
| client_dir = self._get_client_dir(client_id) | |
| chunks_dir = client_dir / "chunks" | |
| # Load current metadata | |
| client_metadata = self._load_metadata(client_id) | |
| chunk_count = client_metadata.get("total_chunks", 0) + 1 | |
| # Create chunk filename | |
| chunk_filename = f"chunk_{chunk_count:04d}.txt" | |
| chunk_path = chunks_dir / chunk_filename | |
| # Prepare chunk metadata | |
| chunk_metadata = { | |
| "chunk_id": chunk_count, | |
| "filename": chunk_filename, | |
| "text_length": len(text), | |
| "stored_at": "", | |
| **(metadata or {}), | |
| } | |
| # Save chunk to file | |
| with open(chunk_path, "w", encoding="utf-8") as f: | |
| f.write(text) | |
| # Update client metadata | |
| client_metadata["total_chunks"] = chunk_count | |
| client_metadata["client_id"] = client_id | |
| self._save_metadata(client_id, client_metadata) | |
| return f"Successfully stored memory chunk {chunk_filename} for client {client_id}. Total chunks: {chunk_count}" | |
| except Exception as e: | |
| error_msg = f"Error storing memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |
| def build_memory_video(self, client_id: str, memory_name: str) -> str: | |
| """ | |
| Build a memory video from stored chunks. | |
| Args: | |
| client_id (str): Client identifier | |
| memory_name (str): Name for the memory video | |
| Returns: | |
| str: Success message with video details | |
| """ | |
| try: | |
| if not MEMVID_AVAILABLE: | |
| return "Error: Memvid library not available" | |
| client_dir = self._get_client_dir(client_id) | |
| chunks_dir = client_dir / "chunks" | |
| videos_dir = client_dir / "videos" | |
| # Check if chunks exist | |
| chunk_files = list(chunks_dir.glob("chunk_*.txt")) | |
| if not chunk_files: | |
| return f"Error: No chunks found for client {client_id}" | |
| # Read all chunks | |
| chunks = [] | |
| for chunk_file in sorted(chunk_files): | |
| try: | |
| with open(chunk_file, "r", encoding="utf-8") as f: | |
| chunks.append(f.read().strip()) | |
| except Exception as e: | |
| self.logger.warning(f"Error reading chunk {chunk_file}: {e}") | |
| if not chunks: | |
| return f"Error: No valid chunks found for client {client_id}" | |
| # Initialize memvid encoder | |
| encoder = MemvidEncoder() | |
| # Add chunks to encoder | |
| for chunk in chunks: | |
| if chunk.strip(): # Only add non-empty chunks | |
| encoder.add_text(chunk.strip()) | |
| # Build video | |
| video_path = videos_dir / f"{memory_name}.mp4" | |
| index_path = videos_dir / f"{memory_name}_index.json" | |
| # Create video with embeddings | |
| encoder.build_video(str(video_path), str(index_path)) | |
| # Update metadata | |
| client_metadata = self._load_metadata(client_id) | |
| memories = client_metadata.get("memories", []) | |
| # Ensure memories is a list, not a dict | |
| if not isinstance(memories, list): | |
| memories = [] | |
| memories.append( | |
| { | |
| "name": memory_name, | |
| "video_path": str(video_path), | |
| "index_path": str(index_path), | |
| "chunks_count": len(chunks), | |
| } | |
| ) | |
| client_metadata["memories"] = memories | |
| client_metadata["total_memories"] = len(memories) | |
| self._save_metadata(client_id, client_metadata) | |
| # Upload to HuggingFace if enabled | |
| if video_path.exists() and Path(index_path).exists(): | |
| self.storage_handler.upload_memory_video( | |
| client_id, memory_name, video_path, Path(index_path) | |
| ) | |
| # Get file size for reporting | |
| video_size = video_path.stat().st_size if video_path.exists() else 0 | |
| return f"Successfully built memory video '{memory_name}' for client {client_id} with {len(chunks)} chunks" | |
| except Exception as e: | |
| error_msg = f"Error building memory video: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |
| def search_memory( | |
| self, query: str, client_id: str, memory_name: str, top_k: int = 5 | |
| ) -> str: | |
| """ | |
| Search stored memories using semantic similarity. | |
| FIXED: Handles memvid return value unpacking issue. | |
| Args: | |
| query (str): Search query | |
| client_id (str): Client identifier | |
| memory_name (str): Name of memory video to search | |
| top_k (int): Number of results to return | |
| Returns: | |
| str: JSON string with search results and scores | |
| """ | |
| try: | |
| if not MEMVID_AVAILABLE: | |
| return json.dumps({"error": "Memvid library not available"}) | |
| client_dir = self._get_client_dir(client_id) | |
| videos_dir = client_dir / "videos" | |
| video_path = videos_dir / f"{memory_name}.mp4" | |
| index_path = videos_dir / f"{memory_name}_index.json" | |
| if not video_path.exists(): | |
| return json.dumps( | |
| { | |
| "error": f"Memory video '{memory_name}' not found for client {client_id}" | |
| } | |
| ) | |
| # Initialize memvid retriever | |
| try: | |
| retriever = MemvidRetriever(str(video_path), str(index_path)) | |
| except Exception as e: | |
| return json.dumps({"error": f"Error loading memory video: {str(e)}"}) | |
| # Perform search with proper error handling | |
| try: | |
| # FIXED: Handle different return value formats from memvid | |
| search_results = retriever.search(query, top_k=top_k) | |
| # Handle tuple return (results, scores) or just results | |
| if isinstance(search_results, tuple): | |
| results, scores = search_results | |
| # Combine results with scores | |
| combined_results = [] | |
| for i, result in enumerate(results): | |
| combined_results.append( | |
| { | |
| "text": result, | |
| "score": float(scores[i]) if i < len(scores) else 0.0, | |
| "rank": i + 1, | |
| } | |
| ) | |
| search_data = combined_results | |
| elif isinstance(search_results, list): | |
| # Just results without scores | |
| search_data = [ | |
| {"text": result, "score": 1.0, "rank": i + 1} # Default score | |
| for i, result in enumerate(search_results) | |
| ] | |
| else: | |
| # Single result or other format | |
| search_data = [ | |
| {"text": str(search_results), "score": 1.0, "rank": 1} | |
| ] | |
| return json.dumps( | |
| { | |
| "query": query, | |
| "client_id": client_id, | |
| "memory_name": memory_name, | |
| "total_results": len(search_data), | |
| "results": search_data, | |
| }, | |
| indent=2, | |
| ) | |
| except Exception as search_error: | |
| return json.dumps( | |
| { | |
| "error": f"Search failed: {str(search_error)}", | |
| "query": query, | |
| "memory_name": memory_name, | |
| } | |
| ) | |
| except Exception as e: | |
| error_msg = f"Error searching memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return json.dumps({"error": error_msg}) | |
| def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str: | |
| """ | |
| Interactive chat with stored memory. | |
| Args: | |
| query (str): User question/query | |
| client_id (str): Client identifier | |
| memory_name (str): Name of memory video to query | |
| Returns: | |
| str: AI response based on memory context | |
| """ | |
| try: | |
| if not MEMVID_AVAILABLE: | |
| return "Error: Memvid library not available" | |
| client_dir = self._get_client_dir(client_id) | |
| videos_dir = client_dir / "videos" | |
| video_path = videos_dir / f"{memory_name}.mp4" | |
| index_path = videos_dir / f"{memory_name}_index.json" | |
| if not video_path.exists(): | |
| return f"Error: Memory video '{memory_name}' not found for client {client_id}" | |
| # Initialize memvid chat | |
| chat = MemvidChat(str(video_path), str(index_path)) | |
| # Use memvid chat functionality | |
| response = chat.chat(query) | |
| return response | |
| except Exception as e: | |
| error_msg = f"Error in chat_with_memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |
| def list_memories(self, client_id: str) -> str: | |
| """ | |
| List all memory videos for a client. | |
| Args: | |
| client_id (str): Client identifier | |
| Returns: | |
| str: JSON string with memory list | |
| """ | |
| try: | |
| client_metadata = self._load_metadata(client_id) | |
| videos_dir = self._get_client_dir(client_id) / "videos" | |
| # Get actual video files | |
| video_files = list(videos_dir.glob("*.mp4")) | |
| memories = [] | |
| for video_file in video_files: | |
| memory_name = video_file.stem | |
| index_file = videos_dir / f"{memory_name}_index.json" | |
| memory_info = { | |
| "name": memory_name, | |
| "video_file": video_file.name, | |
| "size_bytes": video_file.stat().st_size, | |
| "has_index": index_file.exists(), | |
| } | |
| memories.append(memory_info) | |
| return json.dumps( | |
| { | |
| "client_id": client_id, | |
| "total_memories": len(memories), | |
| "total_chunks": client_metadata.get("total_chunks", 0), | |
| "memories": memories, | |
| }, | |
| indent=2, | |
| ) | |
| except Exception as e: | |
| error_msg = f"Error listing memories: {str(e)}" | |
| self.logger.error(error_msg) | |
| return json.dumps({"error": error_msg}) | |
| def get_memory_stats(self, client_id: str) -> str: | |
| """ | |
| Get memory usage statistics for a client. | |
| Args: | |
| client_id (str): Client identifier | |
| Returns: | |
| str: JSON string with statistics | |
| """ | |
| try: | |
| client_dir = self._get_client_dir(client_id) | |
| chunks_dir = client_dir / "chunks" | |
| videos_dir = client_dir / "videos" | |
| # Calculate storage usage | |
| chunks_size = sum(f.stat().st_size for f in chunks_dir.glob("*.txt")) | |
| videos_size = sum(f.stat().st_size for f in videos_dir.glob("*")) | |
| total_size = chunks_size + videos_size | |
| # Count files | |
| chunk_count = len(list(chunks_dir.glob("chunk_*.txt"))) | |
| memory_count = len(list(videos_dir.glob("*.mp4"))) | |
| # Load metadata | |
| client_metadata = self._load_metadata(client_id) | |
| stats = { | |
| "client_id": client_id, | |
| "total_chunks": chunk_count, | |
| "total_memories": memory_count, | |
| "storage_usage": { | |
| "chunks_size_bytes": chunks_size, | |
| "videos_size_bytes": videos_size, | |
| "total_size_bytes": total_size, | |
| "chunks_size_mb": round(chunks_size / 1024 / 1024, 2), | |
| "videos_size_mb": round(videos_size / 1024 / 1024, 2), | |
| "total_size_mb": round(total_size / 1024 / 1024, 2), | |
| }, | |
| "created_at": client_metadata.get("created_at", ""), | |
| "last_updated": client_metadata.get("last_updated", ""), | |
| } | |
| return json.dumps(stats, indent=2) | |
| except Exception as e: | |
| error_msg = f"Error getting memory stats: {str(e)}" | |
| self.logger.error(error_msg) | |
| return json.dumps({"error": error_msg}) | |
| def delete_memory(self, client_id: str, memory_name: str) -> str: | |
| """ | |
| Delete a specific memory video. | |
| Args: | |
| client_id (str): Client identifier | |
| memory_name (str): Name of memory to delete | |
| Returns: | |
| str: Success/error message | |
| """ | |
| try: | |
| client_dir = self._get_client_dir(client_id) | |
| videos_dir = client_dir / "videos" | |
| video_path = videos_dir / f"{memory_name}.mp4" | |
| index_path = videos_dir / f"{memory_name}_index.json" | |
| faiss_path = videos_dir / f"{memory_name}_index.faiss" | |
| deleted_files = [] | |
| # Delete video file | |
| if video_path.exists(): | |
| video_path.unlink() | |
| deleted_files.append("video") | |
| # Delete index files | |
| if index_path.exists(): | |
| index_path.unlink() | |
| deleted_files.append("index") | |
| if faiss_path.exists(): | |
| faiss_path.unlink() | |
| deleted_files.append("faiss_index") | |
| if not deleted_files: | |
| return f"Error: Memory '{memory_name}' not found for client {client_id}" | |
| # Update metadata | |
| client_metadata = self._load_metadata(client_id) | |
| memories = client_metadata.get("memories", []) | |
| memories = [m for m in memories if m.get("name") != memory_name] | |
| client_metadata["memories"] = memories | |
| client_metadata["total_memories"] = len(memories) | |
| self._save_metadata(client_id, client_metadata) | |
| return f"Successfully deleted memory '{memory_name}' for client {client_id} ({', '.join(deleted_files)} files removed)" | |
| except Exception as e: | |
| error_msg = f"Error deleting memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |