File size: 11,973 Bytes
399b80c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import logging
import os
import platform
import uuid
from collections.abc import Callable
from functools import wraps
from pathlib import Path
from typing import Any

from posthog import Posthog
from scarf import ScarfEventLogger

from mcp_use.logging import MCP_USE_DEBUG
from mcp_use.telemetry.events import (
    BaseTelemetryEvent,
    MCPAgentExecutionEvent,
)
from mcp_use.telemetry.utils import get_package_version

logger = logging.getLogger(__name__)


def singleton(cls):
    """A decorator that implements the singleton pattern for a class."""
    instance = [None]
    
    def wrapper(*args, **kwargs):
        if instance[0] is None:
            instance[0] = cls(*args, **kwargs)
        return instance[0]
    
    return wrapper

def requires_telemetry(func: Callable) -> Callable:
    """Decorator that skips function execution if telemetry is disabled"""

    @wraps(func)
    def wrapper(self, *args, **kwargs):
        if not self._posthog_client and not self._scarf_client:
            return None
        return func(self, *args, **kwargs)

    return wrapper


def get_cache_home() -> Path:
    """Get platform-appropriate cache directory."""
    # XDG_CACHE_HOME for Linux and manually set envs
    env_var: str | None = os.getenv("XDG_CACHE_HOME")
    if env_var and (path := Path(env_var)).is_absolute():
        return path

    system = platform.system()
    if system == "Windows":
        appdata = os.getenv("LOCALAPPDATA") or os.getenv("APPDATA")
        if appdata:
            return Path(appdata)
        return Path.home() / "AppData" / "Local"
    elif system == "Darwin":  # macOS
        return Path.home() / "Library" / "Caches"
    else:  # Linux or other Unix
        return Path.home() / ".cache"


