| # test time it takes to make 100 concurrent embedding requests to OpenaI | |
| import sys, os | |
| import traceback | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import os, io | |
| sys.path.insert( | |
| 0, os.path.abspath("../..") | |
| ) # Adds the parent directory to the system path | |
| import pytest | |
| import litellm | |
| litellm.set_verbose=False | |
| question = "embed this very long text" * 100 | |
| # make X concurrent calls to litellm.completion(model=gpt-35-turbo, messages=[]), pick a random question in questions array. | |
| # Allow me to tune X concurrent calls.. Log question, output/exception, response time somewhere | |
| # show me a summary of requests made, success full calls, failed calls. For failed calls show me the exceptions | |
| import concurrent.futures | |
| import random | |
| import time | |
| # Function to make concurrent calls to OpenAI API | |
| def make_openai_completion(question): | |
| try: | |
| start_time = time.time() | |
| import openai | |
| client = openai.OpenAI(api_key=os.environ['OPENAI_API_KEY'], base_url="http://0.0.0.0:8000") #base_url="http://0.0.0.0:8000", | |
| response = client.embeddings.create( | |
| model="text-embedding-ada-002", | |
| input=[question], | |
| ) | |
| print(response) | |
| end_time = time.time() | |
| # Log the request details | |
| # with open("request_log.txt", "a") as log_file: | |
| # log_file.write( | |
| # f"Question: {question[:100]}\nResponse ID:{response.id} Content:{response.choices[0].message.content[:10]}\nTime: {end_time - start_time:.2f} seconds\n\n" | |
| # ) | |
| return response | |
| except Exception as e: | |
| # Log exceptions for failed calls | |
| # with open("error_log.txt", "a") as error_log_file: | |
| # error_log_file.write( | |
| # f"\nException: {str(e)}\n\n" | |
| # ) | |
| return None | |
| start_time = time.time() | |
| # Number of concurrent calls (you can adjust this) | |
| concurrent_calls = 500 | |
| # List to store the futures of concurrent calls | |
| futures = [] | |
| # Make concurrent calls | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_calls) as executor: | |
| for _ in range(concurrent_calls): | |
| futures.append(executor.submit(make_openai_completion, question)) | |
| # Wait for all futures to complete | |
| concurrent.futures.wait(futures) | |
| # Summarize the results | |
| successful_calls = 0 | |
| failed_calls = 0 | |
| for future in futures: | |
| if future.result() is not None: | |
| successful_calls += 1 | |
| else: | |
| failed_calls += 1 | |
| end_time = time.time() | |
| # Calculate the duration | |
| duration = end_time - start_time | |
| print(f"Load test Summary:") | |
| print(f"Total Requests: {concurrent_calls}") | |
| print(f"Successful Calls: {successful_calls}") | |
| print(f"Failed Calls: {failed_calls}") | |
| print(f"Total Time: {duration:.2f} seconds") | |
| # # Display content of the logs | |
| # with open("request_log.txt", "r") as log_file: | |
| # print("\nRequest Log:\n", log_file.read()) | |
| # with open("error_log.txt", "r") as error_log_file: | |
| # print("\nError Log:\n", error_log_file.read()) | |