| | from __future__ import annotations |
| |
|
| | import os |
| | import traceback |
| | import types |
| | from datetime import datetime, timezone |
| | from typing import TYPE_CHECKING, Any |
| |
|
| | from loguru import logger |
| |
|
| | from langflow.schema.data import Data |
| | from langflow.services.tracing.base import BaseTracer |
| |
|
| | if TYPE_CHECKING: |
| | from collections.abc import Sequence |
| | from uuid import UUID |
| |
|
| | from langchain.callbacks.base import BaseCallbackHandler |
| |
|
| | from langflow.graph.vertex.base import Vertex |
| | from langflow.services.tracing.schema import Log |
| |
|
| |
|
| | class LangSmithTracer(BaseTracer): |
| | def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): |
| | try: |
| | self._ready = self.setup_langsmith() |
| | if not self._ready: |
| | return |
| | from langsmith.run_trees import RunTree |
| |
|
| | self.trace_name = trace_name |
| | self.trace_type = trace_type |
| | self.project_name = project_name |
| | self.trace_id = trace_id |
| | self._run_tree = RunTree( |
| | project_name=self.project_name, |
| | name=self.trace_name, |
| | run_type=self.trace_type, |
| | id=self.trace_id, |
| | ) |
| | self._run_tree.add_event({"name": "Start", "time": datetime.now(timezone.utc).isoformat()}) |
| | self._children: dict[str, RunTree] = {} |
| | except Exception: |
| | logger.debug("Error setting up LangSmith tracer") |
| | self._ready = False |
| |
|
| | @property |
| | def ready(self): |
| | return self._ready |
| |
|
| | def setup_langsmith(self) -> bool: |
| | if os.getenv("LANGCHAIN_API_KEY") is None: |
| | return False |
| | try: |
| | from langsmith import Client |
| |
|
| | self._client = Client() |
| | except ImportError: |
| | logger.exception("Could not import langsmith. Please install it with `pip install langsmith`.") |
| | return False |
| | os.environ["LANGCHAIN_TRACING_V2"] = "true" |
| | return True |
| |
|
| | def add_trace( |
| | self, |
| | trace_id: str, |
| | trace_name: str, |
| | trace_type: str, |
| | inputs: dict[str, Any], |
| | metadata: dict[str, Any] | None = None, |
| | vertex: Vertex | None = None, |
| | ) -> None: |
| | if not self._ready or not self._run_tree: |
| | return |
| | processed_inputs = {} |
| | if inputs: |
| | processed_inputs = self._convert_to_langchain_types(inputs) |
| | child = self._run_tree.create_child( |
| | name=trace_name, |
| | run_type=trace_type, |
| | inputs=processed_inputs, |
| | ) |
| | if metadata: |
| | child.add_metadata(self._convert_to_langchain_types(metadata)) |
| | self._children[trace_name] = child |
| | self._child_link: dict[str, str] = {} |
| |
|
| | def _convert_to_langchain_types(self, io_dict: dict[str, Any]): |
| | converted = {} |
| | for key, value in io_dict.items(): |
| | converted[key] = self._convert_to_langchain_type(value) |
| | return converted |
| |
|
| | def _convert_to_langchain_type(self, value): |
| | from langflow.schema.message import Message |
| |
|
| | if isinstance(value, dict): |
| | value = {key: self._convert_to_langchain_type(val) for key, val in value.items()} |
| | elif isinstance(value, list): |
| | value = [self._convert_to_langchain_type(v) for v in value] |
| | elif isinstance(value, Message): |
| | if "prompt" in value: |
| | value = value.load_lc_prompt() |
| | elif value.sender: |
| | value = value.to_lc_message() |
| | else: |
| | value = value.to_lc_document() |
| | elif isinstance(value, Data): |
| | value = value.to_lc_document() |
| | elif isinstance(value, types.GeneratorType): |
| | |
| | value = str(value) |
| | return value |
| |
|
| | def end_trace( |
| | self, |
| | trace_id: str, |
| | trace_name: str, |
| | outputs: dict[str, Any] | None = None, |
| | error: Exception | None = None, |
| | logs: Sequence[Log | dict] = (), |
| | ): |
| | if not self._ready or trace_name not in self._children: |
| | return |
| | child = self._children[trace_name] |
| | raw_outputs = {} |
| | processed_outputs = {} |
| | if outputs: |
| | raw_outputs = outputs |
| | processed_outputs = self._convert_to_langchain_types(outputs) |
| | if logs: |
| | logs_dicts = [log if isinstance(log, dict) else log.model_dump() for log in logs] |
| | child.add_metadata(self._convert_to_langchain_types({"logs": {log.get("name"): log for log in logs_dicts}})) |
| | child.add_metadata(self._convert_to_langchain_types({"outputs": raw_outputs})) |
| | child.end(outputs=processed_outputs, error=self._error_to_string(error)) |
| | if error: |
| | child.patch() |
| | else: |
| | child.post() |
| | self._child_link[trace_name] = child.get_url() |
| |
|
| | def _error_to_string(self, error: Exception | None): |
| | error_message = None |
| | if error: |
| | string_stacktrace = traceback.format_exception(error) |
| | error_message = f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}" |
| | return error_message |
| |
|
| | def end( |
| | self, |
| | inputs: dict[str, Any], |
| | outputs: dict[str, Any], |
| | error: Exception | None = None, |
| | metadata: dict[str, Any] | None = None, |
| | ) -> None: |
| | if not self._ready or not self._run_tree: |
| | return |
| | self._run_tree.add_metadata({"inputs": inputs}) |
| | if metadata: |
| | self._run_tree.add_metadata(metadata) |
| | self._run_tree.end(outputs=outputs, error=self._error_to_string(error)) |
| | self._run_tree.post() |
| | self._run_link = self._run_tree.get_url() |
| |
|
| | def get_langchain_callback(self) -> BaseCallbackHandler | None: |
| | return None |
| |
|