Spaces:
Running
Running
eldarski
π₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
| """ | |
| Metrics Collector - Tracks performance metrics for dual storage comparison. | |
| Provides background analytics and comparison reporting without user complexity. | |
| """ | |
| import json | |
| import time | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from pathlib import Path | |
| from collections import defaultdict, deque | |
| import statistics | |
| class MetricsCollector: | |
| """ | |
| Collects and analyzes performance metrics for dual storage comparison. | |
| Tracks storage/search performance, accuracy, and provides comparison analytics. | |
| """ | |
| def __init__(self, max_samples: int = 1000): | |
| """ | |
| Initialize metrics collector. | |
| Args: | |
| max_samples (int): Maximum number of samples to keep in memory | |
| """ | |
| self.logger = logging.getLogger(__name__) | |
| self.max_samples = max_samples | |
| # Storage metrics | |
| self.storage_metrics = { | |
| "memvid": deque(maxlen=max_samples), | |
| "vector": deque(maxlen=max_samples), | |
| } | |
| # Search metrics | |
| self.search_metrics = { | |
| "memvid": deque(maxlen=max_samples), | |
| "vector": deque(maxlen=max_samples), | |
| } | |
| # Comparison metrics | |
| self.comparison_data = { | |
| "storage_comparisons": deque(maxlen=max_samples), | |
| "search_comparisons": deque(maxlen=max_samples), | |
| } | |
| # Client-specific metrics | |
| self.client_metrics = defaultdict( | |
| lambda: { | |
| "storage_count": 0, | |
| "search_count": 0, | |
| "total_data_stored": 0, | |
| "preferred_mode": "unknown", | |
| } | |
| ) | |
| self.logger.info("MetricsCollector initialized") | |
| def track_storage_operation( | |
| self, backend: str, duration: float, data_size: int, client_id: str = "" | |
| ) -> None: | |
| """ | |
| Track a storage operation. | |
| Args: | |
| backend (str): Storage backend (memvid/vector) | |
| duration (float): Operation duration in seconds | |
| data_size (int): Size of data stored in bytes | |
| client_id (str): Client identifier | |
| """ | |
| metric = { | |
| "timestamp": time.time(), | |
| "backend": backend, | |
| "duration": duration, | |
| "data_size": data_size, | |
| "client_id": client_id, | |
| } | |
| self.storage_metrics[backend].append(metric) | |
| if client_id: | |
| self.client_metrics[client_id]["storage_count"] += 1 | |
| self.client_metrics[client_id]["total_data_stored"] += data_size | |
| def track_search_operation( | |
| self, backend: str, duration: float, top_k: int, client_id: str = "" | |
| ) -> None: | |
| """ | |
| Track a search operation. | |
| Args: | |
| backend (str): Storage backend (memvid/vector) | |
| duration (float): Operation duration in seconds | |
| top_k (int): Number of results requested | |
| client_id (str): Client identifier | |
| """ | |
| metric = { | |
| "timestamp": time.time(), | |
| "backend": backend, | |
| "duration": duration, | |
| "top_k": top_k, | |
| "client_id": client_id, | |
| } | |
| self.search_metrics[backend].append(metric) | |
| if client_id: | |
| self.client_metrics[client_id]["search_count"] += 1 | |
| def track_dual_storage_comparison( | |
| self, memvid_time: float, vector_time: float, data_size: int, client_id: str | |
| ) -> None: | |
| """ | |
| Track dual storage comparison metrics. | |
| Args: | |
| memvid_time (float): Memvid storage time | |
| vector_time (float): Vector storage time | |
| data_size (int): Size of data stored | |
| client_id (str): Client identifier | |
| """ | |
| comparison = { | |
| "timestamp": time.time(), | |
| "memvid_time": memvid_time, | |
| "vector_time": vector_time, | |
| "data_size": data_size, | |
| "client_id": client_id, | |
| "winner": "memvid" if memvid_time < vector_time else "vector", | |
| "speedup": max(memvid_time, vector_time) / min(memvid_time, vector_time), | |
| } | |
| self.comparison_data["storage_comparisons"].append(comparison) | |
| def track_dual_search_comparison( | |
| self, memvid_time: float, vector_time: float, query: str, client_id: str | |
| ) -> None: | |
| """ | |
| Track dual search comparison metrics. | |
| Args: | |
| memvid_time (float): Memvid search time | |
| vector_time (float): Vector search time | |
| query (str): Search query | |
| client_id (str): Client identifier | |
| """ | |
| comparison = { | |
| "timestamp": time.time(), | |
| "memvid_time": memvid_time, | |
| "vector_time": vector_time, | |
| "query_length": len(query), | |
| "client_id": client_id, | |
| "winner": "memvid" if memvid_time < vector_time else "vector", | |
| "speedup": ( | |
| max(memvid_time, vector_time) / min(memvid_time, vector_time) | |
| if min(memvid_time, vector_time) > 0 | |
| else 1.0 | |
| ), | |
| } | |
| self.comparison_data["search_comparisons"].append(comparison) | |
| def get_comparison_report(self, client_id: str = "") -> str: | |
| """ | |
| Generate comprehensive comparison report. | |
| Args: | |
| client_id (str): Client identifier (empty for global report) | |
| Returns: | |
| str: JSON string with comparison analytics | |
| """ | |
| try: | |
| report = { | |
| "report_timestamp": time.time(), | |
| "client_id": client_id or "global", | |
| "storage_mode": "dual", | |
| "summary": self._generate_summary(client_id), | |
| "performance_analysis": self._analyze_performance(client_id), | |
| "recommendations": self._generate_recommendations(client_id), | |
| } | |
| return json.dumps(report, indent=2) | |
| except Exception as e: | |
| self.logger.error(f"Error generating comparison report: {e}") | |
| return json.dumps({"error": f"Failed to generate report: {str(e)}"}) | |
| def _generate_summary(self, client_id: str = "") -> Dict[str, Any]: | |
| """Generate performance summary.""" | |
| storage_comps = list(self.comparison_data["storage_comparisons"]) | |
| search_comps = list(self.comparison_data["search_comparisons"]) | |
| # Filter by client if specified | |
| if client_id: | |
| storage_comps = [c for c in storage_comps if c["client_id"] == client_id] | |
| search_comps = [c for c in search_comps if c["client_id"] == client_id] | |
| if not storage_comps and not search_comps: | |
| return {"message": "No comparison data available"} | |
| summary = { | |
| "total_comparisons": len(storage_comps) + len(search_comps), | |
| "storage_comparisons": len(storage_comps), | |
| "search_comparisons": len(search_comps), | |
| } | |
| # Storage performance summary | |
| if storage_comps: | |
| memvid_wins = sum(1 for c in storage_comps if c["winner"] == "memvid") | |
| avg_speedup = statistics.mean([c["speedup"] for c in storage_comps]) | |
| summary["storage_performance"] = { | |
| "memvid_wins": memvid_wins, | |
| "vector_wins": len(storage_comps) - memvid_wins, | |
| "avg_speedup_factor": round(avg_speedup, 2), | |
| "faster_backend": ( | |
| "memvid" if memvid_wins > len(storage_comps) / 2 else "vector" | |
| ), | |
| } | |
| # Search performance summary | |
| if search_comps: | |
| memvid_wins = sum(1 for c in search_comps if c["winner"] == "memvid") | |
| avg_speedup = statistics.mean([c["speedup"] for c in search_comps]) | |
| summary["search_performance"] = { | |
| "memvid_wins": memvid_wins, | |
| "vector_wins": len(search_comps) - memvid_wins, | |
| "avg_speedup_factor": round(avg_speedup, 2), | |
| "faster_backend": ( | |
| "memvid" if memvid_wins > len(search_comps) / 2 else "vector" | |
| ), | |
| } | |
| return summary | |
| def _analyze_performance(self, client_id: str = "") -> Dict[str, Any]: | |
| """Analyze detailed performance metrics.""" | |
| analysis = {} | |
| # Analyze storage performance | |
| memvid_storage = [ | |
| m | |
| for m in self.storage_metrics["memvid"] | |
| if not client_id or m["client_id"] == client_id | |
| ] | |
| vector_storage = [ | |
| m | |
| for m in self.storage_metrics["vector"] | |
| if not client_id or m["client_id"] == client_id | |
| ] | |
| if memvid_storage: | |
| analysis["memvid_storage"] = { | |
| "avg_duration_ms": round( | |
| statistics.mean([m["duration"] for m in memvid_storage]) * 1000, 2 | |
| ), | |
| "total_operations": len(memvid_storage), | |
| "total_data_mb": round( | |
| sum([m["data_size"] for m in memvid_storage]) / (1024 * 1024), 2 | |
| ), | |
| } | |
| if vector_storage: | |
| analysis["vector_storage"] = { | |
| "avg_duration_ms": round( | |
| statistics.mean([m["duration"] for m in vector_storage]) * 1000, 2 | |
| ), | |
| "total_operations": len(vector_storage), | |
| "total_data_mb": round( | |
| sum([m["data_size"] for m in vector_storage]) / (1024 * 1024), 2 | |
| ), | |
| } | |
| # Analyze search performance | |
| memvid_search = [ | |
| m | |
| for m in self.search_metrics["memvid"] | |
| if not client_id or m["client_id"] == client_id | |
| ] | |
| vector_search = [ | |
| m | |
| for m in self.search_metrics["vector"] | |
| if not client_id or m["client_id"] == client_id | |
| ] | |
| if memvid_search: | |
| analysis["memvid_search"] = { | |
| "avg_duration_ms": round( | |
| statistics.mean([m["duration"] for m in memvid_search]) * 1000, 2 | |
| ), | |
| "total_searches": len(memvid_search), | |
| } | |
| if vector_search: | |
| analysis["vector_search"] = { | |
| "avg_duration_ms": round( | |
| statistics.mean([m["duration"] for m in vector_search]) * 1000, 2 | |
| ), | |
| "total_searches": len(vector_search), | |
| } | |
| return analysis | |
| def _generate_recommendations(self, client_id: str = "") -> List[str]: | |
| """Generate performance-based recommendations.""" | |
| recommendations = [] | |
| storage_comps = list(self.comparison_data["storage_comparisons"]) | |
| search_comps = list(self.comparison_data["search_comparisons"]) | |
| # Filter by client if specified | |
| if client_id: | |
| storage_comps = [c for c in storage_comps if c["client_id"] == client_id] | |
| search_comps = [c for c in search_comps if c["client_id"] == client_id] | |
| if not storage_comps and not search_comps: | |
| recommendations.append("No comparison data available for recommendations") | |
| return recommendations | |
| # Storage recommendations | |
| if storage_comps: | |
| memvid_wins = sum(1 for c in storage_comps if c["winner"] == "memvid") | |
| if memvid_wins > len(storage_comps) * 0.7: | |
| recommendations.append( | |
| "πΉ Memvid shows consistently faster storage - consider memvid_only mode for write-heavy workloads" | |
| ) | |
| elif memvid_wins < len(storage_comps) * 0.3: | |
| recommendations.append( | |
| "β‘ Vector storage shows faster performance - consider vector_only mode for high-frequency storage" | |
| ) | |
| else: | |
| recommendations.append( | |
| "βοΈ Storage performance is balanced - dual mode provides good comparison data" | |
| ) | |
| # Search recommendations | |
| if search_comps: | |
| memvid_wins = sum(1 for c in search_comps if c["winner"] == "memvid") | |
| if memvid_wins > len(search_comps) * 0.7: | |
| recommendations.append( | |
| "π Memvid shows superior search performance - excellent for semantic search workloads" | |
| ) | |
| elif memvid_wins < len(search_comps) * 0.3: | |
| recommendations.append( | |
| "π Vector search outperforms memvid - consider vector_only for search-heavy applications" | |
| ) | |
| else: | |
| recommendations.append( | |
| "π― Search performance varies - dual mode provides valuable insights" | |
| ) | |
| # Data size recommendations | |
| if storage_comps: | |
| avg_data_size = statistics.mean([c["data_size"] for c in storage_comps]) | |
| if avg_data_size > 10000: # Large chunks | |
| recommendations.append( | |
| "π Large data chunks detected - memvid compression may provide storage efficiency benefits" | |
| ) | |
| elif avg_data_size < 1000: # Small chunks | |
| recommendations.append( | |
| "β‘ Small data chunks detected - vector storage may have lower overhead" | |
| ) | |
| return recommendations | |
| def export_metrics(self, format: str = "json") -> str: | |
| """ | |
| Export metrics data. | |
| Args: | |
| format (str): Export format (json, csv) | |
| Returns: | |
| str: Exported metrics data | |
| """ | |
| try: | |
| if format.lower() == "json": | |
| export_data = { | |
| "export_timestamp": time.time(), | |
| "storage_metrics": { | |
| "memvid": list(self.storage_metrics["memvid"]), | |
| "vector": list(self.storage_metrics["vector"]), | |
| }, | |
| "search_metrics": { | |
| "memvid": list(self.search_metrics["memvid"]), | |
| "vector": list(self.search_metrics["vector"]), | |
| }, | |
| "comparison_data": { | |
| "storage_comparisons": list( | |
| self.comparison_data["storage_comparisons"] | |
| ), | |
| "search_comparisons": list( | |
| self.comparison_data["search_comparisons"] | |
| ), | |
| }, | |
| "client_metrics": dict(self.client_metrics), | |
| } | |
| return json.dumps(export_data, indent=2) | |
| else: | |
| return f"Error: Unsupported format '{format}'. Supported: json" | |
| except Exception as e: | |
| return f"Error exporting metrics: {str(e)}" | |