Spaces:
Paused
Paused
| """ | |
| Calling + translation logic for anthropic's `/v1/messages` endpoint | |
| """ | |
| import copy | |
| import json | |
| from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast | |
| import httpx # type: ignore | |
| import litellm | |
| import litellm.litellm_core_utils | |
| import litellm.types | |
| import litellm.types.utils | |
| from litellm import LlmProviders | |
| from litellm.litellm_core_utils.core_helpers import map_finish_reason | |
| from litellm.llms.base_llm.chat.transformation import BaseConfig | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| AsyncHTTPHandler, | |
| HTTPHandler, | |
| get_async_httpx_client, | |
| ) | |
| from litellm.types.llms.anthropic import ( | |
| ContentBlockDelta, | |
| ContentBlockStart, | |
| ContentBlockStop, | |
| MessageBlockDelta, | |
| MessageStartBlock, | |
| UsageDelta, | |
| ) | |
| from litellm.types.llms.openai import ( | |
| ChatCompletionRedactedThinkingBlock, | |
| ChatCompletionThinkingBlock, | |
| ChatCompletionToolCallChunk, | |
| ) | |
| from litellm.types.utils import ( | |
| Delta, | |
| GenericStreamingChunk, | |
| ModelResponseStream, | |
| StreamingChoices, | |
| Usage, | |
| ) | |
| from litellm.utils import CustomStreamWrapper, ModelResponse, ProviderConfigManager | |
| from ...base import BaseLLM | |
| from ..common_utils import AnthropicError, process_anthropic_headers | |
| from .transformation import AnthropicConfig | |
| async def make_call( | |
| client: Optional[AsyncHTTPHandler], | |
| api_base: str, | |
| headers: dict, | |
| data: str, | |
| model: str, | |
| messages: list, | |
| logging_obj, | |
| timeout: Optional[Union[float, httpx.Timeout]], | |
| json_mode: bool, | |
| ) -> Tuple[Any, httpx.Headers]: | |
| if client is None: | |
| client = litellm.module_level_aclient | |
| try: | |
| response = await client.post( | |
| api_base, headers=headers, data=data, stream=True, timeout=timeout | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| error_headers = getattr(e, "headers", None) | |
| error_response = getattr(e, "response", None) | |
| if error_headers is None and error_response: | |
| error_headers = getattr(error_response, "headers", None) | |
| raise AnthropicError( | |
| status_code=e.response.status_code, | |
| message=await e.response.aread(), | |
| headers=error_headers, | |
| ) | |
| except Exception as e: | |
| for exception in litellm.LITELLM_EXCEPTION_TYPES: | |
| if isinstance(e, exception): | |
| raise e | |
| raise AnthropicError(status_code=500, message=str(e)) | |
| completion_stream = ModelResponseIterator( | |
| streaming_response=response.aiter_lines(), | |
| sync_stream=False, | |
| json_mode=json_mode, | |
| ) | |
| # LOGGING | |
| logging_obj.post_call( | |
| input=messages, | |
| api_key="", | |
| original_response=completion_stream, # Pass the completion stream for logging | |
| additional_args={"complete_input_dict": data}, | |
| ) | |
| return completion_stream, response.headers | |
| def make_sync_call( | |
| client: Optional[HTTPHandler], | |
| api_base: str, | |
| headers: dict, | |
| data: str, | |
| model: str, | |
| messages: list, | |
| logging_obj, | |
| timeout: Optional[Union[float, httpx.Timeout]], | |
| json_mode: bool, | |
| ) -> Tuple[Any, httpx.Headers]: | |
| if client is None: | |
| client = litellm.module_level_client # re-use a module level client | |
| try: | |
| response = client.post( | |
| api_base, headers=headers, data=data, stream=True, timeout=timeout | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| error_headers = getattr(e, "headers", None) | |
| error_response = getattr(e, "response", None) | |
| if error_headers is None and error_response: | |
| error_headers = getattr(error_response, "headers", None) | |
| raise AnthropicError( | |
| status_code=e.response.status_code, | |
| message=e.response.read(), | |
| headers=error_headers, | |
| ) | |
| except Exception as e: | |
| for exception in litellm.LITELLM_EXCEPTION_TYPES: | |
| if isinstance(e, exception): | |
| raise e | |
| raise AnthropicError(status_code=500, message=str(e)) | |
| if response.status_code != 200: | |
| response_headers = getattr(response, "headers", None) | |
| raise AnthropicError( | |
| status_code=response.status_code, | |
| message=response.read(), | |
| headers=response_headers, | |
| ) | |
| completion_stream = ModelResponseIterator( | |
| streaming_response=response.iter_lines(), sync_stream=True, json_mode=json_mode | |
| ) | |
| # LOGGING | |
| logging_obj.post_call( | |
| input=messages, | |
| api_key="", | |
| original_response="first stream response received", | |
| additional_args={"complete_input_dict": data}, | |
| ) | |
| return completion_stream, response.headers | |
| class AnthropicChatCompletion(BaseLLM): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| async def acompletion_stream_function( | |
| self, | |
| model: str, | |
| messages: list, | |
| api_base: str, | |
| custom_prompt_dict: dict, | |
| model_response: ModelResponse, | |
| print_verbose: Callable, | |
| timeout: Union[float, httpx.Timeout], | |
| client: Optional[AsyncHTTPHandler], | |
| encoding, | |
| api_key, | |
| logging_obj, | |
| stream, | |
| _is_function_call, | |
| data: dict, | |
| json_mode: bool, | |
| optional_params=None, | |
| litellm_params=None, | |
| logger_fn=None, | |
| headers={}, | |
| ): | |
| data["stream"] = True | |
| completion_stream, headers = await make_call( | |
| client=client, | |
| api_base=api_base, | |
| headers=headers, | |
| data=json.dumps(data), | |
| model=model, | |
| messages=messages, | |
| logging_obj=logging_obj, | |
| timeout=timeout, | |
| json_mode=json_mode, | |
| ) | |
| streamwrapper = CustomStreamWrapper( | |
| completion_stream=completion_stream, | |
| model=model, | |
| custom_llm_provider="anthropic", | |
| logging_obj=logging_obj, | |
| _response_headers=process_anthropic_headers(headers), | |
| ) | |
| return streamwrapper | |
| async def acompletion_function( | |
| self, | |
| model: str, | |
| messages: list, | |
| api_base: str, | |
| custom_prompt_dict: dict, | |
| model_response: ModelResponse, | |
| print_verbose: Callable, | |
| timeout: Union[float, httpx.Timeout], | |
| encoding, | |
| api_key, | |
| logging_obj, | |
| stream, | |
| _is_function_call, | |
| data: dict, | |
| optional_params: dict, | |
| json_mode: bool, | |
| litellm_params: dict, | |
| provider_config: BaseConfig, | |
| logger_fn=None, | |
| headers={}, | |
| client: Optional[AsyncHTTPHandler] = None, | |
| ) -> Union[ModelResponse, CustomStreamWrapper]: | |
| async_handler = client or get_async_httpx_client( | |
| llm_provider=litellm.LlmProviders.ANTHROPIC | |
| ) | |
| try: | |
| response = await async_handler.post( | |
| api_base, headers=headers, json=data, timeout=timeout | |
| ) | |
| except Exception as e: | |
| ## LOGGING | |
| logging_obj.post_call( | |
| input=messages, | |
| api_key=api_key, | |
| original_response=str(e), | |
| additional_args={"complete_input_dict": data}, | |
| ) | |
| status_code = getattr(e, "status_code", 500) | |
| error_headers = getattr(e, "headers", None) | |
| error_text = getattr(e, "text", str(e)) | |
| error_response = getattr(e, "response", None) | |
| if error_headers is None and error_response: | |
| error_headers = getattr(error_response, "headers", None) | |
| if error_response and hasattr(error_response, "text"): | |
| error_text = getattr(error_response, "text", error_text) | |
| raise AnthropicError( | |
| message=error_text, | |
| status_code=status_code, | |
| headers=error_headers, | |
| ) | |
| return provider_config.transform_response( | |
| model=model, | |
| raw_response=response, | |
| model_response=model_response, | |
| logging_obj=logging_obj, | |
| api_key=api_key, | |
| request_data=data, | |
| messages=messages, | |
| optional_params=optional_params, | |
| litellm_params=litellm_params, | |
| encoding=encoding, | |
| json_mode=json_mode, | |
| ) | |
| def completion( | |
| self, | |
| model: str, | |
| messages: list, | |
| api_base: str, | |
| custom_llm_provider: str, | |
| custom_prompt_dict: dict, | |
| model_response: ModelResponse, | |
| print_verbose: Callable, | |
| encoding, | |
| api_key, | |
| logging_obj, | |
| optional_params: dict, | |
| timeout: Union[float, httpx.Timeout], | |
| litellm_params: dict, | |
| acompletion=None, | |
| logger_fn=None, | |
| headers={}, | |
| client=None, | |
| ): | |
| optional_params = copy.deepcopy(optional_params) | |
| stream = optional_params.pop("stream", None) | |
| json_mode: bool = optional_params.pop("json_mode", False) | |
| is_vertex_request: bool = optional_params.pop("is_vertex_request", False) | |
| _is_function_call = False | |
| messages = copy.deepcopy(messages) | |
| headers = AnthropicConfig().validate_environment( | |
| api_key=api_key, | |
| headers=headers, | |
| model=model, | |
| messages=messages, | |
| optional_params={**optional_params, "is_vertex_request": is_vertex_request}, | |
| litellm_params=litellm_params, | |
| ) | |
| config = ProviderConfigManager.get_provider_chat_config( | |
| model=model, | |
| provider=LlmProviders(custom_llm_provider), | |
| ) | |
| if config is None: | |
| raise ValueError( | |
| f"Provider config not found for model: {model} and provider: {custom_llm_provider}" | |
| ) | |
| data = config.transform_request( | |
| model=model, | |
| messages=messages, | |
| optional_params=optional_params, | |
| litellm_params=litellm_params, | |
| headers=headers, | |
| ) | |
| ## LOGGING | |
| logging_obj.pre_call( | |
| input=messages, | |
| api_key=api_key, | |
| additional_args={ | |
| "complete_input_dict": data, | |
| "api_base": api_base, | |
| "headers": headers, | |
| }, | |
| ) | |
| print_verbose(f"_is_function_call: {_is_function_call}") | |
| if acompletion is True: | |
| if ( | |
| stream is True | |
| ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) | |
| print_verbose("makes async anthropic streaming POST request") | |
| data["stream"] = stream | |
| return self.acompletion_stream_function( | |
| model=model, | |
| messages=messages, | |
| data=data, | |
| api_base=api_base, | |
| custom_prompt_dict=custom_prompt_dict, | |
| model_response=model_response, | |
| print_verbose=print_verbose, | |
| encoding=encoding, | |
| api_key=api_key, | |
| logging_obj=logging_obj, | |
| optional_params=optional_params, | |
| stream=stream, | |
| _is_function_call=_is_function_call, | |
| json_mode=json_mode, | |
| litellm_params=litellm_params, | |
| logger_fn=logger_fn, | |
| headers=headers, | |
| timeout=timeout, | |
| client=( | |
| client | |
| if client is not None and isinstance(client, AsyncHTTPHandler) | |
| else None | |
| ), | |
| ) | |
| else: | |
| return self.acompletion_function( | |
| model=model, | |
| messages=messages, | |
| data=data, | |
| api_base=api_base, | |
| custom_prompt_dict=custom_prompt_dict, | |
| model_response=model_response, | |
| print_verbose=print_verbose, | |
| encoding=encoding, | |
| api_key=api_key, | |
| provider_config=config, | |
| logging_obj=logging_obj, | |
| optional_params=optional_params, | |
| stream=stream, | |
| _is_function_call=_is_function_call, | |
| litellm_params=litellm_params, | |
| logger_fn=logger_fn, | |
| headers=headers, | |
| client=client, | |
| json_mode=json_mode, | |
| timeout=timeout, | |
| ) | |
| else: | |
| ## COMPLETION CALL | |
| if ( | |
| stream is True | |
| ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) | |
| data["stream"] = stream | |
| completion_stream, headers = make_sync_call( | |
| client=client, | |
| api_base=api_base, | |
| headers=headers, # type: ignore | |
| data=json.dumps(data), | |
| model=model, | |
| messages=messages, | |
| logging_obj=logging_obj, | |
| timeout=timeout, | |
| json_mode=json_mode, | |
| ) | |
| return CustomStreamWrapper( | |
| completion_stream=completion_stream, | |
| model=model, | |
| custom_llm_provider="anthropic", | |
| logging_obj=logging_obj, | |
| _response_headers=process_anthropic_headers(headers), | |
| ) | |
| else: | |
| if client is None or not isinstance(client, HTTPHandler): | |
| client = HTTPHandler(timeout=timeout) # type: ignore | |
| else: | |
| client = client | |
| try: | |
| response = client.post( | |
| api_base, | |
| headers=headers, | |
| data=json.dumps(data), | |
| timeout=timeout, | |
| ) | |
| except Exception as e: | |
| status_code = getattr(e, "status_code", 500) | |
| error_headers = getattr(e, "headers", None) | |
| error_text = getattr(e, "text", str(e)) | |
| error_response = getattr(e, "response", None) | |
| if error_headers is None and error_response: | |
| error_headers = getattr(error_response, "headers", None) | |
| if error_response and hasattr(error_response, "text"): | |
| error_text = getattr(error_response, "text", error_text) | |
| raise AnthropicError( | |
| message=error_text, | |
| status_code=status_code, | |
| headers=error_headers, | |
| ) | |
| return config.transform_response( | |
| model=model, | |
| raw_response=response, | |
| model_response=model_response, | |
| logging_obj=logging_obj, | |
| api_key=api_key, | |
| request_data=data, | |
| messages=messages, | |
| optional_params=optional_params, | |
| litellm_params=litellm_params, | |
| encoding=encoding, | |
| json_mode=json_mode, | |
| ) | |
| def embedding(self): | |
| # logic for parsing in - calling - parsing out model embedding calls | |
| pass | |
| class ModelResponseIterator: | |
| def __init__( | |
| self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False | |
| ): | |
| self.streaming_response = streaming_response | |
| self.response_iterator = self.streaming_response | |
| self.content_blocks: List[ContentBlockDelta] = [] | |
| self.tool_index = -1 | |
| self.json_mode = json_mode | |
| def check_empty_tool_call_args(self) -> bool: | |
| """ | |
| Check if the tool call block so far has been an empty string | |
| """ | |
| args = "" | |
| # if text content block -> skip | |
| if len(self.content_blocks) == 0: | |
| return False | |
| if ( | |
| self.content_blocks[0]["delta"]["type"] == "text_delta" | |
| or self.content_blocks[0]["delta"]["type"] == "thinking_delta" | |
| ): | |
| return False | |
| for block in self.content_blocks: | |
| if block["delta"]["type"] == "input_json_delta": | |
| args += block["delta"].get("partial_json", "") # type: ignore | |
| if len(args) == 0: | |
| return True | |
| return False | |
| def _handle_usage(self, anthropic_usage_chunk: Union[dict, UsageDelta]) -> Usage: | |
| return AnthropicConfig().calculate_usage( | |
| usage_object=cast(dict, anthropic_usage_chunk), reasoning_content=None | |
| ) | |
| def _content_block_delta_helper( | |
| self, chunk: dict | |
| ) -> Tuple[ | |
| str, | |
| Optional[ChatCompletionToolCallChunk], | |
| List[Union[ChatCompletionThinkingBlock, ChatCompletionRedactedThinkingBlock]], | |
| Dict[str, Any], | |
| ]: | |
| """ | |
| Helper function to handle the content block delta | |
| """ | |
| text = "" | |
| tool_use: Optional[ChatCompletionToolCallChunk] = None | |
| provider_specific_fields = {} | |
| content_block = ContentBlockDelta(**chunk) # type: ignore | |
| thinking_blocks: List[ | |
| Union[ChatCompletionThinkingBlock, ChatCompletionRedactedThinkingBlock] | |
| ] = [] | |
| self.content_blocks.append(content_block) | |
| if "text" in content_block["delta"]: | |
| text = content_block["delta"]["text"] | |
| elif "partial_json" in content_block["delta"]: | |
| tool_use = { | |
| "id": None, | |
| "type": "function", | |
| "function": { | |
| "name": None, | |
| "arguments": content_block["delta"]["partial_json"], | |
| }, | |
| "index": self.tool_index, | |
| } | |
| elif "citation" in content_block["delta"]: | |
| provider_specific_fields["citation"] = content_block["delta"]["citation"] | |
| elif ( | |
| "thinking" in content_block["delta"] | |
| or "signature" in content_block["delta"] | |
| ): | |
| thinking_blocks = [ | |
| ChatCompletionThinkingBlock( | |
| type="thinking", | |
| thinking=content_block["delta"].get("thinking") or "", | |
| signature=content_block["delta"].get("signature"), | |
| ) | |
| ] | |
| provider_specific_fields["thinking_blocks"] = thinking_blocks | |
| return text, tool_use, thinking_blocks, provider_specific_fields | |
| def _handle_reasoning_content( | |
| self, | |
| thinking_blocks: List[ | |
| Union[ChatCompletionThinkingBlock, ChatCompletionRedactedThinkingBlock] | |
| ], | |
| ) -> Optional[str]: | |
| """ | |
| Handle the reasoning content | |
| """ | |
| reasoning_content = None | |
| for block in thinking_blocks: | |
| thinking_content = cast(Optional[str], block.get("thinking")) | |
| if reasoning_content is None: | |
| reasoning_content = "" | |
| if thinking_content is not None: | |
| reasoning_content += thinking_content | |
| return reasoning_content | |
| def chunk_parser(self, chunk: dict) -> ModelResponseStream: | |
| try: | |
| type_chunk = chunk.get("type", "") or "" | |
| text = "" | |
| tool_use: Optional[ChatCompletionToolCallChunk] = None | |
| finish_reason = "" | |
| usage: Optional[Usage] = None | |
| provider_specific_fields: Dict[str, Any] = {} | |
| reasoning_content: Optional[str] = None | |
| thinking_blocks: Optional[ | |
| List[ | |
| Union[ | |
| ChatCompletionThinkingBlock, ChatCompletionRedactedThinkingBlock | |
| ] | |
| ] | |
| ] = None | |
| index = int(chunk.get("index", 0)) | |
| if type_chunk == "content_block_delta": | |
| """ | |
| Anthropic content chunk | |
| chunk = {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Hello'}} | |
| """ | |
| ( | |
| text, | |
| tool_use, | |
| thinking_blocks, | |
| provider_specific_fields, | |
| ) = self._content_block_delta_helper(chunk=chunk) | |
| if thinking_blocks: | |
| reasoning_content = self._handle_reasoning_content( | |
| thinking_blocks=thinking_blocks | |
| ) | |
| elif type_chunk == "content_block_start": | |
| """ | |
| event: content_block_start | |
| data: {"type":"content_block_start","index":1,"content_block":{"type":"tool_use","id":"toolu_01T1x1fJ34qAmk2tNTrN7Up6","name":"get_weather","input":{}}} | |
| """ | |
| content_block_start = ContentBlockStart(**chunk) # type: ignore | |
| self.content_blocks = [] # reset content blocks when new block starts | |
| if content_block_start["content_block"]["type"] == "text": | |
| text = content_block_start["content_block"]["text"] | |
| elif content_block_start["content_block"]["type"] == "tool_use": | |
| self.tool_index += 1 | |
| tool_use = { | |
| "id": content_block_start["content_block"]["id"], | |
| "type": "function", | |
| "function": { | |
| "name": content_block_start["content_block"]["name"], | |
| "arguments": "", | |
| }, | |
| "index": self.tool_index, | |
| } | |
| elif ( | |
| content_block_start["content_block"]["type"] == "redacted_thinking" | |
| ): | |
| thinking_blocks = [ | |
| ChatCompletionRedactedThinkingBlock( | |
| type="redacted_thinking", | |
| data=content_block_start["content_block"]["data"], | |
| ) | |
| ] | |
| elif type_chunk == "content_block_stop": | |
| ContentBlockStop(**chunk) # type: ignore | |
| # check if tool call content block | |
| is_empty = self.check_empty_tool_call_args() | |
| if is_empty: | |
| tool_use = { | |
| "id": None, | |
| "type": "function", | |
| "function": { | |
| "name": None, | |
| "arguments": "{}", | |
| }, | |
| "index": self.tool_index, | |
| } | |
| elif type_chunk == "message_delta": | |
| """ | |
| Anthropic | |
| chunk = {'type': 'message_delta', 'delta': {'stop_reason': 'max_tokens', 'stop_sequence': None}, 'usage': {'output_tokens': 10}} | |
| """ | |
| # TODO - get usage from this chunk, set in response | |
| message_delta = MessageBlockDelta(**chunk) # type: ignore | |
| finish_reason = map_finish_reason( | |
| finish_reason=message_delta["delta"].get("stop_reason", "stop") | |
| or "stop" | |
| ) | |
| usage = self._handle_usage(anthropic_usage_chunk=message_delta["usage"]) | |
| elif type_chunk == "message_start": | |
| """ | |
| Anthropic | |
| chunk = { | |
| "type": "message_start", | |
| "message": { | |
| "id": "msg_vrtx_011PqREFEMzd3REdCoUFAmdG", | |
| "type": "message", | |
| "role": "assistant", | |
| "model": "claude-3-sonnet-20240229", | |
| "content": [], | |
| "stop_reason": null, | |
| "stop_sequence": null, | |
| "usage": { | |
| "input_tokens": 270, | |
| "output_tokens": 1 | |
| } | |
| } | |
| } | |
| """ | |
| message_start_block = MessageStartBlock(**chunk) # type: ignore | |
| if "usage" in message_start_block["message"]: | |
| usage = self._handle_usage( | |
| anthropic_usage_chunk=message_start_block["message"]["usage"] | |
| ) | |
| elif type_chunk == "error": | |
| """ | |
| {"type":"error","error":{"details":null,"type":"api_error","message":"Internal server error"} } | |
| """ | |
| _error_dict = chunk.get("error", {}) or {} | |
| message = _error_dict.get("message", None) or str(chunk) | |
| raise AnthropicError( | |
| message=message, | |
| status_code=500, # it looks like Anthropic API does not return a status code in the chunk error - default to 500 | |
| ) | |
| text, tool_use = self._handle_json_mode_chunk(text=text, tool_use=tool_use) | |
| returned_chunk = ModelResponseStream( | |
| choices=[ | |
| StreamingChoices( | |
| index=index, | |
| delta=Delta( | |
| content=text, | |
| tool_calls=[tool_use] if tool_use is not None else None, | |
| provider_specific_fields=( | |
| provider_specific_fields | |
| if provider_specific_fields | |
| else None | |
| ), | |
| thinking_blocks=( | |
| thinking_blocks if thinking_blocks else None | |
| ), | |
| reasoning_content=reasoning_content, | |
| ), | |
| finish_reason=finish_reason, | |
| ) | |
| ], | |
| usage=usage, | |
| ) | |
| return returned_chunk | |
| except json.JSONDecodeError: | |
| raise ValueError(f"Failed to decode JSON from chunk: {chunk}") | |
| def _handle_json_mode_chunk( | |
| self, text: str, tool_use: Optional[ChatCompletionToolCallChunk] | |
| ) -> Tuple[str, Optional[ChatCompletionToolCallChunk]]: | |
| """ | |
| If JSON mode is enabled, convert the tool call to a message. | |
| Anthropic returns the JSON schema as part of the tool call | |
| OpenAI returns the JSON schema as part of the content, this handles placing it in the content | |
| Args: | |
| text: str | |
| tool_use: Optional[ChatCompletionToolCallChunk] | |
| Returns: | |
| Tuple[str, Optional[ChatCompletionToolCallChunk]] | |
| text: The text to use in the content | |
| tool_use: The ChatCompletionToolCallChunk to use in the chunk response | |
| """ | |
| if self.json_mode is True and tool_use is not None: | |
| message = AnthropicConfig._convert_tool_response_to_message( | |
| tool_calls=[tool_use] | |
| ) | |
| if message is not None: | |
| text = message.content or "" | |
| tool_use = None | |
| return text, tool_use | |
| # Sync iterator | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| try: | |
| chunk = self.response_iterator.__next__() | |
| except StopIteration: | |
| raise StopIteration | |
| except ValueError as e: | |
| raise RuntimeError(f"Error receiving chunk from stream: {e}") | |
| try: | |
| str_line = chunk | |
| if isinstance(chunk, bytes): # Handle binary data | |
| str_line = chunk.decode("utf-8") # Convert bytes to string | |
| index = str_line.find("data:") | |
| if index != -1: | |
| str_line = str_line[index:] | |
| if str_line.startswith("data:"): | |
| data_json = json.loads(str_line[5:]) | |
| return self.chunk_parser(chunk=data_json) | |
| else: | |
| return GenericStreamingChunk( | |
| text="", | |
| is_finished=False, | |
| finish_reason="", | |
| usage=None, | |
| index=0, | |
| tool_use=None, | |
| ) | |
| except StopIteration: | |
| raise StopIteration | |
| except ValueError as e: | |
| raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}") | |
| # Async iterator | |
| def __aiter__(self): | |
| self.async_response_iterator = self.streaming_response.__aiter__() | |
| return self | |
| async def __anext__(self): | |
| try: | |
| chunk = await self.async_response_iterator.__anext__() | |
| except StopAsyncIteration: | |
| raise StopAsyncIteration | |
| except ValueError as e: | |
| raise RuntimeError(f"Error receiving chunk from stream: {e}") | |
| try: | |
| str_line = chunk | |
| if isinstance(chunk, bytes): # Handle binary data | |
| str_line = chunk.decode("utf-8") # Convert bytes to string | |
| index = str_line.find("data:") | |
| if index != -1: | |
| str_line = str_line[index:] | |
| if str_line.startswith("data:"): | |
| data_json = json.loads(str_line[5:]) | |
| return self.chunk_parser(chunk=data_json) | |
| else: | |
| return GenericStreamingChunk( | |
| text="", | |
| is_finished=False, | |
| finish_reason="", | |
| usage=None, | |
| index=0, | |
| tool_use=None, | |
| ) | |
| except StopAsyncIteration: | |
| raise StopAsyncIteration | |
| except ValueError as e: | |
| raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}") | |
| def convert_str_chunk_to_generic_chunk(self, chunk: str) -> ModelResponseStream: | |
| """ | |
| Convert a string chunk to a GenericStreamingChunk | |
| Note: This is used for Anthropic pass through streaming logging | |
| We can move __anext__, and __next__ to use this function since it's common logic. | |
| Did not migrate them to minmize changes made in 1 PR. | |
| """ | |
| str_line = chunk | |
| if isinstance(chunk, bytes): # Handle binary data | |
| str_line = chunk.decode("utf-8") # Convert bytes to string | |
| index = str_line.find("data:") | |
| if index != -1: | |
| str_line = str_line[index:] | |
| if str_line.startswith("data:"): | |
| data_json = json.loads(str_line[5:]) | |
| return self.chunk_parser(chunk=data_json) | |
| else: | |
| return ModelResponseStream() | |