| | from __future__ import annotations |
| |
|
| | import asyncio |
| | import os |
| | import platform |
| | import sys |
| | from datetime import datetime, timezone |
| | from typing import TYPE_CHECKING |
| |
|
| | import httpx |
| | from loguru import logger |
| |
|
| | from langflow.services.base import Service |
| | from langflow.services.telemetry.opentelemetry import OpenTelemetry |
| | from langflow.services.telemetry.schema import ( |
| | ComponentPayload, |
| | PlaygroundPayload, |
| | RunPayload, |
| | ShutdownPayload, |
| | VersionPayload, |
| | ) |
| | from langflow.utils.version import get_version_info |
| |
|
| | if TYPE_CHECKING: |
| | from pydantic import BaseModel |
| |
|
| | from langflow.services.settings.service import SettingsService |
| |
|
| |
|
| | class TelemetryService(Service): |
| | name = "telemetry_service" |
| |
|
| | def __init__(self, settings_service: SettingsService): |
| | super().__init__() |
| | self.settings_service = settings_service |
| | self.base_url = settings_service.settings.telemetry_base_url |
| | self.telemetry_queue: asyncio.Queue = asyncio.Queue() |
| | self.client = httpx.AsyncClient(timeout=10.0) |
| | self.running = False |
| | self._stopping = False |
| |
|
| | self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled) |
| | self.architecture: str | None = None |
| | self.worker_task: asyncio.Task | None = None |
| | |
| | self.do_not_track = ( |
| | os.getenv("DO_NOT_TRACK", "False").lower() == "true" or settings_service.settings.do_not_track |
| | ) |
| |
|
| | async def telemetry_worker(self) -> None: |
| | while self.running: |
| | func, payload, path = await self.telemetry_queue.get() |
| | try: |
| | await func(payload, path) |
| | except Exception: |
| | logger.exception("Error sending telemetry data") |
| | finally: |
| | self.telemetry_queue.task_done() |
| |
|
| | async def send_telemetry_data(self, payload: BaseModel, path: str | None = None) -> None: |
| | if self.do_not_track: |
| | logger.debug("Telemetry tracking is disabled.") |
| | return |
| |
|
| | url = f"{self.base_url}" |
| | if path: |
| | url = f"{url}/{path}" |
| | try: |
| | payload_dict = payload.model_dump(by_alias=True, exclude_none=True, exclude_unset=True) |
| | response = await self.client.get(url, params=payload_dict) |
| | if response.status_code != httpx.codes.OK: |
| | logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}") |
| | else: |
| | logger.debug("Telemetry data sent successfully.") |
| | except httpx.HTTPStatusError: |
| | logger.exception("HTTP error occurred") |
| | except httpx.RequestError: |
| | logger.exception("Request error occurred") |
| | except Exception: |
| | logger.exception("Unexpected error occurred") |
| |
|
| | async def log_package_run(self, payload: RunPayload) -> None: |
| | await self._queue_event((self.send_telemetry_data, payload, "run")) |
| |
|
| | async def log_package_shutdown(self) -> None: |
| | payload = ShutdownPayload(time_running=(datetime.now(timezone.utc) - self._start_time).seconds) |
| | await self._queue_event(payload) |
| |
|
| | async def _queue_event(self, payload) -> None: |
| | if self.do_not_track or self._stopping: |
| | return |
| | await self.telemetry_queue.put(payload) |
| |
|
| | async def log_package_version(self) -> None: |
| | python_version = ".".join(platform.python_version().split(".")[:2]) |
| | version_info = get_version_info() |
| | if self.architecture is None: |
| | self.architecture = (await asyncio.to_thread(platform.architecture))[0] |
| | payload = VersionPayload( |
| | package=version_info["package"].lower(), |
| | version=version_info["version"], |
| | platform=platform.platform(), |
| | python=python_version, |
| | cache_type=self.settings_service.settings.cache_type, |
| | backend_only=self.settings_service.settings.backend_only, |
| | arch=self.architecture, |
| | auto_login=self.settings_service.auth_settings.AUTO_LOGIN, |
| | ) |
| | await self._queue_event((self.send_telemetry_data, payload, None)) |
| |
|
| | async def log_package_playground(self, payload: PlaygroundPayload) -> None: |
| | await self._queue_event((self.send_telemetry_data, payload, "playground")) |
| |
|
| | async def log_package_component(self, payload: ComponentPayload) -> None: |
| | await self._queue_event((self.send_telemetry_data, payload, "component")) |
| |
|
| | def start(self) -> None: |
| | if self.running or self.do_not_track: |
| | return |
| | try: |
| | self.running = True |
| | self._start_time = datetime.now(timezone.utc) |
| | self.worker_task = asyncio.create_task(self.telemetry_worker()) |
| | self.log_package_version_task = asyncio.create_task(self.log_package_version()) |
| | except Exception: |
| | logger.exception("Error starting telemetry service") |
| |
|
| | async def flush(self) -> None: |
| | if self.do_not_track: |
| | return |
| | try: |
| | await self.telemetry_queue.join() |
| | except Exception: |
| | logger.exception("Error flushing logs") |
| |
|
| | async def _cancel_task(self, task: asyncio.Task, cancel_msg: str) -> None: |
| | task.cancel(cancel_msg) |
| | try: |
| | await task |
| | except asyncio.CancelledError: |
| | current_task = asyncio.current_task() |
| | if sys.version_info >= (3, 11): |
| | if current_task and current_task.cancelling() > 0: |
| | raise |
| | elif current_task and hasattr(current_task, "_must_cancel") and current_task._must_cancel: |
| | raise |
| |
|
| | async def stop(self) -> None: |
| | if self.do_not_track or self._stopping: |
| | return |
| | try: |
| | self._stopping = True |
| | |
| | await self.flush() |
| | self.running = False |
| | if self.worker_task: |
| | await self._cancel_task(self.worker_task, "Cancel telemetry worker task") |
| | if self.log_package_version_task: |
| | await self._cancel_task(self.log_package_version_task, "Cancel telemetry log package version task") |
| | await self.client.aclose() |
| | except Exception: |
| | logger.exception("Error stopping tracing service") |
| |
|
| | async def teardown(self) -> None: |
| | await self.stop() |
| |
|