| # #### What this tests #### | |
| # # This tests the cost tracking function works with consecutive calls (~10 consecutive calls) | |
| # import sys, os, asyncio | |
| # import traceback | |
| # import pytest | |
| # sys.path.insert( | |
| # 0, os.path.abspath("../..") | |
| # ) # Adds the parent directory to the system path | |
| # import dotenv | |
| # dotenv.load_dotenv() | |
| # import litellm | |
| # from fastapi.testclient import TestClient | |
| # from fastapi import FastAPI | |
| # from litellm.proxy.proxy_server import router, save_worker_config, startup_event # Replace with the actual module where your FastAPI router is defined | |
| # filepath = os.path.dirname(os.path.abspath(__file__)) | |
| # config_fp = f"{filepath}/test_config.yaml" | |
| # save_worker_config(config=config_fp, model=None, alias=None, api_base=None, api_version=None, debug=True, temperature=None, max_tokens=None, request_timeout=600, max_budget=None, telemetry=False, drop_params=True, add_function_to_prompt=False, headers=None, save=False, use_queue=False) | |
| # app = FastAPI() | |
| # app.include_router(router) # Include your router in the test app | |
| # @app.on_event("startup") | |
| # async def wrapper_startup_event(): | |
| # await startup_event() | |
| # # Here you create a fixture that will be used by your tests | |
| # # Make sure the fixture returns TestClient(app) | |
| # @pytest.fixture(autouse=True) | |
| # def client(): | |
| # with TestClient(app) as client: | |
| # yield client | |
| # @pytest.mark.asyncio | |
| # async def test_proxy_cost_tracking(client): | |
| # """ | |
| # Get min cost. | |
| # Create new key. | |
| # Run 10 parallel calls. | |
| # Check cost for key at the end. | |
| # assert it's > min cost. | |
| # """ | |
| # model = "gpt-3.5-turbo" | |
| # messages = [{"role": "user", "content": "Hey, how's it going?"}] | |
| # number_of_calls = 1 | |
| # min_cost = litellm.completion_cost(model=model, messages=messages) * number_of_calls | |
| # try: | |
| # ### CREATE NEW KEY ### | |
| # test_data = { | |
| # "models": ["azure-model"], | |
| # } | |
| # # Your bearer token | |
| # token = os.getenv("PROXY_MASTER_KEY") | |
| # headers = { | |
| # "Authorization": f"Bearer {token}" | |
| # } | |
| # create_new_key = client.post("/key/generate", json=test_data, headers=headers) | |
| # key = create_new_key.json()["key"] | |
| # print(f"received key: {key}") | |
| # ### MAKE PARALLEL CALLS ### | |
| # async def test_chat_completions(): | |
| # # Your test data | |
| # test_data = { | |
| # "model": "azure-model", | |
| # "messages": messages | |
| # } | |
| # tmp_headers = { | |
| # "Authorization": f"Bearer {key}" | |
| # } | |
| # response = client.post("/v1/chat/completions", json=test_data, headers=tmp_headers) | |
| # assert response.status_code == 200 | |
| # result = response.json() | |
| # print(f"Received response: {result}") | |
| # tasks = [test_chat_completions() for _ in range(number_of_calls)] | |
| # chat_completions = await asyncio.gather(*tasks) | |
| # ### CHECK SPEND ### | |
| # get_key_spend = client.get(f"/key/info?key={key}", headers=headers) | |
| # assert get_key_spend.json()["info"]["spend"] > min_cost | |
| # # print(f"chat_completions: {chat_completions}") | |
| # # except Exception as e: | |
| # # pytest.fail(f"LiteLLM Proxy test failed. Exception - {str(e)}") | |
| # #### JUST TEST LOCAL PROXY SERVER | |
| # import requests, os | |
| # from concurrent.futures import ThreadPoolExecutor | |
| # import dotenv | |
| # dotenv.load_dotenv() | |
| # api_url = "http://0.0.0.0:8000/chat/completions" | |
| # def make_api_call(api_url): | |
| # # Your test data | |
| # test_data = { | |
| # "model": "azure-model", | |
| # "messages": [ | |
| # { | |
| # "role": "user", | |
| # "content": "hi" | |
| # }, | |
| # ], | |
| # "max_tokens": 10, | |
| # } | |
| # # Your bearer token | |
| # token = os.getenv("PROXY_MASTER_KEY") | |
| # headers = { | |
| # "Authorization": f"Bearer {token}" | |
| # } | |
| # print("testing proxy server") | |
| # response = requests.post(api_url, json=test_data, headers=headers) | |
| # return response.json() | |
| # # Number of parallel API calls | |
| # num_parallel_calls = 3 | |
| # # List to store results | |
| # results = [] | |
| # # Create a ThreadPoolExecutor | |
| # with ThreadPoolExecutor() as executor: | |
| # # Submit the API calls concurrently | |
| # futures = [executor.submit(make_api_call, api_url) for _ in range(num_parallel_calls)] | |
| # # Gather the results as they become available | |
| # for future in futures: | |
| # try: | |
| # result = future.result() | |
| # results.append(result) | |
| # except Exception as e: | |
| # print(f"Error: {e}") | |
| # # Print the results | |
| # for idx, result in enumerate(results, start=1): | |
| # print(f"Result {idx}: {result}") | |