Spaces:
Paused
Paused
| from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait | |
| from typing import List, Optional | |
| import litellm | |
| from litellm._logging import print_verbose | |
| from litellm.utils import get_optional_params | |
| from ..llms.vllm.completion import handler as vllm_handler | |
| def batch_completion( | |
| model: str, | |
| # Optional OpenAI params: see https://platform.openai.com/docs/api-reference/chat/create | |
| messages: List = [], | |
| functions: Optional[List] = None, | |
| function_call: Optional[str] = None, | |
| temperature: Optional[float] = None, | |
| top_p: Optional[float] = None, | |
| n: Optional[int] = None, | |
| stream: Optional[bool] = None, | |
| stop=None, | |
| max_tokens: Optional[int] = None, | |
| presence_penalty: Optional[float] = None, | |
| frequency_penalty: Optional[float] = None, | |
| logit_bias: Optional[dict] = None, | |
| user: Optional[str] = None, | |
| deployment_id=None, | |
| request_timeout: Optional[int] = None, | |
| timeout: Optional[int] = 600, | |
| max_workers: Optional[int] = 100, | |
| # Optional liteLLM function params | |
| **kwargs, | |
| ): | |
| """ | |
| Batch litellm.completion function for a given model. | |
| Args: | |
| model (str): The model to use for generating completions. | |
| messages (List, optional): List of messages to use as input for generating completions. Defaults to []. | |
| functions (List, optional): List of functions to use as input for generating completions. Defaults to []. | |
| function_call (str, optional): The function call to use as input for generating completions. Defaults to "". | |
| temperature (float, optional): The temperature parameter for generating completions. Defaults to None. | |
| top_p (float, optional): The top-p parameter for generating completions. Defaults to None. | |
| n (int, optional): The number of completions to generate. Defaults to None. | |
| stream (bool, optional): Whether to stream completions or not. Defaults to None. | |
| stop (optional): The stop parameter for generating completions. Defaults to None. | |
| max_tokens (float, optional): The maximum number of tokens to generate. Defaults to None. | |
| presence_penalty (float, optional): The presence penalty for generating completions. Defaults to None. | |
| frequency_penalty (float, optional): The frequency penalty for generating completions. Defaults to None. | |
| logit_bias (dict, optional): The logit bias for generating completions. Defaults to {}. | |
| user (str, optional): The user string for generating completions. Defaults to "". | |
| deployment_id (optional): The deployment ID for generating completions. Defaults to None. | |
| request_timeout (int, optional): The request timeout for generating completions. Defaults to None. | |
| max_workers (int,optional): The maximum number of threads to use for parallel processing. | |
| Returns: | |
| list: A list of completion results. | |
| """ | |
| args = locals() | |
| batch_messages = messages | |
| completions = [] | |
| model = model | |
| custom_llm_provider = None | |
| if model.split("/", 1)[0] in litellm.provider_list: | |
| custom_llm_provider = model.split("/", 1)[0] | |
| model = model.split("/", 1)[1] | |
| if custom_llm_provider == "vllm": | |
| optional_params = get_optional_params( | |
| functions=functions, | |
| function_call=function_call, | |
| temperature=temperature, | |
| top_p=top_p, | |
| n=n, | |
| stream=stream or False, | |
| stop=stop, | |
| max_tokens=max_tokens, | |
| presence_penalty=presence_penalty, | |
| frequency_penalty=frequency_penalty, | |
| logit_bias=logit_bias, | |
| user=user, | |
| # params to identify the model | |
| model=model, | |
| custom_llm_provider=custom_llm_provider, | |
| ) | |
| results = vllm_handler.batch_completions( | |
| model=model, | |
| messages=batch_messages, | |
| custom_prompt_dict=litellm.custom_prompt_dict, | |
| optional_params=optional_params, | |
| ) | |
| # all non VLLM models for batch completion models | |
| else: | |
| def chunks(lst, n): | |
| """Yield successive n-sized chunks from lst.""" | |
| for i in range(0, len(lst), n): | |
| yield lst[i : i + n] | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| for sub_batch in chunks(batch_messages, 100): | |
| for message_list in sub_batch: | |
| kwargs_modified = args.copy() | |
| kwargs_modified.pop("max_workers") | |
| kwargs_modified["messages"] = message_list | |
| original_kwargs = {} | |
| if "kwargs" in kwargs_modified: | |
| original_kwargs = kwargs_modified.pop("kwargs") | |
| future = executor.submit( | |
| litellm.completion, **kwargs_modified, **original_kwargs | |
| ) | |
| completions.append(future) | |
| # Retrieve the results from the futures | |
| # results = [future.result() for future in completions] | |
| # return exceptions if any | |
| results = [] | |
| for future in completions: | |
| try: | |
| results.append(future.result()) | |
| except Exception as exc: | |
| results.append(exc) | |
| return results | |
| # send one request to multiple models | |
| # return as soon as one of the llms responds | |
| def batch_completion_models(*args, **kwargs): | |
| """ | |
| Send a request to multiple language models concurrently and return the response | |
| as soon as one of the models responds. | |
| Args: | |
| *args: Variable-length positional arguments passed to the completion function. | |
| **kwargs: Additional keyword arguments: | |
| - models (str or list of str): The language models to send requests to. | |
| - Other keyword arguments to be passed to the completion function. | |
| Returns: | |
| str or None: The response from one of the language models, or None if no response is received. | |
| Note: | |
| This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models. | |
| It sends requests concurrently and returns the response from the first model that responds. | |
| """ | |
| if "model" in kwargs: | |
| kwargs.pop("model") | |
| if "models" in kwargs: | |
| models = kwargs["models"] | |
| kwargs.pop("models") | |
| futures = {} | |
| with ThreadPoolExecutor(max_workers=len(models)) as executor: | |
| for model in models: | |
| futures[model] = executor.submit( | |
| litellm.completion, *args, model=model, **kwargs | |
| ) | |
| for model, future in sorted( | |
| futures.items(), key=lambda x: models.index(x[0]) | |
| ): | |
| if future.result() is not None: | |
| return future.result() | |
| elif "deployments" in kwargs: | |
| deployments = kwargs["deployments"] | |
| kwargs.pop("deployments") | |
| kwargs.pop("model_list") | |
| nested_kwargs = kwargs.pop("kwargs", {}) | |
| futures = {} | |
| with ThreadPoolExecutor(max_workers=len(deployments)) as executor: | |
| for deployment in deployments: | |
| for key in kwargs.keys(): | |
| if ( | |
| key not in deployment | |
| ): # don't override deployment values e.g. model name, api base, etc. | |
| deployment[key] = kwargs[key] | |
| kwargs = {**deployment, **nested_kwargs} | |
| futures[deployment["model"]] = executor.submit( | |
| litellm.completion, **kwargs | |
| ) | |
| while futures: | |
| # wait for the first returned future | |
| print_verbose("\n\n waiting for next result\n\n") | |
| done, _ = wait(futures.values(), return_when=FIRST_COMPLETED) | |
| print_verbose(f"done list\n{done}") | |
| for future in done: | |
| try: | |
| result = future.result() | |
| return result | |
| except Exception: | |
| # if model 1 fails, continue with response from model 2, model3 | |
| print_verbose( | |
| "\n\ngot an exception, ignoring, removing from futures" | |
| ) | |
| print_verbose(futures) | |
| new_futures = {} | |
| for key, value in futures.items(): | |
| if future == value: | |
| print_verbose(f"removing key{key}") | |
| continue | |
| else: | |
| new_futures[key] = value | |
| futures = new_futures | |
| print_verbose(f"new futures{futures}") | |
| continue | |
| print_verbose("\n\ndone looping through futures\n\n") | |
| print_verbose(futures) | |
| return None # If no response is received from any model | |
| def batch_completion_models_all_responses(*args, **kwargs): | |
| """ | |
| Send a request to multiple language models concurrently and return a list of responses | |
| from all models that respond. | |
| Args: | |
| *args: Variable-length positional arguments passed to the completion function. | |
| **kwargs: Additional keyword arguments: | |
| - models (str or list of str): The language models to send requests to. | |
| - Other keyword arguments to be passed to the completion function. | |
| Returns: | |
| list: A list of responses from the language models that responded. | |
| Note: | |
| This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models. | |
| It sends requests concurrently and collects responses from all models that respond. | |
| """ | |
| import concurrent.futures | |
| # ANSI escape codes for colored output | |
| if "model" in kwargs: | |
| kwargs.pop("model") | |
| if "models" in kwargs: | |
| models = kwargs["models"] | |
| kwargs.pop("models") | |
| else: | |
| raise Exception("'models' param not in kwargs") | |
| responses = [] | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(models)) as executor: | |
| for idx, model in enumerate(models): | |
| future = executor.submit(litellm.completion, *args, model=model, **kwargs) | |
| if future.result() is not None: | |
| responses.append(future.result()) | |
| return responses | |