| | from __future__ import annotations |
| |
|
| | import os |
| | import inspect |
| | from typing import Any, Union, Mapping, TypeVar, Callable, Awaitable, cast, overload |
| | from functools import cached_property |
| | from typing_extensions import Self, override |
| |
|
| | import httpx |
| |
|
| | from .._types import NOT_GIVEN, Omit, Timeout, NotGiven |
| | from .._utils import is_given |
| | from .._client import Anthropic, AsyncAnthropic |
| | from .._compat import model_copy |
| | from .._models import FinalRequestOptions |
| | from .._streaming import Stream, AsyncStream |
| | from .._exceptions import AnthropicError |
| | from .._base_client import DEFAULT_MAX_RETRIES, BaseClient |
| | from ..resources.beta import Beta, AsyncBeta |
| | from ..resources.messages import Messages, AsyncMessages |
| | from ..resources.beta.messages import Messages as BetaMessages, AsyncMessages as AsyncBetaMessages |
| |
|
| | AzureADTokenProvider = Callable[[], str] |
| | AsyncAzureADTokenProvider = Callable[[], "str | Awaitable[str]"] |
| | _HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient]) |
| | _DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]]) |
| |
|
| |
|
| | class MutuallyExclusiveAuthError(AnthropicError): |
| | def __init__(self) -> None: |
| | super().__init__( |
| | "The `api_key` and `azure_ad_token_provider` arguments are mutually exclusive; Only one can be passed at a time" |
| | ) |
| |
|
| |
|
| | class BaseFoundryClient(BaseClient[_HttpxClientT, _DefaultStreamT]): ... |
| |
|
| |
|
| | class MessagesFoundry(Messages): |
| | @cached_property |
| | @override |
| | def batches(self) -> None: |
| | """Batches endpoint is not supported for Anthropic Foundry client.""" |
| | return None |
| |
|
| |
|
| | class BetaFoundryMessages(BetaMessages): |
| | @cached_property |
| | @override |
| | def batches(self) -> None: |
| | """Batches endpoint is not supported for Anthropic Foundry client.""" |
| | return None |
| |
|
| |
|
| | class BetaFoundry(Beta): |
| | @cached_property |
| | @override |
| | def messages(self) -> BetaMessages: |
| | """Return beta messages resource instance with excluded unsupported endpoints.""" |
| | return BetaFoundryMessages(self._client) |
| |
|
| |
|
| | class AsyncMessagesFoundry(AsyncMessages): |
| | @cached_property |
| | @override |
| | def batches(self) -> None: |
| | """Batches endpoint is not supported for Anthropic Foundry client.""" |
| | return None |
| |
|
| |
|
| | class AsyncBetaFoundryMessages(AsyncBetaMessages): |
| | @cached_property |
| | @override |
| | def batches(self) -> None: |
| | """Batches endpoint is not supported for Anthropic Foundry client.""" |
| | return None |
| |
|
| |
|
| | class AsyncBetaFoundry(AsyncBeta): |
| | @cached_property |
| | @override |
| | def messages(self) -> AsyncBetaMessages: |
| | """Return beta messages resource instance with excluded unsupported endpoints.""" |
| | return AsyncBetaFoundryMessages(self._client) |
| |
|
| |
|
| | |
| |
|
| |
|
| | class AnthropicFoundry(BaseFoundryClient[httpx.Client, Stream[Any]], Anthropic): |
| | @overload |
| | def __init__( |
| | self, |
| | *, |
| | resource: str | None = None, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AzureADTokenProvider | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | max_retries: int = DEFAULT_MAX_RETRIES, |
| | default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | http_client: httpx.Client | None = None, |
| | _strict_response_validation: bool = False, |
| | ) -> None: ... |
| |
|
| | @overload |
| | def __init__( |
| | self, |
| | *, |
| | base_url: str, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AzureADTokenProvider | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | max_retries: int = DEFAULT_MAX_RETRIES, |
| | default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | http_client: httpx.Client | None = None, |
| | _strict_response_validation: bool = False, |
| | ) -> None: ... |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | resource: str | None = None, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AzureADTokenProvider | None = None, |
| | base_url: str | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | max_retries: int = DEFAULT_MAX_RETRIES, |
| | default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | http_client: httpx.Client | None = None, |
| | _strict_response_validation: bool = False, |
| | ) -> None: |
| | """Construct a new synchronous Anthropic Foundry client instance. |
| | |
| | This automatically infers the following arguments from their corresponding environment variables if they are not provided: |
| | - `api_key` from `ANTHROPIC_FOUNDRY_API_KEY` |
| | - `resource` from `ANTHROPIC_FOUNDRY_RESOURCE` |
| | - `base_url` from `ANTHROPIC_FOUNDRY_BASE_URL` |
| | |
| | Args: |
| | resource: Your Foundry resource name, e.g. `example-resource` for `https://example-resource.services.ai.azure.com/anthropic/` |
| | azure_ad_token_provider: A function that returns an Azure Active Directory token, will be invoked on every request. |
| | """ |
| | api_key = api_key if api_key is not None else os.environ.get("ANTHROPIC_FOUNDRY_API_KEY") |
| | resource = resource if resource is not None else os.environ.get("ANTHROPIC_FOUNDRY_RESOURCE") |
| | base_url = base_url if base_url is not None else os.environ.get("ANTHROPIC_FOUNDRY_BASE_URL") |
| |
|
| | if api_key is None and azure_ad_token_provider is None: |
| | raise AnthropicError( |
| | "Missing credentials. Please pass one of `api_key`, `azure_ad_token_provider`, or the `ANTHROPIC_FOUNDRY_API_KEY` environment variable." |
| | ) |
| |
|
| | if base_url is None: |
| | if resource is None: |
| | raise ValueError( |
| | "Must provide one of the `base_url` or `resource` arguments, or the `ANTHROPIC_FOUNDRY_RESOURCE` environment variable" |
| | ) |
| | base_url = f"https://{resource}.services.ai.azure.com/anthropic/" |
| | elif resource is not None: |
| | raise ValueError("base_url and resource are mutually exclusive") |
| |
|
| | super().__init__( |
| | api_key=api_key, |
| | base_url=base_url, |
| | timeout=timeout, |
| | max_retries=max_retries, |
| | default_headers=default_headers, |
| | default_query=default_query, |
| | http_client=http_client, |
| | _strict_response_validation=_strict_response_validation, |
| | ) |
| | self._azure_ad_token_provider = azure_ad_token_provider |
| |
|
| | @cached_property |
| | @override |
| | def models(self) -> None: |
| | """Models endpoint is not supported for Anthropic Foundry client.""" |
| | return None |
| |
|
| | @cached_property |
| | @override |
| | def messages(self) -> MessagesFoundry: |
| | """Return messages resource instance with excluded unsupported endpoints.""" |
| | return MessagesFoundry(client=self) |
| |
|
| | @cached_property |
| | @override |
| | def beta(self) -> Beta: |
| | """Return beta resource instance with excluded unsupported endpoints.""" |
| | return BetaFoundry(self) |
| |
|
| | @override |
| | def copy( |
| | self, |
| | *, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AzureADTokenProvider | None = None, |
| | auth_token: str | None = None, |
| | base_url: str | httpx.URL | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | http_client: httpx.Client | None = None, |
| | max_retries: int | NotGiven = NOT_GIVEN, |
| | default_headers: Mapping[str, str] | None = None, |
| | set_default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | set_default_query: Mapping[str, object] | None = None, |
| | _extra_kwargs: Mapping[str, Any] = {}, |
| | ) -> Self: |
| | """ |
| | Create a new client instance re-using the same options given to the current client with optional overriding. |
| | """ |
| | return super().copy( |
| | api_key=api_key, |
| | auth_token=auth_token, |
| | base_url=base_url, |
| | timeout=timeout, |
| | http_client=http_client, |
| | max_retries=max_retries, |
| | default_headers=default_headers, |
| | set_default_headers=set_default_headers, |
| | default_query=default_query, |
| | set_default_query=set_default_query, |
| | _extra_kwargs={ |
| | "azure_ad_token_provider": azure_ad_token_provider or self._azure_ad_token_provider, |
| | **_extra_kwargs, |
| | }, |
| | ) |
| |
|
| | with_options = copy |
| |
|
| | def _get_azure_ad_token(self) -> str | None: |
| | provider = self._azure_ad_token_provider |
| | if provider is not None: |
| | token = provider() |
| | if not token or not isinstance(token, str): |
| | raise ValueError( |
| | f"Expected `azure_ad_token_provider` argument to return a string but it returned {token}", |
| | ) |
| | return token |
| |
|
| | return None |
| |
|
| | @override |
| | def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions: |
| | headers: dict[str, str | Omit] = {**options.headers} if is_given(options.headers) else {} |
| |
|
| | options = model_copy(options) |
| | options.headers = headers |
| |
|
| | azure_ad_token = self._get_azure_ad_token() |
| | if azure_ad_token is not None: |
| | if headers.get("Authorization") is None: |
| | headers["Authorization"] = f"Bearer {azure_ad_token}" |
| | elif self.api_key is not None: |
| | if headers.get("api-key") is None: |
| | assert self.api_key is not None |
| | headers["api-key"] = self.api_key |
| | else: |
| | |
| | raise ValueError("Unable to handle auth") |
| |
|
| | return options |
| |
|
| |
|
| | class AsyncAnthropicFoundry(BaseFoundryClient[httpx.AsyncClient, AsyncStream[Any]], AsyncAnthropic): |
| | @overload |
| | def __init__( |
| | self, |
| | *, |
| | resource: str | None = None, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AsyncAzureADTokenProvider | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | max_retries: int = DEFAULT_MAX_RETRIES, |
| | default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | http_client: httpx.AsyncClient | None = None, |
| | _strict_response_validation: bool = False, |
| | ) -> None: ... |
| |
|
| | @overload |
| | def __init__( |
| | self, |
| | *, |
| | base_url: str, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AsyncAzureADTokenProvider | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | max_retries: int = DEFAULT_MAX_RETRIES, |
| | default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | http_client: httpx.AsyncClient | None = None, |
| | _strict_response_validation: bool = False, |
| | ) -> None: ... |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | resource: str | None = None, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AsyncAzureADTokenProvider | None = None, |
| | base_url: str | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | max_retries: int = DEFAULT_MAX_RETRIES, |
| | default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | http_client: httpx.AsyncClient | None = None, |
| | _strict_response_validation: bool = False, |
| | ) -> None: |
| | """Construct a new asynchronous Anthropic Foundry client instance. |
| | |
| | This automatically infers the following arguments from their corresponding environment variables if they are not provided: |
| | - `api_key` from `ANTHROPIC_FOUNDRY_API_KEY` |
| | - `resource` from `ANTHROPIC_FOUNDRY_RESOURCE` |
| | - `base_url` from `ANTHROPIC_FOUNDRY_BASE_URL` |
| | |
| | Args: |
| | resource: Your Foundry resource name, e.g. `example-resource` for `https://example-resource.services.ai.azure.com/anthropic/` |
| | azure_ad_token_provider: A function that returns an Azure Active Directory token, will be invoked on every request. |
| | """ |
| | api_key = api_key if api_key is not None else os.environ.get("ANTHROPIC_FOUNDRY_API_KEY") |
| | resource = resource if resource is not None else os.environ.get("ANTHROPIC_FOUNDRY_RESOURCE") |
| | base_url = base_url if base_url is not None else os.environ.get("ANTHROPIC_FOUNDRY_BASE_URL") |
| |
|
| | if api_key is None and azure_ad_token_provider is None: |
| | raise AnthropicError( |
| | "Missing credentials. Please pass one of `api_key`, `azure_ad_token_provider`, or the `ANTHROPIC_FOUNDRY_API_KEY` environment variable." |
| | ) |
| |
|
| | if base_url is None: |
| | if resource is None: |
| | raise ValueError( |
| | "Must provide one of the `base_url` or `resource` arguments, or the `ANTHROPIC_FOUNDRY_RESOURCE` environment variable" |
| | ) |
| | base_url = f"https://{resource}.services.ai.azure.com/anthropic/" |
| | elif resource is not None: |
| | raise ValueError("base_url and resource are mutually exclusive") |
| |
|
| | super().__init__( |
| | api_key=api_key, |
| | base_url=base_url, |
| | timeout=timeout, |
| | max_retries=max_retries, |
| | default_headers=default_headers, |
| | default_query=default_query, |
| | http_client=http_client, |
| | _strict_response_validation=_strict_response_validation, |
| | ) |
| | self._azure_ad_token_provider = azure_ad_token_provider |
| |
|
| | @cached_property |
| | @override |
| | def models(self) -> None: |
| | """Models endpoint is not supported for Azure Anthropic client.""" |
| | return None |
| |
|
| | @cached_property |
| | @override |
| | def messages(self) -> AsyncMessagesFoundry: |
| | """Return messages resource instance with excluded unsupported endpoints.""" |
| | return AsyncMessagesFoundry(client=self) |
| |
|
| | @cached_property |
| | @override |
| | def beta(self) -> AsyncBetaFoundry: |
| | """Return beta resource instance with excluded unsupported endpoints.""" |
| | return AsyncBetaFoundry(client=self) |
| |
|
| | @override |
| | def copy( |
| | self, |
| | *, |
| | api_key: str | None = None, |
| | azure_ad_token_provider: AsyncAzureADTokenProvider | None = None, |
| | auth_token: str | None = None, |
| | base_url: str | httpx.URL | None = None, |
| | timeout: float | Timeout | None | NotGiven = NOT_GIVEN, |
| | http_client: httpx.AsyncClient | None = None, |
| | max_retries: int | NotGiven = NOT_GIVEN, |
| | default_headers: Mapping[str, str] | None = None, |
| | set_default_headers: Mapping[str, str] | None = None, |
| | default_query: Mapping[str, object] | None = None, |
| | set_default_query: Mapping[str, object] | None = None, |
| | _extra_kwargs: Mapping[str, Any] = {}, |
| | ) -> Self: |
| | """ |
| | Create a new client instance re-using the same options given to the current client with optional overriding. |
| | """ |
| | return super().copy( |
| | api_key=api_key, |
| | auth_token=auth_token, |
| | base_url=base_url, |
| | timeout=timeout, |
| | http_client=http_client, |
| | max_retries=max_retries, |
| | default_headers=default_headers, |
| | set_default_headers=set_default_headers, |
| | default_query=default_query, |
| | set_default_query=set_default_query, |
| | _extra_kwargs={ |
| | "azure_ad_token_provider": azure_ad_token_provider or self._azure_ad_token_provider, |
| | **_extra_kwargs, |
| | }, |
| | ) |
| |
|
| | with_options = copy |
| |
|
| | async def _get_azure_ad_token(self) -> str | None: |
| | provider = self._azure_ad_token_provider |
| | if provider is not None: |
| | token = provider() |
| | if inspect.isawaitable(token): |
| | token = await token |
| | if not token or not isinstance(cast(Any, token), str): |
| | raise ValueError( |
| | f"Expected `azure_ad_token_provider` argument to return a string but it returned {token}", |
| | ) |
| | return str(token) |
| |
|
| | return None |
| |
|
| | @override |
| | async def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions: |
| | headers: dict[str, str | Omit] = {**options.headers} if is_given(options.headers) else {} |
| |
|
| | options = model_copy(options) |
| | options.headers = headers |
| |
|
| | azure_ad_token = await self._get_azure_ad_token() |
| | if azure_ad_token is not None: |
| | if headers.get("Authorization") is None: |
| | headers["Authorization"] = f"Bearer {azure_ad_token}" |
| | elif self.api_key is not None: |
| | assert self.api_key is not None |
| | if headers.get("api-key") is None: |
| | headers["api-key"] = self.api_key |
| | else: |
| | |
| | raise ValueError("Unable to handle auth") |
| |
|
| | return options |
| |
|