| import click |
| import subprocess, traceback, json |
| import os, sys |
| import random |
| import importlib |
|
|
| def run_ollama_serve(): |
| try: |
| command = ["ollama", "serve"] |
|
|
| with open(os.devnull, "w") as devnull: |
| process = subprocess.Popen(command, stdout=devnull, stderr=devnull) |
| except Exception as e: |
| print( |
| f""" |
| LiteLLM Warning: proxy started with `ollama` model\n`ollama serve` failed with Exception{e}. \nEnsure you run `ollama serve` |
| """ |
| ) |
|
|
| def is_port_in_use(port): |
| import socket |
|
|
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| return s.connect_ex(("localhost", port)) == 0 |
|
|
| def run_server( |
| host = "0.0.0.0", |
| port = 8000, |
| api_base = None, |
| api_version = "2023-07-01-preview", |
| model = None, |
| alias = None, |
| add_key = None, |
| headers = None, |
| save = False, |
| debug = False, |
| detailed_debug = False, |
| temperature = 0.0, |
| max_tokens = 1000, |
| request_timeout = 10, |
| drop_params = True, |
| add_function_to_prompt = True, |
| config = None, |
| max_budget = 100, |
| telemetry = False, |
| test = False, |
| local = False, |
| num_workers = 1, |
| test_async = False, |
| num_requests = 1, |
| use_queue = False, |
| health = False, |
| version = False, |
| ): |
| global feature_telemetry |
| args = locals() |
| if local: |
| from proxy_server import app, save_worker_config, usage_telemetry |
| else: |
| try: |
| from .litellm.proxy.proxy_server import app, save_worker_config, usage_telemetry |
| except ImportError as e: |
| if "litellm[proxy]" in str(e): |
| |
| raise e |
| else: |
| |
| from proxy_server import app, save_worker_config, usage_telemetry |
| feature_telemetry = usage_telemetry |
| if version == True: |
| pkg_version = importlib.metadata.version("litellm") |
| click.echo(f"\nLiteLLM: Current Version = {pkg_version}\n") |
| return |
| if model and "ollama" in model and api_base is None: |
| run_ollama_serve() |
| if test_async is True: |
| import requests, concurrent, time |
|
|
| api_base = f"http://{host}:{port}" |
|
|
| def _make_openai_completion(): |
| data = { |
| "model": "gpt-3.5-turbo", |
| "messages": [ |
| {"role": "user", "content": "Write a short poem about the moon"} |
| ], |
| } |
|
|
| response = requests.post("http://0.0.0.0:8000/queue/request", json=data) |
|
|
| response = response.json() |
|
|
| while True: |
| try: |
| url = response["url"] |
| polling_url = f"{api_base}{url}" |
| polling_response = requests.get(polling_url) |
| polling_response = polling_response.json() |
| print("\n RESPONSE FROM POLLING JOB", polling_response) |
| status = polling_response["status"] |
| if status == "finished": |
| llm_response = polling_response["result"] |
| break |
| print( |
| f"POLLING JOB{polling_url}\nSTATUS: {status}, \n Response {polling_response}" |
| ) |
| time.sleep(0.5) |
| except Exception as e: |
| print("got exception in polling", e) |
| break |
|
|
| |
| concurrent_calls = num_requests |
|
|
| |
| futures = [] |
| start_time = time.time() |
| |
| with concurrent.futures.ThreadPoolExecutor( |
| max_workers=concurrent_calls |
| ) as executor: |
| for _ in range(concurrent_calls): |
| futures.append(executor.submit(_make_openai_completion)) |
|
|
| |
| concurrent.futures.wait(futures) |
|
|
| |
| successful_calls = 0 |
| failed_calls = 0 |
|
|
| for future in futures: |
| if future.done(): |
| if future.result() is not None: |
| successful_calls += 1 |
| else: |
| failed_calls += 1 |
| end_time = time.time() |
| print(f"Elapsed Time: {end_time-start_time}") |
| print(f"Load test Summary:") |
| print(f"Total Requests: {concurrent_calls}") |
| print(f"Successful Calls: {successful_calls}") |
| print(f"Failed Calls: {failed_calls}") |
| return |
| if health != False: |
| import requests |
|
|
| print("\nLiteLLM: Health Testing models in config") |
| response = requests.get(url=f"http://{host}:{port}/health") |
| print(json.dumps(response.json(), indent=4)) |
| return |
| if test != False: |
| request_model = model or "gpt-3.5-turbo" |
| click.echo( |
| f"\nLiteLLM: Making a test ChatCompletions request to your proxy. Model={request_model}" |
| ) |
| import openai |
|
|
| if test == True: |
| api_base = f"http://{host}:{port}" |
| else: |
| api_base = test |
| client = openai.OpenAI(api_key="My API Key", base_url=api_base) |
|
|
| response = client.chat.completions.create( |
| model=request_model, |
| messages=[ |
| { |
| "role": "user", |
| "content": "this is a test request, write a short poem", |
| } |
| ], |
| max_tokens=256, |
| ) |
| click.echo(f"\nLiteLLM: response from proxy {response}") |
|
|
| print( |
| f"\n LiteLLM: Making a test ChatCompletions + streaming request to proxy. Model={request_model}" |
| ) |
|
|
| response = client.chat.completions.create( |
| model=request_model, |
| messages=[ |
| { |
| "role": "user", |
| "content": "this is a test request, write a short poem", |
| } |
| ], |
| stream=True, |
| ) |
| for chunk in response: |
| click.echo(f"LiteLLM: streaming response from proxy {chunk}") |
| print("\n making completion request to proxy") |
| response = client.completions.create( |
| model=request_model, prompt="this is a test request, write a short poem" |
| ) |
| print(response) |
|
|
| return |
| else: |
| if headers: |
| headers = json.loads(headers) |
| save_worker_config( |
| model=model, |
| alias=alias, |
| api_base=api_base, |
| api_version=api_version, |
| debug=debug, |
| detailed_debug=detailed_debug, |
| temperature=temperature, |
| max_tokens=max_tokens, |
| request_timeout=request_timeout, |
| max_budget=max_budget, |
| telemetry=telemetry, |
| drop_params=drop_params, |
| add_function_to_prompt=add_function_to_prompt, |
| headers=headers, |
| save=save, |
| config=config, |
| use_queue=use_queue, |
| ) |
| try: |
| import uvicorn |
|
|
| if os.name == "nt": |
| pass |
| else: |
| import gunicorn.app.base |
| except: |
| raise ImportError( |
| "Uvicorn, gunicorn needs to be imported. Run - `pip 'litellm[proxy]'`" |
| ) |
|
|
| if config is not None: |
| """ |
| Allow user to pass in db url via config |
| |
| read from there and save it to os.env['DATABASE_URL'] |
| """ |
| try: |
| import yaml |
| except: |
| raise ImportError( |
| "yaml needs to be imported. Run - `pip install 'litellm[proxy]'`" |
| ) |
|
|
| if os.path.exists(config): |
| with open(config, "r") as config_file: |
| config = yaml.safe_load(config_file) |
| general_settings = config.get("general_settings", {}) |
| database_url = general_settings.get("database_url", None) |
| if database_url and database_url.startswith("os.environ/"): |
| original_dir = os.getcwd() |
| |
| sys.path.insert( |
| 0, os.path.abspath("../..") |
| ) |
| import litellm |
|
|
| database_url = litellm.get_secret(database_url) |
| os.chdir(original_dir) |
| if database_url is not None and isinstance(database_url, str): |
| os.environ["DATABASE_URL"] = database_url |
|
|
| if os.getenv("DATABASE_URL", None) is not None: |
| try: |
| subprocess.run(["prisma"], capture_output=True) |
| is_prisma_runnable = True |
| except FileNotFoundError: |
| is_prisma_runnable = False |
|
|
| if is_prisma_runnable: |
| |
| |
| original_dir = os.getcwd() |
| |
| abspath = os.path.abspath(__file__) |
| dname = os.path.dirname(abspath) |
| os.chdir(dname) |
| try: |
| subprocess.run( |
| ["prisma", "db", "push", "--accept-data-loss"] |
| ) |
| finally: |
| os.chdir(original_dir) |
| else: |
| print( |
| f"Unable to connect to DB. DATABASE_URL found in environment, but prisma package not found." |
| ) |
| if port == 8000 and is_port_in_use(port): |
| port = random.randint(1024, 49152) |
| from litellm.proxy.proxy_server import app |
|
|
| uvicorn.run(app, host=host, port=port) |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| if __name__ == "__main__": |
| run_server() |
|
|