| """ |
| Image Processing Utilities |
| Handles image compression, conversion, and storage |
| """ |
|
|
| import base64 |
| import re |
| from io import BytesIO |
| from PIL import Image |
| import uuid |
| from datetime import datetime |
| import os |
| from pathlib import Path |
|
|
| from utils.storage import temp_images |
|
|
| IMAGE_STORAGE_DIR = Path("storage/images") |
| IMAGE_TTL_SECONDS = 3600 |
|
|
|
|
| async def compress_and_store_image( |
| data_url: str, |
| public_url: str, |
| max_width: int = 960, |
| max_height: int = 540, |
| quality: int = 70 |
| ) -> str: |
| """ |
| Compress image from data URL and return public URL |
| |
| Args: |
| data_url: Base64 data URL (data:image/...;base64,...) |
| public_url: Base URL for the server |
| max_width: Maximum width for resizing |
| max_height: Maximum height for resizing |
| quality: JPEG quality (1-100) |
| |
| Returns: |
| Public URL to access the compressed image |
| """ |
| try: |
| |
| matches = re.match(r'^data:image/[a-zA-Z]+;base64,(.+)$', data_url) |
| if not matches: |
| raise ValueError('Invalid data URL format') |
| |
| base64_data = matches.group(1) |
| image_bytes = base64.b64decode(base64_data) |
| |
| |
| image = Image.open(BytesIO(image_bytes)) |
| |
| |
| if image.mode in ('RGBA', 'LA', 'P'): |
| background = Image.new('RGB', image.size, (255, 255, 255)) |
| if image.mode == 'P': |
| image = image.convert('RGBA') |
| background.paste(image, mask=image.split()[-1] if image.mode in ('RGBA', 'LA') else None) |
| image = background |
| |
| |
| image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS) |
| |
| |
| output = BytesIO() |
| image.save(output, format='JPEG', quality=quality, optimize=True) |
| compressed_buffer = output.getvalue() |
| |
| |
| image_id = f"img_{int(datetime.now().timestamp())}_{uuid.uuid4().hex[:9]}" |
| |
| |
| temp_images[image_id] = { |
| 'buffer': compressed_buffer, |
| 'timestamp': datetime.now().timestamp(), |
| 'content_type': 'image/jpeg' |
| } |
|
|
| |
| IMAGE_STORAGE_DIR.mkdir(parents=True, exist_ok=True) |
| (IMAGE_STORAGE_DIR / f"{image_id}.jpg").write_bytes(compressed_buffer) |
| |
| |
| cleanup_old_images() |
| |
| |
| return f"{public_url}/api/images/{image_id}" |
| |
| except Exception as e: |
| print(f"❌ Image compression error: {str(e)}") |
| raise |
|
|
| def cleanup_old_images(): |
| """Remove images older than 1 hour from memory and disk.""" |
| current_time = datetime.now().timestamp() |
| to_remove = [] |
| |
| for image_id, data in temp_images.items(): |
| if current_time - data['timestamp'] > IMAGE_TTL_SECONDS: |
| to_remove.append(image_id) |
| |
| for image_id in to_remove: |
| del temp_images[image_id] |
|
|
| |
| |
| if IMAGE_STORAGE_DIR.exists(): |
| for p in IMAGE_STORAGE_DIR.glob("img_*.jpg"): |
| try: |
| if current_time - p.stat().st_mtime > IMAGE_TTL_SECONDS: |
| p.unlink(missing_ok=True) |
| except Exception: |
| continue |
| |
| if to_remove: |
| print(f"🧹 Cleaned up {len(to_remove)} old images") |
|
|
|
|