Spaces:
Paused
Paused
| ### What this tests #### | |
| import sys, os, time, inspect, asyncio, traceback | |
| import pytest | |
| sys.path.insert(0, os.path.abspath('../..')) | |
| from litellm import completion, embedding | |
| import litellm | |
| from litellm.integrations.custom_logger import CustomLogger | |
| class MyCustomHandler(CustomLogger): | |
| complete_streaming_response_in_callback = "" | |
| def __init__(self): | |
| self.success: bool = False # type: ignore | |
| self.failure: bool = False # type: ignore | |
| self.async_success: bool = False # type: ignore | |
| self.async_success_embedding: bool = False # type: ignore | |
| self.async_failure: bool = False # type: ignore | |
| self.async_failure_embedding: bool = False # type: ignore | |
| self.async_completion_kwargs = None # type: ignore | |
| self.async_embedding_kwargs = None # type: ignore | |
| self.async_embedding_response = None # type: ignore | |
| self.async_completion_kwargs_fail = None # type: ignore | |
| self.async_embedding_kwargs_fail = None # type: ignore | |
| self.stream_collected_response = None # type: ignore | |
| self.sync_stream_collected_response = None # type: ignore | |
| self.user = None # type: ignore | |
| self.data_sent_to_api: dict = {} | |
| def log_pre_api_call(self, model, messages, kwargs): | |
| print(f"Pre-API Call") | |
| self.data_sent_to_api = kwargs["additional_args"].get("complete_input_dict", {}) | |
| def log_post_api_call(self, kwargs, response_obj, start_time, end_time): | |
| print(f"Post-API Call") | |
| def log_stream_event(self, kwargs, response_obj, start_time, end_time): | |
| print(f"On Stream") | |
| def log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| print(f"On Success") | |
| self.success = True | |
| if kwargs.get("stream") == True: | |
| self.sync_stream_collected_response = response_obj | |
| def log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
| print(f"On Failure") | |
| self.failure = True | |
| async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| print(f"On Async success") | |
| print(f"received kwargs user: {kwargs['user']}") | |
| self.async_success = True | |
| if kwargs.get("model") == "text-embedding-ada-002": | |
| self.async_success_embedding = True | |
| self.async_embedding_kwargs = kwargs | |
| self.async_embedding_response = response_obj | |
| if kwargs.get("stream") == True: | |
| self.stream_collected_response = response_obj | |
| self.async_completion_kwargs = kwargs | |
| self.user = kwargs.get("user", None) | |
| async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
| print(f"On Async Failure") | |
| self.async_failure = True | |
| if kwargs.get("model") == "text-embedding-ada-002": | |
| self.async_failure_embedding = True | |
| self.async_embedding_kwargs_fail = kwargs | |
| self.async_completion_kwargs_fail = kwargs | |
| class TmpFunction: | |
| complete_streaming_response_in_callback = "" | |
| async_success: bool = False | |
| async def async_test_logging_fn(self, kwargs, completion_obj, start_time, end_time): | |
| print(f"ON ASYNC LOGGING") | |
| self.async_success = True | |
| print(f'kwargs.get("complete_streaming_response"): {kwargs.get("complete_streaming_response")}') | |
| self.complete_streaming_response_in_callback = kwargs.get("complete_streaming_response") | |
| def test_async_chat_openai_stream(): | |
| try: | |
| tmp_function = TmpFunction() | |
| # litellm.set_verbose = True | |
| litellm.success_callback = [tmp_function.async_test_logging_fn] | |
| complete_streaming_response = "" | |
| async def call_gpt(): | |
| nonlocal complete_streaming_response | |
| response = await litellm.acompletion(model="gpt-3.5-turbo", | |
| messages=[{ | |
| "role": "user", | |
| "content": "Hi 👋 - i'm openai" | |
| }], | |
| stream=True) | |
| async for chunk in response: | |
| complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" | |
| print(complete_streaming_response) | |
| asyncio.run(call_gpt()) | |
| complete_streaming_response = complete_streaming_response.strip("'") | |
| response1 = tmp_function.complete_streaming_response_in_callback["choices"][0]["message"]["content"] | |
| response2 = complete_streaming_response | |
| # assert [ord(c) for c in response1] == [ord(c) for c in response2] | |
| assert response1 == response2 | |
| assert tmp_function.async_success == True | |
| except Exception as e: | |
| print(e) | |
| pytest.fail(f"An error occurred - {str(e)}") | |
| # test_async_chat_openai_stream() | |
| def test_completion_azure_stream_moderation_failure(): | |
| try: | |
| customHandler = MyCustomHandler() | |
| litellm.callbacks = [customHandler] | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| { | |
| "role": "user", | |
| "content": "how do i kill someone", | |
| }, | |
| ] | |
| try: | |
| response = completion( | |
| model="azure/chatgpt-v-2", messages=messages, stream=True | |
| ) | |
| for chunk in response: | |
| print(f"chunk: {chunk}") | |
| continue | |
| except Exception as e: | |
| print(e) | |
| time.sleep(1) | |
| assert customHandler.failure == True | |
| except Exception as e: | |
| pytest.fail(f"Error occurred: {e}") | |
| def test_async_custom_handler_stream(): | |
| try: | |
| # [PROD Test] - Do not DELETE | |
| # checks if the model response available in the async + stream callbacks is equal to the received response | |
| customHandler2 = MyCustomHandler() | |
| litellm.callbacks = [customHandler2] | |
| litellm.set_verbose = False | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| { | |
| "role": "user", | |
| "content": "write 1 sentence about litellm being amazing", | |
| }, | |
| ] | |
| complete_streaming_response = "" | |
| async def test_1(): | |
| nonlocal complete_streaming_response | |
| response = await litellm.acompletion( | |
| model="azure/chatgpt-v-2", | |
| messages=messages, | |
| stream=True | |
| ) | |
| async for chunk in response: | |
| complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" | |
| print(complete_streaming_response) | |
| asyncio.run(test_1()) | |
| response_in_success_handler = customHandler2.stream_collected_response | |
| response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"] | |
| print("\n\n") | |
| print("response_in_success_handler: ", response_in_success_handler) | |
| print("complete_streaming_response: ", complete_streaming_response) | |
| assert response_in_success_handler == complete_streaming_response | |
| except Exception as e: | |
| pytest.fail(f"Error occurred: {e}") | |
| # test_async_custom_handler_stream() | |
| def test_azure_completion_stream(): | |
| # [PROD Test] - Do not DELETE | |
| # test if completion() + sync custom logger get the same complete stream response | |
| try: | |
| # checks if the model response available in the async + stream callbacks is equal to the received response | |
| customHandler2 = MyCustomHandler() | |
| litellm.callbacks = [customHandler2] | |
| litellm.set_verbose = False | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| { | |
| "role": "user", | |
| "content": "write 1 sentence about litellm being amazing", | |
| }, | |
| ] | |
| complete_streaming_response = "" | |
| response = litellm.completion( | |
| model="azure/chatgpt-v-2", | |
| messages=messages, | |
| stream=True | |
| ) | |
| for chunk in response: | |
| complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" | |
| print(complete_streaming_response) | |
| time.sleep(0.5) # wait 1/2 second before checking callbacks | |
| response_in_success_handler = customHandler2.sync_stream_collected_response | |
| response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"] | |
| print("\n\n") | |
| print("response_in_success_handler: ", response_in_success_handler) | |
| print("complete_streaming_response: ", complete_streaming_response) | |
| assert response_in_success_handler == complete_streaming_response | |
| except Exception as e: | |
| pytest.fail(f"Error occurred: {e}") | |
| async def test_async_custom_handler_completion(): | |
| try: | |
| customHandler_success = MyCustomHandler() | |
| customHandler_failure = MyCustomHandler() | |
| # success | |
| assert customHandler_success.async_success == False | |
| litellm.callbacks = [customHandler_success] | |
| response = await litellm.acompletion( | |
| model="gpt-3.5-turbo", | |
| messages=[{ | |
| "role": "user", | |
| "content": "hello from litellm test", | |
| }] | |
| ) | |
| await asyncio.sleep(1) | |
| assert customHandler_success.async_success == True, "async success is not set to True even after success" | |
| assert customHandler_success.async_completion_kwargs.get("model") == "gpt-3.5-turbo" | |
| # failure | |
| litellm.callbacks = [customHandler_failure] | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| { | |
| "role": "user", | |
| "content": "how do i kill someone", | |
| }, | |
| ] | |
| assert customHandler_failure.async_failure == False | |
| try: | |
| response = await litellm.acompletion( | |
| model="gpt-3.5-turbo", | |
| messages=messages, | |
| api_key="my-bad-key", | |
| ) | |
| except: | |
| pass | |
| assert customHandler_failure.async_failure == True, "async failure is not set to True even after failure" | |
| assert customHandler_failure.async_completion_kwargs_fail.get("model") == "gpt-3.5-turbo" | |
| assert len(str(customHandler_failure.async_completion_kwargs_fail.get("exception"))) > 10 # expect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119 | |
| litellm.callbacks = [] | |
| print("Passed setting async failure") | |
| except Exception as e: | |
| pytest.fail(f"An exception occurred - {str(e)}") | |
| # asyncio.run(test_async_custom_handler_completion()) | |
| async def test_async_custom_handler_embedding(): | |
| try: | |
| customHandler_embedding = MyCustomHandler() | |
| litellm.callbacks = [customHandler_embedding] | |
| # success | |
| assert customHandler_embedding.async_success_embedding == False | |
| response = await litellm.aembedding( | |
| model="text-embedding-ada-002", | |
| input = ["hello world"], | |
| ) | |
| await asyncio.sleep(1) | |
| assert customHandler_embedding.async_success_embedding == True, "async_success_embedding is not set to True even after success" | |
| assert customHandler_embedding.async_embedding_kwargs.get("model") == "text-embedding-ada-002" | |
| assert customHandler_embedding.async_embedding_response["usage"]["prompt_tokens"] ==2 | |
| print("Passed setting async success: Embedding") | |
| # failure | |
| assert customHandler_embedding.async_failure_embedding == False | |
| try: | |
| response = await litellm.aembedding( | |
| model="text-embedding-ada-002", | |
| input = ["hello world"], | |
| api_key="my-bad-key", | |
| ) | |
| except: | |
| pass | |
| assert customHandler_embedding.async_failure_embedding == True, "async failure embedding is not set to True even after failure" | |
| assert customHandler_embedding.async_embedding_kwargs_fail.get("model") == "text-embedding-ada-002" | |
| assert len(str(customHandler_embedding.async_embedding_kwargs_fail.get("exception"))) > 10 # exppect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119 | |
| except Exception as e: | |
| pytest.fail(f"An exception occurred - {str(e)}") | |
| # asyncio.run(test_async_custom_handler_embedding()) | |
| async def test_async_custom_handler_embedding_optional_param(): | |
| """ | |
| Tests if the openai optional params for embedding - user + encoding_format, | |
| are logged | |
| """ | |
| customHandler_optional_params = MyCustomHandler() | |
| litellm.callbacks = [customHandler_optional_params] | |
| response = await litellm.aembedding( | |
| model="azure/azure-embedding-model", | |
| input = ["hello world"], | |
| user = "John" | |
| ) | |
| await asyncio.sleep(1) # success callback is async | |
| assert customHandler_optional_params.user == "John" | |
| assert customHandler_optional_params.user == customHandler_optional_params.data_sent_to_api["user"] | |
| # asyncio.run(test_async_custom_handler_embedding_optional_param()) | |
| async def test_async_custom_handler_embedding_optional_param_bedrock(): | |
| """ | |
| Tests if the openai optional params for embedding - user + encoding_format, | |
| are logged | |
| but makes sure these are not sent to the non-openai/azure endpoint (raises errors). | |
| """ | |
| litellm.drop_params = True | |
| litellm.set_verbose = True | |
| customHandler_optional_params = MyCustomHandler() | |
| litellm.callbacks = [customHandler_optional_params] | |
| response = await litellm.aembedding( | |
| model="bedrock/amazon.titan-embed-text-v1", | |
| input = ["hello world"], | |
| user = "John" | |
| ) | |
| await asyncio.sleep(1) # success callback is async | |
| assert customHandler_optional_params.user == "John" | |
| assert "user" not in customHandler_optional_params.data_sent_to_api | |
| def test_redis_cache_completion_stream(): | |
| from litellm import Cache | |
| # Important Test - This tests if we can add to streaming cache, when custom callbacks are set | |
| import random | |
| try: | |
| print("\nrunning test_redis_cache_completion_stream") | |
| litellm.set_verbose = True | |
| random_number = random.randint(1, 100000) # add a random number to ensure it's always adding / reading from cache | |
| messages = [{"role": "user", "content": f"write a one sentence poem about: {random_number}"}] | |
| litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD']) | |
| print("test for caching, streaming + completion") | |
| response1 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True) | |
| response_1_content = "" | |
| for chunk in response1: | |
| print(chunk) | |
| response_1_content += chunk.choices[0].delta.content or "" | |
| print(response_1_content) | |
| time.sleep(0.1) # sleep for 0.1 seconds allow set cache to occur | |
| response2 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True) | |
| response_2_content = "" | |
| for chunk in response2: | |
| print(chunk) | |
| response_2_content += chunk.choices[0].delta.content or "" | |
| print("\nresponse 1", response_1_content) | |
| print("\nresponse 2", response_2_content) | |
| assert response_1_content == response_2_content, f"Response 1 != Response 2. Same params, Response 1{response_1_content} != Response 2{response_2_content}" | |
| litellm.success_callback = [] | |
| litellm._async_success_callback = [] | |
| litellm.cache = None | |
| except Exception as e: | |
| print(e) | |
| litellm.success_callback = [] | |
| raise e | |
| # test_redis_cache_completion_stream() |