import io import uuid from contextlib import asynccontextmanager from pathlib import Path import numpy as np from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from PIL import Image, ImageFilter from rembg import new_session, remove BACKGROUNDS_DIR = Path("/app/backgrounds") _BG_WIDTH, _BG_HEIGHT = 1920, 1080 _BUILTIN_BACKGROUNDS = [ { "filename": "studio_white.jpg", "stops": [((210,210,215),0.0),((235,235,240),0.3),((248,248,252),0.6),((230,230,235),1.0)], "floor": (180, 180, 185), "vignette": 0.35, }, { "filename": "showroom_night.jpg", "stops": [((5,5,20),0.0),((10,12,40),0.3),((18,20,65),0.6),((8,10,30),1.0)], "floor": (12, 14, 45), "vignette": 0.5, }, { "filename": "canyon_sunset.jpg", "stops": [((180,60,20),0.0),((220,100,30),0.25),((240,150,40),0.5),((255,180,60),0.75),((200,80,25),1.0)], "floor": (120, 55, 20), "vignette": 0.4, }, { "filename": "mountain_road.jpg", "stops": [((80,160,220),0.0),((100,180,230),0.3),((50,120,80),0.6),((30,80,50),1.0)], "floor": (25, 70, 40), "vignette": 0.3, }, { "filename": "urban_garage.jpg", "stops": [((25,25,28),0.0),((38,38,42),0.3),((50,50,55),0.6),((30,30,34),1.0)], "floor": (20, 20, 22), "vignette": 0.45, }, ] def _linear_gradient(stops): arr = np.zeros((_BG_HEIGHT, _BG_WIDTH, 3), dtype=np.uint8) xs = np.linspace(0, 1, _BG_WIDTH) for xi, t in enumerate(xs): lc, lp = stops[0] rc, rp = stops[-1] for i in range(len(stops) - 1): c0, p0 = stops[i] c1, p1 = stops[i + 1] if p0 <= t <= p1: lc, lp, rc, rp = c0, p0, c1, p1 break blend = 0.0 if rp == lp else max(0.0, min(1.0, (t - lp) / (rp - lp))) arr[:, xi] = [int(lc[j] + (rc[j] - lc[j]) * blend) for j in range(3)] return Image.fromarray(arr, "RGB") def _add_floor(img, floor_color, ratio=0.78): arr = np.array(img).astype(np.float32) fy = int(_BG_HEIGHT * ratio) for y in range(fy, _BG_HEIGHT): a = (y - fy) / (_BG_HEIGHT - fy) arr[y] = arr[y] * (1 - a * 0.6) + np.array(floor_color) * (a * 0.6) return Image.fromarray(np.clip(arr, 0, 255).astype(np.uint8), "RGB") def _vignette(img, strength=0.4): arr = np.array(img).astype(np.float32) cy, cx = _BG_HEIGHT / 2, _BG_WIDTH / 2 y, x = np.mgrid[0:_BG_HEIGHT, 0:_BG_WIDTH] dist = np.sqrt(((x - cx) / cx) ** 2 + ((y - cy) / cy) ** 2) arr *= (1.0 - np.clip(dist * strength, 0, strength))[:, :, np.newaxis] return Image.fromarray(np.clip(arr, 0, 255).astype(np.uint8), "RGB") def generate_backgrounds(): BACKGROUNDS_DIR.mkdir(parents=True, exist_ok=True) for bg in _BUILTIN_BACKGROUNDS: path = BACKGROUNDS_DIR / bg["filename"] if path.exists(): print(f" [bg] exists: {bg['filename']}") continue img = _linear_gradient(bg["stops"]) img = _add_floor(img, bg["floor"]) img = _vignette(img, bg["vignette"]) img.save(str(path), "JPEG", quality=95, optimize=True) print(f" [bg] generated: {bg['filename']}") print(f" [bg] done") @asynccontextmanager async def lifespan(app_: FastAPI): generate_backgrounds() yield app = FastAPI( title="Car Background Removal API", description="Remove car backgrounds and composite onto showroom backgrounds.", version="1.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) rembg_session = new_session("u2net") def refresh_backgrounds(): return { bg.stem: bg for bg in BACKGROUNDS_DIR.glob("*") if bg.suffix.lower() in {".jpg", ".jpeg", ".png", ".webp"} } def remove_background(image): img_bytes = io.BytesIO() image.save(img_bytes, format="PNG") img_bytes.seek(0) result_bytes = remove(img_bytes.read(), session=rembg_session) result = Image.open(io.BytesIO(result_bytes)).convert("RGBA") return result def add_drop_shadow(car_rgba, shadow_offset=(20, 40), shadow_blur_radius=35, shadow_opacity=140): alpha = car_rgba.split()[3] shadow_alpha = Image.new("L", car_rgba.size, 0) shadow_alpha.paste(alpha, (0, 0)) dark_shadow = Image.new("RGBA", car_rgba.size, (0, 0, 0, shadow_opacity)) dark_shadow.putalpha(shadow_alpha) blurred = dark_shadow.filter(ImageFilter.GaussianBlur(radius=shadow_blur_radius)) canvas_w = car_rgba.width + abs(shadow_offset[0]) + shadow_blur_radius * 2 canvas_h = car_rgba.height + abs(shadow_offset[1]) + shadow_blur_radius * 2 canvas = Image.new("RGBA", (canvas_w, canvas_h), (0, 0, 0, 0)) pad_x = shadow_blur_radius pad_y = shadow_blur_radius canvas.paste(blurred, (pad_x + shadow_offset[0], pad_y + shadow_offset[1]), blurred) canvas.paste(car_rgba, (pad_x, pad_y), car_rgba) return canvas, (pad_x, pad_y) def fit_car_to_background(car_with_shadow, background, car_height_ratio=0.65): bg_w, bg_h = background.size shadow_w, shadow_h = car_with_shadow.size target_height = int(bg_h * car_height_ratio) scale = target_height / shadow_h target_width = int(shadow_w * scale) resized = car_with_shadow.resize((target_width, target_height), Image.LANCZOS) paste_x = (bg_w - target_width) // 2 paste_y = bg_h - target_height - int(bg_h * 0.05) return resized, paste_x, paste_y def composite_car_on_background(car_rgba, background): car_w, car_h = car_rgba.size shadow_blur = max(8, int(car_h * 0.025)) shadow_offset = (int(car_w * 0.008), int(car_h * 0.03)) car_with_shadow, _ = add_drop_shadow( car_rgba, shadow_offset=shadow_offset, shadow_blur_radius=shadow_blur, shadow_opacity=120, ) resized_car, paste_x, paste_y = fit_car_to_background(car_with_shadow, background) result = background.copy().convert("RGBA") result.paste(resized_car, (paste_x, paste_y), resized_car) return result.convert("RGB") @app.get("/") def root(): return { "message": "Car Background Removal API is running.", "endpoints": { "POST /process": "Remove car background and composite onto a showroom background.", "GET /backgrounds": "List available background IDs.", "POST /upload-background": "Upload a custom background image.", }, } @app.get("/backgrounds") def list_backgrounds(): backgrounds = refresh_backgrounds() return {"available_background_ids": list(backgrounds.keys()), "count": len(backgrounds)} @app.post("/upload-background") async def upload_background(background_image: UploadFile = File(...)): if background_image.content_type not in {"image/jpeg", "image/png", "image/webp"}: raise HTTPException(status_code=400, detail="Unsupported image type.") raw_bytes = await background_image.read() if len(raw_bytes) == 0: raise HTTPException(status_code=400, detail="Uploaded file is empty.") try: img = Image.open(io.BytesIO(raw_bytes)).convert("RGB") except Exception as exc: raise HTTPException(status_code=400, detail=f"Could not open image: {exc}") bg_id = f"custom_{uuid.uuid4().hex[:12]}" save_path = BACKGROUNDS_DIR / f"{bg_id}.jpg" try: img.save(save_path, "JPEG", quality=92, optimize=True) except Exception as exc: raise HTTPException(status_code=500, detail=f"Could not save background: {exc}") return {"background_id": bg_id, "filename": save_path.name, "width": img.width, "height": img.height} @app.post("/process") async def process_car_image( car_image: UploadFile = File(...), background_id: str = Form(...), ): if car_image.content_type not in {"image/jpeg", "image/png", "image/webp"}: raise HTTPException(status_code=400, detail="Unsupported image type.") backgrounds = refresh_backgrounds() if background_id not in backgrounds: raise HTTPException( status_code=404, detail=f"Background not found. Available: {list(backgrounds.keys())}", ) raw_bytes = await car_image.read() if len(raw_bytes) == 0: raise HTTPException(status_code=400, detail="Uploaded file is empty.") try: car_img = Image.open(io.BytesIO(raw_bytes)).convert("RGB") except Exception as exc: raise HTTPException(status_code=400, detail=f"Could not open image: {exc}") try: car_rgba = remove_background(car_img) except Exception as exc: raise HTTPException(status_code=500, detail=f"Background removal failed: {exc}") try: background_img = Image.open(backgrounds[background_id]).convert("RGB") except Exception as exc: raise HTTPException(status_code=500, detail=f"Could not load background: {exc}") try: final_image = composite_car_on_background(car_rgba, background_img) except Exception as exc: raise HTTPException(status_code=500, detail=f"Compositing failed: {exc}") output_buffer = io.BytesIO() final_image.save(output_buffer, format="JPEG", quality=92, optimize=True) output_buffer.seek(0) filename = Path(car_image.filename or "car").stem return StreamingResponse( output_buffer, media_type="image/jpeg", headers={"Content-Disposition": f"attachment; filename={filename}_composited.jpg"}, )