Spaces:
Paused
Paused
| from __future__ import annotations | |
| from abc import abstractmethod | |
| from enum import Enum | |
| from typing import ( | |
| Any, | |
| List, | |
| Optional, | |
| Dict, | |
| Tuple, | |
| TypeVar, | |
| ) | |
| from dataclasses import dataclass | |
| from pydantic import SecretStr | |
| from chromadb.config import ( | |
| Component, | |
| System, | |
| ) | |
| T = TypeVar("T") | |
| S = TypeVar("S") | |
| class AuthError(Exception): | |
| pass | |
| ClientAuthHeaders = Dict[str, SecretStr] | |
| class ClientAuthProvider(Component): | |
| """ | |
| ClientAuthProvider is responsible for providing authentication headers for | |
| client requests. Client implementations (in our case, just the FastAPI | |
| client) must inject these headers into their requests. | |
| """ | |
| def __init__(self, system: System) -> None: | |
| super().__init__(system) | |
| def authenticate(self) -> ClientAuthHeaders: | |
| pass | |
| class UserIdentity: | |
| """ | |
| UserIdentity represents the identity of a user. In general, not all fields | |
| will be populated, and the fields that are populated will depend on the | |
| authentication provider. | |
| The idea is that the AuthenticationProvider is responsible for populating | |
| _all_ information known about the user, and the AuthorizationProvider is | |
| responsible for making decisions based on that information. | |
| """ | |
| user_id: str | |
| tenant: Optional[str] = None | |
| databases: Optional[List[str]] = None | |
| # This can be used for any additional auth context which needs to be | |
| # propagated from the authentication provider to the authorization | |
| # provider. | |
| attributes: Optional[Dict[str, Any]] = None | |
| class ServerAuthenticationProvider(Component): | |
| """ | |
| ServerAuthenticationProvider is responsible for authenticating requests. If | |
| a ServerAuthenticationProvider is configured, it will be called by the | |
| server to authenticate requests. If no ServerAuthenticationProvider is | |
| configured, all requests will be authenticated. | |
| The ServerAuthenticationProvider should return a UserIdentity object if the | |
| request is authenticated for use by the ServerAuthorizationProvider. | |
| """ | |
| def __init__(self, system: System) -> None: | |
| super().__init__(system) | |
| self._ignore_auth_paths: Dict[ | |
| str, List[str] | |
| ] = system.settings.chroma_server_auth_ignore_paths | |
| self.overwrite_singleton_tenant_database_access_from_auth = ( | |
| system.settings.chroma_overwrite_singleton_tenant_database_access_from_auth | |
| ) | |
| def authenticate_or_raise(self, headers: Dict[str, str]) -> UserIdentity: | |
| pass | |
| def ignore_operation(self, verb: str, path: str) -> bool: | |
| if ( | |
| path in self._ignore_auth_paths.keys() | |
| and verb.upper() in self._ignore_auth_paths[path] | |
| ): | |
| return True | |
| return False | |
| def read_creds_or_creds_file(self) -> List[str]: | |
| _creds_file = None | |
| _creds = None | |
| if self._system.settings.chroma_server_authn_credentials_file: | |
| _creds_file = str( | |
| self._system.settings["chroma_server_authn_credentials_file"] | |
| ) | |
| if self._system.settings.chroma_server_authn_credentials: | |
| _creds = str(self._system.settings["chroma_server_authn_credentials"]) | |
| if not _creds_file and not _creds: | |
| raise ValueError( | |
| "No credentials file or credentials found in " | |
| "[chroma_server_authn_credentials]." | |
| ) | |
| if _creds_file and _creds: | |
| raise ValueError( | |
| "Both credentials file and credentials found." | |
| "Please provide only one." | |
| ) | |
| if _creds: | |
| return [c for c in _creds.split("\n") if c] | |
| elif _creds_file: | |
| with open(_creds_file, "r") as f: | |
| return f.readlines() | |
| raise ValueError("Should never happen") | |
| def singleton_tenant_database_if_applicable( | |
| self, user: Optional[UserIdentity] | |
| ) -> Tuple[Optional[str], Optional[str]]: | |
| """ | |
| If settings.chroma_overwrite_singleton_tenant_database_access_from_auth | |
| is False, this function always returns (None, None). | |
| If settings.chroma_overwrite_singleton_tenant_database_access_from_auth | |
| is True, follows the following logic: | |
| - If the user only has access to a single tenant, this function will | |
| return that tenant as its first return value. | |
| - If the user only has access to a single database, this function will | |
| return that database as its second return value. If the user has | |
| access to multiple tenants and/or databases, including "*", this | |
| function will return None for the corresponding value(s). | |
| - If the user has access to multiple tenants and/or databases this | |
| function will return None for the corresponding value(s). | |
| """ | |
| if not self.overwrite_singleton_tenant_database_access_from_auth or not user: | |
| return None, None | |
| tenant = None | |
| database = None | |
| if user.tenant and user.tenant != "*": | |
| tenant = user.tenant | |
| if user.databases and len(user.databases) == 1 and user.databases[0] != "*": | |
| database = user.databases[0] | |
| return tenant, database | |
| class AuthzAction(str, Enum): | |
| """ | |
| The set of actions that can be authorized by the authorization provider. | |
| """ | |
| RESET = "system:reset" | |
| CREATE_TENANT = "tenant:create_tenant" | |
| GET_TENANT = "tenant:get_tenant" | |
| CREATE_DATABASE = "db:create_database" | |
| GET_DATABASE = "db:get_database" | |
| LIST_COLLECTIONS = "db:list_collections" | |
| COUNT_COLLECTIONS = "db:count_collections" | |
| CREATE_COLLECTION = "db:create_collection" | |
| GET_OR_CREATE_COLLECTION = "db:get_or_create_collection" | |
| GET_COLLECTION = "collection:get_collection" | |
| DELETE_COLLECTION = "collection:delete_collection" | |
| UPDATE_COLLECTION = "collection:update_collection" | |
| ADD = "collection:add" | |
| DELETE = "collection:delete" | |
| GET = "collection:get" | |
| QUERY = "collection:query" | |
| COUNT = "collection:count" | |
| UPDATE = "collection:update" | |
| UPSERT = "collection:upsert" | |
| class AuthzResource: | |
| """ | |
| The resource being accessed in an authorization request. | |
| """ | |
| tenant: Optional[str] | |
| database: Optional[str] | |
| collection: Optional[str] | |
| class ServerAuthorizationProvider(Component): | |
| """ | |
| ServerAuthorizationProvider is responsible for authorizing requests. If a | |
| ServerAuthorizationProvider is configured, it will be called by the server | |
| to authorize requests. If no ServerAuthorizationProvider is configured, all | |
| requests will be authorized. | |
| ServerAuthorizationProvider should raise an exception if the request is not | |
| authorized. | |
| """ | |
| def __init__(self, system: System) -> None: | |
| super().__init__(system) | |
| def authorize_or_raise( | |
| self, user: UserIdentity, action: AuthzAction, resource: AuthzResource | |
| ) -> None: | |
| pass | |
| def read_config_or_config_file(self) -> List[str]: | |
| _config_file = None | |
| _config = None | |
| if self._system.settings.chroma_server_authz_config_file: | |
| _config_file = self._system.settings["chroma_server_authz_config_file"] | |
| if self._system.settings.chroma_server_authz_config: | |
| _config = str(self._system.settings["chroma_server_authz_config"]) | |
| if not _config_file and not _config: | |
| raise ValueError( | |
| "No authz configuration file or authz configuration found." | |
| ) | |
| if _config_file and _config: | |
| raise ValueError( | |
| "Both authz configuration file and authz configuration found." | |
| "Please provide only one." | |
| ) | |
| if _config: | |
| return [c for c in _config.split("\n") if c] | |
| elif _config_file: | |
| with open(_config_file, "r") as f: | |
| return f.readlines() | |
| raise ValueError("Should never happen") | |