Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import TYPE_CHECKING, Any | |
| from uuid import UUID | |
| from langchain_core.agents import AgentAction, AgentFinish | |
| from langchain_core.callbacks.base import AsyncCallbackHandler | |
| from loguru import logger | |
| from typing_extensions import override | |
| from langflow.api.v1.schemas import ChatResponse, PromptResponse | |
| from langflow.services.deps import get_chat_service, get_socket_service | |
| from langflow.utils.util import remove_ansi_escape_codes | |
| if TYPE_CHECKING: | |
| from langflow.services.socket.service import SocketIOService | |
| # https://github.com/hwchase17/chat-langchain/blob/master/callback.py | |
| class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler): | |
| """Callback handler for streaming LLM responses.""" | |
| def ignore_chain(self) -> bool: | |
| """Whether to ignore chain callbacks.""" | |
| return False | |
| def __init__(self, session_id: str): | |
| self.chat_service = get_chat_service() | |
| self.client_id = session_id | |
| self.socketio_service: SocketIOService = get_socket_service() | |
| self.sid = session_id | |
| # self.socketio_service = self.chat_service.active_connections[self.client_id] | |
| async def on_llm_new_token(self, token: str, **kwargs: Any) -> None: # type: ignore[misc] | |
| resp = ChatResponse(message=token, type="stream", intermediate_steps="") | |
| await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump()) | |
| async def on_tool_start(self, serialized: dict[str, Any], input_str: str, **kwargs: Any) -> Any: # type: ignore[misc] | |
| """Run when tool starts running.""" | |
| resp = ChatResponse( | |
| message="", | |
| type="stream", | |
| intermediate_steps=f"Tool input: {input_str}", | |
| ) | |
| await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump()) | |
| async def on_tool_end(self, output: str, **kwargs: Any) -> Any: | |
| """Run when tool ends running.""" | |
| observation_prefix = kwargs.get("observation_prefix", "Tool output: ") | |
| split_output = output.split() | |
| first_word = split_output[0] | |
| rest_of_output = split_output[1:] | |
| # Create a formatted message. | |
| intermediate_steps = f"{observation_prefix}{first_word}" | |
| # Create a ChatResponse instance. | |
| resp = ChatResponse( | |
| message="", | |
| type="stream", | |
| intermediate_steps=intermediate_steps, | |
| ) | |
| rest_of_resps = [ | |
| ChatResponse( | |
| message="", | |
| type="stream", | |
| intermediate_steps=f"{word}", | |
| ) | |
| for word in rest_of_output | |
| ] | |
| resps = [resp, *rest_of_resps] | |
| # Try to send the response, handle potential errors. | |
| try: | |
| # This is to emulate the stream of tokens | |
| for resp in resps: | |
| await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump()) | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Error sending response") | |
| async def on_tool_error( | |
| self, | |
| error: BaseException, | |
| *, | |
| run_id: UUID, | |
| parent_run_id: UUID | None = None, | |
| tags: list[str] | None = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """Run when tool errors.""" | |
| async def on_text( # type: ignore[misc] | |
| self, text: str, **kwargs: Any | |
| ) -> Any: | |
| """Run on arbitrary text.""" | |
| # This runs when first sending the prompt | |
| # to the LLM, adding it will send the final prompt | |
| # to the frontend | |
| if "Prompt after formatting" in text: | |
| text = text.replace("Prompt after formatting:\n", "") | |
| text = remove_ansi_escape_codes(text) | |
| resp = PromptResponse( | |
| prompt=text, | |
| ) | |
| await self.socketio_service.emit_message(to=self.sid, data=resp.model_dump()) | |
| async def on_agent_action( # type: ignore[misc] | |
| self, action: AgentAction, **kwargs: Any | |
| ) -> None: | |
| log = f"Thought: {action.log}" | |
| # if there are line breaks, split them and send them | |
| # as separate messages | |
| if "\n" in log: | |
| logs = log.split("\n") | |
| for log in logs: | |
| resp = ChatResponse(message="", type="stream", intermediate_steps=log) | |
| await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump()) | |
| else: | |
| resp = ChatResponse(message="", type="stream", intermediate_steps=log) | |
| await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump()) | |
| async def on_agent_finish( # type: ignore[misc] | |
| self, finish: AgentFinish, **kwargs: Any | |
| ) -> Any: | |
| """Run on agent end.""" | |
| resp = ChatResponse( | |
| message="", | |
| type="stream", | |
| intermediate_steps=finish.log, | |
| ) | |
| await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump()) | |