from openai import OpenAI from langchain_openai import ChatOpenAI from langchain_core.language_models.base import LanguageModelInput from langchain_core.messages import ( AIMessage, SystemMessage, BaseMessage, ) from typing import Any, Optional class DeepSeekR1ChatOpenAI(ChatOpenAI): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.client = OpenAI( base_url=kwargs.get("base_url"), api_key=kwargs.get("api_key") ) async def ainvoke( self, input: LanguageModelInput, config: Optional[dict] = None, *, stop: Optional[list[str]] = None, **kwargs: Any, ) -> AIMessage: message_history = [] for input_ in input: if isinstance(input_, SystemMessage): message_history.append({"role": "system", "content": input_.content}) elif isinstance(input_, AIMessage): message_history.append({"role": "assistant", "content": input_.content}) else: message_history.append({"role": "user", "content": input_.content}) response = self.client.chat.completions.create( model=self.model_name, messages=message_history ) content = response.choices[0].message.content return AIMessage(content=content) def invoke( self, input: LanguageModelInput, config: Optional[dict] = None, *, stop: Optional[list[str]] = None, **kwargs: Any, ) -> AIMessage: message_history = [] for input_ in input: if isinstance(input_, SystemMessage): message_history.append({"role": "system", "content": input_.content}) elif isinstance(input_, AIMessage): message_history.append({"role": "assistant", "content": input_.content}) else: message_history.append({"role": "user", "content": input_.content}) response = self.client.chat.completions.create( model=self.model_name, messages=message_history ) content = response.choices[0].message.content return AIMessage(content=content)