Update app.py
Browse files
app.py
CHANGED
|
@@ -17,118 +17,17 @@ from datetime import datetime
|
|
| 17 |
REPO_ID = os.environ.get("HFPATH")
|
| 18 |
HF_TOKEN = os.environ.get("MAGIC")
|
| 19 |
login(HF_TOKEN)
|
| 20 |
-
|
| 21 |
-
api = HfApi()
|
| 22 |
-
|
| 23 |
-
# --- Download function ---
|
| 24 |
-
def download_from_hf(subfolder):
|
| 25 |
-
downloaded_files = []
|
| 26 |
-
|
| 27 |
-
# List all files in the repository
|
| 28 |
-
all_files = api.list_repo_files(repo_id=REPO_ID,repo_type="dataset")
|
| 29 |
-
|
| 30 |
-
# Filter files that start with the specified subfolder
|
| 31 |
-
for file in all_files:
|
| 32 |
-
if file.startswith(subfolder + "/"):
|
| 33 |
-
# Download the file
|
| 34 |
-
downloaded_file = hf_hub_download(repo_id=REPO_ID, filename=file,repo_type="dataset" )
|
| 35 |
-
downloaded_files.append(downloaded_file)
|
| 36 |
-
|
| 37 |
-
return downloaded_files
|
| 38 |
# --- Upload function ---
|
| 39 |
-
def upload_to_hf(
|
| 40 |
-
#filename = os.path.basename(filepath)
|
| 41 |
SUBFOLDER = datetime.now().strftime("%Y%m%d")
|
| 42 |
-
path_in_repo = f"{SUBFOLDER}"
|
| 43 |
api.upload_folder(
|
| 44 |
-
folder_path=
|
| 45 |
-
path_in_repo=
|
| 46 |
repo_id=REPO_ID,
|
| 47 |
repo_type="dataset"
|
| 48 |
)
|
| 49 |
-
return f"https://huggingface.co/datasets/{REPO_ID}/blob/main/{
|
| 50 |
-
# ------------------------------ ARGS ------------------------------
|
| 51 |
-
parser = argparse.ArgumentParser(description="Face Swapper (Multi-target, Male+Female Sources)")
|
| 52 |
-
parser.add_argument("--out_dir", default=os.getcwd())
|
| 53 |
-
parser.add_argument("--batch_size", default=32)
|
| 54 |
-
parser.add_argument("--cuda", action="store_true", default=False)
|
| 55 |
-
user_args = parser.parse_args()
|
| 56 |
-
|
| 57 |
-
USE_CUDA = user_args.cuda
|
| 58 |
-
DEF_OUTPUT_PATH = user_args.out_dir
|
| 59 |
-
BATCH_SIZE = int(user_args.batch_size)
|
| 60 |
-
|
| 61 |
-
# ------------------------------ DEVICE ------------------------------
|
| 62 |
-
PROVIDER = ["CPUExecutionProvider"]
|
| 63 |
-
if USE_CUDA and "CUDAExecutionProvider" in onnxruntime.get_available_providers():
|
| 64 |
-
PROVIDER = ["CUDAExecutionProvider", "CPUExecutionProvider"]
|
| 65 |
-
print(">>> Running on CUDA")
|
| 66 |
-
else:
|
| 67 |
-
USE_CUDA = False
|
| 68 |
-
print(">>> Running on CPU")
|
| 69 |
-
|
| 70 |
-
device = "cuda" if USE_CUDA else "cpu"
|
| 71 |
-
EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None
|
| 72 |
-
|
| 73 |
-
# ------------------------------ MODELS ------------------------------
|
| 74 |
-
FACE_ANALYSER = insightface.app.FaceAnalysis(name="buffalo_l", providers=PROVIDER)
|
| 75 |
-
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.6)
|
| 76 |
-
|
| 77 |
-
FACE_SWAPPER = Inswapper(
|
| 78 |
-
model_file="./assets/pretrained_models/inswapper_128.onnx",
|
| 79 |
-
batch_size=(BATCH_SIZE if USE_CUDA else 1),
|
| 80 |
-
providers=PROVIDER,
|
| 81 |
-
)
|
| 82 |
-
|
| 83 |
-
# ------------------------------ ENHANCERS ------------------------------
|
| 84 |
-
ENHANCER_CHOICES = ["NONE"] + get_available_enhancer_names()
|
| 85 |
-
|
| 86 |
-
# ------------------------------ CORE SWAP FUNC ------------------------------
|
| 87 |
-
def swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name="NONE"):
|
| 88 |
-
analysed_faces = FACE_ANALYSER.get(frame_bgr)
|
| 89 |
-
preds, matrs = [], []
|
| 90 |
-
|
| 91 |
-
for analysed_face in analysed_faces:
|
| 92 |
-
gender = analysed_face.get("gender", 1) # 1 = male, 0 = female
|
| 93 |
-
src_face = None
|
| 94 |
-
|
| 95 |
-
if analysed_source_male is not None and analysed_source_female is not None:
|
| 96 |
-
src_face = analysed_source_male if gender == 1 else analysed_source_female
|
| 97 |
-
elif analysed_source_male is not None and analysed_source_female is None:
|
| 98 |
-
if gender == 1:
|
| 99 |
-
src_face = analysed_source_male
|
| 100 |
-
elif analysed_source_female is not None and analysed_source_male is None:
|
| 101 |
-
if gender == 0:
|
| 102 |
-
src_face = analysed_source_female
|
| 103 |
-
|
| 104 |
-
if src_face is None:
|
| 105 |
-
continue
|
| 106 |
-
|
| 107 |
-
batch_pred, batch_matr = FACE_SWAPPER.get([frame_bgr], [analysed_face], [src_face])
|
| 108 |
-
preds.extend(batch_pred)
|
| 109 |
-
matrs.extend(batch_matr)
|
| 110 |
-
EMPTY_CACHE()
|
| 111 |
-
|
| 112 |
-
for p, m in zip(preds, matrs):
|
| 113 |
-
frame_bgr = paste_to_whole(
|
| 114 |
-
foreground=p,
|
| 115 |
-
background=frame_bgr,
|
| 116 |
-
matrix=m,
|
| 117 |
-
mask=None,
|
| 118 |
-
crop_mask=(0, 0, 0, 0),
|
| 119 |
-
blur_amount=0.1,
|
| 120 |
-
erode_amount=0.15,
|
| 121 |
-
blend_method="laplacian",
|
| 122 |
-
)
|
| 123 |
-
|
| 124 |
-
if enhancer_name != "NONE":
|
| 125 |
-
try:
|
| 126 |
-
model, runner = load_face_enhancer_model(name=enhancer_name, device=device)
|
| 127 |
-
frame_bgr = runner(frame_bgr, model)
|
| 128 |
-
except Exception as e:
|
| 129 |
-
print(f"[Enhancer] Error while running {enhancer_name}: {e}")
|
| 130 |
-
|
| 131 |
-
return frame_bgr
|
| 132 |
|
| 133 |
# ------------------------------ PROCESS ------------------------------
|
| 134 |
def swap_faces(target_files, male_file, female_file, enhancer_name="NONE"):
|
|
@@ -148,40 +47,37 @@ def swap_faces(target_files, male_file, female_file, enhancer_name="NONE"):
|
|
| 148 |
|
| 149 |
if analysed_source_male is None and analysed_source_female is None:
|
| 150 |
raise ValueError("❌ Cần ít nhất 1 khuôn mặt nguồn (Nam hoặc Nữ).")
|
| 151 |
-
|
|
|
|
| 152 |
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
|
| 153 |
output_files = []
|
| 154 |
|
| 155 |
for f in target_files:
|
| 156 |
target_path = f.name
|
| 157 |
ext = os.path.splitext(target_path)[-1].lower()
|
| 158 |
-
|
| 159 |
-
|
| 160 |
-
|
| 161 |
-
|
| 162 |
-
|
| 163 |
-
|
| 164 |
-
|
| 165 |
-
|
| 166 |
-
|
| 167 |
-
out_frame = swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name)
|
| 168 |
-
|
| 169 |
-
# Save the output image to a temporary file
|
| 170 |
-
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file:
|
| 171 |
-
#timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
| 172 |
-
#output_path = temp_file.name
|
| 173 |
-
#cv2.imwrite(output_path, out_frame) # Convert BGR to RGB before saving
|
| 174 |
-
#output_files.append(output_path)
|
| 175 |
-
# Lưu ảnh đầu ra vào thư mục tạm
|
| 176 |
-
output_path = os.path.join(temp_dir,temp_file.name )
|
| 177 |
-
cv2.imwrite(output_path, out_frame) # Lưu ảnh vào thư mục tạm
|
| 178 |
output_files.append(output_path)
|
| 179 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 180 |
|
| 181 |
-
upload_to_hf(temp_dir)
|
| 182 |
print(f"✔ Hoàn tất tất cả trong {time.time() - start_time:.2f}s")
|
| 183 |
return output_files
|
| 184 |
|
|
|
|
|
|
|
| 185 |
# ------------------------------ UI ------------------------------
|
| 186 |
with gr.Blocks() as demo:
|
| 187 |
gr.Markdown("## 🧑🦱➡👩 Face Swapper (Upload nhiều file target + nguồn nam/nữ) + Enhancer")
|
|
|
|
| 17 |
REPO_ID = os.environ.get("HFPATH")
|
| 18 |
HF_TOKEN = os.environ.get("MAGIC")
|
| 19 |
login(HF_TOKEN)
|
| 20 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 21 |
# --- Upload function ---
|
| 22 |
+
def upload_to_hf(folder_path):
|
|
|
|
| 23 |
SUBFOLDER = datetime.now().strftime("%Y%m%d")
|
|
|
|
| 24 |
api.upload_folder(
|
| 25 |
+
folder_path=folder_path,
|
| 26 |
+
path_in_repo=SUBFOLDER,
|
| 27 |
repo_id=REPO_ID,
|
| 28 |
repo_type="dataset"
|
| 29 |
)
|
| 30 |
+
return f"https://huggingface.co/datasets/{REPO_ID}/blob/main/{SUBFOLDER}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 31 |
|
| 32 |
# ------------------------------ PROCESS ------------------------------
|
| 33 |
def swap_faces(target_files, male_file, female_file, enhancer_name="NONE"):
|
|
|
|
| 47 |
|
| 48 |
if analysed_source_male is None and analysed_source_female is None:
|
| 49 |
raise ValueError("❌ Cần ít nhất 1 khuôn mặt nguồn (Nam hoặc Nữ).")
|
| 50 |
+
|
| 51 |
+
# Tạo thư mục tạm để lưu ảnh đầu ra
|
| 52 |
with tempfile.TemporaryDirectory() as temp_dir:
|
| 53 |
+
print(f"Temporary directory created at: {temp_dir}")
|
| 54 |
output_files = []
|
| 55 |
|
| 56 |
for f in target_files:
|
| 57 |
target_path = f.name
|
| 58 |
ext = os.path.splitext(target_path)[-1].lower()
|
| 59 |
+
|
| 60 |
+
# -------------------- IMAGE --------------------
|
| 61 |
+
if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
|
| 62 |
+
frame_bgr = cv2.imread(target_path)
|
| 63 |
+
out_frame = swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name)
|
| 64 |
+
|
| 65 |
+
# Save the output image to a temporary file
|
| 66 |
+
output_path = os.path.join(temp_dir, f"output{len(output_files)}{ext}")
|
| 67 |
+
cv2.imwrite(output_path, out_frame) # Save image to temp directory
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 68 |
output_files.append(output_path)
|
| 69 |
|
| 70 |
+
# Upload the temporary directory to Hugging Face
|
| 71 |
+
if os.path.isdir(temp_dir):
|
| 72 |
+
upload_to_hf(temp_dir)
|
| 73 |
+
else:
|
| 74 |
+
print(f"Error: {temp_dir} is not a valid directory.")
|
| 75 |
|
|
|
|
| 76 |
print(f"✔ Hoàn tất tất cả trong {time.time() - start_time:.2f}s")
|
| 77 |
return output_files
|
| 78 |
|
| 79 |
+
|
| 80 |
+
|
| 81 |
# ------------------------------ UI ------------------------------
|
| 82 |
with gr.Blocks() as demo:
|
| 83 |
gr.Markdown("## 🧑🦱➡👩 Face Swapper (Upload nhiều file target + nguồn nam/nữ) + Enhancer")
|