File size: 3,670 Bytes
2f9de1b c4f9173 2f9de1b c4f9173 2f9de1b c4f9173 2f9de1b c4f9173 2f9de1b c4f9173 2f9de1b c4f9173 2f9de1b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | """
Image Processing Utilities
Handles image compression, conversion, and storage
"""
import base64
import re
from io import BytesIO
from PIL import Image
import uuid
from datetime import datetime
import os
from pathlib import Path
from utils.storage import temp_images
IMAGE_STORAGE_DIR = Path("storage/images")
IMAGE_TTL_SECONDS = 3600
async def compress_and_store_image(
data_url: str,
public_url: str,
max_width: int = 960,
max_height: int = 540,
quality: int = 70
) -> str:
"""
Compress image from data URL and return public URL
Args:
data_url: Base64 data URL (data:image/...;base64,...)
public_url: Base URL for the server
max_width: Maximum width for resizing
max_height: Maximum height for resizing
quality: JPEG quality (1-100)
Returns:
Public URL to access the compressed image
"""
try:
# Extract base64 data from data URL
matches = re.match(r'^data:image/[a-zA-Z]+;base64,(.+)$', data_url)
if not matches:
raise ValueError('Invalid data URL format')
base64_data = matches.group(1)
image_bytes = base64.b64decode(base64_data)
# Open image with PIL
image = Image.open(BytesIO(image_bytes))
# Convert RGBA to RGB if necessary
if image.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', image.size, (255, 255, 255))
if image.mode == 'P':
image = image.convert('RGBA')
background.paste(image, mask=image.split()[-1] if image.mode in ('RGBA', 'LA') else None)
image = background
# Resize maintaining aspect ratio
image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
# Compress to JPEG
output = BytesIO()
image.save(output, format='JPEG', quality=quality, optimize=True)
compressed_buffer = output.getvalue()
# Generate unique ID
image_id = f"img_{int(datetime.now().timestamp())}_{uuid.uuid4().hex[:9]}"
# Store in memory
temp_images[image_id] = {
'buffer': compressed_buffer,
'timestamp': datetime.now().timestamp(),
'content_type': 'image/jpeg'
}
# Persist to disk so URLs survive process reloads.
IMAGE_STORAGE_DIR.mkdir(parents=True, exist_ok=True)
(IMAGE_STORAGE_DIR / f"{image_id}.jpg").write_bytes(compressed_buffer)
# Clean up old images (older than 1 hour)
cleanup_old_images()
# Return public URL
return f"{public_url}/api/images/{image_id}"
except Exception as e:
print(f"❌ Image compression error: {str(e)}")
raise
def cleanup_old_images():
"""Remove images older than 1 hour from memory and disk."""
current_time = datetime.now().timestamp()
to_remove = []
for image_id, data in temp_images.items():
if current_time - data['timestamp'] > IMAGE_TTL_SECONDS:
to_remove.append(image_id)
for image_id in to_remove:
del temp_images[image_id]
# Cleanup disk cache by file mtime too, so stale files are removed
# even if this process has no in-memory entry.
if IMAGE_STORAGE_DIR.exists():
for p in IMAGE_STORAGE_DIR.glob("img_*.jpg"):
try:
if current_time - p.stat().st_mtime > IMAGE_TTL_SECONDS:
p.unlink(missing_ok=True)
except Exception:
continue
if to_remove:
print(f"🧹 Cleaned up {len(to_remove)} old images")
|