| import os |
| import shutil |
| from abc import ABC, abstractmethod |
| from typing import BinaryIO, Tuple |
|
|
| import boto3 |
| from botocore.exceptions import ClientError |
| from open_webui.config import ( |
| S3_ACCESS_KEY_ID, |
| S3_BUCKET_NAME, |
| S3_ENDPOINT_URL, |
| S3_REGION_NAME, |
| S3_SECRET_ACCESS_KEY, |
| STORAGE_PROVIDER, |
| UPLOAD_DIR, |
| ) |
| from open_webui.constants import ERROR_MESSAGES |
|
|
|
|
| class StorageProvider(ABC): |
| @abstractmethod |
| def get_file(self, file_path: str) -> str: |
| pass |
|
|
| @abstractmethod |
| def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]: |
| pass |
|
|
| @abstractmethod |
| def delete_all_files(self) -> None: |
| pass |
|
|
| @abstractmethod |
| def delete_file(self, file_path: str) -> None: |
| pass |
|
|
|
|
| class LocalStorageProvider(StorageProvider): |
| @staticmethod |
| def upload_file(file: BinaryIO, filename: str) -> Tuple[bytes, str]: |
| contents = file.read() |
| if not contents: |
| raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT) |
| file_path = f"{UPLOAD_DIR}/{filename}" |
| with open(file_path, "wb") as f: |
| f.write(contents) |
| return contents, file_path |
|
|
| @staticmethod |
| def get_file(file_path: str) -> str: |
| """Handles downloading of the file from local storage.""" |
| return file_path |
|
|
| @staticmethod |
| def delete_file(file_path: str) -> None: |
| """Handles deletion of the file from local storage.""" |
| filename = file_path.split("/")[-1] |
| file_path = f"{UPLOAD_DIR}/{filename}" |
| if os.path.isfile(file_path): |
| os.remove(file_path) |
| else: |
| print(f"File {file_path} not found in local storage.") |
|
|
| @staticmethod |
| def delete_all_files() -> None: |
| """Handles deletion of all files from local storage.""" |
| if os.path.exists(UPLOAD_DIR): |
| for filename in os.listdir(UPLOAD_DIR): |
| file_path = os.path.join(UPLOAD_DIR, filename) |
| try: |
| if os.path.isfile(file_path) or os.path.islink(file_path): |
| os.unlink(file_path) |
| elif os.path.isdir(file_path): |
| shutil.rmtree(file_path) |
| except Exception as e: |
| print(f"Failed to delete {file_path}. Reason: {e}") |
| else: |
| print(f"Directory {UPLOAD_DIR} not found in local storage.") |
|
|
|
|
| class S3StorageProvider(StorageProvider): |
| def __init__(self): |
| self.s3_client = boto3.client( |
| "s3", |
| region_name=S3_REGION_NAME, |
| endpoint_url=S3_ENDPOINT_URL, |
| aws_access_key_id=S3_ACCESS_KEY_ID, |
| aws_secret_access_key=S3_SECRET_ACCESS_KEY, |
| ) |
| self.bucket_name = S3_BUCKET_NAME |
|
|
| def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]: |
| """Handles uploading of the file to S3 storage.""" |
| _, file_path = LocalStorageProvider.upload_file(file, filename) |
| try: |
| self.s3_client.upload_file(file_path, self.bucket_name, filename) |
| return ( |
| open(file_path, "rb").read(), |
| "s3://" + self.bucket_name + "/" + filename, |
| ) |
| except ClientError as e: |
| raise RuntimeError(f"Error uploading file to S3: {e}") |
|
|
| def get_file(self, file_path: str) -> str: |
| """Handles downloading of the file from S3 storage.""" |
| try: |
| bucket_name, key = file_path.split("//")[1].split("/") |
| local_file_path = f"{UPLOAD_DIR}/{key}" |
| self.s3_client.download_file(bucket_name, key, local_file_path) |
| return local_file_path |
| except ClientError as e: |
| raise RuntimeError(f"Error downloading file from S3: {e}") |
|
|
| def delete_file(self, file_path: str) -> None: |
| """Handles deletion of the file from S3 storage.""" |
| filename = file_path.split("/")[-1] |
| try: |
| self.s3_client.delete_object(Bucket=self.bucket_name, Key=filename) |
| except ClientError as e: |
| raise RuntimeError(f"Error deleting file from S3: {e}") |
|
|
| |
| LocalStorageProvider.delete_file(file_path) |
|
|
| def delete_all_files(self) -> None: |
| """Handles deletion of all files from S3 storage.""" |
| try: |
| response = self.s3_client.list_objects_v2(Bucket=self.bucket_name) |
| if "Contents" in response: |
| for content in response["Contents"]: |
| self.s3_client.delete_object( |
| Bucket=self.bucket_name, Key=content["Key"] |
| ) |
| except ClientError as e: |
| raise RuntimeError(f"Error deleting all files from S3: {e}") |
|
|
| |
| LocalStorageProvider.delete_all_files() |
|
|
|
|
| def get_storage_provider(storage_provider: str): |
| if storage_provider == "local": |
| Storage = LocalStorageProvider() |
| elif storage_provider == "s3": |
| Storage = S3StorageProvider() |
| else: |
| raise RuntimeError(f"Unsupported storage provider: {storage_provider}") |
| return Storage |
|
|
|
|
| Storage = get_storage_provider(STORAGE_PROVIDER) |
|
|