ishaq101's picture
Merge dev_new to main (improve retriever and add querying) (#13)
52999bc
"""Azure Blob Storage client wrapper."""
from azure.storage.blob.aio import BlobClient
from src.config.settings import settings
from src.middlewares.logging import get_logger
import uuid
logger = get_logger("azure_blob")
class AzureBlobStorage:
"""Azure Blob Storage async client wrapper."""
def __init__(self):
self.container_name = settings.azureai_container_name
self.sas_token = settings.azureai_blob_sas
self.account_url = settings.azureai_container_endpoint.rstrip('/')
def _get_blob_client(self, blob_name: str) -> BlobClient:
"""Get async blob client with SAS token."""
sas_url = f"{self.account_url}/{self.container_name}/{blob_name}?{self.sas_token}"
return BlobClient.from_blob_url(sas_url)
async def upload_file(self, file_content: bytes, filename: str, user_id: str) -> str:
"""Upload file to Azure Blob Storage.
Returns:
blob_name: Unique blob name in storage
"""
try:
ext = filename.split('.')[-1] if '.' in filename else 'txt'
blob_name = f"{user_id}/{uuid.uuid4()}.{ext}"
async with self._get_blob_client(blob_name) as blob_client:
logger.info(f"Uploading file {filename} to blob {blob_name}")
await blob_client.upload_blob(file_content, overwrite=True)
logger.info(f"Successfully uploaded {blob_name}")
return blob_name
except Exception as e:
logger.error(f"Failed to upload file {filename}", error=str(e))
raise
async def download_file(self, blob_name: str) -> bytes:
"""Download file from Azure Blob Storage."""
try:
async with self._get_blob_client(blob_name) as blob_client:
logger.info(f"Downloading blob {blob_name}")
stream = await blob_client.download_blob()
content = await stream.readall()
logger.info(f"Successfully downloaded {blob_name}")
return content
except Exception as e:
logger.error(f"Failed to download blob {blob_name}", error=str(e))
raise
async def upload_bytes(self, content: bytes, blob_name: str) -> str:
"""Upload bytes to Azure Blob Storage using a specific blob name.
Unlike upload_file(), this does not generate a UUID name — caller controls the blob_name.
Used for Parquet files where the name must be deterministic (derived from document_id).
"""
try:
async with self._get_blob_client(blob_name) as blob_client:
logger.info(f"Uploading bytes to blob {blob_name}")
await blob_client.upload_blob(content, overwrite=True)
logger.info(f"Successfully uploaded {blob_name}")
return blob_name
except Exception as e:
logger.error(f"Failed to upload bytes to {blob_name}", error=str(e))
raise
async def delete_file(self, blob_name: str) -> bool:
"""Delete file from Azure Blob Storage."""
try:
async with self._get_blob_client(blob_name) as blob_client:
logger.info(f"Deleting blob {blob_name}")
await blob_client.delete_blob()
logger.info(f"Successfully deleted {blob_name}")
return True
except Exception as e:
logger.error(f"Failed to delete blob {blob_name}", error=str(e))
return False
async def delete_blobs_with_prefix(self, prefix: str) -> int:
"""Delete all blobs whose name starts with prefix. Returns count deleted.
Used to delete all Parquet files for a document in one call.
"""
from azure.storage.blob.aio import ContainerClient
container_url = f"{self.account_url}/{self.container_name}?{self.sas_token}"
deleted = 0
try:
async with ContainerClient.from_container_url(container_url) as container:
async for blob in container.list_blobs(name_starts_with=prefix):
await container.delete_blob(blob.name)
deleted += 1
logger.info(f"Deleted {deleted} blobs with prefix {prefix}")
except Exception as e:
logger.error(f"Failed to delete blobs with prefix {prefix}", error=str(e))
return deleted
# Singleton instance
blob_storage = AzureBlobStorage()