Spaces:
Paused
Paused
| from __future__ import annotations | |
| import os | |
| import inspect | |
| import logging | |
| import datetime | |
| import functools | |
| from typing import TYPE_CHECKING, Any, Union, Generic, TypeVar, Callable, Iterator, AsyncIterator, cast, overload | |
| from typing_extensions import Awaitable, ParamSpec, override, deprecated, get_origin | |
| import anyio | |
| import httpx | |
| import pydantic | |
| from ._types import NoneType | |
| from ._utils import is_given, extract_type_arg, is_annotated_type | |
| from ._models import BaseModel, is_basemodel | |
| from ._constants import RAW_RESPONSE_HEADER | |
| from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type | |
| from ._exceptions import APIResponseValidationError | |
| if TYPE_CHECKING: | |
| from ._models import FinalRequestOptions | |
| from ._base_client import BaseClient | |
| P = ParamSpec("P") | |
| R = TypeVar("R") | |
| _T = TypeVar("_T") | |
| log: logging.Logger = logging.getLogger(__name__) | |
| class LegacyAPIResponse(Generic[R]): | |
| """This is a legacy class as it will be replaced by `APIResponse` | |
| and `AsyncAPIResponse` in the `_response.py` file in the next major | |
| release. | |
| For the sync client this will mostly be the same with the exception | |
| of `content` & `text` will be methods instead of properties. In the | |
| async client, all methods will be async. | |
| A migration script will be provided & the migration in general should | |
| be smooth. | |
| """ | |
| _cast_to: type[R] | |
| _client: BaseClient[Any, Any] | |
| _parsed_by_type: dict[type[Any], Any] | |
| _stream: bool | |
| _stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None | |
| _options: FinalRequestOptions | |
| http_response: httpx.Response | |
| def __init__( | |
| self, | |
| *, | |
| raw: httpx.Response, | |
| cast_to: type[R], | |
| client: BaseClient[Any, Any], | |
| stream: bool, | |
| stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None, | |
| options: FinalRequestOptions, | |
| ) -> None: | |
| self._cast_to = cast_to | |
| self._client = client | |
| self._parsed_by_type = {} | |
| self._stream = stream | |
| self._stream_cls = stream_cls | |
| self._options = options | |
| self.http_response = raw | |
| def parse(self, *, to: type[_T]) -> _T: | |
| ... | |
| def parse(self) -> R: | |
| ... | |
| def parse(self, *, to: type[_T] | None = None) -> R | _T: | |
| """Returns the rich python representation of this response's data. | |
| NOTE: For the async client: this will become a coroutine in the next major version. | |
| For lower-level control, see `.read()`, `.json()`, `.iter_bytes()`. | |
| You can customise the type that the response is parsed into through | |
| the `to` argument, e.g. | |
| ```py | |
| from anthropic import BaseModel | |
| class MyModel(BaseModel): | |
| foo: str | |
| obj = response.parse(to=MyModel) | |
| print(obj.foo) | |
| ``` | |
| We support parsing: | |
| - `BaseModel` | |
| - `dict` | |
| - `list` | |
| - `Union` | |
| - `str` | |
| - `int` | |
| - `float` | |
| - `httpx.Response` | |
| """ | |
| cache_key = to if to is not None else self._cast_to | |
| cached = self._parsed_by_type.get(cache_key) | |
| if cached is not None: | |
| return cached # type: ignore[no-any-return] | |
| parsed = self._parse(to=to) | |
| if is_given(self._options.post_parser): | |
| parsed = self._options.post_parser(parsed) | |
| self._parsed_by_type[cache_key] = parsed | |
| return parsed | |
| def headers(self) -> httpx.Headers: | |
| return self.http_response.headers | |
| def http_request(self) -> httpx.Request: | |
| return self.http_response.request | |
| def status_code(self) -> int: | |
| return self.http_response.status_code | |
| def url(self) -> httpx.URL: | |
| return self.http_response.url | |
| def method(self) -> str: | |
| return self.http_request.method | |
| def content(self) -> bytes: | |
| """Return the binary response content. | |
| NOTE: this will be removed in favour of `.read()` in the | |
| next major version. | |
| """ | |
| return self.http_response.content | |
| def text(self) -> str: | |
| """Return the decoded response content. | |
| NOTE: this will be turned into a method in the next major version. | |
| """ | |
| return self.http_response.text | |
| def http_version(self) -> str: | |
| return self.http_response.http_version | |
| def is_closed(self) -> bool: | |
| return self.http_response.is_closed | |
| def elapsed(self) -> datetime.timedelta: | |
| """The time taken for the complete request/response cycle to complete.""" | |
| return self.http_response.elapsed | |
| def _parse(self, *, to: type[_T] | None = None) -> R | _T: | |
| # unwrap `Annotated[T, ...]` -> `T` | |
| if to and is_annotated_type(to): | |
| to = extract_type_arg(to, 0) | |
| if self._stream: | |
| if to: | |
| if not is_stream_class_type(to): | |
| raise TypeError(f"Expected custom parse type to be a subclass of {Stream} or {AsyncStream}") | |
| return cast( | |
| _T, | |
| to( | |
| cast_to=extract_stream_chunk_type( | |
| to, | |
| failure_message="Expected custom stream type to be passed with a type argument, e.g. Stream[ChunkType]", | |
| ), | |
| response=self.http_response, | |
| client=cast(Any, self._client), | |
| ), | |
| ) | |
| if self._stream_cls: | |
| return cast( | |
| R, | |
| self._stream_cls( | |
| cast_to=extract_stream_chunk_type(self._stream_cls), | |
| response=self.http_response, | |
| client=cast(Any, self._client), | |
| ), | |
| ) | |
| stream_cls = cast("type[Stream[Any]] | type[AsyncStream[Any]] | None", self._client._default_stream_cls) | |
| if stream_cls is None: | |
| raise MissingStreamClassError() | |
| return cast( | |
| R, | |
| stream_cls( | |
| cast_to=self._cast_to, | |
| response=self.http_response, | |
| client=cast(Any, self._client), | |
| ), | |
| ) | |
| cast_to = to if to is not None else self._cast_to | |
| # unwrap `Annotated[T, ...]` -> `T` | |
| if is_annotated_type(cast_to): | |
| cast_to = extract_type_arg(cast_to, 0) | |
| if cast_to is NoneType: | |
| return cast(R, None) | |
| response = self.http_response | |
| if cast_to == str: | |
| return cast(R, response.text) | |
| if cast_to == int: | |
| return cast(R, int(response.text)) | |
| if cast_to == float: | |
| return cast(R, float(response.text)) | |
| origin = get_origin(cast_to) or cast_to | |
| if inspect.isclass(origin) and issubclass(origin, HttpxBinaryResponseContent): | |
| return cast(R, cast_to(response)) # type: ignore | |
| if origin == LegacyAPIResponse: | |
| raise RuntimeError("Unexpected state - cast_to is `APIResponse`") | |
| if inspect.isclass(origin) and issubclass(origin, httpx.Response): | |
| # Because of the invariance of our ResponseT TypeVar, users can subclass httpx.Response | |
| # and pass that class to our request functions. We cannot change the variance to be either | |
| # covariant or contravariant as that makes our usage of ResponseT illegal. We could construct | |
| # the response class ourselves but that is something that should be supported directly in httpx | |
| # as it would be easy to incorrectly construct the Response object due to the multitude of arguments. | |
| if cast_to != httpx.Response: | |
| raise ValueError(f"Subclasses of httpx.Response cannot be passed to `cast_to`") | |
| return cast(R, response) | |
| if inspect.isclass(origin) and not issubclass(origin, BaseModel) and issubclass(origin, pydantic.BaseModel): | |
| raise TypeError("Pydantic models must subclass our base model type, e.g. `from anthropic import BaseModel`") | |
| if ( | |
| cast_to is not object | |
| and not origin is list | |
| and not origin is dict | |
| and not origin is Union | |
| and not issubclass(origin, BaseModel) | |
| ): | |
| raise RuntimeError( | |
| f"Unsupported type, expected {cast_to} to be a subclass of {BaseModel}, {dict}, {list}, {Union}, {NoneType}, {str} or {httpx.Response}." | |
| ) | |
| # split is required to handle cases where additional information is included | |
| # in the response, e.g. application/json; charset=utf-8 | |
| content_type, *_ = response.headers.get("content-type", "*").split(";") | |
| if content_type != "application/json": | |
| if is_basemodel(cast_to): | |
| try: | |
| data = response.json() | |
| except Exception as exc: | |
| log.debug("Could not read JSON from response data due to %s - %s", type(exc), exc) | |
| else: | |
| return self._client._process_response_data( | |
| data=data, | |
| cast_to=cast_to, # type: ignore | |
| response=response, | |
| ) | |
| if self._client._strict_response_validation: | |
| raise APIResponseValidationError( | |
| response=response, | |
| message=f"Expected Content-Type response header to be `application/json` but received `{content_type}` instead.", | |
| body=response.text, | |
| ) | |
| # If the API responds with content that isn't JSON then we just return | |
| # the (decoded) text without performing any parsing so that you can still | |
| # handle the response however you need to. | |
| return response.text # type: ignore | |
| data = response.json() | |
| return self._client._process_response_data( | |
| data=data, | |
| cast_to=cast_to, # type: ignore | |
| response=response, | |
| ) | |
| def __repr__(self) -> str: | |
| return f"<APIResponse [{self.status_code} {self.http_response.reason_phrase}] type={self._cast_to}>" | |
| class MissingStreamClassError(TypeError): | |
| def __init__(self) -> None: | |
| super().__init__( | |
| "The `stream` argument was set to `True` but the `stream_cls` argument was not given. See `anthropic._streaming` for reference", | |
| ) | |
| def to_raw_response_wrapper(func: Callable[P, R]) -> Callable[P, LegacyAPIResponse[R]]: | |
| """Higher order function that takes one of our bound API methods and wraps it | |
| to support returning the raw `APIResponse` object directly. | |
| """ | |
| def wrapped(*args: P.args, **kwargs: P.kwargs) -> LegacyAPIResponse[R]: | |
| extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})} | |
| extra_headers[RAW_RESPONSE_HEADER] = "true" | |
| kwargs["extra_headers"] = extra_headers | |
| return cast(LegacyAPIResponse[R], func(*args, **kwargs)) | |
| return wrapped | |
| def async_to_raw_response_wrapper(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[LegacyAPIResponse[R]]]: | |
| """Higher order function that takes one of our bound API methods and wraps it | |
| to support returning the raw `APIResponse` object directly. | |
| """ | |
| async def wrapped(*args: P.args, **kwargs: P.kwargs) -> LegacyAPIResponse[R]: | |
| extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})} | |
| extra_headers[RAW_RESPONSE_HEADER] = "true" | |
| kwargs["extra_headers"] = extra_headers | |
| return cast(LegacyAPIResponse[R], await func(*args, **kwargs)) | |
| return wrapped | |
| class HttpxBinaryResponseContent: | |
| response: httpx.Response | |
| def __init__(self, response: httpx.Response) -> None: | |
| self.response = response | |
| def content(self) -> bytes: | |
| return self.response.content | |
| def text(self) -> str: | |
| return self.response.text | |
| def encoding(self) -> str | None: | |
| return self.response.encoding | |
| def charset_encoding(self) -> str | None: | |
| return self.response.charset_encoding | |
| def json(self, **kwargs: Any) -> Any: | |
| return self.response.json(**kwargs) | |
| def read(self) -> bytes: | |
| return self.response.read() | |
| def iter_bytes(self, chunk_size: int | None = None) -> Iterator[bytes]: | |
| return self.response.iter_bytes(chunk_size) | |
| def iter_text(self, chunk_size: int | None = None) -> Iterator[str]: | |
| return self.response.iter_text(chunk_size) | |
| def iter_lines(self) -> Iterator[str]: | |
| return self.response.iter_lines() | |
| def iter_raw(self, chunk_size: int | None = None) -> Iterator[bytes]: | |
| return self.response.iter_raw(chunk_size) | |
| def write_to_file( | |
| self, | |
| file: str | os.PathLike[str], | |
| ) -> None: | |
| """Write the output to the given file. | |
| Accepts a filename or any path-like object, e.g. pathlib.Path | |
| Note: if you want to stream the data to the file instead of writing | |
| all at once then you should use `.with_streaming_response` when making | |
| the API request, e.g. `client.with_streaming_response.foo().stream_to_file('my_filename.txt')` | |
| """ | |
| with open(file, mode="wb") as f: | |
| for data in self.response.iter_bytes(): | |
| f.write(data) | |
| def stream_to_file( | |
| self, | |
| file: str | os.PathLike[str], | |
| *, | |
| chunk_size: int | None = None, | |
| ) -> None: | |
| with open(file, mode="wb") as f: | |
| for data in self.response.iter_bytes(chunk_size): | |
| f.write(data) | |
| def close(self) -> None: | |
| return self.response.close() | |
| async def aread(self) -> bytes: | |
| return await self.response.aread() | |
| async def aiter_bytes(self, chunk_size: int | None = None) -> AsyncIterator[bytes]: | |
| return self.response.aiter_bytes(chunk_size) | |
| async def aiter_text(self, chunk_size: int | None = None) -> AsyncIterator[str]: | |
| return self.response.aiter_text(chunk_size) | |
| async def aiter_lines(self) -> AsyncIterator[str]: | |
| return self.response.aiter_lines() | |
| async def aiter_raw(self, chunk_size: int | None = None) -> AsyncIterator[bytes]: | |
| return self.response.aiter_raw(chunk_size) | |
| async def astream_to_file( | |
| self, | |
| file: str | os.PathLike[str], | |
| *, | |
| chunk_size: int | None = None, | |
| ) -> None: | |
| path = anyio.Path(file) | |
| async with await path.open(mode="wb") as f: | |
| async for data in self.response.aiter_bytes(chunk_size): | |
| await f.write(data) | |
| async def aclose(self) -> None: | |
| return await self.response.aclose() | |