| | import asyncio |
| | import copy |
| | import logging |
| | from dataclasses import asdict |
| | from typing import List, Optional, Union |
| |
|
| | import aiohttp |
| |
|
| | from lagent.llms.base_llm import AsyncLLMMixin, BaseLLM |
| | from lagent.schema import ModelStatusCode |
| | from lagent.utils.util import filter_suffix |
| |
|
| |
|
| | class TritonClient(BaseLLM): |
| | """TritonClient is a wrapper of TritonClient for LLM. |
| | |
| | Args: |
| | tritonserver_addr (str): the address in format "ip:port" of |
| | triton inference server |
| | model_name (str): the name of the model |
| | session_len (int): the context size |
| | max_tokens (int): the expected generated token numbers |
| | """ |
| |
|
| | def __init__(self, |
| | tritonserver_addr: str, |
| | model_name: str, |
| | session_len: int = 32768, |
| | log_level: str = 'WARNING', |
| | **kwargs): |
| | super().__init__(path=None, **kwargs) |
| | try: |
| | from lmdeploy.serve.turbomind.chatbot import Chatbot, StatusCode |
| | except Exception as e: |
| | logging.error(f'{e}') |
| | raise RuntimeError('DO NOT use turbomind.chatbot since it has ' |
| | 'been removed by lmdeploy since v0.5.2') |
| | self.state_map = { |
| | StatusCode.TRITON_STREAM_END: ModelStatusCode.END, |
| | StatusCode.TRITON_SERVER_ERR: ModelStatusCode.SERVER_ERR, |
| | StatusCode.TRITON_SESSION_CLOSED: ModelStatusCode.SESSION_CLOSED, |
| | StatusCode.TRITON_STREAM_ING: ModelStatusCode.STREAM_ING, |
| | StatusCode.TRITON_SESSION_OUT_OF_LIMIT: |
| | ModelStatusCode.SESSION_OUT_OF_LIMIT, |
| | StatusCode.TRITON_SESSION_INVALID_ARG: |
| | ModelStatusCode.SESSION_INVALID_ARG, |
| | StatusCode.TRITON_SESSION_READY: ModelStatusCode.SESSION_READY |
| | } |
| | self.chatbot = Chatbot( |
| | tritonserver_addr=tritonserver_addr, |
| | model_name=model_name, |
| | session_len=session_len, |
| | log_level=log_level, |
| | **kwargs) |
| |
|
| | def generate(self, |
| | inputs: Union[str, List[str]], |
| | session_id: int = 2967, |
| | request_id: str = '', |
| | sequence_start: bool = True, |
| | sequence_end: bool = True, |
| | skip_special_tokens: bool = False, |
| | **kwargs): |
| | """Start a new round conversation of a session. Return the chat |
| | completions in non-stream mode. |
| | |
| | Args: |
| | inputs (str, List[str]): user's prompt(s) in this round |
| | session_id (int): the identical id of a session |
| | request_id (str): the identical id of this round conversation |
| | sequence_start (bool): start flag of a session |
| | sequence_end (bool): end flag of a session |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | Returns: |
| | (a list of/batched) text/chat completion |
| | """ |
| | from lmdeploy.serve.turbomind.chatbot import Session, get_logger |
| | if isinstance(inputs, str): |
| | inputs = [inputs] |
| | prompt = inputs |
| |
|
| | assert isinstance(session_id, int), \ |
| | f'INT session id is required, but got {type(session_id)}' |
| |
|
| | self.chatbot.cfg = self._update_gen_params(**kwargs) |
| | max_new_tokens = self.chatbot.cfg.max_new_tokens |
| |
|
| | logger = get_logger('service.ft', log_level=self.chatbot.log_level) |
| | logger.info(f'session {session_id}, request_id {request_id}, ' |
| | f'max_out_len {max_new_tokens}') |
| |
|
| | if self.chatbot._session is None: |
| | sequence_start = True |
| | self.chatbot._session = Session(session_id=session_id) |
| | elif self.chatbot._session.status == 0: |
| | logger.error(f'session {session_id} has been ended. Please set ' |
| | f'`sequence_start` be True if you want to restart it') |
| | return '' |
| |
|
| | self.chatbot._session.status = 1 |
| | self.chatbot._session.request_id = request_id |
| | self.chatbot._session.response = '' |
| |
|
| | status, res, _ = None, '', 0 |
| | for status, res, _ in self.chatbot._stream_infer( |
| | self.chatbot._session, |
| | prompt, |
| | max_new_tokens, |
| | sequence_start, |
| | sequence_end, |
| | skip_special_tokens=skip_special_tokens): |
| | status = self.state_map.get(status) |
| | if status < ModelStatusCode.END: |
| | return '' |
| | elif status == ModelStatusCode.END: |
| | self.chatbot._session.histories = ( |
| | self.chatbot._session.histories + |
| | self.chatbot._session.prompt + |
| | self.chatbot._session.response) |
| | |
| | res = filter_suffix(res, self.gen_params.get('stop_words')) |
| | return res |
| |
|
| | def stream_chat(self, |
| | inputs: List[dict], |
| | session_id: int = 2967, |
| | request_id: str = '', |
| | sequence_start: bool = True, |
| | sequence_end: bool = True, |
| | skip_special_tokens: bool = False, |
| | **kwargs): |
| | """Start a new round conversation of a session. Return the chat |
| | completions in stream mode. |
| | |
| | Args: |
| | session_id (int): the identical id of a session |
| | inputs (List[dict]): user's inputs in this round conversation |
| | request_id (str): the identical id of this round conversation |
| | sequence_start (bool): start flag of a session |
| | sequence_end (bool): end flag of a session |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | Returns: |
| | tuple(Status, str, int): status, text/chat completion, |
| | generated token number |
| | """ |
| | from lmdeploy.serve.turbomind.chatbot import Session, get_logger |
| | assert isinstance(session_id, int), \ |
| | f'INT session id is required, but got {type(session_id)}' |
| |
|
| | self.chatbot.cfg = self._update_gen_params(**kwargs) |
| | max_new_tokens = self.chatbot.cfg.max_new_tokens |
| |
|
| | logger = get_logger('service.ft', log_level=self.chatbot.log_level) |
| | logger.info(f'session {session_id}, request_id {request_id}, ' |
| | f'max_out_len {max_new_tokens}') |
| |
|
| | if self.chatbot._session is None: |
| | sequence_start = True |
| | self.chatbot._session = Session(session_id=session_id) |
| | elif self.chatbot._session.status == 0: |
| | logger.error(f'session {session_id} has been ended. Please set ' |
| | f'`sequence_start` be True if you want to restart it') |
| | return ModelStatusCode.SESSION_CLOSED, '', 0 |
| |
|
| | self.chatbot._session.status = 1 |
| | self.chatbot._session.request_id = request_id |
| | self.chatbot._session.response = '' |
| |
|
| | prompt = self.template_parser(inputs) |
| | status, res, _ = None, '', 0 |
| | for status, res, _ in self.chatbot._stream_infer( |
| | self.chatbot._session, |
| | prompt, |
| | max_new_tokens, |
| | sequence_start, |
| | sequence_end, |
| | skip_special_tokens=skip_special_tokens): |
| | status = self.state_map.get(status) |
| | |
| | res = filter_suffix(res, self.gen_params.get('stop_words')) |
| | if status < ModelStatusCode.END: |
| | return status, res, _ |
| | elif status == ModelStatusCode.END: |
| | self.chatbot._session.histories = ( |
| | self.chatbot._session.histories + |
| | self.chatbot._session.prompt + |
| | self.chatbot._session.response) |
| | yield status, res, _ |
| | break |
| | else: |
| | yield status, res, _ |
| |
|
| | def _update_gen_params(self, **kwargs): |
| | import mmengine |
| | new_gen_params = self.update_gen_params(**kwargs) |
| | self.gen_params['stop_words'] = new_gen_params.pop('stop_words') |
| | stop_words = self.chatbot._stop_words( |
| | self.gen_params.get('stop_words')) |
| | cfg = mmengine.Config( |
| | dict( |
| | session_len=self.chatbot.model.session_len, |
| | stop_words=stop_words, |
| | bad_words=self.chatbot.cfg.bad_words, |
| | **new_gen_params)) |
| | return cfg |
| |
|
| |
|
| | class LMDeployPipeline(BaseLLM): |
| | """ |
| | |
| | Args: |
| | path (str): The path to the model. |
| | It could be one of the following options: |
| | - i) A local directory path of a turbomind model which is |
| | converted by `lmdeploy convert` command or download |
| | from ii) and iii). |
| | - ii) The model_id of a lmdeploy-quantized model hosted |
| | inside a model repo on huggingface.co, such as |
| | "InternLM/internlm-chat-20b-4bit", |
| | "lmdeploy/llama2-chat-70b-4bit", etc. |
| | - iii) The model_id of a model hosted inside a model repo |
| | on huggingface.co, such as "internlm/internlm-chat-7b", |
| | "Qwen/Qwen-7B-Chat ", "baichuan-inc/Baichuan2-7B-Chat" |
| | and so on. |
| | model_name (str): needed when model_path is a pytorch model on |
| | huggingface.co, such as "internlm-chat-7b", |
| | "Qwen-7B-Chat ", "Baichuan2-7B-Chat" and so on. |
| | tp (int): tensor parallel |
| | pipeline_cfg (dict): config of pipeline |
| | """ |
| |
|
| | def __init__(self, |
| | path: str, |
| | model_name: Optional[str] = None, |
| | tp: int = 1, |
| | pipeline_cfg=dict(), |
| | **kwargs): |
| | import lmdeploy |
| | from lmdeploy import ChatTemplateConfig, TurbomindEngineConfig, pipeline, version_info |
| |
|
| | self.str_version = lmdeploy.__version__ |
| | self.version = version_info |
| | self.do_sample = kwargs.pop('do_sample', None) |
| | if self.do_sample is not None and self.version < (0, 6, 0): |
| | raise RuntimeError( |
| | '`do_sample` parameter is not supported by lmdeploy until ' |
| | f'v0.6.0, but currently using lmdeloy {self.str_version}') |
| | super().__init__(path=path, **kwargs) |
| | backend_config = copy.deepcopy(pipeline_cfg) |
| | backend_config.update(tp=tp) |
| | backend_config = { |
| | k: v |
| | for k, v in backend_config.items() |
| | if hasattr(TurbomindEngineConfig, k) |
| | } |
| | backend_config = TurbomindEngineConfig(**backend_config) |
| | chat_template_config = ChatTemplateConfig( |
| | model_name=model_name) if model_name else None |
| | self.model = pipeline( |
| | model_path=self.path, |
| | backend_config=backend_config, |
| | chat_template_config=chat_template_config, |
| | log_level='WARNING') |
| |
|
| | def generate(self, |
| | inputs: Union[str, List[str]], |
| | do_preprocess: bool = None, |
| | skip_special_tokens: bool = False, |
| | return_dict: bool = False, |
| | **kwargs): |
| | """Return the chat completions in non-stream mode. |
| | |
| | Args: |
| | inputs (Union[str, List[str]]): input texts to be completed. |
| | do_preprocess (bool): whether pre-process the messages. Default to |
| | True, which means chat_template will be applied. |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | Returns: |
| | (a list of/batched) text/chat completion |
| | """ |
| | from lmdeploy.messages import GenerationConfig |
| | batched = True |
| | if isinstance(inputs, str): |
| | inputs = [inputs] |
| | batched = False |
| | prompt = inputs |
| | do_sample = kwargs.pop('do_sample', None) |
| | gen_params = self.update_gen_params(**kwargs) |
| |
|
| | if do_sample is None: |
| | do_sample = self.do_sample |
| | if do_sample is not None and self.version < (0, 6, 0): |
| | raise RuntimeError( |
| | '`do_sample` parameter is not supported by lmdeploy until ' |
| | f'v0.6.0, but currently using lmdeloy {self.str_version}') |
| | if self.version >= (0, 6, 0): |
| | if do_sample is None: |
| | do_sample = gen_params['top_k'] > 1 or gen_params[ |
| | 'temperature'] > 0 |
| | gen_params.update(do_sample=do_sample) |
| |
|
| | gen_config = GenerationConfig( |
| | skip_special_tokens=skip_special_tokens, **gen_params) |
| | response = self.model.batch_infer( |
| | prompt, gen_config=gen_config, do_preprocess=do_preprocess) |
| | texts = [resp.text for resp in response] |
| | |
| | texts = filter_suffix(texts, self.gen_params.get('stop_words')) |
| | for resp, text in zip(response, texts): |
| | resp.text = text |
| | if batched: |
| | return [asdict(resp) |
| | for resp in response] if return_dict else texts |
| | return asdict(response[0]) if return_dict else texts[0] |
| |
|
| |
|
| | class LMDeployServer(BaseLLM): |
| | """ |
| | |
| | Args: |
| | path (str): The path to the model. |
| | It could be one of the following options: |
| | - i) A local directory path of a turbomind model which is |
| | converted by `lmdeploy convert` command or download from |
| | ii) and iii). |
| | - ii) The model_id of a lmdeploy-quantized model hosted |
| | inside a model repo on huggingface.co, such as |
| | "InternLM/internlm-chat-20b-4bit", |
| | "lmdeploy/llama2-chat-70b-4bit", etc. |
| | - iii) The model_id of a model hosted inside a model repo |
| | on huggingface.co, such as "internlm/internlm-chat-7b", |
| | "Qwen/Qwen-7B-Chat ", "baichuan-inc/Baichuan2-7B-Chat" |
| | and so on. |
| | model_name (str): needed when model_path is a pytorch model on |
| | huggingface.co, such as "internlm-chat-7b", |
| | "Qwen-7B-Chat ", "Baichuan2-7B-Chat" and so on. |
| | server_name (str): host ip for serving |
| | server_port (int): server port |
| | tp (int): tensor parallel |
| | log_level (str): set log level whose value among |
| | [CRITICAL, ERROR, WARNING, INFO, DEBUG] |
| | """ |
| |
|
| | def __init__(self, |
| | path: str, |
| | model_name: Optional[str] = None, |
| | server_name: str = '0.0.0.0', |
| | server_port: int = 23333, |
| | tp: int = 1, |
| | log_level: str = 'WARNING', |
| | serve_cfg=dict(), |
| | **kwargs): |
| | super().__init__(path=path, **kwargs) |
| | self.model_name = model_name |
| | |
| | import lmdeploy |
| | self.client = lmdeploy.serve( |
| | model_path=self.path, |
| | model_name=model_name, |
| | server_name=server_name, |
| | server_port=server_port, |
| | tp=tp, |
| | log_level=log_level, |
| | **serve_cfg) |
| |
|
| | def generate(self, |
| | inputs: Union[str, List[str]], |
| | session_id: int = 2967, |
| | sequence_start: bool = True, |
| | sequence_end: bool = True, |
| | ignore_eos: bool = False, |
| | skip_special_tokens: Optional[bool] = False, |
| | timeout: int = 30, |
| | **kwargs) -> List[str]: |
| | """Start a new round conversation of a session. Return the chat |
| | completions in non-stream mode. |
| | |
| | Args: |
| | inputs (str, List[str]): user's prompt(s) in this round |
| | session_id (int): the identical id of a session |
| | sequence_start (bool): start flag of a session |
| | sequence_end (bool): end flag of a session |
| | ignore_eos (bool): indicator for ignoring eos |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | timeout (int): max time to wait for response |
| | Returns: |
| | (a list of/batched) text/chat completion |
| | """ |
| |
|
| | batched = True |
| | if isinstance(inputs, str): |
| | inputs = [inputs] |
| | batched = False |
| |
|
| | gen_params = self.update_gen_params(**kwargs) |
| | max_new_tokens = gen_params.pop('max_new_tokens') |
| | gen_params.update(max_tokens=max_new_tokens) |
| |
|
| | resp = [''] * len(inputs) |
| | for text in self.client.completions_v1( |
| | self.model_name, |
| | inputs, |
| | session_id=session_id, |
| | sequence_start=sequence_start, |
| | sequence_end=sequence_end, |
| | stream=False, |
| | ignore_eos=ignore_eos, |
| | skip_special_tokens=skip_special_tokens, |
| | timeout=timeout, |
| | **gen_params): |
| | resp = [ |
| | resp[i] + item['text'] |
| | for i, item in enumerate(text['choices']) |
| | ] |
| | |
| | resp = filter_suffix(resp, self.gen_params.get('stop_words')) |
| | if not batched: |
| | return resp[0] |
| | return resp |
| |
|
| | def stream_chat(self, |
| | inputs: List[dict], |
| | session_id=0, |
| | sequence_start: bool = True, |
| | sequence_end: bool = True, |
| | stream: bool = True, |
| | ignore_eos: bool = False, |
| | skip_special_tokens: Optional[bool] = False, |
| | timeout: int = 30, |
| | **kwargs): |
| | """Start a new round conversation of a session. Return the chat |
| | completions in stream mode. |
| | |
| | Args: |
| | session_id (int): the identical id of a session |
| | inputs (List[dict]): user's inputs in this round conversation |
| | sequence_start (bool): start flag of a session |
| | sequence_end (bool): end flag of a session |
| | stream (bool): return in a streaming format if enabled |
| | ignore_eos (bool): indicator for ignoring eos |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | timeout (int): max time to wait for response |
| | Returns: |
| | tuple(Status, str, int): status, text/chat completion, |
| | generated token number |
| | """ |
| | gen_params = self.update_gen_params(**kwargs) |
| | max_new_tokens = gen_params.pop('max_new_tokens') |
| | gen_params.update(max_tokens=max_new_tokens) |
| | prompt = self.template_parser(inputs) |
| |
|
| | resp = '' |
| | finished = False |
| | stop_words = self.gen_params.get('stop_words') |
| | for text in self.client.completions_v1( |
| | self.model_name, |
| | prompt, |
| | session_id=session_id, |
| | sequence_start=sequence_start, |
| | sequence_end=sequence_end, |
| | stream=stream, |
| | ignore_eos=ignore_eos, |
| | skip_special_tokens=skip_special_tokens, |
| | timeout=timeout, |
| | **gen_params): |
| | resp += text['choices'][0]['text'] |
| | if not resp: |
| | continue |
| | |
| | for sw in stop_words: |
| | if sw in resp: |
| | resp = filter_suffix(resp, stop_words) |
| | finished = True |
| | break |
| | yield ModelStatusCode.STREAM_ING, resp, None |
| | if finished: |
| | break |
| | yield ModelStatusCode.END, resp, None |
| |
|
| |
|
| | class LMDeployClient(LMDeployServer): |
| | """ |
| | |
| | Args: |
| | url (str): communicating address 'http://<ip>:<port>' of |
| | api_server |
| | model_name (str): needed when model_path is a pytorch model on |
| | huggingface.co, such as "internlm-chat-7b", |
| | "Qwen-7B-Chat ", "Baichuan2-7B-Chat" and so on. |
| | """ |
| |
|
| | def __init__(self, url: str, model_name: str, **kwargs): |
| | BaseLLM.__init__(self, path=url, **kwargs) |
| | from lmdeploy.serve.openai.api_client import APIClient |
| | self.client = APIClient(url) |
| | self.model_name = model_name |
| |
|
| |
|
| | class AsyncLMDeployPipeline(AsyncLLMMixin, LMDeployPipeline): |
| | """ |
| | |
| | Args: |
| | path (str): The path to the model. |
| | It could be one of the following options: |
| | - i) A local directory path of a turbomind model which is |
| | converted by `lmdeploy convert` command or download |
| | from ii) and iii). |
| | - ii) The model_id of a lmdeploy-quantized model hosted |
| | inside a model repo on huggingface.co, such as |
| | "InternLM/internlm-chat-20b-4bit", |
| | "lmdeploy/llama2-chat-70b-4bit", etc. |
| | - iii) The model_id of a model hosted inside a model repo |
| | on huggingface.co, such as "internlm/internlm-chat-7b", |
| | "Qwen/Qwen-7B-Chat ", "baichuan-inc/Baichuan2-7B-Chat" |
| | and so on. |
| | model_name (str): needed when model_path is a pytorch model on |
| | huggingface.co, such as "internlm-chat-7b", |
| | "Qwen-7B-Chat ", "Baichuan2-7B-Chat" and so on. |
| | tp (int): tensor parallel |
| | pipeline_cfg (dict): config of pipeline |
| | """ |
| |
|
| | async def generate(self, |
| | inputs: Union[str, List[str]], |
| | session_ids: Union[int, List[int]] = None, |
| | do_preprocess: bool = None, |
| | skip_special_tokens: bool = False, |
| | return_dict: bool = False, |
| | **kwargs): |
| | """Return the chat completions in non-stream mode. |
| | |
| | Args: |
| | inputs (Union[str, List[str]]): input texts to be completed. |
| | do_preprocess (bool): whether pre-process the messages. Default to |
| | True, which means chat_template will be applied. |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | Returns: |
| | (a list of/batched) text/chat completion |
| | """ |
| | from lmdeploy.messages import GenerationConfig, Response |
| |
|
| | batched = True |
| | if isinstance(inputs, str): |
| | inputs = [inputs] |
| | batched = False |
| | if session_ids is None: |
| | session_ids = list(range(len(inputs))) |
| | elif isinstance(session_ids, (int, str)): |
| | session_ids = [session_ids] |
| | assert len(inputs) == len(session_ids) |
| |
|
| | prompt = inputs |
| | gen_params = self.update_gen_params(**kwargs) |
| | gen_config = GenerationConfig( |
| | skip_special_tokens=skip_special_tokens, **gen_params) |
| |
|
| | async def _inner_generate(uid, text): |
| | resp = Response('', 0, 0, uid) |
| | async for out in self.model.generate( |
| | text, |
| | uid, |
| | gen_config, |
| | stream_response=True, |
| | sequence_start=True, |
| | sequence_end=True, |
| | do_preprocess=do_preprocess, |
| | **kwargs): |
| | resp.text += out.response |
| | resp.generate_token_len = out.generate_token_len |
| | resp.input_token_len = out.input_token_len |
| | resp.finish_reason = out.finish_reason |
| | if out.token_ids: |
| | resp.token_ids.extend(out.token_ids) |
| | if out.logprobs: |
| | if resp.logprobs is None: |
| | resp.logprobs = [] |
| | resp.logprobs.extend(out.logprobs) |
| | return resp |
| |
|
| | response = await asyncio.gather(*[ |
| | _inner_generate(sid, inp) for sid, inp in zip(session_ids, prompt) |
| | ]) |
| | texts = [resp.text for resp in response] |
| | |
| | texts = filter_suffix(texts, self.gen_params.get('stop_words')) |
| | for resp, text in zip(response, texts): |
| | resp.text = text |
| | if batched: |
| | return [asdict(resp) |
| | for resp in response] if return_dict else texts |
| | return asdict(response[0]) if return_dict else texts[0] |
| |
|
| |
|
| | class AsyncLMDeployServer(AsyncLLMMixin, LMDeployServer): |
| | """ |
| | |
| | Args: |
| | path (str): The path to the model. |
| | It could be one of the following options: |
| | - i) A local directory path of a turbomind model which is |
| | converted by `lmdeploy convert` command or download from |
| | ii) and iii). |
| | - ii) The model_id of a lmdeploy-quantized model hosted |
| | inside a model repo on huggingface.co, such as |
| | "InternLM/internlm-chat-20b-4bit", |
| | "lmdeploy/llama2-chat-70b-4bit", etc. |
| | - iii) The model_id of a model hosted inside a model repo |
| | on huggingface.co, such as "internlm/internlm-chat-7b", |
| | "Qwen/Qwen-7B-Chat ", "baichuan-inc/Baichuan2-7B-Chat" |
| | and so on. |
| | model_name (str): needed when model_path is a pytorch model on |
| | huggingface.co, such as "internlm-chat-7b", |
| | "Qwen-7B-Chat ", "Baichuan2-7B-Chat" and so on. |
| | server_name (str): host ip for serving |
| | server_port (int): server port |
| | tp (int): tensor parallel |
| | log_level (str): set log level whose value among |
| | [CRITICAL, ERROR, WARNING, INFO, DEBUG] |
| | """ |
| |
|
| | async def generate( |
| | self, |
| | inputs: Union[str, List[str]], |
| | session_ids: Union[int, List[int]] = None, |
| | sequence_start: bool = True, |
| | sequence_end: bool = True, |
| | ignore_eos: bool = False, |
| | skip_special_tokens: Optional[bool] = False, |
| | timeout: int = 30, |
| | **kwargs, |
| | ): |
| | """Start a new round conversation of a session. Return the chat |
| | completions in non-stream mode. |
| | |
| | Args: |
| | inputs (str, List[str]): user's prompt(s) in this round |
| | session_ids (int, List[int]): session id(s) |
| | sequence_start (bool): start flag of a session |
| | sequence_end (bool): end flag of a session |
| | ignore_eos (bool): indicator for ignoring eos |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | timeout (int): max time to wait for response |
| | Returns: |
| | (a list of/batched) text/chat completion |
| | """ |
| | from lmdeploy.serve.openai.api_client import json_loads |
| |
|
| | batched = True |
| | if isinstance(inputs, str): |
| | inputs = [inputs] |
| | batched = False |
| |
|
| | gen_params = self.update_gen_params(**kwargs) |
| | max_new_tokens = gen_params.pop('max_new_tokens') |
| | gen_params.update(max_tokens=max_new_tokens) |
| |
|
| | responses = [''] * len(inputs) |
| | pload = dict( |
| | model=self.model_name, |
| | prompt=inputs, |
| | sequence_start=sequence_start, |
| | sequence_end=sequence_end, |
| | stream=False, |
| | ignore_eos=ignore_eos, |
| | skip_special_tokens=skip_special_tokens, |
| | timeout=timeout, |
| | **gen_params) |
| | async with aiohttp.ClientSession( |
| | timeout=aiohttp.ClientTimeout(3 * 3600)) as session: |
| | async with session.post( |
| | self.client.completions_v1_url, |
| | headers=self.client.headers, |
| | json=pload) as resp: |
| | async for chunk in resp.content: |
| | if chunk: |
| | decoded = chunk.decode('utf-8') |
| | output = json_loads(decoded) |
| | responses = [ |
| | response + item['text'] for response, item in zip( |
| | responses, output['choices']) |
| | ] |
| | |
| | responses = filter_suffix(responses, self.gen_params.get('stop_words')) |
| | if not batched: |
| | return responses[0] |
| | return responses |
| |
|
| | async def stream_chat( |
| | self, |
| | inputs: List[dict], |
| | session_id: int = None, |
| | sequence_start: bool = True, |
| | sequence_end: bool = True, |
| | stream: bool = True, |
| | ignore_eos: bool = False, |
| | skip_special_tokens: Optional[bool] = False, |
| | timeout: int = 30, |
| | **kwargs, |
| | ): |
| | """Start a new round conversation of a session. Return the chat |
| | completions in stream mode. |
| | |
| | Args: |
| | inputs (List[dict]): user's inputs in this round conversation |
| | session_id (int): session id |
| | sequence_start (bool): start flag of a session |
| | sequence_end (bool): end flag of a session |
| | stream (bool): return in a streaming format if enabled |
| | ignore_eos (bool): indicator for ignoring eos |
| | skip_special_tokens (bool): Whether or not to remove special tokens |
| | in the decoding. Default to be False. |
| | timeout (int): max time to wait for response |
| | Returns: |
| | tuple(Status, str, int): status, text/chat completion, |
| | generated token number |
| | """ |
| | from lmdeploy.serve.openai.api_client import json_loads |
| |
|
| | gen_params = self.update_gen_params(**kwargs) |
| | max_new_tokens = gen_params.pop('max_new_tokens') |
| | gen_params.update(max_tokens=max_new_tokens) |
| | prompt = self.template_parser(inputs) |
| |
|
| | response = '' |
| | finished = False |
| | stop_words = self.gen_params.get('stop_words') |
| |
|
| | pload = dict( |
| | model=self.model_name, |
| | prompt=prompt, |
| | sequence_start=sequence_start, |
| | sequence_end=sequence_end, |
| | stream=stream, |
| | ignore_eos=ignore_eos, |
| | skip_special_tokens=skip_special_tokens, |
| | timeout=timeout, |
| | **gen_params) |
| | async with aiohttp.ClientSession( |
| | timeout=aiohttp.ClientTimeout(3 * 3600)) as session: |
| | async with session.post( |
| | self.client.completions_v1_url, |
| | headers=self.client.headers, |
| | json=pload) as resp: |
| | async for chunk in resp.content: |
| | if chunk: |
| | decoded = chunk.decode('utf-8') |
| | if not decoded.strip() or decoded.rstrip( |
| | ) == 'data: [DONE]': |
| | continue |
| | if decoded[:6] == 'data: ': |
| | decoded = decoded[6:] |
| | output = json_loads(decoded) |
| | response += output['choices'][0]['text'] |
| | if not response: |
| | continue |
| | |
| | for sw in stop_words: |
| | if sw in response: |
| | response = filter_suffix(response, stop_words) |
| | finished = True |
| | break |
| | yield ModelStatusCode.STREAM_ING, response, None |
| | if finished: |
| | break |
| | yield ModelStatusCode.END, response, None |
| |
|
| |
|
| | class AsyncLMDeployClient(AsyncLMDeployServer): |
| | """ |
| | |
| | Args: |
| | url (str): communicating address 'http://<ip>:<port>' of |
| | api_server |
| | model_name (str): needed when model_path is a pytorch model on |
| | huggingface.co, such as "internlm-chat-7b", |
| | "Qwen-7B-Chat ", "Baichuan2-7B-Chat" and so on. |
| | """ |
| |
|
| | def __init__(self, url: str, model_name: str, **kwargs): |
| | BaseLLM.__init__(self, path=url, **kwargs) |
| | from lmdeploy.serve.openai.api_client import APIClient |
| | self.client = APIClient(url) |
| | self.model_name = model_name |
| |
|