CandidateExplorer / services /uploader /azure_blob_service.py
ishaq101's picture
Fix azure sas key for blob container & Fix create weight response
1249d8b
import os
import dotenv
from config.env_constant import EnvFilepath
dotenv.load_dotenv(EnvFilepath.ENVPATH)
from config.constant import AzureBlobConstants
from externals.storages.azure_blob import get_container_client
from fastapi import UploadFile
from utils.logger import get_logger
logger = get_logger("azure blob")
class AzureBlobStorageService:
"""
A dedicated service class for interacting with Azure Blob Storage.
Uses asynchronous clients for non-blocking I/O operations.
"""
def __init__(self):
# Initialize the BlobServiceClient for the entire application lifetime
# self.blob_service_client = get_blob_service_client()
# self.container_client = get_container_client()
self.prefix = AzureBlobConstants.BLOB_PREFIX
# if not self.container_client:
# raise ValueError("azure container client is not set.")
async def upload_pdf(self, file: UploadFile) -> str:
"""
Uploads a file stream directly to Azure Blob Storage.
Args:
file: The FastAPI UploadFile object.
Returns:
The URL of the uploaded blob.
"""
# 1. Create a unique blob name (use the original filename for simplicity,
# but consider using a UUID for production to avoid collisions)
blob_name = file.filename
# 2. Get the blob client
container_client = await get_container_client()
blob_client = container_client.get_blob_client(blob_name)
# 3. Read the file content asynchronously
# The file is read in chunks to avoid loading the entire file into memory
# if it's very large.
content = await file.read()
# 4. Upload the content
# Setting content_settings is crucial for the browser to handle the file correctly
await blob_client.upload_blob(
content,
blob_type="BlockBlob",
overwrite=True,
content_settings={"content_type": "application/pdf"}
)
# 5. Return the URL
# Note: This is the URL to the blob. If it needs to be publicly accessible,
# ensure your container's access policy is configured correctly.
return blob_client.url
# --- Cleanup for Graceful Shutdown (Optional but Recommended) ---
async def close(self):
"""Closes the Blob Service Client connection."""
await self.blob_service_client.close()
# Initialize the service globally
blob_storage_service = AzureBlobStorageService()