Spaces:
Paused
Paused
| import importlib | |
| import json | |
| import os | |
| import random | |
| import subprocess | |
| import sys | |
| import urllib.parse as urlparse | |
| from typing import TYPE_CHECKING, Any, Optional, Union | |
| import click | |
| import httpx | |
| from dotenv import load_dotenv | |
| if TYPE_CHECKING: | |
| from fastapi import FastAPI | |
| else: | |
| FastAPI = Any | |
| sys.path.append(os.getcwd()) | |
| config_filename = "litellm.secrets" | |
| litellm_mode = os.getenv("LITELLM_MODE", "DEV") # "PRODUCTION", "DEV" | |
| if litellm_mode == "DEV": | |
| load_dotenv() | |
| from enum import Enum | |
| telemetry = None | |
| class LiteLLMDatabaseConnectionPool(Enum): | |
| database_connection_pool_limit = 10 | |
| database_connection_pool_timeout = 60 | |
| def append_query_params(url, params) -> str: | |
| from litellm._logging import verbose_proxy_logger | |
| verbose_proxy_logger.debug(f"url: {url}") | |
| verbose_proxy_logger.debug(f"params: {params}") | |
| parsed_url = urlparse.urlparse(url) | |
| parsed_query = urlparse.parse_qs(parsed_url.query) | |
| parsed_query.update(params) | |
| encoded_query = urlparse.urlencode(parsed_query, doseq=True) | |
| modified_url = urlparse.urlunparse(parsed_url._replace(query=encoded_query)) | |
| return modified_url # type: ignore | |
| class ProxyInitializationHelpers: | |
| def _echo_litellm_version(): | |
| pkg_version = importlib.metadata.version("litellm") # type: ignore | |
| click.echo(f"\nLiteLLM: Current Version = {pkg_version}\n") | |
| def _run_health_check(host, port): | |
| print("\nLiteLLM: Health Testing models in config") # noqa | |
| response = httpx.get(url=f"http://{host}:{port}/health") | |
| print(json.dumps(response.json(), indent=4)) # noqa | |
| def _run_test_chat_completion( | |
| host: str, | |
| port: int, | |
| model: str, | |
| test: Union[bool, str], | |
| ): | |
| request_model = model or "gpt-3.5-turbo" | |
| click.echo( | |
| f"\nLiteLLM: Making a test ChatCompletions request to your proxy. Model={request_model}" | |
| ) | |
| import openai | |
| api_base = f"http://{host}:{port}" | |
| if isinstance(test, str): | |
| api_base = test | |
| else: | |
| raise ValueError("Invalid test value") | |
| client = openai.OpenAI(api_key="My API Key", base_url=api_base) | |
| response = client.chat.completions.create( | |
| model=request_model, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": "this is a test request, write a short poem", | |
| } | |
| ], | |
| max_tokens=256, | |
| ) | |
| click.echo(f"\nLiteLLM: response from proxy {response}") | |
| print( # noqa | |
| f"\n LiteLLM: Making a test ChatCompletions + streaming r equest to proxy. Model={request_model}" | |
| ) | |
| stream_response = client.chat.completions.create( | |
| model=request_model, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": "this is a test request, write a short poem", | |
| } | |
| ], | |
| stream=True, | |
| ) | |
| for chunk in stream_response: | |
| click.echo(f"LiteLLM: streaming response from proxy {chunk}") | |
| print("\n making completion request to proxy") # noqa | |
| completion_response = client.completions.create( | |
| model=request_model, prompt="this is a test request, write a short poem" | |
| ) | |
| print(completion_response) # noqa | |
| def _get_default_unvicorn_init_args( | |
| host: str, | |
| port: int, | |
| log_config: Optional[str] = None, | |
| ) -> dict: | |
| """ | |
| Get the arguments for `uvicorn` worker | |
| """ | |
| import litellm | |
| uvicorn_args = { | |
| "app": "litellm.proxy.proxy_server:app", | |
| "host": host, | |
| "port": port, | |
| } | |
| if log_config is not None: | |
| print(f"Using log_config: {log_config}") # noqa | |
| uvicorn_args["log_config"] = log_config | |
| elif litellm.json_logs: | |
| print("Using json logs. Setting log_config to None.") # noqa | |
| uvicorn_args["log_config"] = None | |
| return uvicorn_args | |
| def _init_hypercorn_server( | |
| app: FastAPI, | |
| host: str, | |
| port: int, | |
| ssl_certfile_path: str, | |
| ssl_keyfile_path: str, | |
| ): | |
| """ | |
| Initialize litellm with `hypercorn` | |
| """ | |
| import asyncio | |
| from hypercorn.asyncio import serve | |
| from hypercorn.config import Config | |
| print( # noqa | |
| f"\033[1;32mLiteLLM Proxy: Starting server on {host}:{port} using Hypercorn\033[0m\n" # noqa | |
| ) # noqa | |
| config = Config() | |
| config.bind = [f"{host}:{port}"] | |
| if ssl_certfile_path is not None and ssl_keyfile_path is not None: | |
| print( # noqa | |
| f"\033[1;32mLiteLLM Proxy: Using SSL with certfile: {ssl_certfile_path} and keyfile: {ssl_keyfile_path}\033[0m\n" # noqa | |
| ) | |
| config.certfile = ssl_certfile_path | |
| config.keyfile = ssl_keyfile_path | |
| # hypercorn serve raises a type warning when passing a fast api app - even though fast API is a valid type | |
| asyncio.run(serve(app, config)) # type: ignore | |
| def _run_gunicorn_server( | |
| host: str, | |
| port: int, | |
| app: FastAPI, | |
| num_workers: int, | |
| ssl_certfile_path: str, | |
| ssl_keyfile_path: str, | |
| ): | |
| """ | |
| Run litellm with `gunicorn` | |
| """ | |
| if os.name == "nt": | |
| pass | |
| else: | |
| import gunicorn.app.base | |
| # Gunicorn Application Class | |
| class StandaloneApplication(gunicorn.app.base.BaseApplication): | |
| def __init__(self, app, options=None): | |
| self.options = options or {} # gunicorn options | |
| self.application = app # FastAPI app | |
| super().__init__() | |
| _endpoint_str = ( | |
| f"curl --location 'http://0.0.0.0:{port}/chat/completions' \\" | |
| ) | |
| curl_command = ( | |
| _endpoint_str | |
| + """ | |
| --header 'Content-Type: application/json' \\ | |
| --data ' { | |
| "model": "gpt-3.5-turbo", | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": "what llm are you" | |
| } | |
| ] | |
| }' | |
| \n | |
| """ | |
| ) | |
| print() # noqa | |
| print( # noqa | |
| '\033[1;34mLiteLLM: Test your local proxy with: "litellm --test" This runs an openai.ChatCompletion request to your proxy [In a new terminal tab]\033[0m\n' | |
| ) | |
| print( # noqa | |
| f"\033[1;34mLiteLLM: Curl Command Test for your local proxy\n {curl_command} \033[0m\n" | |
| ) | |
| print( # noqa | |
| "\033[1;34mDocs: https://docs.litellm.ai/docs/simple_proxy\033[0m\n" | |
| ) # noqa | |
| print( # noqa | |
| f"\033[1;34mSee all Router/Swagger docs on http://0.0.0.0:{port} \033[0m\n" | |
| ) # noqa | |
| def load_config(self): | |
| # note: This Loads the gunicorn config - has nothing to do with LiteLLM Proxy config | |
| if self.cfg is not None: | |
| config = { | |
| key: value | |
| for key, value in self.options.items() | |
| if key in self.cfg.settings and value is not None | |
| } | |
| else: | |
| config = {} | |
| for key, value in config.items(): | |
| if self.cfg is not None: | |
| self.cfg.set(key.lower(), value) | |
| def load(self): | |
| # gunicorn app function | |
| return self.application | |
| print( # noqa | |
| f"\033[1;32mLiteLLM Proxy: Starting server on {host}:{port} with {num_workers} workers\033[0m\n" # noqa | |
| ) | |
| gunicorn_options = { | |
| "bind": f"{host}:{port}", | |
| "workers": num_workers, # default is 1 | |
| "worker_class": "uvicorn.workers.UvicornWorker", | |
| "preload": True, # Add the preload flag, | |
| "accesslog": "-", # Log to stdout | |
| "timeout": 600, # default to very high number, bedrock/anthropic.claude-v2:1 can take 30+ seconds for the 1st chunk to come in | |
| "access_log_format": '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s', | |
| } | |
| if ssl_certfile_path is not None and ssl_keyfile_path is not None: | |
| print( # noqa | |
| f"\033[1;32mLiteLLM Proxy: Using SSL with certfile: {ssl_certfile_path} and keyfile: {ssl_keyfile_path}\033[0m\n" # noqa | |
| ) | |
| gunicorn_options["certfile"] = ssl_certfile_path | |
| gunicorn_options["keyfile"] = ssl_keyfile_path | |
| StandaloneApplication(app=app, options=gunicorn_options).run() # Run gunicorn | |
| def _run_ollama_serve(): | |
| try: | |
| command = ["ollama", "serve"] | |
| with open(os.devnull, "w") as devnull: | |
| subprocess.Popen(command, stdout=devnull, stderr=devnull) | |
| except Exception as e: | |
| print( # noqa | |
| f""" | |
| LiteLLM Warning: proxy started with `ollama` model\n`ollama serve` failed with Exception{e}. \nEnsure you run `ollama serve` | |
| """ | |
| ) # noqa | |
| def _is_port_in_use(port): | |
| import socket | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| return s.connect_ex(("localhost", port)) == 0 | |
| def _get_loop_type(): | |
| """Helper function to determine the event loop type based on platform""" | |
| if sys.platform in ("win32", "cygwin", "cli"): | |
| return None # Let uvicorn choose the default loop on Windows | |
| return "uvloop" | |
| def run_server( # noqa: PLR0915 | |
| host, | |
| port, | |
| api_base, | |
| api_version, | |
| model, | |
| alias, | |
| add_key, | |
| headers, | |
| save, | |
| debug, | |
| detailed_debug, | |
| temperature, | |
| max_tokens, | |
| request_timeout, | |
| drop_params, | |
| add_function_to_prompt, | |
| config, | |
| max_budget, | |
| telemetry, | |
| test, | |
| local, | |
| num_workers, | |
| test_async, | |
| iam_token_db_auth, | |
| num_requests, | |
| use_queue, | |
| health, | |
| version, | |
| run_gunicorn, | |
| run_hypercorn, | |
| ssl_keyfile_path, | |
| ssl_certfile_path, | |
| log_config, | |
| use_prisma_migrate, | |
| ): | |
| args = locals() | |
| if local: | |
| from proxy_server import ( | |
| KeyManagementSettings, | |
| ProxyConfig, | |
| app, | |
| save_worker_config, | |
| ) | |
| else: | |
| try: | |
| from .proxy_server import ( | |
| KeyManagementSettings, | |
| ProxyConfig, | |
| app, | |
| save_worker_config, | |
| ) | |
| except ImportError as e: | |
| if "litellm[proxy]" in str(e): | |
| # user is missing a proxy dependency, ask them to pip install litellm[proxy] | |
| raise e | |
| else: | |
| # this is just a local/relative import error, user git cloned litellm | |
| from proxy_server import ( | |
| KeyManagementSettings, | |
| ProxyConfig, | |
| app, | |
| save_worker_config, | |
| ) | |
| if version is True: | |
| ProxyInitializationHelpers._echo_litellm_version() | |
| return | |
| if model and "ollama" in model and api_base is None: | |
| ProxyInitializationHelpers._run_ollama_serve() | |
| if health is True: | |
| ProxyInitializationHelpers._run_health_check(host, port) | |
| return | |
| if test is True: | |
| ProxyInitializationHelpers._run_test_chat_completion(host, port, model, test) | |
| return | |
| else: | |
| if headers: | |
| headers = json.loads(headers) | |
| save_worker_config( | |
| model=model, | |
| alias=alias, | |
| api_base=api_base, | |
| api_version=api_version, | |
| debug=debug, | |
| detailed_debug=detailed_debug, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| request_timeout=request_timeout, | |
| max_budget=max_budget, | |
| telemetry=telemetry, | |
| drop_params=drop_params, | |
| add_function_to_prompt=add_function_to_prompt, | |
| headers=headers, | |
| save=save, | |
| config=config, | |
| use_queue=use_queue, | |
| ) | |
| try: | |
| import uvicorn | |
| except Exception: | |
| raise ImportError( | |
| "uvicorn, gunicorn needs to be imported. Run - `pip install 'litellm[proxy]'`" | |
| ) | |
| db_connection_pool_limit = 100 | |
| db_connection_timeout = 60 | |
| general_settings = {} | |
| ### GET DB TOKEN FOR IAM AUTH ### | |
| if iam_token_db_auth: | |
| from litellm.proxy.auth.rds_iam_token import generate_iam_auth_token | |
| db_host = os.getenv("DATABASE_HOST") | |
| db_port = os.getenv("DATABASE_PORT") | |
| db_user = os.getenv("DATABASE_USER") | |
| db_name = os.getenv("DATABASE_NAME") | |
| db_schema = os.getenv("DATABASE_SCHEMA") | |
| token = generate_iam_auth_token( | |
| db_host=db_host, db_port=db_port, db_user=db_user | |
| ) | |
| # print(f"token: {token}") | |
| _db_url = f"postgresql://{db_user}:{token}@{db_host}:{db_port}/{db_name}" | |
| if db_schema: | |
| _db_url += f"?schema={db_schema}" | |
| os.environ["DATABASE_URL"] = _db_url | |
| os.environ["IAM_TOKEN_DB_AUTH"] = "True" | |
| ### DECRYPT ENV VAR ### | |
| from litellm.secret_managers.aws_secret_manager import decrypt_env_var | |
| if ( | |
| os.getenv("USE_AWS_KMS", None) is not None | |
| and os.getenv("USE_AWS_KMS") == "True" | |
| ): | |
| ## V2 IMPLEMENTATION OF AWS KMS - USER WANTS TO DECRYPT MULTIPLE KEYS IN THEIR ENV | |
| new_env_var = decrypt_env_var() | |
| for k, v in new_env_var.items(): | |
| os.environ[k] = v | |
| if config is not None: | |
| """ | |
| Allow user to pass in db url via config | |
| read from there and save it to os.env['DATABASE_URL'] | |
| """ | |
| try: | |
| import asyncio | |
| except Exception: | |
| raise ImportError( | |
| "yaml needs to be imported. Run - `pip install 'litellm[proxy]'`" | |
| ) | |
| proxy_config = ProxyConfig() | |
| _config = asyncio.run(proxy_config.get_config(config_file_path=config)) | |
| ### LITELLM SETTINGS ### | |
| litellm_settings = _config.get("litellm_settings", None) | |
| if ( | |
| litellm_settings is not None | |
| and "json_logs" in litellm_settings | |
| and litellm_settings["json_logs"] is True | |
| ): | |
| import litellm | |
| litellm.json_logs = True | |
| litellm._turn_on_json() | |
| ### GENERAL SETTINGS ### | |
| general_settings = _config.get("general_settings", {}) | |
| if general_settings is None: | |
| general_settings = {} | |
| if general_settings: | |
| ### LOAD SECRET MANAGER ### | |
| key_management_system = general_settings.get( | |
| "key_management_system", None | |
| ) | |
| proxy_config.initialize_secret_manager(key_management_system) | |
| key_management_settings = general_settings.get( | |
| "key_management_settings", None | |
| ) | |
| if key_management_settings is not None: | |
| import litellm | |
| litellm._key_management_settings = KeyManagementSettings( | |
| **key_management_settings | |
| ) | |
| database_url = general_settings.get("database_url", None) | |
| if database_url is None: | |
| # Check if all required variables are provided | |
| database_host = os.getenv("DATABASE_HOST") | |
| database_username = os.getenv("DATABASE_USERNAME") | |
| database_password = os.getenv("DATABASE_PASSWORD") | |
| database_name = os.getenv("DATABASE_NAME") | |
| if ( | |
| database_host | |
| and database_username | |
| and database_password | |
| and database_name | |
| ): | |
| # Construct DATABASE_URL from the provided variables | |
| database_url = f"postgresql://{database_username}:{database_password}@{database_host}/{database_name}" | |
| os.environ["DATABASE_URL"] = database_url | |
| db_connection_pool_limit = general_settings.get( | |
| "database_connection_pool_limit", | |
| LiteLLMDatabaseConnectionPool.database_connection_pool_limit.value, | |
| ) | |
| db_connection_timeout = general_settings.get( | |
| "database_connection_pool_timeout", | |
| LiteLLMDatabaseConnectionPool.database_connection_pool_timeout.value, | |
| ) | |
| if database_url and database_url.startswith("os.environ/"): | |
| original_dir = os.getcwd() | |
| # set the working directory to where this script is | |
| sys.path.insert( | |
| 0, os.path.abspath("../..") | |
| ) # Adds the parent directory to the system path - for litellm local dev | |
| import litellm | |
| from litellm import get_secret_str | |
| database_url = get_secret_str(database_url, default_value=None) | |
| os.chdir(original_dir) | |
| if database_url is not None and isinstance(database_url, str): | |
| os.environ["DATABASE_URL"] = database_url | |
| if ( | |
| os.getenv("DATABASE_URL", None) is not None | |
| or os.getenv("DIRECT_URL", None) is not None | |
| ): | |
| try: | |
| from litellm.secret_managers.main import get_secret | |
| if os.getenv("DATABASE_URL", None) is not None: | |
| ### add connection pool + pool timeout args | |
| params = { | |
| "connection_limit": db_connection_pool_limit, | |
| "pool_timeout": db_connection_timeout, | |
| } | |
| database_url = get_secret("DATABASE_URL", default_value=None) | |
| modified_url = append_query_params(database_url, params) | |
| os.environ["DATABASE_URL"] = modified_url | |
| if os.getenv("DIRECT_URL", None) is not None: | |
| ### add connection pool + pool timeout args | |
| params = { | |
| "connection_limit": db_connection_pool_limit, | |
| "pool_timeout": db_connection_timeout, | |
| } | |
| database_url = os.getenv("DIRECT_URL") | |
| modified_url = append_query_params(database_url, params) | |
| os.environ["DIRECT_URL"] = modified_url | |
| ### | |
| subprocess.run(["prisma"], capture_output=True) | |
| is_prisma_runnable = True | |
| except FileNotFoundError: | |
| is_prisma_runnable = False | |
| if is_prisma_runnable: | |
| from litellm.proxy.db.check_migration import check_prisma_schema_diff | |
| from litellm.proxy.db.prisma_client import ( | |
| PrismaManager, | |
| should_update_prisma_schema, | |
| ) | |
| if ( | |
| should_update_prisma_schema( | |
| general_settings.get("disable_prisma_schema_update") | |
| ) | |
| is False | |
| ): | |
| check_prisma_schema_diff(db_url=None) | |
| else: | |
| PrismaManager.setup_database(use_migrate=use_prisma_migrate) | |
| else: | |
| print( # noqa | |
| f"Unable to connect to DB. DATABASE_URL found in environment, but prisma package not found." # noqa | |
| ) | |
| if port == 4000 and ProxyInitializationHelpers._is_port_in_use(port): | |
| port = random.randint(1024, 49152) | |
| import litellm | |
| if detailed_debug is True: | |
| litellm._turn_on_debug() | |
| # DO NOT DELETE - enables global variables to work across files | |
| from litellm.proxy.proxy_server import app # noqa | |
| uvicorn_args = ProxyInitializationHelpers._get_default_unvicorn_init_args( | |
| host=host, | |
| port=port, | |
| log_config=log_config, | |
| ) | |
| if run_gunicorn is False and run_hypercorn is False: | |
| if ssl_certfile_path is not None and ssl_keyfile_path is not None: | |
| print( # noqa | |
| f"\033[1;32mLiteLLM Proxy: Using SSL with certfile: {ssl_certfile_path} and keyfile: {ssl_keyfile_path}\033[0m\n" # noqa | |
| ) | |
| uvicorn_args["ssl_keyfile"] = ssl_keyfile_path | |
| uvicorn_args["ssl_certfile"] = ssl_certfile_path | |
| loop_type = ProxyInitializationHelpers._get_loop_type() | |
| if loop_type: | |
| uvicorn_args["loop"] = loop_type | |
| uvicorn.run( | |
| **uvicorn_args, | |
| workers=num_workers, | |
| ) | |
| elif run_gunicorn is True: | |
| ProxyInitializationHelpers._run_gunicorn_server( | |
| host=host, | |
| port=port, | |
| app=app, | |
| num_workers=num_workers, | |
| ssl_certfile_path=ssl_certfile_path, | |
| ssl_keyfile_path=ssl_keyfile_path, | |
| ) | |
| elif run_hypercorn is True: | |
| ProxyInitializationHelpers._init_hypercorn_server( | |
| app=app, | |
| host=host, | |
| port=port, | |
| ssl_certfile_path=ssl_certfile_path, | |
| ssl_keyfile_path=ssl_keyfile_path, | |
| ) | |
| if __name__ == "__main__": | |
| run_server() | |