"""Azure Blob Storage client wrapper.""" from azure.storage.blob.aio import BlobClient from src.config.settings import settings from src.middlewares.logging import get_logger import uuid logger = get_logger("azure_blob") class AzureBlobStorage: """Azure Blob Storage async client wrapper.""" def __init__(self): self.container_name = settings.azureai_container_name self.sas_token = settings.azureai_blob_sas self.account_url = settings.azureai_container_endpoint.rstrip('/') def _get_blob_client(self, blob_name: str) -> BlobClient: """Get async blob client with SAS token.""" sas_url = f"{self.account_url}/{self.container_name}/{blob_name}?{self.sas_token}" return BlobClient.from_blob_url(sas_url) async def upload_file(self, file_content: bytes, filename: str, user_id: str) -> str: """Upload file to Azure Blob Storage. Returns: blob_name: Unique blob name in storage """ try: ext = filename.split('.')[-1] if '.' in filename else 'txt' blob_name = f"{user_id}/{uuid.uuid4()}.{ext}" async with self._get_blob_client(blob_name) as blob_client: logger.info(f"Uploading file {filename} to blob {blob_name}") await blob_client.upload_blob(file_content, overwrite=True) logger.info(f"Successfully uploaded {blob_name}") return blob_name except Exception as e: logger.error(f"Failed to upload file {filename}", error=str(e)) raise async def download_file(self, blob_name: str) -> bytes: """Download file from Azure Blob Storage.""" try: async with self._get_blob_client(blob_name) as blob_client: logger.info(f"Downloading blob {blob_name}") stream = await blob_client.download_blob() content = await stream.readall() logger.info(f"Successfully downloaded {blob_name}") return content except Exception as e: logger.error(f"Failed to download blob {blob_name}", error=str(e)) raise async def upload_bytes(self, content: bytes, blob_name: str) -> str: """Upload bytes to Azure Blob Storage using a specific blob name. Unlike upload_file(), this does not generate a UUID name — caller controls the blob_name. Used for Parquet files where the name must be deterministic (derived from document_id). """ try: async with self._get_blob_client(blob_name) as blob_client: logger.info(f"Uploading bytes to blob {blob_name}") await blob_client.upload_blob(content, overwrite=True) logger.info(f"Successfully uploaded {blob_name}") return blob_name except Exception as e: logger.error(f"Failed to upload bytes to {blob_name}", error=str(e)) raise async def delete_file(self, blob_name: str) -> bool: """Delete file from Azure Blob Storage.""" try: async with self._get_blob_client(blob_name) as blob_client: logger.info(f"Deleting blob {blob_name}") await blob_client.delete_blob() logger.info(f"Successfully deleted {blob_name}") return True except Exception as e: logger.error(f"Failed to delete blob {blob_name}", error=str(e)) return False async def delete_blobs_with_prefix(self, prefix: str) -> int: """Delete all blobs whose name starts with prefix. Returns count deleted. Used to delete all Parquet files for a document in one call. """ from azure.storage.blob.aio import ContainerClient container_url = f"{self.account_url}/{self.container_name}?{self.sas_token}" deleted = 0 try: async with ContainerClient.from_container_url(container_url) as container: async for blob in container.list_blobs(name_starts_with=prefix): await container.delete_blob(blob.name) deleted += 1 logger.info(f"Deleted {deleted} blobs with prefix {prefix}") except Exception as e: logger.error(f"Failed to delete blobs with prefix {prefix}", error=str(e)) return deleted # Singleton instance blob_storage = AzureBlobStorage()