Spaces:
Sleeping
Sleeping
File size: 2,947 Bytes
91d209c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
"""
Image Processing Utilities
Handles image compression, conversion, and storage
"""
import base64
import re
from io import BytesIO
from PIL import Image
import uuid
from datetime import datetime
import os
from utils.storage import temp_images
async def compress_and_store_image(
data_url: str,
public_url: str,
max_width: int = 960,
max_height: int = 540,
quality: int = 70
) -> str:
"""
Compress image from data URL and return public URL
Args:
data_url: Base64 data URL (data:image/...;base64,...)
public_url: Base URL for the server
max_width: Maximum width for resizing
max_height: Maximum height for resizing
quality: JPEG quality (1-100)
Returns:
Public URL to access the compressed image
"""
try:
# Extract base64 data from data URL
matches = re.match(r'^data:image/[a-zA-Z]+;base64,(.+)$', data_url)
if not matches:
raise ValueError('Invalid data URL format')
base64_data = matches.group(1)
image_bytes = base64.b64decode(base64_data)
# Open image with PIL
image = Image.open(BytesIO(image_bytes))
# Convert RGBA to RGB if necessary
if image.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', image.size, (255, 255, 255))
if image.mode == 'P':
image = image.convert('RGBA')
background.paste(image, mask=image.split()[-1] if image.mode in ('RGBA', 'LA') else None)
image = background
# Resize maintaining aspect ratio
image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
# Compress to JPEG
output = BytesIO()
image.save(output, format='JPEG', quality=quality, optimize=True)
compressed_buffer = output.getvalue()
# Generate unique ID
image_id = f"img_{int(datetime.now().timestamp())}_{uuid.uuid4().hex[:9]}"
# Store in memory
temp_images[image_id] = {
'buffer': compressed_buffer,
'timestamp': datetime.now().timestamp(),
'content_type': 'image/jpeg'
}
# Clean up old images (older than 1 hour)
cleanup_old_images()
# Return public URL
return f"{public_url}/api/images/{image_id}"
except Exception as e:
print(f"❌ Image compression error: {str(e)}")
raise
def cleanup_old_images():
"""Remove images older than 1 hour"""
current_time = datetime.now().timestamp()
to_remove = []
for image_id, data in temp_images.items():
if current_time - data['timestamp'] > 3600: # 1 hour
to_remove.append(image_id)
for image_id in to_remove:
del temp_images[image_id]
if to_remove:
print(f"🧹 Cleaned up {len(to_remove)} old images")
|