Spaces:
Running
Running
| import logging | |
| import os | |
| import platform | |
| import uuid | |
| from collections.abc import Callable | |
| from functools import wraps | |
| from pathlib import Path | |
| from typing import Any | |
| from posthog import Posthog | |
| from scarf import ScarfEventLogger | |
| from mcp_use.logging import MCP_USE_DEBUG | |
| from mcp_use.telemetry.events import ( | |
| BaseTelemetryEvent, | |
| MCPAgentExecutionEvent, | |
| ) | |
| from mcp_use.telemetry.utils import get_package_version | |
| logger = logging.getLogger(__name__) | |
| def singleton(cls): | |
| """A decorator that implements the singleton pattern for a class.""" | |
| instance = [None] | |
| def wrapper(*args, **kwargs): | |
| if instance[0] is None: | |
| instance[0] = cls(*args, **kwargs) | |
| return instance[0] | |
| return wrapper | |
| def requires_telemetry(func: Callable) -> Callable: | |
| """Decorator that skips function execution if telemetry is disabled""" | |
| def wrapper(self, *args, **kwargs): | |
| if not self._posthog_client and not self._scarf_client: | |
| return None | |
| return func(self, *args, **kwargs) | |
| return wrapper | |
| def get_cache_home() -> Path: | |
| """Get platform-appropriate cache directory.""" | |
| # XDG_CACHE_HOME for Linux and manually set envs | |
| env_var: str | None = os.getenv("XDG_CACHE_HOME") | |
| if env_var and (path := Path(env_var)).is_absolute(): | |
| return path | |
| system = platform.system() | |
| if system == "Windows": | |
| appdata = os.getenv("LOCALAPPDATA") or os.getenv("APPDATA") | |
| if appdata: | |
| return Path(appdata) | |
| return Path.home() / "AppData" / "Local" | |
| elif system == "Darwin": # macOS | |
| return Path.home() / "Library" / "Caches" | |
| else: # Linux or other Unix | |
| return Path.home() / ".cache" | |
| class Telemetry: | |
| """ | |
| Service for capturing anonymized telemetry data via PostHog and Scarf. | |
| If the environment variable `MCP_USE_ANONYMIZED_TELEMETRY=false`, telemetry will be disabled. | |
| """ | |
| USER_ID_PATH = str(get_cache_home() / "mcp_use_3" / "telemetry_user_id") | |
| VERSION_DOWNLOAD_PATH = str(get_cache_home() / "mcp_use" / "download_version") | |
| PROJECT_API_KEY = "phc_lyTtbYwvkdSbrcMQNPiKiiRWrrM1seyKIMjycSvItEI" | |
| HOST = "https://eu.i.posthog.com" | |
| SCARF_GATEWAY_URL = "https://mcpuse.gateway.scarf.sh/events" | |
| UNKNOWN_USER_ID = "UNKNOWN_USER_ID" | |
| _curr_user_id = None | |
| def __init__(self): | |
| telemetry_disabled = os.getenv("MCP_USE_ANONYMIZED_TELEMETRY", "true").lower() == "false" | |
| if telemetry_disabled: | |
| self._posthog_client = None | |
| self._scarf_client = None | |
| logger.debug("Telemetry disabled") | |
| else: | |
| logger.info("Anonymized telemetry enabled. Set MCP_USE_ANONYMIZED_TELEMETRY=false to disable.") | |
| # Initialize PostHog | |
| try: | |
| self._posthog_client = Posthog( | |
| project_api_key=self.PROJECT_API_KEY, | |
| host=self.HOST, | |
| disable_geoip=False, | |
| enable_exception_autocapture=True, | |
| ) | |
| # Silence posthog's logging unless debug mode (level 2) | |
| if MCP_USE_DEBUG < 2: | |
| posthog_logger = logging.getLogger("posthog") | |
| posthog_logger.disabled = True | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize PostHog telemetry: {e}") | |
| self._posthog_client = None | |
| # Initialize Scarf | |
| try: | |
| self._scarf_client = ScarfEventLogger( | |
| endpoint_url=self.SCARF_GATEWAY_URL, | |
| timeout=3.0, | |
| verbose=MCP_USE_DEBUG >= 2, | |
| ) | |
| # Silence scarf's logging unless debug mode (level 2) | |
| if MCP_USE_DEBUG < 2: | |
| scarf_logger = logging.getLogger("scarf") | |
| scarf_logger.disabled = True | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize Scarf telemetry: {e}") | |
| self._scarf_client = None | |
| def user_id(self) -> str: | |
| """Get or create a persistent anonymous user ID""" | |
| if self._curr_user_id: | |
| return self._curr_user_id | |
| try: | |
| is_first_time = not os.path.exists(self.USER_ID_PATH) | |
| if is_first_time: | |
| logger.debug(f"Creating user ID path: {self.USER_ID_PATH}") | |
| os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True) | |
| with open(self.USER_ID_PATH, "w") as f: | |
| new_user_id = str(uuid.uuid4()) | |
| f.write(new_user_id) | |
| self._curr_user_id = new_user_id | |
| logger.debug(f"User ID path created: {self.USER_ID_PATH}") | |
| else: | |
| with open(self.USER_ID_PATH) as f: | |
| self._curr_user_id = f.read().strip() | |
| # Always check for version-based download tracking | |
| self.track_package_download( | |
| { | |
| "triggered_by": "user_id_property", | |
| } | |
| ) | |
| except Exception as e: | |
| logger.debug(f"Failed to get/create user ID: {e}") | |
| self._curr_user_id = self.UNKNOWN_USER_ID | |
| return self._curr_user_id | |
| def capture(self, event: BaseTelemetryEvent) -> None: | |
| """Capture a telemetry event""" | |
| # Send to PostHog | |
| if self._posthog_client: | |
| try: | |
| # Add package version to all events | |
| properties = event.properties.copy() | |
| properties["mcp_use_version"] = get_package_version() | |
| self._posthog_client.capture(distinct_id=self.user_id, event=event.name, properties=properties) | |
| except Exception as e: | |
| logger.debug(f"Failed to track PostHog event {event.name}: {e}") | |
| # Send to Scarf | |
| if self._scarf_client: | |
| try: | |
| # Add package version and user_id to all events | |
| properties = {} | |
| properties["mcp_use_version"] = get_package_version() | |
| properties["user_id"] = self.user_id | |
| properties["event"] = event.name | |
| # Convert complex types to simple types for Scarf compatibility | |
| self._scarf_client.log_event(properties=properties) | |
| except Exception as e: | |
| logger.debug(f"Failed to track Scarf event {event.name}: {e}") | |
| def track_package_download(self, properties: dict[str, Any] | None = None) -> None: | |
| """Track package download event specifically for Scarf analytics""" | |
| if self._scarf_client: | |
| try: | |
| current_version = get_package_version() | |
| should_track = False | |
| first_download = False | |
| # Check if version file exists | |
| if not os.path.exists(self.VERSION_DOWNLOAD_PATH): | |
| # First download | |
| should_track = True | |
| first_download = True | |
| # Create directory and save version | |
| os.makedirs(os.path.dirname(self.VERSION_DOWNLOAD_PATH), exist_ok=True) | |
| with open(self.VERSION_DOWNLOAD_PATH, "w") as f: | |
| f.write(current_version) | |
| else: | |
| # Read saved version | |
| with open(self.VERSION_DOWNLOAD_PATH) as f: | |
| saved_version = f.read().strip() | |
| # Compare versions (simple string comparison for now) | |
| if current_version > saved_version: | |
| should_track = True | |
| first_download = False | |
| # Update saved version | |
| with open(self.VERSION_DOWNLOAD_PATH, "w") as f: | |
| f.write(current_version) | |
| if should_track: | |
| logger.debug(f"Tracking package download event with properties: {properties}") | |
| # Add package version and user_id to event | |
| event_properties = (properties or {}).copy() | |
| event_properties["mcp_use_version"] = current_version | |
| event_properties["user_id"] = self.user_id | |
| event_properties["event"] = "package_download" | |
| event_properties["first_download"] = first_download | |
| # Convert complex types to simple types for Scarf compatibility | |
| self._scarf_client.log_event(properties=event_properties) | |
| except Exception as e: | |
| logger.debug(f"Failed to track Scarf package_download event: {e}") | |
| def track_agent_execution( | |
| self, | |
| execution_method: str, | |
| query: str, | |
| success: bool, | |
| model_provider: str, | |
| model_name: str, | |
| server_count: int, | |
| server_identifiers: list[dict[str, str]], | |
| total_tools_available: int, | |
| tools_available_names: list[str], | |
| max_steps_configured: int, | |
| memory_enabled: bool, | |
| use_server_manager: bool, | |
| max_steps_used: int | None, | |
| manage_connector: bool, | |
| external_history_used: bool, | |
| steps_taken: int | None = None, | |
| tools_used_count: int | None = None, | |
| tools_used_names: list[str] | None = None, | |
| response: str | None = None, | |
| execution_time_ms: int | None = None, | |
| error_type: str | None = None, | |
| conversation_history_length: int | None = None, | |
| ) -> None: | |
| """Track comprehensive agent execution""" | |
| event = MCPAgentExecutionEvent( | |
| execution_method=execution_method, | |
| query=query, | |
| success=success, | |
| model_provider=model_provider, | |
| model_name=model_name, | |
| server_count=server_count, | |
| server_identifiers=server_identifiers, | |
| total_tools_available=total_tools_available, | |
| tools_available_names=tools_available_names, | |
| max_steps_configured=max_steps_configured, | |
| memory_enabled=memory_enabled, | |
| use_server_manager=use_server_manager, | |
| max_steps_used=max_steps_used, | |
| manage_connector=manage_connector, | |
| external_history_used=external_history_used, | |
| steps_taken=steps_taken, | |
| tools_used_count=tools_used_count, | |
| tools_used_names=tools_used_names, | |
| response=response, | |
| execution_time_ms=execution_time_ms, | |
| error_type=error_type, | |
| conversation_history_length=conversation_history_length, | |
| ) | |
| self.capture(event) | |
| def flush(self) -> None: | |
| """Flush any queued telemetry events""" | |
| # Flush PostHog | |
| if self._posthog_client: | |
| try: | |
| self._posthog_client.flush() | |
| logger.debug("PostHog client telemetry queue flushed") | |
| except Exception as e: | |
| logger.debug(f"Failed to flush PostHog client: {e}") | |
| # Scarf events are sent immediately, no flush needed | |
| if self._scarf_client: | |
| logger.debug("Scarf telemetry events sent immediately (no flush needed)") | |
| def shutdown(self) -> None: | |
| """Shutdown telemetry clients and flush remaining events""" | |
| # Shutdown PostHog | |
| if self._posthog_client: | |
| try: | |
| self._posthog_client.shutdown() | |
| logger.debug("PostHog client shutdown successfully") | |
| except Exception as e: | |
| logger.debug(f"Error shutting down PostHog client: {e}") | |
| # Scarf doesn't require explicit shutdown | |
| if self._scarf_client: | |
| logger.debug("Scarf telemetry client shutdown (no action needed)") | |