| """
|
| EL HELAL Studio β Web Backend (FastAPI)
|
| Integrated with Auto-Cleanup and Custom Cropping
|
| """
|
|
|
| from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks
|
| from fastapi.responses import JSONResponse, FileResponse, StreamingResponse
|
| from fastapi.staticfiles import StaticFiles
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from contextlib import asynccontextmanager
|
| import uvicorn
|
| import shutil
|
| import os
|
| import json
|
| import uuid
|
| import zipfile
|
| import io
|
| from pathlib import Path
|
| from PIL import Image
|
| import threading
|
| import sys
|
| import asyncio
|
| import time
|
|
|
|
|
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'core')))
|
|
|
|
|
| import crop
|
| import process_images
|
| import color_steal
|
| import retouch
|
| import restoration
|
| from layout_engine import generate_layout, load_settings
|
|
|
|
|
| WEB_DIR = Path(os.path.dirname(__file__)) / "web_storage"
|
| ROOT_DIR = Path(os.path.dirname(__file__)).parent
|
| STORAGE_DIR = ROOT_DIR / "storage"
|
| ASSETS_DIR = ROOT_DIR / "assets"
|
|
|
| UPLOAD_DIR = STORAGE_DIR / "uploads"
|
| PROCESSED_DIR = STORAGE_DIR / "processed"
|
| RESULT_DIR = STORAGE_DIR / "results"
|
|
|
| for d in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]:
|
| d.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| models = {
|
| "model": None,
|
| "transform": None,
|
| "restoration": None,
|
| "luts": color_steal.load_trained_curves(),
|
| "ready": False
|
| }
|
|
|
| def warm_up_ai():
|
| print("AI Model: Loading in background...")
|
| try:
|
| models["model"], _ = process_images.setup_model()
|
| models["transform"] = process_images.get_transform()
|
| print("Restoration Model: API Mode Active")
|
|
|
| models["ready"] = True
|
| print("AI Model: READY")
|
| except Exception as e:
|
| print(f"AI Model: FAILED to load - {e}")
|
|
|
| async def cleanup_task():
|
| """Background task to delete files older than 24 hours."""
|
| while True:
|
| print("Cleanup: Checking for old files...")
|
| now = time.time()
|
| count = 0
|
| for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]:
|
| for path in folder.glob("*"):
|
| if path.is_file() and (now - path.stat().st_mtime) > 86400:
|
| path.unlink()
|
| count += 1
|
| if count > 0: print(f"Cleanup: Removed {count} old files.")
|
| await asyncio.sleep(3600)
|
|
|
| @asynccontextmanager
|
| async def lifespan(app: FastAPI):
|
|
|
| threading.Thread(target=warm_up_ai, daemon=True).start()
|
| asyncio.create_task(cleanup_task())
|
| yield
|
|
|
| pass
|
|
|
| app = FastAPI(title="EL HELAL Studio API", lifespan=lifespan)
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
|
|
| @app.get("/")
|
| async def read_index():
|
| return FileResponse(WEB_DIR / "index.html")
|
|
|
| @app.get("/status")
|
| async def get_status():
|
| return {"ai_ready": models["ready"]}
|
|
|
| @app.post("/upload")
|
| async def upload_image(file: UploadFile = File(...)):
|
| file_id = str(uuid.uuid4())
|
| ext = Path(file.filename).suffix
|
| file_path = UPLOAD_DIR / f"{file_id}{ext}"
|
|
|
| with file_path.open("wb") as buffer:
|
| shutil.copyfileobj(file.file, buffer)
|
|
|
| with Image.open(file_path) as img:
|
| from PIL import ImageOps
|
|
|
| img = ImageOps.exif_transpose(img)
|
|
|
|
|
| width, height = img.size
|
|
|
|
|
| img.thumbnail((200, 200), Image.BILINEAR)
|
| thumb_path = UPLOAD_DIR / f"{file_id}_thumb.jpg"
|
| if img.mode in ("RGBA", "LA"):
|
| bg = Image.new("RGB", img.size, (255, 255, 255))
|
| bg.paste(img, mask=img.split()[-1])
|
| bg.save(thumb_path, "JPEG", quality=60)
|
| else:
|
| img.convert("RGB").save(thumb_path, "JPEG", quality=60)
|
| return {
|
| "id": file_id,
|
| "filename": file.filename,
|
| "thumb_url": f"/static/uploads/{file_id}_thumb.jpg",
|
| "width": width,
|
| "height": height
|
| }
|
|
|
| @app.post("/process/{file_id}")
|
| async def process_image(
|
| file_id: str,
|
| name: str = Form(""),
|
| id_number: str = Form(""),
|
|
|
| do_restore: bool = Form(False),
|
| fidelity: float = Form(0.5),
|
| do_rmbg: bool = Form(True),
|
| do_color: bool = Form(True),
|
| do_retouch: bool = Form(True),
|
| do_crop: bool = Form(True),
|
|
|
| add_studio_name: bool = Form(True),
|
| add_logo: bool = Form(True),
|
| add_date: bool = Form(True),
|
|
|
| frame_color: str = Form(None),
|
| frame_name: str = Form(None),
|
|
|
| x1: int = Form(None),
|
| y1: int = Form(None),
|
| x2: int = Form(None),
|
| y2: int = Form(None)
|
| ):
|
| if not models["ready"]:
|
| return JSONResponse(status_code=503, content={"error": "AI Model not ready"})
|
|
|
| files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
|
| if not files: return JSONResponse(status_code=404, content={"error": "File not found"})
|
| orig_path = files[0]
|
|
|
|
|
| layout_color = None
|
| if frame_color and frame_color.startswith("#"):
|
| try:
|
| c = frame_color.lstrip("#")
|
| layout_color = tuple(int(c[i:i+2], 16) for i in (0, 2, 4))
|
| except:
|
| pass
|
|
|
| try:
|
|
|
| current_source_path = orig_path
|
| if do_restore:
|
| print(f"Pipeline: Restoring face for {file_id} (Fidelity: {fidelity})...")
|
| restored_img_pil = restoration.restore_image(str(orig_path), fidelity=fidelity, return_pil=True)
|
|
|
|
|
|
|
| source_img = restored_img_pil
|
| else:
|
| source_img = Image.open(orig_path)
|
| from PIL import ImageOps
|
| source_img = ImageOps.exif_transpose(source_img)
|
|
|
|
|
| temp_crop = PROCESSED_DIR / f"{file_id}_processed_crop.png"
|
|
|
|
|
| if x1 is not None and y1 is not None:
|
| print(f"Pipeline: Applying manual crop for {file_id} | Rect: ({x1}, {y1}, {x2}, {y2})")
|
| rect = (x1, y1, x2, y2)
|
|
|
| if do_restore:
|
|
|
| restored_temp = PROCESSED_DIR / f"{file_id}_restored.png"
|
| source_img.save(restored_temp, "PNG")
|
| crop.apply_custom_crop(str(restored_temp), str(temp_crop), rect)
|
| cropped_img = Image.open(temp_crop)
|
| if restored_temp.exists(): restored_temp.unlink()
|
| else:
|
| crop.apply_custom_crop(str(orig_path), str(temp_crop), rect)
|
| cropped_img = Image.open(temp_crop)
|
| elif do_crop:
|
| print(f"Pipeline: Applying auto crop for {file_id}...")
|
| if do_restore:
|
| restored_temp = PROCESSED_DIR / f"{file_id}_restored.png"
|
| source_img.save(restored_temp, "PNG")
|
| crop.crop_to_4x6_opencv(str(restored_temp), str(temp_crop))
|
| cropped_img = Image.open(temp_crop)
|
| if restored_temp.exists(): restored_temp.unlink()
|
| else:
|
| crop.crop_to_4x6_opencv(str(orig_path), str(temp_crop))
|
| cropped_img = Image.open(temp_crop)
|
| else:
|
| print(f"Pipeline: Skipping crop for {file_id}")
|
| cropped_img = source_img
|
|
|
|
|
| if do_rmbg:
|
| print(f"Pipeline: Removing background for {file_id}...")
|
| processed_img = process_images.remove_background(models["model"], cropped_img, models["transform"])
|
| print(f"Pipeline: BG Removal Done. Image Mode: {processed_img.mode}")
|
| else:
|
| print(f"Pipeline: Skipping background removal for {file_id}")
|
| processed_img = cropped_img
|
|
|
|
|
| if do_color and models["luts"]:
|
| print(f"Pipeline: Applying color grading for {file_id}...")
|
| graded_img = color_steal.apply_to_image(models["luts"], processed_img)
|
| print(f"Pipeline: Color Grading Done. Image Mode: {graded_img.mode}")
|
| else:
|
| print(f"Pipeline: Skipping color grading for {file_id}")
|
| graded_img = processed_img
|
|
|
|
|
| current_settings = load_settings()
|
|
|
| if do_retouch and current_settings.get("retouch", {}).get("enabled", False):
|
| retouch_cfg = current_settings["retouch"]
|
| print(f"Pipeline: Retouching face for {file_id} (Sensitivity: {retouch_cfg.get('sensitivity', 3.0)})")
|
| final_processed, count = retouch.retouch_image_pil(
|
| graded_img,
|
| sensitivity=retouch_cfg.get("sensitivity", 3.0),
|
| tone_smoothing=retouch_cfg.get("tone_smoothing", 0.6)
|
| )
|
| print(f"Pipeline: Retouch Done. Blemishes: {count}. Image Mode: {final_processed.mode}")
|
| else:
|
| print(f"Pipeline: Retouching skipped for {file_id}")
|
| final_processed = graded_img
|
|
|
| print(f"Pipeline: Generating final layout for {file_id}...")
|
| final_layout = generate_layout(
|
| final_processed, name, id_number,
|
| add_studio_name=add_studio_name,
|
| add_logo=add_logo,
|
| add_date=add_date,
|
| frame_color=layout_color,
|
| frame_name=frame_name
|
| )
|
|
|
| result_path = RESULT_DIR / f"{file_id}_layout.jpg"
|
|
|
| final_layout.save(result_path, "JPEG", quality=100, subsampling=0, dpi=(300, 300))
|
|
|
|
|
| preview_path = RESULT_DIR / f"{file_id}_preview.jpg"
|
| pw, ph = final_layout.size
|
| p_scale = 900 / pw if pw > 900 else 1.0
|
| if p_scale < 1.0:
|
| preview_img = final_layout.resize((int(pw * p_scale), int(ph * p_scale)), Image.BILINEAR)
|
| preview_img.save(preview_path, "JPEG", quality=70)
|
| else:
|
| final_layout.save(preview_path, "JPEG", quality=70)
|
|
|
| if temp_crop.exists(): temp_crop.unlink()
|
|
|
| return {
|
| "id": file_id,
|
| "result_url": f"/static/results/{file_id}_layout.jpg",
|
| "preview_url": f"/static/results/{file_id}_preview.jpg"
|
| }
|
| except Exception as e:
|
| import traceback
|
| traceback.print_exc()
|
| return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
| @app.post("/clear-all")
|
| async def clear_all():
|
| """Manually clear all uploaded and processed files."""
|
| count = 0
|
| try:
|
| for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]:
|
| for path in folder.glob("*"):
|
| if path.is_file() and not path.name.endswith(".gitkeep"):
|
| path.unlink()
|
| count += 1
|
| return {"status": "success", "removed_count": count}
|
| except Exception as e:
|
| return JSONResponse(status_code=500, content={"error": f"Failed to clear storage: {str(e)}"})
|
|
|
| @app.get("/frames")
|
| async def list_frames():
|
| """List available frame overlays."""
|
| frames = []
|
| if ASSETS_DIR.exists():
|
| for f in ASSETS_DIR.glob("frame-*.png"):
|
| frames.append({
|
| "filename": f.name,
|
| "url": f"/assets/{f.name}"
|
| })
|
| return {"frames": frames}
|
|
|
| @app.post("/frames")
|
| async def upload_frame(file: UploadFile = File(...)):
|
| """Upload a new custom frame."""
|
| if not ASSETS_DIR.exists(): ASSETS_DIR.mkdir(parents=True)
|
|
|
|
|
| if file.content_type not in ["image/png", "image/jpeg", "image/webp"]:
|
| return JSONResponse(status_code=400, content={"error": "Invalid file type. Use PNG/JPG."})
|
|
|
| ext = Path(file.filename).suffix
|
| frame_id = f"frame-{uuid.uuid4().hex[:8]}"
|
| filename = f"{frame_id}{ext}" if ext else f"{frame_id}.png"
|
| file_path = ASSETS_DIR / filename
|
|
|
| try:
|
| with file_path.open("wb") as buffer:
|
| shutil.copyfileobj(file.file, buffer)
|
| return {
|
| "status": "success",
|
| "frame": {
|
| "filename": filename,
|
| "url": f"/assets/{filename}"
|
| }
|
| }
|
| except Exception as e:
|
| return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
| @app.delete("/frames/{filename}")
|
| async def delete_frame(filename: str):
|
| """Delete a frame file."""
|
|
|
| if ".." in filename or "/" in filename or "\\" in filename:
|
| return JSONResponse(status_code=400, content={"error": "Invalid filename"})
|
|
|
| if not filename.startswith("frame-"):
|
| return JSONResponse(status_code=400, content={"error": "Can only delete frame files"})
|
|
|
| file_path = ASSETS_DIR / filename
|
|
|
| if not file_path.exists():
|
| return JSONResponse(status_code=404, content={"error": "Frame not found"})
|
|
|
| try:
|
| file_path.unlink()
|
| return {"status": "success"}
|
| except Exception as e:
|
| return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
|
|
|
| @app.post("/backup/export")
|
| async def export_backup(client_data: dict):
|
| """Export settings, assets, and client data as a ZIP file."""
|
| mem_zip = io.BytesIO()
|
|
|
| with zipfile.ZipFile(mem_zip, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
|
| if SETTINGS_PATH.exists():
|
| zf.write(SETTINGS_PATH, arcname="settings.json")
|
|
|
|
|
| if ASSETS_DIR.exists():
|
| for f in ASSETS_DIR.glob("*"):
|
| if f.is_file():
|
| zf.write(f, arcname=f"assets/{f.name}")
|
|
|
|
|
| zf.writestr("client_data.json", json.dumps(client_data, indent=2))
|
|
|
| mem_zip.seek(0)
|
| filename = f"studio_backup_{int(time.time())}.zip"
|
| return StreamingResponse(
|
| mem_zip,
|
| media_type="application/zip",
|
| headers={"Content-Disposition": f"attachment; filename={filename}"}
|
| )
|
|
|
| @app.post("/backup/import")
|
| async def import_backup(file: UploadFile = File(...)):
|
| """Import a backup ZIP file."""
|
| if not file.filename.endswith(".zip"):
|
| return JSONResponse(status_code=400, content={"error": "Must be a .zip file"})
|
|
|
| try:
|
| content = await file.read()
|
| with zipfile.ZipFile(io.BytesIO(content)) as zf:
|
|
|
| if "settings.json" in zf.namelist():
|
|
|
| SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True)
|
| with open(SETTINGS_PATH, "wb") as f:
|
| f.write(zf.read("settings.json"))
|
|
|
|
|
| if not ASSETS_DIR.exists(): ASSETS_DIR.mkdir(parents=True, exist_ok=True)
|
| for name in zf.namelist():
|
| if name.startswith("assets/") and not name.endswith("/"):
|
|
|
| clean_name = os.path.basename(name)
|
| if clean_name:
|
| with open(ASSETS_DIR / clean_name, "wb") as f:
|
| f.write(zf.read(name))
|
|
|
|
|
| client_data = {}
|
| if "client_data.json" in zf.namelist():
|
| client_data = json.loads(zf.read("client_data.json"))
|
|
|
| return {"status": "success", "client_data": client_data}
|
|
|
| except Exception as e:
|
| return JSONResponse(status_code=500, content={"error": f"Import failed: {str(e)}"})
|
|
|
|
|
| SETTINGS_PATH = ROOT_DIR / "config" / "settings.json"
|
|
|
| @app.get("/settings")
|
| async def get_settings():
|
| """Return current settings.json contents."""
|
| try:
|
| if SETTINGS_PATH.exists():
|
| with open(SETTINGS_PATH, "r") as f:
|
| return json.load(f)
|
| return {}
|
| except Exception as e:
|
| return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
| @app.post("/settings")
|
| async def update_settings(data: dict):
|
| """Merge incoming settings into settings.json (partial update)."""
|
| try:
|
| current = {}
|
| if SETTINGS_PATH.exists():
|
| with open(SETTINGS_PATH, "r") as f:
|
| current = json.load(f)
|
|
|
| for key, val in data.items():
|
| if key in current and isinstance(val, dict) and isinstance(current[key], dict):
|
| current[key].update(val)
|
| else:
|
| current[key] = val
|
| SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True)
|
| with open(SETTINGS_PATH, "w") as f:
|
| json.dump(current, f, indent=4, ensure_ascii=False)
|
| return {"status": "success"}
|
| except Exception as e:
|
| return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
| app.mount("/static", StaticFiles(directory=str(STORAGE_DIR)), name="static")
|
| if ASSETS_DIR.exists():
|
| app.mount("/assets", StaticFiles(directory=str(ASSETS_DIR)), name="assets")
|
|
|
| if __name__ == "__main__":
|
|
|
| port = int(os.environ.get("PORT", 7860))
|
| uvicorn.run(app, host="0.0.0.0", port=port)
|
|
|