| | """ |
| | EL HELAL Studio β Web Backend (FastAPI) |
| | Integrated with Auto-Cleanup and Custom Cropping |
| | """ |
| |
|
| | from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks |
| | from fastapi.responses import JSONResponse, FileResponse |
| | from fastapi.staticfiles import StaticFiles |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from contextlib import asynccontextmanager |
| | import uvicorn |
| | import shutil |
| | import os |
| | import json |
| | import uuid |
| | from pathlib import Path |
| | from PIL import Image |
| | import threading |
| | import sys |
| | import asyncio |
| | import time |
| |
|
| | |
| | sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'core'))) |
| |
|
| | |
| | import crop |
| | import process_images |
| | import color_steal |
| | import retouch |
| | from layout_engine import generate_layout, load_settings |
| |
|
| | |
| | WEB_DIR = Path(os.path.dirname(__file__)) / "web_storage" |
| | ROOT_DIR = Path(os.path.dirname(__file__)).parent |
| | STORAGE_DIR = ROOT_DIR / "storage" |
| |
|
| | UPLOAD_DIR = STORAGE_DIR / "uploads" |
| | PROCESSED_DIR = STORAGE_DIR / "processed" |
| | RESULT_DIR = STORAGE_DIR / "results" |
| |
|
| | for d in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: |
| | d.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | models = { |
| | "model": None, |
| | "transform": None, |
| | "luts": color_steal.load_trained_curves(), |
| | "ready": False |
| | } |
| |
|
| | def warm_up_ai(): |
| | print("AI Model: Loading in background...") |
| | try: |
| | models["model"], _ = process_images.setup_model() |
| | models["transform"] = process_images.get_transform() |
| | models["ready"] = True |
| | print("AI Model: READY") |
| | except Exception as e: |
| | print(f"AI Model: FAILED to load - {e}") |
| |
|
| | async def cleanup_task(): |
| | """Background task to delete files older than 24 hours.""" |
| | while True: |
| | print("Cleanup: Checking for old files...") |
| | now = time.time() |
| | count = 0 |
| | for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: |
| | for path in folder.glob("*"): |
| | if path.is_file() and (now - path.stat().st_mtime) > 86400: |
| | path.unlink() |
| | count += 1 |
| | if count > 0: print(f"Cleanup: Removed {count} old files.") |
| | await asyncio.sleep(3600) |
| |
|
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | |
| | threading.Thread(target=warm_up_ai, daemon=True).start() |
| | asyncio.create_task(cleanup_task()) |
| | yield |
| | |
| | pass |
| |
|
| | app = FastAPI(title="EL HELAL Studio API", lifespan=lifespan) |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| |
|
| | @app.get("/") |
| | async def read_index(): |
| | return FileResponse(WEB_DIR / "index.html") |
| |
|
| | @app.get("/status") |
| | async def get_status(): |
| | return {"ai_ready": models["ready"]} |
| |
|
| | @app.post("/upload") |
| | async def upload_image(file: UploadFile = File(...)): |
| | file_id = str(uuid.uuid4()) |
| | ext = Path(file.filename).suffix |
| | file_path = UPLOAD_DIR / f"{file_id}{ext}" |
| | |
| | with file_path.open("wb") as buffer: |
| | shutil.copyfileobj(file.file, buffer) |
| | |
| | with Image.open(file_path) as img: |
| | from PIL import ImageOps |
| | |
| | img = ImageOps.exif_transpose(img) |
| |
|
| | |
| | width, height = img.size |
| |
|
| | |
| | img.thumbnail((200, 200), Image.BILINEAR) |
| | thumb_path = UPLOAD_DIR / f"{file_id}_thumb.jpg" |
| | if img.mode in ("RGBA", "LA"): |
| | bg = Image.new("RGB", img.size, (255, 255, 255)) |
| | bg.paste(img, mask=img.split()[-1]) |
| | bg.save(thumb_path, "JPEG", quality=60) |
| | else: |
| | img.convert("RGB").save(thumb_path, "JPEG", quality=60) |
| | return { |
| | "id": file_id, |
| | "filename": file.filename, |
| | "thumb_url": f"/static/uploads/{file_id}_thumb.jpg", |
| | "width": width, |
| | "height": height |
| | } |
| |
|
| | @app.post("/process/{file_id}") |
| | async def process_image( |
| | file_id: str, |
| | name: str = Form(""), |
| | id_number: str = Form(""), |
| | |
| | do_rmbg: bool = Form(True), |
| | do_color: bool = Form(True), |
| | do_retouch: bool = Form(True), |
| | do_crop: bool = Form(True), |
| | |
| | add_studio_name: bool = Form(True), |
| | add_logo: bool = Form(True), |
| | add_date: bool = Form(True), |
| | |
| | x1: int = Form(None), |
| | y1: int = Form(None), |
| | x2: int = Form(None), |
| | y2: int = Form(None) |
| | ): |
| | if not models["ready"]: |
| | return JSONResponse(status_code=503, content={"error": "AI Model not ready"}) |
| | |
| | files = list(UPLOAD_DIR.glob(f"{file_id}.*")) |
| | if not files: return JSONResponse(status_code=404, content={"error": "File not found"}) |
| | orig_path = files[0] |
| | |
| | try: |
| | temp_crop = PROCESSED_DIR / f"{file_id}_processed_crop.jpg" |
| | |
| | |
| | if x1 is not None and y1 is not None: |
| | print(f"Pipeline: Applying manual crop for {file_id} | Rect: ({x1}, {y1}, {x2}, {y2})") |
| | rect = (x1, y1, x2, y2) |
| | crop.apply_custom_crop(str(orig_path), str(temp_crop), rect) |
| | cropped_img = Image.open(temp_crop) |
| | elif do_crop: |
| | print(f"Pipeline: Applying auto crop for {file_id}...") |
| | crop.crop_to_4x6_opencv(str(orig_path), str(temp_crop)) |
| | cropped_img = Image.open(temp_crop) |
| | else: |
| | print(f"Pipeline: Skipping crop for {file_id}") |
| | cropped_img = Image.open(orig_path) |
| | |
| | |
| | if do_rmbg: |
| | print(f"Pipeline: Removing background for {file_id}...") |
| | processed_img = process_images.remove_background(models["model"], cropped_img, models["transform"]) |
| | print(f"Pipeline: BG Removal Done. Image Mode: {processed_img.mode}") |
| | else: |
| | print(f"Pipeline: Skipping background removal for {file_id}") |
| | processed_img = cropped_img |
| | |
| | |
| | if do_color and models["luts"]: |
| | print(f"Pipeline: Applying color grading for {file_id}...") |
| | graded_img = color_steal.apply_to_image(models["luts"], processed_img) |
| | print(f"Pipeline: Color Grading Done. Image Mode: {graded_img.mode}") |
| | else: |
| | print(f"Pipeline: Skipping color grading for {file_id}") |
| | graded_img = processed_img |
| | |
| | |
| | current_settings = load_settings() |
| | |
| | if do_retouch and current_settings.get("retouch", {}).get("enabled", False): |
| | retouch_cfg = current_settings["retouch"] |
| | print(f"Pipeline: Retouching face for {file_id} (Sensitivity: {retouch_cfg.get('sensitivity', 3.0)})") |
| | final_processed, count = retouch.retouch_image_pil( |
| | graded_img, |
| | sensitivity=retouch_cfg.get("sensitivity", 3.0), |
| | tone_smoothing=retouch_cfg.get("tone_smoothing", 0.6) |
| | ) |
| | print(f"Pipeline: Retouch Done. Blemishes: {count}. Image Mode: {final_processed.mode}") |
| | else: |
| | print(f"Pipeline: Retouching skipped for {file_id}") |
| | final_processed = graded_img |
| | |
| | print(f"Pipeline: Generating final layout for {file_id}...") |
| | final_layout = generate_layout( |
| | final_processed, name, id_number, |
| | add_studio_name=add_studio_name, |
| | add_logo=add_logo, |
| | add_date=add_date |
| | ) |
| |
|
| | result_path = RESULT_DIR / f"{file_id}_layout.jpg" |
| | final_layout.save(result_path, "JPEG", quality=95, dpi=(300, 300)) |
| |
|
| | |
| | preview_path = RESULT_DIR / f"{file_id}_preview.jpg" |
| | pw, ph = final_layout.size |
| | p_scale = 900 / pw if pw > 900 else 1.0 |
| | if p_scale < 1.0: |
| | preview_img = final_layout.resize((int(pw * p_scale), int(ph * p_scale)), Image.BILINEAR) |
| | preview_img.save(preview_path, "JPEG", quality=70) |
| | else: |
| | final_layout.save(preview_path, "JPEG", quality=70) |
| |
|
| | if temp_crop.exists(): temp_crop.unlink() |
| |
|
| | return { |
| | "id": file_id, |
| | "result_url": f"/static/results/{file_id}_layout.jpg", |
| | "preview_url": f"/static/results/{file_id}_preview.jpg" |
| | } |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | return JSONResponse(status_code=500, content={"error": str(e)}) |
| |
|
| | @app.post("/clear-all") |
| | async def clear_all(): |
| | """Manually clear all uploaded and processed files.""" |
| | count = 0 |
| | try: |
| | for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: |
| | for path in folder.glob("*"): |
| | if path.is_file() and not path.name.endswith(".gitkeep"): |
| | path.unlink() |
| | count += 1 |
| | return {"status": "success", "removed_count": count} |
| | except Exception as e: |
| | return JSONResponse(status_code=500, content={"error": f"Failed to clear storage: {str(e)}"}) |
| |
|
| | |
| | SETTINGS_PATH = ROOT_DIR / "config" / "settings.json" |
| |
|
| | @app.get("/settings") |
| | async def get_settings(): |
| | """Return current settings.json contents.""" |
| | try: |
| | if SETTINGS_PATH.exists(): |
| | with open(SETTINGS_PATH, "r") as f: |
| | return json.load(f) |
| | return {} |
| | except Exception as e: |
| | return JSONResponse(status_code=500, content={"error": str(e)}) |
| |
|
| | @app.post("/settings") |
| | async def update_settings(data: dict): |
| | """Merge incoming settings into settings.json (partial update).""" |
| | try: |
| | current = {} |
| | if SETTINGS_PATH.exists(): |
| | with open(SETTINGS_PATH, "r") as f: |
| | current = json.load(f) |
| | |
| | for key, val in data.items(): |
| | if key in current and isinstance(val, dict) and isinstance(current[key], dict): |
| | current[key].update(val) |
| | else: |
| | current[key] = val |
| | SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True) |
| | with open(SETTINGS_PATH, "w") as f: |
| | json.dump(current, f, indent=4, ensure_ascii=False) |
| | return {"status": "success"} |
| | except Exception as e: |
| | return JSONResponse(status_code=500, content={"error": str(e)}) |
| |
|
| | app.mount("/static", StaticFiles(directory=str(STORAGE_DIR)), name="static") |
| |
|
| | if __name__ == "__main__": |
| | |
| | port = int(os.environ.get("PORT", 7860)) |
| | uvicorn.run(app, host="0.0.0.0", port=port) |
| |
|