Spaces:
Sleeping
Sleeping
| import os | |
| import dotenv | |
| from config.env_constant import EnvFilepath | |
| dotenv.load_dotenv(EnvFilepath.ENVPATH) | |
| from config.constant import AzureBlobConstants | |
| from externals.storages.azure_blob import get_container_client | |
| from fastapi import UploadFile | |
| from utils.logger import get_logger | |
| logger = get_logger("azure blob") | |
| class AzureBlobStorageService: | |
| """ | |
| A dedicated service class for interacting with Azure Blob Storage. | |
| Uses asynchronous clients for non-blocking I/O operations. | |
| """ | |
| def __init__(self): | |
| # Initialize the BlobServiceClient for the entire application lifetime | |
| # self.blob_service_client = get_blob_service_client() | |
| # self.container_client = get_container_client() | |
| self.prefix = AzureBlobConstants.BLOB_PREFIX | |
| # if not self.container_client: | |
| # raise ValueError("azure container client is not set.") | |
| async def upload_pdf(self, file: UploadFile) -> str: | |
| """ | |
| Uploads a file stream directly to Azure Blob Storage. | |
| Args: | |
| file: The FastAPI UploadFile object. | |
| Returns: | |
| The URL of the uploaded blob. | |
| """ | |
| # 1. Create a unique blob name (use the original filename for simplicity, | |
| # but consider using a UUID for production to avoid collisions) | |
| blob_name = file.filename | |
| # 2. Get the blob client | |
| container_client = await get_container_client() | |
| blob_client = container_client.get_blob_client(blob_name) | |
| # 3. Read the file content asynchronously | |
| # The file is read in chunks to avoid loading the entire file into memory | |
| # if it's very large. | |
| content = await file.read() | |
| # 4. Upload the content | |
| # Setting content_settings is crucial for the browser to handle the file correctly | |
| await blob_client.upload_blob( | |
| content, | |
| blob_type="BlockBlob", | |
| overwrite=True, | |
| content_settings={"content_type": "application/pdf"} | |
| ) | |
| # 5. Return the URL | |
| # Note: This is the URL to the blob. If it needs to be publicly accessible, | |
| # ensure your container's access policy is configured correctly. | |
| return blob_client.url | |
| # --- Cleanup for Graceful Shutdown (Optional but Recommended) --- | |
| async def close(self): | |
| """Closes the Blob Service Client connection.""" | |
| await self.blob_service_client.close() | |
| # Initialize the service globally | |
| blob_storage_service = AzureBlobStorageService() |