File size: 4,424 Bytes
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52999bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
52999bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""Azure Blob Storage client wrapper."""

from azure.storage.blob.aio import BlobClient
from src.config.settings import settings
from src.middlewares.logging import get_logger
import uuid

logger = get_logger("azure_blob")


class AzureBlobStorage:
    """Azure Blob Storage async client wrapper."""

    def __init__(self):
        self.container_name = settings.azureai_container_name
        self.sas_token = settings.azureai_blob_sas
        self.account_url = settings.azureai_container_endpoint.rstrip('/')

    def _get_blob_client(self, blob_name: str) -> BlobClient:
        """Get async blob client with SAS token."""
        sas_url = f"{self.account_url}/{self.container_name}/{blob_name}?{self.sas_token}"
        return BlobClient.from_blob_url(sas_url)

    async def upload_file(self, file_content: bytes, filename: str, user_id: str) -> str:
        """Upload file to Azure Blob Storage.

        Returns:
            blob_name: Unique blob name in storage
        """
        try:
            ext = filename.split('.')[-1] if '.' in filename else 'txt'
            blob_name = f"{user_id}/{uuid.uuid4()}.{ext}"

            async with self._get_blob_client(blob_name) as blob_client:
                logger.info(f"Uploading file {filename} to blob {blob_name}")
                await blob_client.upload_blob(file_content, overwrite=True)

            logger.info(f"Successfully uploaded {blob_name}")
            return blob_name

        except Exception as e:
            logger.error(f"Failed to upload file {filename}", error=str(e))
            raise

    async def download_file(self, blob_name: str) -> bytes:
        """Download file from Azure Blob Storage."""
        try:
            async with self._get_blob_client(blob_name) as blob_client:
                logger.info(f"Downloading blob {blob_name}")
                stream = await blob_client.download_blob()
                content = await stream.readall()

            logger.info(f"Successfully downloaded {blob_name}")
            return content

        except Exception as e:
            logger.error(f"Failed to download blob {blob_name}", error=str(e))
            raise

    async def upload_bytes(self, content: bytes, blob_name: str) -> str:
        """Upload bytes to Azure Blob Storage using a specific blob name.

        Unlike upload_file(), this does not generate a UUID name — caller controls the blob_name.
        Used for Parquet files where the name must be deterministic (derived from document_id).
        """
        try:
            async with self._get_blob_client(blob_name) as blob_client:
                logger.info(f"Uploading bytes to blob {blob_name}")
                await blob_client.upload_blob(content, overwrite=True)
            logger.info(f"Successfully uploaded {blob_name}")
            return blob_name
        except Exception as e:
            logger.error(f"Failed to upload bytes to {blob_name}", error=str(e))
            raise

    async def delete_file(self, blob_name: str) -> bool:
        """Delete file from Azure Blob Storage."""
        try:
            async with self._get_blob_client(blob_name) as blob_client:
                logger.info(f"Deleting blob {blob_name}")
                await blob_client.delete_blob()

            logger.info(f"Successfully deleted {blob_name}")
            return True

        except Exception as e:
            logger.error(f"Failed to delete blob {blob_name}", error=str(e))
            return False

    async def delete_blobs_with_prefix(self, prefix: str) -> int:
        """Delete all blobs whose name starts with prefix. Returns count deleted.

        Used to delete all Parquet files for a document in one call.
        """
        from azure.storage.blob.aio import ContainerClient
        container_url = f"{self.account_url}/{self.container_name}?{self.sas_token}"
        deleted = 0
        try:
            async with ContainerClient.from_container_url(container_url) as container:
                async for blob in container.list_blobs(name_starts_with=prefix):
                    await container.delete_blob(blob.name)
                    deleted += 1
            logger.info(f"Deleted {deleted} blobs with prefix {prefix}")
        except Exception as e:
            logger.error(f"Failed to delete blobs with prefix {prefix}", error=str(e))
        return deleted


# Singleton instance
blob_storage = AzureBlobStorage()