Spaces:
Paused
Paused
| # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. | |
| from __future__ import annotations | |
| import os | |
| from typing import Any, Union, Mapping | |
| from typing_extensions import Self, override | |
| import httpx | |
| from . import resources, _constants, _exceptions | |
| from ._qs import Querystring | |
| from ._types import ( | |
| NOT_GIVEN, | |
| Omit, | |
| Headers, | |
| Timeout, | |
| NotGiven, | |
| Transport, | |
| ProxiesTypes, | |
| AsyncTransport, | |
| RequestOptions, | |
| ) | |
| from ._utils import ( | |
| is_given, | |
| get_async_library, | |
| ) | |
| from ._version import __version__ | |
| from ._streaming import Stream as Stream, AsyncStream as AsyncStream | |
| from ._exceptions import APIStatusError | |
| from ._tokenizers import ( | |
| TokenizerType, # type: ignore[import] | |
| sync_get_tokenizer, | |
| async_get_tokenizer, | |
| ) | |
| from ._base_client import ( | |
| DEFAULT_MAX_RETRIES, | |
| DEFAULT_CONNECTION_LIMITS, | |
| SyncAPIClient, | |
| AsyncAPIClient, | |
| SyncHttpxClientWrapper, | |
| AsyncHttpxClientWrapper, | |
| ) | |
| __all__ = [ | |
| "Timeout", | |
| "Transport", | |
| "ProxiesTypes", | |
| "RequestOptions", | |
| "resources", | |
| "Anthropic", | |
| "AsyncAnthropic", | |
| "Client", | |
| "AsyncClient", | |
| ] | |
| class Anthropic(SyncAPIClient): | |
| completions: resources.Completions | |
| messages: resources.Messages | |
| with_raw_response: AnthropicWithRawResponse | |
| with_streaming_response: AnthropicWithStreamedResponse | |
| # client options | |
| api_key: str | None | |
| auth_token: str | None | |
| # constants | |
| HUMAN_PROMPT = _constants.HUMAN_PROMPT | |
| AI_PROMPT = _constants.AI_PROMPT | |
| def __init__( | |
| self, | |
| *, | |
| api_key: str | None = None, | |
| auth_token: str | None = None, | |
| base_url: str | httpx.URL | None = None, | |
| timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN, | |
| max_retries: int = DEFAULT_MAX_RETRIES, | |
| default_headers: Mapping[str, str] | None = None, | |
| default_query: Mapping[str, object] | None = None, | |
| # Configure a custom httpx client. | |
| # We provide a `DefaultHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`. | |
| # See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details. | |
| http_client: httpx.Client | None = None, | |
| # See httpx documentation for [custom transports](https://www.python-httpx.org/advanced/#custom-transports) | |
| transport: Transport | None = None, | |
| # See httpx documentation for [proxies](https://www.python-httpx.org/advanced/#http-proxying) | |
| proxies: ProxiesTypes | None = None, | |
| # See httpx documentation for [limits](https://www.python-httpx.org/advanced/#pool-limit-configuration) | |
| connection_pool_limits: httpx.Limits | None = None, | |
| # Enable or disable schema validation for data returned by the API. | |
| # When enabled an error APIResponseValidationError is raised | |
| # if the API responds with invalid data for the expected schema. | |
| # | |
| # This parameter may be removed or changed in the future. | |
| # If you rely on this feature, please open a GitHub issue | |
| # outlining your use-case to help us decide if it should be | |
| # part of our public interface in the future. | |
| _strict_response_validation: bool = False, | |
| ) -> None: | |
| """Construct a new synchronous anthropic client instance. | |
| This automatically infers the following arguments from their corresponding environment variables if they are not provided: | |
| - `api_key` from `ANTHROPIC_API_KEY` | |
| - `auth_token` from `ANTHROPIC_AUTH_TOKEN` | |
| """ | |
| if api_key is None: | |
| api_key = os.environ.get("ANTHROPIC_API_KEY") | |
| self.api_key = api_key | |
| if auth_token is None: | |
| auth_token = os.environ.get("ANTHROPIC_AUTH_TOKEN") | |
| self.auth_token = auth_token | |
| if base_url is None: | |
| base_url = os.environ.get("ANTHROPIC_BASE_URL") | |
| if base_url is None: | |
| base_url = f"https://api.anthropic.com" | |
| super().__init__( | |
| version=__version__, | |
| base_url=base_url, | |
| max_retries=max_retries, | |
| timeout=timeout, | |
| http_client=http_client, | |
| transport=transport, | |
| proxies=proxies, | |
| limits=connection_pool_limits, | |
| custom_headers=default_headers, | |
| custom_query=default_query, | |
| _strict_response_validation=_strict_response_validation, | |
| ) | |
| self._default_stream_cls = Stream | |
| self.completions = resources.Completions(self) | |
| self.messages = resources.Messages(self) | |
| self.with_raw_response = AnthropicWithRawResponse(self) | |
| self.with_streaming_response = AnthropicWithStreamedResponse(self) | |
| def qs(self) -> Querystring: | |
| return Querystring(array_format="comma") | |
| def auth_headers(self) -> dict[str, str]: | |
| if self._api_key_auth: | |
| return self._api_key_auth | |
| if self._bearer_auth: | |
| return self._bearer_auth | |
| return {} | |
| def _api_key_auth(self) -> dict[str, str]: | |
| api_key = self.api_key | |
| if api_key is None: | |
| return {} | |
| return {"X-Api-Key": api_key} | |
| def _bearer_auth(self) -> dict[str, str]: | |
| auth_token = self.auth_token | |
| if auth_token is None: | |
| return {} | |
| return {"Authorization": f"Bearer {auth_token}"} | |
| def default_headers(self) -> dict[str, str | Omit]: | |
| return { | |
| **super().default_headers, | |
| "X-Stainless-Async": "false", | |
| "anthropic-version": "2023-06-01", | |
| **self._custom_headers, | |
| } | |
| def _validate_headers(self, headers: Headers, custom_headers: Headers) -> None: | |
| if self.api_key and headers.get("X-Api-Key"): | |
| return | |
| if isinstance(custom_headers.get("X-Api-Key"), Omit): | |
| return | |
| if self.auth_token and headers.get("Authorization"): | |
| return | |
| if isinstance(custom_headers.get("Authorization"), Omit): | |
| return | |
| raise TypeError( | |
| '"Could not resolve authentication method. Expected either api_key or auth_token to be set. Or for one of the `X-Api-Key` or `Authorization` headers to be explicitly omitted"' | |
| ) | |
| def copy( | |
| self, | |
| *, | |
| api_key: str | None = None, | |
| auth_token: str | None = None, | |
| base_url: str | httpx.URL | None = None, | |
| timeout: float | Timeout | None | NotGiven = NOT_GIVEN, | |
| http_client: httpx.Client | None = None, | |
| connection_pool_limits: httpx.Limits | None = None, | |
| max_retries: int | NotGiven = NOT_GIVEN, | |
| default_headers: Mapping[str, str] | None = None, | |
| set_default_headers: Mapping[str, str] | None = None, | |
| default_query: Mapping[str, object] | None = None, | |
| set_default_query: Mapping[str, object] | None = None, | |
| _extra_kwargs: Mapping[str, Any] = {}, | |
| ) -> Self: | |
| """ | |
| Create a new client instance re-using the same options given to the current client with optional overriding. | |
| """ | |
| if default_headers is not None and set_default_headers is not None: | |
| raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive") | |
| if default_query is not None and set_default_query is not None: | |
| raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive") | |
| headers = self._custom_headers | |
| if default_headers is not None: | |
| headers = {**headers, **default_headers} | |
| elif set_default_headers is not None: | |
| headers = set_default_headers | |
| params = self._custom_query | |
| if default_query is not None: | |
| params = {**params, **default_query} | |
| elif set_default_query is not None: | |
| params = set_default_query | |
| if connection_pool_limits is not None: | |
| if http_client is not None: | |
| raise ValueError("The 'http_client' argument is mutually exclusive with 'connection_pool_limits'") | |
| if not isinstance(self._client, SyncHttpxClientWrapper): | |
| raise ValueError( | |
| "A custom HTTP client has been set and is mutually exclusive with the 'connection_pool_limits' argument" | |
| ) | |
| http_client = None | |
| else: | |
| if self._limits is not DEFAULT_CONNECTION_LIMITS: | |
| connection_pool_limits = self._limits | |
| else: | |
| connection_pool_limits = None | |
| http_client = http_client or self._client | |
| return self.__class__( | |
| api_key=api_key or self.api_key, | |
| auth_token=auth_token or self.auth_token, | |
| base_url=base_url or self.base_url, | |
| timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, | |
| http_client=http_client, | |
| connection_pool_limits=connection_pool_limits, | |
| max_retries=max_retries if is_given(max_retries) else self.max_retries, | |
| default_headers=headers, | |
| default_query=params, | |
| **_extra_kwargs, | |
| ) | |
| # Alias for `copy` for nicer inline usage, e.g. | |
| # client.with_options(timeout=10).foo.create(...) | |
| with_options = copy | |
| def count_tokens( | |
| self, | |
| text: str, | |
| ) -> int: | |
| """Count the number of tokens in a given string. | |
| Note that this is only accurate for older models, e.g. `claude-2.1`. For newer | |
| models this can only be used as a _very_ rough estimate, instead you should rely | |
| on the `usage` property in the response for exact counts. | |
| """ | |
| # Note: tokenizer is untyped | |
| tokenizer = self.get_tokenizer() | |
| encoded_text = tokenizer.encode(text) # type: ignore | |
| return len(encoded_text.ids) # type: ignore | |
| def get_tokenizer(self) -> TokenizerType: | |
| return sync_get_tokenizer() | |
| def _make_status_error( | |
| self, | |
| err_msg: str, | |
| *, | |
| body: object, | |
| response: httpx.Response, | |
| ) -> APIStatusError: | |
| if response.status_code == 400: | |
| return _exceptions.BadRequestError(err_msg, response=response, body=body) | |
| if response.status_code == 401: | |
| return _exceptions.AuthenticationError(err_msg, response=response, body=body) | |
| if response.status_code == 403: | |
| return _exceptions.PermissionDeniedError(err_msg, response=response, body=body) | |
| if response.status_code == 404: | |
| return _exceptions.NotFoundError(err_msg, response=response, body=body) | |
| if response.status_code == 409: | |
| return _exceptions.ConflictError(err_msg, response=response, body=body) | |
| if response.status_code == 422: | |
| return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body) | |
| if response.status_code == 429: | |
| return _exceptions.RateLimitError(err_msg, response=response, body=body) | |
| if response.status_code >= 500: | |
| return _exceptions.InternalServerError(err_msg, response=response, body=body) | |
| return APIStatusError(err_msg, response=response, body=body) | |
| class AsyncAnthropic(AsyncAPIClient): | |
| completions: resources.AsyncCompletions | |
| messages: resources.AsyncMessages | |
| with_raw_response: AsyncAnthropicWithRawResponse | |
| with_streaming_response: AsyncAnthropicWithStreamedResponse | |
| # client options | |
| api_key: str | None | |
| auth_token: str | None | |
| # constants | |
| HUMAN_PROMPT = _constants.HUMAN_PROMPT | |
| AI_PROMPT = _constants.AI_PROMPT | |
| def __init__( | |
| self, | |
| *, | |
| api_key: str | None = None, | |
| auth_token: str | None = None, | |
| base_url: str | httpx.URL | None = None, | |
| timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN, | |
| max_retries: int = DEFAULT_MAX_RETRIES, | |
| default_headers: Mapping[str, str] | None = None, | |
| default_query: Mapping[str, object] | None = None, | |
| # Configure a custom httpx client. | |
| # We provide a `DefaultAsyncHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`. | |
| # See the [httpx documentation](https://www.python-httpx.org/api/#asyncclient) for more details. | |
| http_client: httpx.AsyncClient | None = None, | |
| # See httpx documentation for [custom transports](https://www.python-httpx.org/advanced/#custom-transports) | |
| transport: AsyncTransport | None = None, | |
| # See httpx documentation for [proxies](https://www.python-httpx.org/advanced/#http-proxying) | |
| proxies: ProxiesTypes | None = None, | |
| # See httpx documentation for [limits](https://www.python-httpx.org/advanced/#pool-limit-configuration) | |
| connection_pool_limits: httpx.Limits | None = None, | |
| # Enable or disable schema validation for data returned by the API. | |
| # When enabled an error APIResponseValidationError is raised | |
| # if the API responds with invalid data for the expected schema. | |
| # | |
| # This parameter may be removed or changed in the future. | |
| # If you rely on this feature, please open a GitHub issue | |
| # outlining your use-case to help us decide if it should be | |
| # part of our public interface in the future. | |
| _strict_response_validation: bool = False, | |
| ) -> None: | |
| """Construct a new async anthropic client instance. | |
| This automatically infers the following arguments from their corresponding environment variables if they are not provided: | |
| - `api_key` from `ANTHROPIC_API_KEY` | |
| - `auth_token` from `ANTHROPIC_AUTH_TOKEN` | |
| """ | |
| if api_key is None: | |
| api_key = os.environ.get("ANTHROPIC_API_KEY") | |
| self.api_key = api_key | |
| if auth_token is None: | |
| auth_token = os.environ.get("ANTHROPIC_AUTH_TOKEN") | |
| self.auth_token = auth_token | |
| if base_url is None: | |
| base_url = os.environ.get("ANTHROPIC_BASE_URL") | |
| if base_url is None: | |
| base_url = f"https://api.anthropic.com" | |
| super().__init__( | |
| version=__version__, | |
| base_url=base_url, | |
| max_retries=max_retries, | |
| timeout=timeout, | |
| http_client=http_client, | |
| transport=transport, | |
| proxies=proxies, | |
| limits=connection_pool_limits, | |
| custom_headers=default_headers, | |
| custom_query=default_query, | |
| _strict_response_validation=_strict_response_validation, | |
| ) | |
| self._default_stream_cls = AsyncStream | |
| self.completions = resources.AsyncCompletions(self) | |
| self.messages = resources.AsyncMessages(self) | |
| self.with_raw_response = AsyncAnthropicWithRawResponse(self) | |
| self.with_streaming_response = AsyncAnthropicWithStreamedResponse(self) | |
| def qs(self) -> Querystring: | |
| return Querystring(array_format="comma") | |
| def auth_headers(self) -> dict[str, str]: | |
| if self._api_key_auth: | |
| return self._api_key_auth | |
| if self._bearer_auth: | |
| return self._bearer_auth | |
| return {} | |
| def _api_key_auth(self) -> dict[str, str]: | |
| api_key = self.api_key | |
| if api_key is None: | |
| return {} | |
| return {"X-Api-Key": api_key} | |
| def _bearer_auth(self) -> dict[str, str]: | |
| auth_token = self.auth_token | |
| if auth_token is None: | |
| return {} | |
| return {"Authorization": f"Bearer {auth_token}"} | |
| def default_headers(self) -> dict[str, str | Omit]: | |
| return { | |
| **super().default_headers, | |
| "X-Stainless-Async": f"async:{get_async_library()}", | |
| "anthropic-version": "2023-06-01", | |
| **self._custom_headers, | |
| } | |
| def _validate_headers(self, headers: Headers, custom_headers: Headers) -> None: | |
| if self.api_key and headers.get("X-Api-Key"): | |
| return | |
| if isinstance(custom_headers.get("X-Api-Key"), Omit): | |
| return | |
| if self.auth_token and headers.get("Authorization"): | |
| return | |
| if isinstance(custom_headers.get("Authorization"), Omit): | |
| return | |
| raise TypeError( | |
| '"Could not resolve authentication method. Expected either api_key or auth_token to be set. Or for one of the `X-Api-Key` or `Authorization` headers to be explicitly omitted"' | |
| ) | |
| def copy( | |
| self, | |
| *, | |
| api_key: str | None = None, | |
| auth_token: str | None = None, | |
| base_url: str | httpx.URL | None = None, | |
| timeout: float | Timeout | None | NotGiven = NOT_GIVEN, | |
| http_client: httpx.AsyncClient | None = None, | |
| connection_pool_limits: httpx.Limits | None = None, | |
| max_retries: int | NotGiven = NOT_GIVEN, | |
| default_headers: Mapping[str, str] | None = None, | |
| set_default_headers: Mapping[str, str] | None = None, | |
| default_query: Mapping[str, object] | None = None, | |
| set_default_query: Mapping[str, object] | None = None, | |
| _extra_kwargs: Mapping[str, Any] = {}, | |
| ) -> Self: | |
| """ | |
| Create a new client instance re-using the same options given to the current client with optional overriding. | |
| """ | |
| if default_headers is not None and set_default_headers is not None: | |
| raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive") | |
| if default_query is not None and set_default_query is not None: | |
| raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive") | |
| headers = self._custom_headers | |
| if default_headers is not None: | |
| headers = {**headers, **default_headers} | |
| elif set_default_headers is not None: | |
| headers = set_default_headers | |
| params = self._custom_query | |
| if default_query is not None: | |
| params = {**params, **default_query} | |
| elif set_default_query is not None: | |
| params = set_default_query | |
| if connection_pool_limits is not None: | |
| if http_client is not None: | |
| raise ValueError("The 'http_client' argument is mutually exclusive with 'connection_pool_limits'") | |
| if not isinstance(self._client, AsyncHttpxClientWrapper): | |
| raise ValueError( | |
| "A custom HTTP client has been set and is mutually exclusive with the 'connection_pool_limits' argument" | |
| ) | |
| http_client = None | |
| else: | |
| if self._limits is not DEFAULT_CONNECTION_LIMITS: | |
| connection_pool_limits = self._limits | |
| else: | |
| connection_pool_limits = None | |
| http_client = http_client or self._client | |
| return self.__class__( | |
| api_key=api_key or self.api_key, | |
| auth_token=auth_token or self.auth_token, | |
| base_url=base_url or self.base_url, | |
| timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, | |
| http_client=http_client, | |
| connection_pool_limits=connection_pool_limits, | |
| max_retries=max_retries if is_given(max_retries) else self.max_retries, | |
| default_headers=headers, | |
| default_query=params, | |
| **_extra_kwargs, | |
| ) | |
| # Alias for `copy` for nicer inline usage, e.g. | |
| # client.with_options(timeout=10).foo.create(...) | |
| with_options = copy | |
| async def count_tokens( | |
| self, | |
| text: str, | |
| ) -> int: | |
| """Count the number of tokens in a given string. | |
| Note that this is only accurate for older models, e.g. `claude-2.1`. For newer | |
| models this can only be used as a _very_ rough estimate, instead you should rely | |
| on the `usage` property in the response for exact counts. | |
| """ | |
| # Note: tokenizer is untyped | |
| tokenizer = await self.get_tokenizer() | |
| encoded_text = tokenizer.encode(text) # type: ignore | |
| return len(encoded_text.ids) # type: ignore | |
| async def get_tokenizer(self) -> TokenizerType: | |
| return await async_get_tokenizer() | |
| def _make_status_error( | |
| self, | |
| err_msg: str, | |
| *, | |
| body: object, | |
| response: httpx.Response, | |
| ) -> APIStatusError: | |
| if response.status_code == 400: | |
| return _exceptions.BadRequestError(err_msg, response=response, body=body) | |
| if response.status_code == 401: | |
| return _exceptions.AuthenticationError(err_msg, response=response, body=body) | |
| if response.status_code == 403: | |
| return _exceptions.PermissionDeniedError(err_msg, response=response, body=body) | |
| if response.status_code == 404: | |
| return _exceptions.NotFoundError(err_msg, response=response, body=body) | |
| if response.status_code == 409: | |
| return _exceptions.ConflictError(err_msg, response=response, body=body) | |
| if response.status_code == 422: | |
| return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body) | |
| if response.status_code == 429: | |
| return _exceptions.RateLimitError(err_msg, response=response, body=body) | |
| if response.status_code >= 500: | |
| return _exceptions.InternalServerError(err_msg, response=response, body=body) | |
| return APIStatusError(err_msg, response=response, body=body) | |
| class AnthropicWithRawResponse: | |
| def __init__(self, client: Anthropic) -> None: | |
| self.completions = resources.CompletionsWithRawResponse(client.completions) | |
| self.messages = resources.MessagesWithRawResponse(client.messages) | |
| class AsyncAnthropicWithRawResponse: | |
| def __init__(self, client: AsyncAnthropic) -> None: | |
| self.completions = resources.AsyncCompletionsWithRawResponse(client.completions) | |
| self.messages = resources.AsyncMessagesWithRawResponse(client.messages) | |
| class AnthropicWithStreamedResponse: | |
| def __init__(self, client: Anthropic) -> None: | |
| self.completions = resources.CompletionsWithStreamingResponse(client.completions) | |
| self.messages = resources.MessagesWithStreamingResponse(client.messages) | |
| class AsyncAnthropicWithStreamedResponse: | |
| def __init__(self, client: AsyncAnthropic) -> None: | |
| self.completions = resources.AsyncCompletionsWithStreamingResponse(client.completions) | |
| self.messages = resources.AsyncMessagesWithStreamingResponse(client.messages) | |
| Client = Anthropic | |
| AsyncClient = AsyncAnthropic | |