Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
|
@@ -2,13 +2,12 @@ import os
|
|
| 2 |
import torch
|
| 3 |
import numpy as np
|
| 4 |
import cv2
|
| 5 |
-
import traceback
|
| 6 |
import gc
|
| 7 |
import time
|
| 8 |
from PIL import Image, ImageFilter
|
| 9 |
from transformers import SegformerImageProcessor, SegformerForSemanticSegmentation
|
| 10 |
from ultralytics import YOLO
|
| 11 |
-
from fastapi import FastAPI, File, UploadFile
|
| 12 |
from fastapi.responses import StreamingResponse
|
| 13 |
from fastapi.middleware.cors import CORSMiddleware
|
| 14 |
import io
|
|
@@ -27,7 +26,7 @@ DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
| 27 |
logger.info(f"Using Device: {DEVICE}")
|
| 28 |
logger.info(f"CUDA Available: {torch.cuda.is_available()}")
|
| 29 |
|
| 30 |
-
# CPU Optimization
|
| 31 |
if DEVICE.type == "cpu":
|
| 32 |
torch.set_num_threads(4)
|
| 33 |
torch.set_num_interop_threads(1)
|
|
@@ -81,32 +80,33 @@ def get_hair_and_exclude_masks(pil_image: Image.Image):
|
|
| 81 |
load_face_parser()
|
| 82 |
orig_w, orig_h = pil_image.size
|
| 83 |
img_small = pil_image.resize((128, 128), Image.BILINEAR)
|
| 84 |
-
|
| 85 |
inputs = face_processor(images=img_small, return_tensors="pt").to(DEVICE)
|
| 86 |
-
|
| 87 |
with torch.inference_mode():
|
| 88 |
out = face_parser(**inputs)
|
| 89 |
logits = out.logits
|
| 90 |
up = torch.nn.functional.interpolate(logits, size=(128, 128), mode="bilinear", align_corners=False)
|
| 91 |
probs = torch.softmax(up, dim=1)[0]
|
| 92 |
|
| 93 |
-
#
|
| 94 |
-
hair = (probs[13].cpu().numpy() > 0.
|
| 95 |
-
hair = cv2.GaussianBlur(hair, (3, 3),
|
| 96 |
|
| 97 |
-
# Face exclude
|
| 98 |
parsing = up.argmax(dim=1).squeeze(0).cpu().numpy()
|
| 99 |
face_cls = list(range(1,6)) + list(range(8,13)) + [17,18]
|
| 100 |
face_m = np.isin(parsing, face_cls).astype(np.float32)
|
| 101 |
-
|
| 102 |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3,3))
|
| 103 |
face_m = cv2.dilate(face_m, kernel, iterations=1)
|
| 104 |
-
|
| 105 |
h, w = face_m.shape
|
| 106 |
forehead = np.zeros_like(face_m, dtype=np.float32)
|
| 107 |
forehead[:int(h * 0.32)] = 1.0
|
| 108 |
face_m = face_m * (1 - forehead * 0.45)
|
| 109 |
hair = hair * (1 - face_m)
|
|
|
|
| 110 |
hair = cv2.resize(hair, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
|
| 111 |
|
| 112 |
# Exclude mask (nose + lips)
|
|
@@ -119,7 +119,6 @@ def get_hair_and_exclude_masks(pil_image: Image.Image):
|
|
| 119 |
|
| 120 |
return hair, exclude
|
| 121 |
|
| 122 |
-
|
| 123 |
@timed("Beard Mask")
|
| 124 |
def get_beard_mask_fast(pil_image: Image.Image, exclude_mask: np.ndarray):
|
| 125 |
model = load_beard_model()
|
|
@@ -130,7 +129,7 @@ def get_beard_mask_fast(pil_image: Image.Image, exclude_mask: np.ndarray):
|
|
| 130 |
results = model.predict(
|
| 131 |
img_array,
|
| 132 |
device=DEVICE.type,
|
| 133 |
-
conf=0.
|
| 134 |
iou=0.50,
|
| 135 |
imgsz=128,
|
| 136 |
half=(DEVICE.type == "cuda"),
|
|
@@ -141,10 +140,10 @@ def get_beard_mask_fast(pil_image: Image.Image, exclude_mask: np.ndarray):
|
|
| 141 |
mask = np.zeros((orig_h, orig_w), dtype=np.float32)
|
| 142 |
if results[0].masks is not None:
|
| 143 |
for i, cls in enumerate(results[0].boxes.cls):
|
| 144 |
-
if int(cls) == 0:
|
| 145 |
m = results[0].masks.data[i].cpu().numpy()
|
| 146 |
m = cv2.resize(m, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
|
| 147 |
-
mask = np.maximum(mask, (m > 0.
|
| 148 |
|
| 149 |
mask = np.maximum(mask - exclude_mask, 0)
|
| 150 |
|
|
@@ -155,7 +154,7 @@ def get_beard_mask_fast(pil_image: Image.Image, exclude_mask: np.ndarray):
|
|
| 155 |
|
| 156 |
return mask
|
| 157 |
|
| 158 |
-
|
| 159 |
@timed("Color Transfer")
|
| 160 |
def apply_strong_grey_hair(image: Image.Image, hair_mask: np.ndarray, beard_mask: np.ndarray):
|
| 161 |
comb = np.maximum(hair_mask, beard_mask)
|
|
@@ -164,11 +163,21 @@ def apply_strong_grey_hair(image: Image.Image, hair_mask: np.ndarray, beard_mask
|
|
| 164 |
|
| 165 |
img = np.array(image).astype(np.float32) / 255.0
|
| 166 |
orig_lab = cv2.cvtColor((img * 255).astype(np.uint8), cv2.COLOR_RGB2LAB).astype(np.float32)
|
| 167 |
-
|
| 168 |
hsv = cv2.cvtColor((img * 255).astype(np.uint8), cv2.COLOR_RGB2HSV).astype(np.float32)
|
|
|
|
|
|
|
| 169 |
hsv_hair = hsv.copy()
|
| 170 |
-
|
| 171 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 172 |
|
| 173 |
hair_grey = cv2.cvtColor(hsv_hair.astype(np.uint8), cv2.COLOR_HSV2RGB).astype(np.float32) / 255.0
|
| 174 |
hair_grey_lab = cv2.cvtColor((hair_grey * 255).astype(np.uint8), cv2.COLOR_RGB2LAB).astype(np.float32)
|
|
@@ -178,9 +187,10 @@ def apply_strong_grey_hair(image: Image.Image, hair_mask: np.ndarray, beard_mask
|
|
| 178 |
mean_hair = np.mean(hair_grey_lab[hair_bin], axis=0)
|
| 179 |
std_hair = np.maximum(np.std(hair_grey_lab[hair_bin], axis=0), 5.0)
|
| 180 |
else:
|
| 181 |
-
mean_hair = np.array([
|
| 182 |
-
std_hair = np.array([
|
| 183 |
|
|
|
|
| 184 |
beard_bin = beard_mask > 0.5
|
| 185 |
if np.sum(beard_bin) > 30:
|
| 186 |
beard_pix = orig_lab[beard_bin]
|
|
@@ -198,14 +208,18 @@ def apply_strong_grey_hair(image: Image.Image, hair_mask: np.ndarray, beard_mask
|
|
| 198 |
|
| 199 |
final = hair_grey * hair_mask_3ch + final * (1 - hair_mask_3ch)
|
| 200 |
final = final * comb_3ch + img * (1 - comb_3ch)
|
| 201 |
-
final = final + (np.array([5,3,0], dtype=np.float32)/255.0 * comb[..., None] * 0.18)
|
| 202 |
-
final = np.clip(final * 255, 0, 255).astype(np.uint8)
|
| 203 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 204 |
result = Image.fromarray(final)
|
| 205 |
-
|
|
|
|
|
|
|
|
|
|
| 206 |
return result
|
| 207 |
|
| 208 |
-
|
| 209 |
# ====================== MAIN PROCESSING ======================
|
| 210 |
@timed("Total Processing")
|
| 211 |
def process_face_whitening(input_image: Image.Image):
|
|
@@ -231,7 +245,6 @@ def process_face_whitening(input_image: Image.Image):
|
|
| 231 |
|
| 232 |
return final_img
|
| 233 |
|
| 234 |
-
|
| 235 |
# ====================== FASTAPI ======================
|
| 236 |
app = FastAPI()
|
| 237 |
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
|
@@ -241,33 +254,30 @@ async def startup():
|
|
| 241 |
loop = asyncio.get_event_loop()
|
| 242 |
await loop.run_in_executor(executor, load_face_parser)
|
| 243 |
await loop.run_in_executor(executor, load_beard_model)
|
| 244 |
-
|
| 245 |
logger.info("✅ Models loaded")
|
| 246 |
logger.info("Running light warmup...")
|
| 247 |
dummy = Image.new("RGB", (256, 256))
|
| 248 |
_ = process_face_whitening(dummy)
|
| 249 |
logger.info("✅ Server Ready!")
|
| 250 |
|
| 251 |
-
|
| 252 |
@app.post("/age-face")
|
| 253 |
async def age_face(file: UploadFile = File(...)):
|
| 254 |
start_total = time.perf_counter()
|
| 255 |
contents = await file.read()
|
| 256 |
img = Image.open(io.BytesIO(contents)).convert("RGB")
|
| 257 |
-
|
| 258 |
loop = asyncio.get_event_loop()
|
| 259 |
result = await loop.run_in_executor(executor, process_face_whitening, img)
|
| 260 |
-
|
| 261 |
buf = io.BytesIO()
|
| 262 |
-
result.save(buf, format="JPEG", quality=
|
| 263 |
buf.seek(0)
|
| 264 |
-
|
| 265 |
total_time = (time.perf_counter() - start_total) * 1000
|
| 266 |
logger.info(f"✅ Total Request Time: {total_time:.1f} ms")
|
| 267 |
-
|
| 268 |
return StreamingResponse(buf, media_type="image/jpeg")
|
| 269 |
|
| 270 |
-
|
| 271 |
if __name__ == "__main__":
|
| 272 |
import uvicorn
|
| 273 |
uvicorn.run(app, host="0.0.0.0", port=7860, workers=1)
|
|
|
|
| 2 |
import torch
|
| 3 |
import numpy as np
|
| 4 |
import cv2
|
|
|
|
| 5 |
import gc
|
| 6 |
import time
|
| 7 |
from PIL import Image, ImageFilter
|
| 8 |
from transformers import SegformerImageProcessor, SegformerForSemanticSegmentation
|
| 9 |
from ultralytics import YOLO
|
| 10 |
+
from fastapi import FastAPI, File, UploadFile
|
| 11 |
from fastapi.responses import StreamingResponse
|
| 12 |
from fastapi.middleware.cors import CORSMiddleware
|
| 13 |
import io
|
|
|
|
| 26 |
logger.info(f"Using Device: {DEVICE}")
|
| 27 |
logger.info(f"CUDA Available: {torch.cuda.is_available()}")
|
| 28 |
|
| 29 |
+
# CPU Optimization
|
| 30 |
if DEVICE.type == "cpu":
|
| 31 |
torch.set_num_threads(4)
|
| 32 |
torch.set_num_interop_threads(1)
|
|
|
|
| 80 |
load_face_parser()
|
| 81 |
orig_w, orig_h = pil_image.size
|
| 82 |
img_small = pil_image.resize((128, 128), Image.BILINEAR)
|
| 83 |
+
|
| 84 |
inputs = face_processor(images=img_small, return_tensors="pt").to(DEVICE)
|
| 85 |
+
|
| 86 |
with torch.inference_mode():
|
| 87 |
out = face_parser(**inputs)
|
| 88 |
logits = out.logits
|
| 89 |
up = torch.nn.functional.interpolate(logits, size=(128, 128), mode="bilinear", align_corners=False)
|
| 90 |
probs = torch.softmax(up, dim=1)[0]
|
| 91 |
|
| 92 |
+
# Hair Mask
|
| 93 |
+
hair = (probs[13].cpu().numpy() > 0.035).astype(np.float32) # thoda lower threshold
|
| 94 |
+
hair = cv2.GaussianBlur(hair, (3, 3), 1.0)
|
| 95 |
|
| 96 |
+
# Face exclude mask
|
| 97 |
parsing = up.argmax(dim=1).squeeze(0).cpu().numpy()
|
| 98 |
face_cls = list(range(1,6)) + list(range(8,13)) + [17,18]
|
| 99 |
face_m = np.isin(parsing, face_cls).astype(np.float32)
|
| 100 |
+
|
| 101 |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3,3))
|
| 102 |
face_m = cv2.dilate(face_m, kernel, iterations=1)
|
| 103 |
+
|
| 104 |
h, w = face_m.shape
|
| 105 |
forehead = np.zeros_like(face_m, dtype=np.float32)
|
| 106 |
forehead[:int(h * 0.32)] = 1.0
|
| 107 |
face_m = face_m * (1 - forehead * 0.45)
|
| 108 |
hair = hair * (1 - face_m)
|
| 109 |
+
|
| 110 |
hair = cv2.resize(hair, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
|
| 111 |
|
| 112 |
# Exclude mask (nose + lips)
|
|
|
|
| 119 |
|
| 120 |
return hair, exclude
|
| 121 |
|
|
|
|
| 122 |
@timed("Beard Mask")
|
| 123 |
def get_beard_mask_fast(pil_image: Image.Image, exclude_mask: np.ndarray):
|
| 124 |
model = load_beard_model()
|
|
|
|
| 129 |
results = model.predict(
|
| 130 |
img_array,
|
| 131 |
device=DEVICE.type,
|
| 132 |
+
conf=0.28,
|
| 133 |
iou=0.50,
|
| 134 |
imgsz=128,
|
| 135 |
half=(DEVICE.type == "cuda"),
|
|
|
|
| 140 |
mask = np.zeros((orig_h, orig_w), dtype=np.float32)
|
| 141 |
if results[0].masks is not None:
|
| 142 |
for i, cls in enumerate(results[0].boxes.cls):
|
| 143 |
+
if int(cls) == 0: # beard class
|
| 144 |
m = results[0].masks.data[i].cpu().numpy()
|
| 145 |
m = cv2.resize(m, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
|
| 146 |
+
mask = np.maximum(mask, (m > 0.42).astype(np.float32))
|
| 147 |
|
| 148 |
mask = np.maximum(mask - exclude_mask, 0)
|
| 149 |
|
|
|
|
| 154 |
|
| 155 |
return mask
|
| 156 |
|
| 157 |
+
# ====================== MAIN COLOR FUNCTION (Updated) ======================
|
| 158 |
@timed("Color Transfer")
|
| 159 |
def apply_strong_grey_hair(image: Image.Image, hair_mask: np.ndarray, beard_mask: np.ndarray):
|
| 160 |
comb = np.maximum(hair_mask, beard_mask)
|
|
|
|
| 163 |
|
| 164 |
img = np.array(image).astype(np.float32) / 255.0
|
| 165 |
orig_lab = cv2.cvtColor((img * 255).astype(np.uint8), cv2.COLOR_RGB2LAB).astype(np.float32)
|
|
|
|
| 166 |
hsv = cv2.cvtColor((img * 255).astype(np.uint8), cv2.COLOR_RGB2HSV).astype(np.float32)
|
| 167 |
+
|
| 168 |
+
# ====================== HAIR - Special Light Grey Treatment ======================
|
| 169 |
hsv_hair = hsv.copy()
|
| 170 |
+
|
| 171 |
+
# Strong desaturation for grey look
|
| 172 |
+
hsv_hair[..., 1] = hsv_hair[..., 1] * (1 - 0.78 * hair_mask)
|
| 173 |
+
|
| 174 |
+
# **Main Fix**: Much stronger + adaptive brightness for camera images
|
| 175 |
+
original_v = hsv[..., 2]
|
| 176 |
+
boost_amount = 145 * hair_mask # 88 → 145
|
| 177 |
+
hsv_hair[..., 2] = np.clip(
|
| 178 |
+
original_v + boost_amount - (original_v * 0.42 * hair_mask),
|
| 179 |
+
125, 245
|
| 180 |
+
)
|
| 181 |
|
| 182 |
hair_grey = cv2.cvtColor(hsv_hair.astype(np.uint8), cv2.COLOR_HSV2RGB).astype(np.float32) / 255.0
|
| 183 |
hair_grey_lab = cv2.cvtColor((hair_grey * 255).astype(np.uint8), cv2.COLOR_RGB2LAB).astype(np.float32)
|
|
|
|
| 187 |
mean_hair = np.mean(hair_grey_lab[hair_bin], axis=0)
|
| 188 |
std_hair = np.maximum(np.std(hair_grey_lab[hair_bin], axis=0), 5.0)
|
| 189 |
else:
|
| 190 |
+
mean_hair = np.array([135., 0., 0.])
|
| 191 |
+
std_hair = np.array([32., 12., 12.])
|
| 192 |
|
| 193 |
+
# Beard Color Transfer (same as before)
|
| 194 |
beard_bin = beard_mask > 0.5
|
| 195 |
if np.sum(beard_bin) > 30:
|
| 196 |
beard_pix = orig_lab[beard_bin]
|
|
|
|
| 208 |
|
| 209 |
final = hair_grey * hair_mask_3ch + final * (1 - hair_mask_3ch)
|
| 210 |
final = final * comb_3ch + img * (1 - comb_3ch)
|
|
|
|
|
|
|
| 211 |
|
| 212 |
+
# Extra cool white-grey tint (makes it look more natural silver-grey)
|
| 213 |
+
final = final + (np.array([12, 10, 8], dtype=np.float32)/255.0 * comb[..., None] * 0.25)
|
| 214 |
+
|
| 215 |
+
final = np.clip(final * 255, 0, 255).astype(np.uint8)
|
| 216 |
result = Image.fromarray(final)
|
| 217 |
+
|
| 218 |
+
# Better sharpening
|
| 219 |
+
result = result.filter(ImageFilter.UnsharpMask(radius=0.8, percent=80, threshold=2))
|
| 220 |
+
|
| 221 |
return result
|
| 222 |
|
|
|
|
| 223 |
# ====================== MAIN PROCESSING ======================
|
| 224 |
@timed("Total Processing")
|
| 225 |
def process_face_whitening(input_image: Image.Image):
|
|
|
|
| 245 |
|
| 246 |
return final_img
|
| 247 |
|
|
|
|
| 248 |
# ====================== FASTAPI ======================
|
| 249 |
app = FastAPI()
|
| 250 |
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
|
|
|
| 254 |
loop = asyncio.get_event_loop()
|
| 255 |
await loop.run_in_executor(executor, load_face_parser)
|
| 256 |
await loop.run_in_executor(executor, load_beard_model)
|
|
|
|
| 257 |
logger.info("✅ Models loaded")
|
| 258 |
logger.info("Running light warmup...")
|
| 259 |
dummy = Image.new("RGB", (256, 256))
|
| 260 |
_ = process_face_whitening(dummy)
|
| 261 |
logger.info("✅ Server Ready!")
|
| 262 |
|
|
|
|
| 263 |
@app.post("/age-face")
|
| 264 |
async def age_face(file: UploadFile = File(...)):
|
| 265 |
start_total = time.perf_counter()
|
| 266 |
contents = await file.read()
|
| 267 |
img = Image.open(io.BytesIO(contents)).convert("RGB")
|
| 268 |
+
|
| 269 |
loop = asyncio.get_event_loop()
|
| 270 |
result = await loop.run_in_executor(executor, process_face_whitening, img)
|
| 271 |
+
|
| 272 |
buf = io.BytesIO()
|
| 273 |
+
result.save(buf, format="JPEG", quality=92, optimize=True)
|
| 274 |
buf.seek(0)
|
| 275 |
+
|
| 276 |
total_time = (time.perf_counter() - start_total) * 1000
|
| 277 |
logger.info(f"✅ Total Request Time: {total_time:.1f} ms")
|
| 278 |
+
|
| 279 |
return StreamingResponse(buf, media_type="image/jpeg")
|
| 280 |
|
|
|
|
| 281 |
if __name__ == "__main__":
|
| 282 |
import uvicorn
|
| 283 |
uvicorn.run(app, host="0.0.0.0", port=7860, workers=1)
|