File size: 6,068 Bytes
f201243 b8b7791 f201243 80f0f30 f201243 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | """Cloudflare R2 storage service for saving generated images."""
import os
import sys
from typing import Optional
from datetime import datetime
import uuid
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
import boto3
from botocore.exceptions import ClientError, BotoCoreError
BOTO3_AVAILABLE = True
except ImportError:
BOTO3_AVAILABLE = False
print("Warning: boto3 not installed. R2 storage will not be available.")
from config import settings
class R2StorageService:
"""Service for uploading images to Cloudflare R2."""
def __init__(self):
"""Initialize R2 client."""
if not BOTO3_AVAILABLE:
raise ImportError("boto3 is required for R2 storage. Install it with: pip install boto3")
if not all([
settings.r2_endpoint,
settings.r2_bucket_name,
settings.r2_access_key,
settings.r2_secret_key,
]):
raise ValueError("R2 credentials not configured. Please set R2_ENDPOINT, R2_BUCKET_NAME, R2_ACCESS_KEY, and R2_SECRET_KEY in .env")
# Initialize S3-compatible client for R2
self.s3_client = boto3.client(
's3',
endpoint_url=settings.r2_endpoint,
aws_access_key_id=settings.r2_access_key,
aws_secret_access_key=settings.r2_secret_key,
region_name='auto', # R2 doesn't use regions
)
self.bucket_name = settings.r2_bucket_name
self.folder = "psyadgenesis"
def upload_image(
self,
image_bytes: bytes,
filename: Optional[str] = None,
niche: Optional[str] = None,
) -> str:
"""
Upload image to R2.
Args:
image_bytes: Image file bytes
filename: Optional filename (if not provided, generates one)
niche: Optional niche name for filename generation
Returns:
Public URL of the uploaded image
"""
# Generate filename if not provided
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
niche_prefix = niche or "ad"
filename = f"{niche_prefix}_{timestamp}_{unique_id}.png"
# Construct R2 key (path in bucket)
r2_key = f"{self.folder}/{filename}"
try:
# Upload to R2
# Note: R2 doesn't support ACL parameter, use custom domain or presigned URLs for public access
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=r2_key,
Body=image_bytes,
ContentType='image/png',
)
# Construct public URL
# R2 public URLs work best with a custom domain
# If custom domain is set via R2_PUBLIC_DOMAIN, use it
# Otherwise, generate a presigned URL (valid for 1 year)
# Note: R2 presigned URLs from boto3 include bucket name in path, which is correct for R2
if hasattr(settings, 'r2_public_domain') and settings.r2_public_domain:
public_url = f"https://{settings.r2_public_domain}/{r2_key}"
else:
# Generate presigned URL (valid for 1 week - R2 maximum)
# R2 presigned URLs include bucket name in path - this is correct for R2
# R2 limits presigned URLs to max 1 week (604800 seconds)
public_url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': r2_key},
ExpiresIn=604800 # 1 week (R2 maximum)
)
print(f"Successfully uploaded image to R2: {r2_key}")
print(f"R2 URL: {public_url}")
return public_url
except (ClientError, BotoCoreError) as e:
print(f"Error uploading to R2: {e}")
raise Exception(f"Failed to upload image to R2: {str(e)}")
def get_object_bytes(self, filename: str) -> Optional[bytes]:
"""
Download image bytes from R2 by filename (server-side).
Use this to proxy images when the client cannot reach R2 (e.g. ERR_TUNNEL_CONNECTION_FAILED).
"""
r2_key = f"{self.folder}/{filename}"
try:
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=r2_key)
return response["Body"].read()
except (ClientError, BotoCoreError):
return None
def get_public_url(self, filename: str) -> str:
"""
Get public URL for an image in R2.
Args:
filename: Filename in R2
Returns:
Public URL
"""
r2_key = f"{self.folder}/{filename}"
if hasattr(settings, 'r2_public_domain') and settings.r2_public_domain:
return f"https://{settings.r2_public_domain}/{r2_key}"
else:
# Generate presigned URL (valid for 1 week - R2 maximum)
# R2 presigned URLs include bucket name in path - this is correct for R2
# R2 limits presigned URLs to max 1 week (604800 seconds)
return self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': r2_key},
ExpiresIn=604800 # 1 week (R2 maximum)
)
# Global instance (will be None if R2 not configured)
r2_storage: Optional[R2StorageService] = None
def get_r2_storage() -> Optional[R2StorageService]:
"""Get R2 storage service instance, creating it if needed."""
global r2_storage
if r2_storage is None:
try:
r2_storage = R2StorageService()
except (ImportError, ValueError) as e:
print(f"R2 storage not available: {e}")
return None
return r2_storage
|