cuatrolabs-tracker-ms / app /storage.py
Michael-Antony's picture
feat: implement task attachments API with MinIO storage integration
e0b1eb1
"""
MinIO storage client for task attachments.
"""
import asyncio
import io
from datetime import timedelta
from typing import Optional
from minio import Minio
from minio.error import S3Error
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
# Global MinIO client instance
_minio_client: Optional[Minio] = None
def get_minio_client() -> Minio:
"""Get or create MinIO client instance."""
global _minio_client
if _minio_client is None:
if not settings.MINIO_ACCESS_KEY or not settings.MINIO_SECRET_KEY:
raise RuntimeError("MinIO credentials are not configured")
_minio_client = Minio(
endpoint=settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
secure=settings.MINIO_SECURE,
region=settings.MINIO_REGION
)
logger.info("MinIO client initialized", extra={
"endpoint": settings.MINIO_ENDPOINT,
"secure": settings.MINIO_SECURE
})
return _minio_client
class MinioStorageAdapter:
"""Async-friendly wrapper around MinIO client for file uploads."""
def __init__(self, client: Minio, bucket: str):
self.client = client
self.bucket = bucket
self._ensure_bucket()
def _ensure_bucket(self) -> None:
"""Ensure bucket exists, create if not."""
try:
if not self.client.bucket_exists(self.bucket):
self.client.make_bucket(self.bucket)
logger.info(f"Created MinIO bucket: {self.bucket}")
except S3Error as e:
logger.error(f"Failed to ensure bucket exists: {e}", exc_info=e)
raise
async def upload_file(
self,
object_key: str,
file_data: bytes,
mime_type: str
) -> str:
"""
Upload file data directly to MinIO storage.
Args:
object_key: Object path/key in bucket
file_data: File content as bytes
mime_type: MIME type of the file
Returns:
Object key of uploaded file
"""
loop = asyncio.get_running_loop()
file_stream = io.BytesIO(file_data)
try:
await loop.run_in_executor(
None,
lambda: self.client.put_object(
bucket_name=self.bucket,
object_name=object_key,
data=file_stream,
length=len(file_data),
content_type=mime_type
)
)
logger.info(f"File uploaded to MinIO", extra={
"bucket": self.bucket,
"object_key": object_key,
"size": len(file_data),
"mime_type": mime_type
})
return object_key
except S3Error as e:
logger.error(f"Failed to upload file to MinIO: {e}", exc_info=e)
raise
async def get_public_url(self, object_key: str) -> str:
"""
Generate public URL for uploaded file.
Args:
object_key: Object path/key in bucket
Returns:
Public URL to access the file
"""
protocol = "https" if settings.MINIO_SECURE else "http"
endpoint = f"{protocol}://{settings.MINIO_ENDPOINT}".rstrip('/')
return f"{endpoint}/{self.bucket}/{object_key}"
async def get_presigned_url(
self,
object_key: str,
expiry_seconds: int = 3600
) -> str:
"""
Generate presigned URL for secure file access.
Args:
object_key: Object path/key in bucket
expiry_seconds: URL expiry time in seconds
Returns:
Presigned URL to access the file
"""
loop = asyncio.get_running_loop()
try:
url = await loop.run_in_executor(
None,
lambda: self.client.presigned_get_object(
bucket_name=self.bucket,
object_name=object_key,
expires=timedelta(seconds=expiry_seconds)
)
)
return url
except S3Error as e:
logger.error(f"Failed to generate presigned URL: {e}", exc_info=e)
raise
async def delete_file(self, object_key: str) -> None:
"""
Delete file from MinIO storage.
Args:
object_key: Object path/key in bucket
"""
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(
None,
lambda: self.client.remove_object(
bucket_name=self.bucket,
object_name=object_key
)
)
logger.info(f"File deleted from MinIO", extra={
"bucket": self.bucket,
"object_key": object_key
})
except S3Error as e:
logger.error(f"Failed to delete file from MinIO: {e}", exc_info=e)
raise
def get_storage_adapter(bucket: Optional[str] = None) -> MinioStorageAdapter:
"""
Factory function to create MinioStorageAdapter instance.
Args:
bucket: Bucket name (defaults to task attachments bucket)
Returns:
MinioStorageAdapter instance
"""
client = get_minio_client()
bucket_name = bucket or settings.STORAGE_BUCKET_TASK_ATTACHMENTS
return MinioStorageAdapter(client, bucket_name)