Esmaill1
Enhance image processing: use PNG for lossless intermediate steps and improve resizing quality with Lanczos resampling
bdef2db | """ | |
| EL HELAL Studio β Web Backend (FastAPI) | |
| Integrated with Auto-Cleanup and Custom Cropping | |
| """ | |
| from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks | |
| from fastapi.responses import JSONResponse, FileResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from contextlib import asynccontextmanager | |
| import uvicorn | |
| import shutil | |
| import os | |
| import json | |
| import uuid | |
| import zipfile | |
| import io | |
| from pathlib import Path | |
| from PIL import Image | |
| import threading | |
| import sys | |
| import asyncio | |
| import time | |
| # Add core directory to python path | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'core'))) | |
| # Import existing tools | |
| import crop | |
| import process_images | |
| import color_steal | |
| import retouch | |
| import restoration | |
| from layout_engine import generate_layout, load_settings | |
| # Setup Directories | |
| WEB_DIR = Path(os.path.dirname(__file__)) / "web_storage" | |
| ROOT_DIR = Path(os.path.dirname(__file__)).parent | |
| STORAGE_DIR = ROOT_DIR / "storage" | |
| ASSETS_DIR = ROOT_DIR / "assets" | |
| UPLOAD_DIR = STORAGE_DIR / "uploads" | |
| PROCESSED_DIR = STORAGE_DIR / "processed" | |
| RESULT_DIR = STORAGE_DIR / "results" | |
| for d in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: | |
| d.mkdir(parents=True, exist_ok=True) | |
| # Global Model State | |
| models = { | |
| "model": None, | |
| "transform": None, | |
| "restoration": None, | |
| "luts": color_steal.load_trained_curves(), | |
| "ready": False | |
| } | |
| def warm_up_ai(): | |
| print("AI Model: Loading in background...") | |
| try: | |
| models["model"], _ = process_images.setup_model() | |
| models["transform"] = process_images.get_transform() | |
| print("Restoration Model: API Mode Active") | |
| # No local restoration model initialization needed | |
| models["ready"] = True | |
| print("AI Model: READY") | |
| except Exception as e: | |
| print(f"AI Model: FAILED to load - {e}") | |
| async def cleanup_task(): | |
| """Background task to delete files older than 24 hours.""" | |
| while True: | |
| print("Cleanup: Checking for old files...") | |
| now = time.time() | |
| count = 0 | |
| for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: | |
| for path in folder.glob("*"): | |
| if path.is_file() and (now - path.stat().st_mtime) > 86400: # 24 hours | |
| path.unlink() | |
| count += 1 | |
| if count > 0: print(f"Cleanup: Removed {count} old files.") | |
| await asyncio.sleep(3600) # Run every hour | |
| async def lifespan(app: FastAPI): | |
| # Startup | |
| threading.Thread(target=warm_up_ai, daemon=True).start() | |
| asyncio.create_task(cleanup_task()) | |
| yield | |
| # Shutdown | |
| pass | |
| app = FastAPI(title="EL HELAL Studio API", lifespan=lifespan) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ββ API Endpoints ββ | |
| async def read_index(): | |
| return FileResponse(WEB_DIR / "index.html") | |
| async def get_status(): | |
| return {"ai_ready": models["ready"]} | |
| async def upload_image(file: UploadFile = File(...)): | |
| file_id = str(uuid.uuid4()) | |
| ext = Path(file.filename).suffix | |
| file_path = UPLOAD_DIR / f"{file_id}{ext}" | |
| with file_path.open("wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| with Image.open(file_path) as img: | |
| from PIL import ImageOps | |
| # FIX: Handle EXIF orientation (rotation) | |
| img = ImageOps.exif_transpose(img) | |
| # Get original dimensions after transposition for the web cropper | |
| width, height = img.size | |
| # Create a faster, smaller thumbnail for the UI (200x200 is plenty for the 72px grid) | |
| img.thumbnail((200, 200), Image.BILINEAR) | |
| thumb_path = UPLOAD_DIR / f"{file_id}_thumb.jpg" | |
| if img.mode in ("RGBA", "LA"): | |
| bg = Image.new("RGB", img.size, (255, 255, 255)) | |
| bg.paste(img, mask=img.split()[-1]) | |
| bg.save(thumb_path, "JPEG", quality=60) | |
| else: | |
| img.convert("RGB").save(thumb_path, "JPEG", quality=60) | |
| return { | |
| "id": file_id, | |
| "filename": file.filename, | |
| "thumb_url": f"/static/uploads/{file_id}_thumb.jpg", | |
| "width": width, | |
| "height": height | |
| } | |
| async def process_image( | |
| file_id: str, | |
| name: str = Form(""), | |
| id_number: str = Form(""), | |
| # Steps toggles | |
| do_restore: bool = Form(False), | |
| fidelity: float = Form(0.5), | |
| do_rmbg: bool = Form(True), | |
| do_color: bool = Form(True), | |
| do_retouch: bool = Form(True), | |
| do_crop: bool = Form(True), | |
| # Branding toggles | |
| add_studio_name: bool = Form(True), | |
| add_logo: bool = Form(True), | |
| add_date: bool = Form(True), | |
| # Layout customization | |
| frame_color: str = Form(None), | |
| frame_name: str = Form(None), | |
| # Optional manual crop coordinates | |
| x1: int = Form(None), | |
| y1: int = Form(None), | |
| x2: int = Form(None), | |
| y2: int = Form(None) | |
| ): | |
| if not models["ready"]: | |
| return JSONResponse(status_code=503, content={"error": "AI Model not ready"}) | |
| files = list(UPLOAD_DIR.glob(f"{file_id}.*")) | |
| if not files: return JSONResponse(status_code=404, content={"error": "File not found"}) | |
| orig_path = files[0] | |
| # Parse Frame Color | |
| layout_color = None | |
| if frame_color and frame_color.startswith("#"): | |
| try: | |
| c = frame_color.lstrip("#") | |
| layout_color = tuple(int(c[i:i+2], 16) for i in (0, 2, 4)) | |
| except: | |
| pass | |
| try: | |
| # 0. FACE RESTORATION (Step 0) | |
| current_source_path = orig_path | |
| if do_restore: | |
| print(f"Pipeline: Restoring face for {file_id} (Fidelity: {fidelity})...") | |
| restored_img_pil = restoration.restore_image(str(orig_path), fidelity=fidelity, return_pil=True) | |
| # We use a PIL Image for restoration result | |
| # We don't necessarily need to save it, but let's keep it in memory | |
| # Actually, the next step (crop) might expect a path or PIL image | |
| source_img = restored_img_pil | |
| else: | |
| source_img = Image.open(orig_path) | |
| from PIL import ImageOps | |
| source_img = ImageOps.exif_transpose(source_img) | |
| # Use PNG for intermediate crop to prevent generation loss | |
| temp_crop = PROCESSED_DIR / f"{file_id}_processed_crop.png" | |
| # 1. CROP (Manual, Auto, or Skip) | |
| if x1 is not None and y1 is not None: | |
| print(f"Pipeline: Applying manual crop for {file_id} | Rect: ({x1}, {y1}, {x2}, {y2})") | |
| rect = (x1, y1, x2, y2) | |
| # If we used restoration, we need to apply crop to the PIL image | |
| if do_restore: | |
| # Save restored image as PNG for lossless intermediate step | |
| restored_temp = PROCESSED_DIR / f"{file_id}_restored.png" | |
| source_img.save(restored_temp, "PNG") | |
| crop.apply_custom_crop(str(restored_temp), str(temp_crop), rect) | |
| cropped_img = Image.open(temp_crop) | |
| if restored_temp.exists(): restored_temp.unlink() | |
| else: | |
| crop.apply_custom_crop(str(orig_path), str(temp_crop), rect) | |
| cropped_img = Image.open(temp_crop) | |
| elif do_crop: | |
| print(f"Pipeline: Applying auto crop for {file_id}...") | |
| if do_restore: | |
| restored_temp = PROCESSED_DIR / f"{file_id}_restored.png" | |
| source_img.save(restored_temp, "PNG") | |
| crop.crop_to_4x6_opencv(str(restored_temp), str(temp_crop)) | |
| cropped_img = Image.open(temp_crop) | |
| if restored_temp.exists(): restored_temp.unlink() | |
| else: | |
| crop.crop_to_4x6_opencv(str(orig_path), str(temp_crop)) | |
| cropped_img = Image.open(temp_crop) | |
| else: | |
| print(f"Pipeline: Skipping crop for {file_id}") | |
| cropped_img = source_img | |
| # 2. BACKGROUND REMOVAL | |
| if do_rmbg: | |
| print(f"Pipeline: Removing background for {file_id}...") | |
| processed_img = process_images.remove_background(models["model"], cropped_img, models["transform"]) | |
| print(f"Pipeline: BG Removal Done. Image Mode: {processed_img.mode}") | |
| else: | |
| print(f"Pipeline: Skipping background removal for {file_id}") | |
| processed_img = cropped_img | |
| # 3. COLOR GRADING | |
| if do_color and models["luts"]: | |
| print(f"Pipeline: Applying color grading for {file_id}...") | |
| graded_img = color_steal.apply_to_image(models["luts"], processed_img) | |
| print(f"Pipeline: Color Grading Done. Image Mode: {graded_img.mode}") | |
| else: | |
| print(f"Pipeline: Skipping color grading for {file_id}") | |
| graded_img = processed_img | |
| # 4. RETOUCH | |
| current_settings = load_settings() | |
| # Retouch happens if BOTH the UI checkbox is checked AND it's enabled in global settings | |
| if do_retouch and current_settings.get("retouch", {}).get("enabled", False): | |
| retouch_cfg = current_settings["retouch"] | |
| print(f"Pipeline: Retouching face for {file_id} (Sensitivity: {retouch_cfg.get('sensitivity', 3.0)})") | |
| final_processed, count = retouch.retouch_image_pil( | |
| graded_img, | |
| sensitivity=retouch_cfg.get("sensitivity", 3.0), | |
| tone_smoothing=retouch_cfg.get("tone_smoothing", 0.6) | |
| ) | |
| print(f"Pipeline: Retouch Done. Blemishes: {count}. Image Mode: {final_processed.mode}") | |
| else: | |
| print(f"Pipeline: Retouching skipped for {file_id}") | |
| final_processed = graded_img | |
| print(f"Pipeline: Generating final layout for {file_id}...") | |
| final_layout = generate_layout( | |
| final_processed, name, id_number, | |
| add_studio_name=add_studio_name, | |
| add_logo=add_logo, | |
| add_date=add_date, | |
| frame_color=layout_color, | |
| frame_name=frame_name | |
| ) | |
| result_path = RESULT_DIR / f"{file_id}_layout.jpg" | |
| # Save high-quality JPEG (100% quality, no chroma subsampling) | |
| final_layout.save(result_path, "JPEG", quality=100, subsampling=0, dpi=(300, 300)) | |
| # 5. Generate a lightweight WEB PREVIEW (max 900px width) for the UI | |
| preview_path = RESULT_DIR / f"{file_id}_preview.jpg" | |
| pw, ph = final_layout.size | |
| p_scale = 900 / pw if pw > 900 else 1.0 | |
| if p_scale < 1.0: | |
| preview_img = final_layout.resize((int(pw * p_scale), int(ph * p_scale)), Image.BILINEAR) | |
| preview_img.save(preview_path, "JPEG", quality=70) | |
| else: | |
| final_layout.save(preview_path, "JPEG", quality=70) | |
| if temp_crop.exists(): temp_crop.unlink() | |
| return { | |
| "id": file_id, | |
| "result_url": f"/static/results/{file_id}_layout.jpg", | |
| "preview_url": f"/static/results/{file_id}_preview.jpg" | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def clear_all(): | |
| """Manually clear all uploaded and processed files.""" | |
| count = 0 | |
| try: | |
| for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: | |
| for path in folder.glob("*"): | |
| if path.is_file() and not path.name.endswith(".gitkeep"): | |
| path.unlink() | |
| count += 1 | |
| return {"status": "success", "removed_count": count} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": f"Failed to clear storage: {str(e)}"}) | |
| async def list_frames(): | |
| """List available frame overlays.""" | |
| frames = [] | |
| if ASSETS_DIR.exists(): | |
| for f in ASSETS_DIR.glob("frame-*.png"): | |
| frames.append({ | |
| "filename": f.name, | |
| "url": f"/assets/{f.name}" | |
| }) | |
| return {"frames": frames} | |
| async def upload_frame(file: UploadFile = File(...)): | |
| """Upload a new custom frame.""" | |
| if not ASSETS_DIR.exists(): ASSETS_DIR.mkdir(parents=True) | |
| # Validation | |
| if file.content_type not in ["image/png", "image/jpeg", "image/webp"]: | |
| return JSONResponse(status_code=400, content={"error": "Invalid file type. Use PNG/JPG."}) | |
| ext = Path(file.filename).suffix | |
| frame_id = f"frame-{uuid.uuid4().hex[:8]}" | |
| filename = f"{frame_id}{ext}" if ext else f"{frame_id}.png" | |
| file_path = ASSETS_DIR / filename | |
| try: | |
| with file_path.open("wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| return { | |
| "status": "success", | |
| "frame": { | |
| "filename": filename, | |
| "url": f"/assets/{filename}" | |
| } | |
| } | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def delete_frame(filename: str): | |
| """Delete a frame file.""" | |
| # Security check: prevent directory traversal and ensure it's a frame file | |
| if ".." in filename or "/" in filename or "\\" in filename: | |
| return JSONResponse(status_code=400, content={"error": "Invalid filename"}) | |
| if not filename.startswith("frame-"): | |
| return JSONResponse(status_code=400, content={"error": "Can only delete frame files"}) | |
| file_path = ASSETS_DIR / filename | |
| if not file_path.exists(): | |
| return JSONResponse(status_code=404, content={"error": "Frame not found"}) | |
| try: | |
| file_path.unlink() | |
| return {"status": "success"} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| # ββ Backup & Restore API ββ | |
| async def export_backup(client_data: dict): | |
| """Export settings, assets, and client data as a ZIP file.""" | |
| mem_zip = io.BytesIO() | |
| with zipfile.ZipFile(mem_zip, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: | |
| # 1. Config | |
| if SETTINGS_PATH.exists(): | |
| zf.write(SETTINGS_PATH, arcname="settings.json") | |
| # 2. Assets (Frames, Logos) | |
| if ASSETS_DIR.exists(): | |
| for f in ASSETS_DIR.glob("*"): | |
| if f.is_file(): | |
| zf.write(f, arcname=f"assets/{f.name}") | |
| # 3. Client Data | |
| zf.writestr("client_data.json", json.dumps(client_data, indent=2)) | |
| mem_zip.seek(0) | |
| filename = f"studio_backup_{int(time.time())}.zip" | |
| return StreamingResponse( | |
| mem_zip, | |
| media_type="application/zip", | |
| headers={"Content-Disposition": f"attachment; filename={filename}"} | |
| ) | |
| async def import_backup(file: UploadFile = File(...)): | |
| """Import a backup ZIP file.""" | |
| if not file.filename.endswith(".zip"): | |
| return JSONResponse(status_code=400, content={"error": "Must be a .zip file"}) | |
| try: | |
| content = await file.read() | |
| with zipfile.ZipFile(io.BytesIO(content)) as zf: | |
| # 1. Restore Config | |
| if "settings.json" in zf.namelist(): | |
| # Ensure config dir exists | |
| SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| with open(SETTINGS_PATH, "wb") as f: | |
| f.write(zf.read("settings.json")) | |
| # 2. Restore Assets | |
| if not ASSETS_DIR.exists(): ASSETS_DIR.mkdir(parents=True, exist_ok=True) | |
| for name in zf.namelist(): | |
| if name.startswith("assets/") and not name.endswith("/"): | |
| # Safe extraction: ignore directory traversal attempts | |
| clean_name = os.path.basename(name) | |
| if clean_name: | |
| with open(ASSETS_DIR / clean_name, "wb") as f: | |
| f.write(zf.read(name)) | |
| # 3. Read Client Data | |
| client_data = {} | |
| if "client_data.json" in zf.namelist(): | |
| client_data = json.loads(zf.read("client_data.json")) | |
| return {"status": "success", "client_data": client_data} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": f"Import failed: {str(e)}"}) | |
| # ββ Settings API ββ | |
| SETTINGS_PATH = ROOT_DIR / "config" / "settings.json" | |
| async def get_settings(): | |
| """Return current settings.json contents.""" | |
| try: | |
| if SETTINGS_PATH.exists(): | |
| with open(SETTINGS_PATH, "r") as f: | |
| return json.load(f) | |
| return {} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def update_settings(data: dict): | |
| """Merge incoming settings into settings.json (partial update).""" | |
| try: | |
| current = {} | |
| if SETTINGS_PATH.exists(): | |
| with open(SETTINGS_PATH, "r") as f: | |
| current = json.load(f) | |
| # Deep merge one level | |
| for key, val in data.items(): | |
| if key in current and isinstance(val, dict) and isinstance(current[key], dict): | |
| current[key].update(val) | |
| else: | |
| current[key] = val | |
| SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| with open(SETTINGS_PATH, "w") as f: | |
| json.dump(current, f, indent=4, ensure_ascii=False) | |
| return {"status": "success"} | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| app.mount("/static", StaticFiles(directory=str(STORAGE_DIR)), name="static") | |
| if ASSETS_DIR.exists(): | |
| app.mount("/assets", StaticFiles(directory=str(ASSETS_DIR)), name="assets") | |
| if __name__ == "__main__": | |
| # Hugging Face Spaces uses port 7860 by default | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |