"""Cloudflare R2 storage service for saving generated images.""" import os import sys from typing import Optional from datetime import datetime import uuid # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) try: import boto3 from botocore.exceptions import ClientError, BotoCoreError BOTO3_AVAILABLE = True except ImportError: BOTO3_AVAILABLE = False print("Warning: boto3 not installed. R2 storage will not be available.") from config import settings class R2StorageService: """Service for uploading images to Cloudflare R2.""" def __init__(self): """Initialize R2 client.""" if not BOTO3_AVAILABLE: raise ImportError("boto3 is required for R2 storage. Install it with: pip install boto3") if not all([ settings.r2_endpoint, settings.r2_bucket_name, settings.r2_access_key, settings.r2_secret_key, ]): raise ValueError("R2 credentials not configured. Please set R2_ENDPOINT, R2_BUCKET_NAME, R2_ACCESS_KEY, and R2_SECRET_KEY in .env") # Initialize S3-compatible client for R2 self.s3_client = boto3.client( 's3', endpoint_url=settings.r2_endpoint, aws_access_key_id=settings.r2_access_key, aws_secret_access_key=settings.r2_secret_key, region_name='auto', # R2 doesn't use regions ) self.bucket_name = settings.r2_bucket_name self.folder = "psyadgenesis" def upload_image( self, image_bytes: bytes, filename: Optional[str] = None, niche: Optional[str] = None, ) -> str: """ Upload image to R2. Args: image_bytes: Image file bytes filename: Optional filename (if not provided, generates one) niche: Optional niche name for filename generation Returns: Public URL of the uploaded image """ # Generate filename if not provided if not filename: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = str(uuid.uuid4())[:8] niche_prefix = niche or "ad" filename = f"{niche_prefix}_{timestamp}_{unique_id}.png" # Construct R2 key (path in bucket) r2_key = f"{self.folder}/{filename}" try: # Upload to R2 # Note: R2 doesn't support ACL parameter, use custom domain or presigned URLs for public access self.s3_client.put_object( Bucket=self.bucket_name, Key=r2_key, Body=image_bytes, ContentType='image/png', ) # Construct public URL # R2 public URLs work best with a custom domain # If custom domain is set via R2_PUBLIC_DOMAIN, use it # Otherwise, generate a presigned URL (valid for 1 year) # Note: R2 presigned URLs from boto3 include bucket name in path, which is correct for R2 if hasattr(settings, 'r2_public_domain') and settings.r2_public_domain: public_url = f"https://{settings.r2_public_domain}/{r2_key}" else: # Generate presigned URL (valid for 1 week - R2 maximum) # R2 presigned URLs include bucket name in path - this is correct for R2 # R2 limits presigned URLs to max 1 week (604800 seconds) public_url = self.s3_client.generate_presigned_url( 'get_object', Params={'Bucket': self.bucket_name, 'Key': r2_key}, ExpiresIn=604800 # 1 week (R2 maximum) ) print(f"Successfully uploaded image to R2: {r2_key}") print(f"R2 URL: {public_url}") return public_url except (ClientError, BotoCoreError) as e: print(f"Error uploading to R2: {e}") raise Exception(f"Failed to upload image to R2: {str(e)}") def get_public_url(self, filename: str) -> str: """ Get public URL for an image in R2. Args: filename: Filename in R2 Returns: Public URL """ r2_key = f"{self.folder}/{filename}" if hasattr(settings, 'r2_public_domain') and settings.r2_public_domain: return f"https://{settings.r2_public_domain}/{r2_key}" else: # Generate presigned URL (valid for 1 week - R2 maximum) # R2 presigned URLs include bucket name in path - this is correct for R2 # R2 limits presigned URLs to max 1 week (604800 seconds) return self.s3_client.generate_presigned_url( 'get_object', Params={'Bucket': self.bucket_name, 'Key': r2_key}, ExpiresIn=604800 # 1 week (R2 maximum) ) # Global instance (will be None if R2 not configured) r2_storage: Optional[R2StorageService] = None def get_r2_storage() -> Optional[R2StorageService]: """Get R2 storage service instance, creating it if needed.""" global r2_storage if r2_storage is None: try: r2_storage = R2StorageService() except (ImportError, ValueError) as e: print(f"R2 storage not available: {e}") return None return r2_storage