| """Azure Blob Storage client wrapper.""" |
|
|
| from azure.storage.blob.aio import BlobClient |
| from src.config.settings import settings |
| from src.middlewares.logging import get_logger |
| import uuid |
|
|
| logger = get_logger("azure_blob") |
|
|
|
|
| class AzureBlobStorage: |
| """Azure Blob Storage async client wrapper.""" |
|
|
| def __init__(self): |
| self.container_name = settings.azureai_container_name |
| self.sas_token = settings.azureai_blob_sas |
| self.account_url = settings.azureai_container_endpoint.rstrip('/') |
|
|
| def _get_blob_client(self, blob_name: str) -> BlobClient: |
| """Get async blob client with SAS token.""" |
| sas_url = f"{self.account_url}/{self.container_name}/{blob_name}?{self.sas_token}" |
| return BlobClient.from_blob_url(sas_url) |
|
|
| async def upload_file(self, file_content: bytes, filename: str, user_id: str) -> str: |
| """Upload file to Azure Blob Storage. |
| |
| Returns: |
| blob_name: Unique blob name in storage |
| """ |
| try: |
| ext = filename.split('.')[-1] if '.' in filename else 'txt' |
| blob_name = f"{user_id}/{uuid.uuid4()}.{ext}" |
|
|
| async with self._get_blob_client(blob_name) as blob_client: |
| logger.info(f"Uploading file {filename} to blob {blob_name}") |
| await blob_client.upload_blob(file_content, overwrite=True) |
|
|
| logger.info(f"Successfully uploaded {blob_name}") |
| return blob_name |
|
|
| except Exception as e: |
| logger.error(f"Failed to upload file {filename}", error=str(e)) |
| raise |
|
|
| async def download_file(self, blob_name: str) -> bytes: |
| """Download file from Azure Blob Storage.""" |
| try: |
| async with self._get_blob_client(blob_name) as blob_client: |
| logger.info(f"Downloading blob {blob_name}") |
| stream = await blob_client.download_blob() |
| content = await stream.readall() |
|
|
| logger.info(f"Successfully downloaded {blob_name}") |
| return content |
|
|
| except Exception as e: |
| logger.error(f"Failed to download blob {blob_name}", error=str(e)) |
| raise |
|
|
| async def upload_bytes(self, content: bytes, blob_name: str) -> str: |
| """Upload bytes to Azure Blob Storage using a specific blob name. |
| |
| Unlike upload_file(), this does not generate a UUID name — caller controls the blob_name. |
| Used for Parquet files where the name must be deterministic (derived from document_id). |
| """ |
| try: |
| async with self._get_blob_client(blob_name) as blob_client: |
| logger.info(f"Uploading bytes to blob {blob_name}") |
| await blob_client.upload_blob(content, overwrite=True) |
| logger.info(f"Successfully uploaded {blob_name}") |
| return blob_name |
| except Exception as e: |
| logger.error(f"Failed to upload bytes to {blob_name}", error=str(e)) |
| raise |
|
|
| async def delete_file(self, blob_name: str) -> bool: |
| """Delete file from Azure Blob Storage.""" |
| try: |
| async with self._get_blob_client(blob_name) as blob_client: |
| logger.info(f"Deleting blob {blob_name}") |
| await blob_client.delete_blob() |
|
|
| logger.info(f"Successfully deleted {blob_name}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to delete blob {blob_name}", error=str(e)) |
| return False |
|
|
| async def delete_blobs_with_prefix(self, prefix: str) -> int: |
| """Delete all blobs whose name starts with prefix. Returns count deleted. |
| |
| Used to delete all Parquet files for a document in one call. |
| """ |
| from azure.storage.blob.aio import ContainerClient |
| container_url = f"{self.account_url}/{self.container_name}?{self.sas_token}" |
| deleted = 0 |
| try: |
| async with ContainerClient.from_container_url(container_url) as container: |
| async for blob in container.list_blobs(name_starts_with=prefix): |
| await container.delete_blob(blob.name) |
| deleted += 1 |
| logger.info(f"Deleted {deleted} blobs with prefix {prefix}") |
| except Exception as e: |
| logger.error(f"Failed to delete blobs with prefix {prefix}", error=str(e)) |
| return deleted |
|
|
|
|
| |
| blob_storage = AzureBlobStorage() |
|
|