@singleton
class Telemetry:
    """
    Service for capturing anonymized telemetry data via PostHog and Scarf.
    If the environment variable `MCP_USE_ANONYMIZED_TELEMETRY=false`, telemetry will be disabled.
    """

    USER_ID_PATH = str(get_cache_home() / "mcp_use_3" / "telemetry_user_id")
    VERSION_DOWNLOAD_PATH = str(get_cache_home() / "mcp_use" / "download_version")
    PROJECT_API_KEY = "phc_lyTtbYwvkdSbrcMQNPiKiiRWrrM1seyKIMjycSvItEI"
    HOST = "https://eu.i.posthog.com"
    SCARF_GATEWAY_URL = "https://mcpuse.gateway.scarf.sh/events"
    UNKNOWN_USER_ID = "UNKNOWN_USER_ID"

    _curr_user_id = None

    def __init__(self):
        telemetry_disabled = os.getenv("MCP_USE_ANONYMIZED_TELEMETRY", "true").lower() == "false"

        if telemetry_disabled:
            self._posthog_client = None
            self._scarf_client = None
            logger.debug("Telemetry disabled")
        else:
            logger.info("Anonymized telemetry enabled. Set MCP_USE_ANONYMIZED_TELEMETRY=false to disable.")

            # Initialize PostHog
            try:
                self._posthog_client = Posthog(
                    project_api_key=self.PROJECT_API_KEY,
                    host=self.HOST,
                    disable_geoip=False,
                    enable_exception_autocapture=True,
                )

                # Silence posthog's logging unless debug mode (level 2)
                if MCP_USE_DEBUG < 2:
                    posthog_logger = logging.getLogger("posthog")
                    posthog_logger.disabled = True

            except Exception as e:
                logger.warning(f"Failed to initialize PostHog telemetry: {e}")
                self._posthog_client = None

            # Initialize Scarf
            try:
                self._scarf_client = ScarfEventLogger(
                    endpoint_url=self.SCARF_GATEWAY_URL,
                    timeout=3.0,
                    verbose=MCP_USE_DEBUG >= 2,
                )

                # Silence scarf's logging unless debug mode (level 2)
                if MCP_USE_DEBUG < 2:
                    scarf_logger = logging.getLogger("scarf")
                    scarf_logger.disabled = True

            except Exception as e:
                logger.warning(f"Failed to initialize Scarf telemetry: {e}")
                self._scarf_client = None

    @property
    def user_id(self) -> str:
        """Get or create a persistent anonymous user ID"""
        if self._curr_user_id:
            return self._curr_user_id

        try:
            is_first_time = not os.path.exists(self.USER_ID_PATH)

            if is_first_time:
                logger.debug(f"Creating user ID path: {self.USER_ID_PATH}")
                os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
                with open(self.USER_ID_PATH, "w") as f:
                    new_user_id = str(uuid.uuid4())
                    f.write(new_user_id)
                self._curr_user_id = new_user_id

                logger.debug(f"User ID path created: {self.USER_ID_PATH}")
            else:
                with open(self.USER_ID_PATH) as f:
                    self._curr_user_id = f.read().strip()

            # Always check for version-based download tracking
            self.track_package_download(
                {
                    "triggered_by": "user_id_property",
                }
            )
        except Exception as e:
            logger.debug(f"Failed to get/create user ID: {e}")
            self._curr_user_id = self.UNKNOWN_USER_ID

        return self._curr_user_id

    @requires_telemetry
    def capture(self, event: BaseTelemetryEvent) -> None:
        """Capture a telemetry event"""
        # Send to PostHog
        if self._posthog_client:
            try:
                # Add package version to all events
                properties = event.properties.copy()
                properties["mcp_use_version"] = get_package_version()

                self._posthog_client.capture(distinct_id=self.user_id, event=event.name, properties=properties)
            except Exception as e:
                logger.debug(f"Failed to track PostHog event {event.name}: {e}")

        # Send to Scarf
        if self._scarf_client:
            try:
                # Add package version and user_id to all events
                properties = {}
                properties["mcp_use_version"] = get_package_version()
                properties["user_id"] = self.user_id
                properties["event"] = event.name

                # Convert complex types to simple types for Scarf compatibility
                self._scarf_client.log_event(properties=properties)
            except Exception as e:
                logger.debug(f"Failed to track Scarf event {event.name}: {e}")

    @requires_telemetry
    def track_package_download(self, properties: dict[str, Any] | None = None) -> None:
        """Track package download event specifically for Scarf analytics"""
        if self._scarf_client:
            try:
                current_version = get_package_version()
                should_track = False
                first_download = False

                # Check if version file exists
                if not os.path.exists(self.VERSION_DOWNLOAD_PATH):
                    # First download
                    should_track = True
                    first_download = True

                    # Create directory and save version
                    os.makedirs(os.path.dirname(self.VERSION_DOWNLOAD_PATH), exist_ok=True)
                    with open(self.VERSION_DOWNLOAD_PATH, "w") as f:
                        f.write(current_version)
                else:
                    # Read saved version
                    with open(self.VERSION_DOWNLOAD_PATH) as f:
                        saved_version = f.read().strip()

                    # Compare versions (simple string comparison for now)
                    if current_version > saved_version:
                        should_track = True
                        first_download = False

                        # Update saved version
                        with open(self.VERSION_DOWNLOAD_PATH, "w") as f:
                            f.write(current_version)

                if should_track:
                    logger.debug(f"Tracking package download event with properties: {properties}")
                    # Add package version and user_id to event
                    event_properties = (properties or {}).copy()
                    event_properties["mcp_use_version"] = current_version
                    event_properties["user_id"] = self.user_id
                    event_properties["event"] = "package_download"
                    event_properties["first_download"] = first_download

                    # Convert complex types to simple types for Scarf compatibility
                    self._scarf_client.log_event(properties=event_properties)
            except Exception as e:
                logger.debug(f"Failed to track Scarf package_download event: {e}")

    @requires_telemetry
    def track_agent_execution(
        self,
        execution_method: str,
        query: str,
        success: bool,
        model_provider: str,
        model_name: str,
        server_count: int,
        server_identifiers: list[dict[str, str]],
        total_tools_available: int,
        tools_available_names: list[str],
        max_steps_configured: int,
        memory_enabled: bool,
        use_server_manager: bool,
        max_steps_used: int | None,
        manage_connector: bool,
        external_history_used: bool,
        steps_taken: int | None = None,
        tools_used_count: int | None = None,
        tools_used_names: list[str] | None = None,
        response: str | None = None,
        execution_time_ms: int | None = None,
        error_type: str | None = None,
        conversation_history_length: int | None = None,
    ) -> None:
        """Track comprehensive agent execution"""
        event = MCPAgentExecutionEvent(
            execution_method=execution_method,
            query=query,
            success=success,
            model_provider=model_provider,
            model_name=model_name,
            server_count=server_count,
            server_identifiers=server_identifiers,
            total_tools_available=total_tools_available,
            tools_available_names=tools_available_names,
            max_steps_configured=max_steps_configured,
            memory_enabled=memory_enabled,
            use_server_manager=use_server_manager,
            max_steps_used=max_steps_used,
            manage_connector=manage_connector,
            external_history_used=external_history_used,
            steps_taken=steps_taken,
            tools_used_count=tools_used_count,
            tools_used_names=tools_used_names,
            response=response,
            execution_time_ms=execution_time_ms,
            error_type=error_type,
            conversation_history_length=conversation_history_length,
        )
        self.capture(event)

    @requires_telemetry
    def flush(self) -> None:
        """Flush any queued telemetry events"""
        # Flush PostHog
        if self._posthog_client:
            try:
                self._posthog_client.flush()
                logger.debug("PostHog client telemetry queue flushed")
            except Exception as e:
                logger.debug(f"Failed to flush PostHog client: {e}")

        # Scarf events are sent immediately, no flush needed
        if self._scarf_client:
            logger.debug("Scarf telemetry events sent immediately (no flush needed)")

    @requires_telemetry
    def shutdown(self) -> None:
        """Shutdown telemetry clients and flush remaining events"""
        # Shutdown PostHog
        if self._posthog_client:
            try:
                self._posthog_client.shutdown()
                logger.debug("PostHog client shutdown successfully")
            except Exception as e:
                logger.debug(f"Error shutting down PostHog client: {e}")

        # Scarf doesn't require explicit shutdown
        if self._scarf_client:
            logger.debug("Scarf telemetry client shutdown (no action needed)")