HariLogicgo commited on
Commit
078d18a
Β·
1 Parent(s): 91eee68

replced with old

Browse files
Files changed (1) hide show
  1. app.py +40 -54
app.py CHANGED
@@ -1,7 +1,7 @@
1
  import os
2
- import uuid
3
  import shutil
4
- import glob
5
  import cv2
6
  import numpy as np
7
  import threading
@@ -128,73 +128,59 @@ async def shutdown_db():
128
  logger.info("MongoDB connection closed")
129
 
130
  # -------------------------------------------------
131
- # Multi-user safe face swap
132
  # -------------------------------------------------
133
- MAX_CONCURRENT_SWAPS = 2
134
- swap_semaphore = threading.Semaphore(MAX_CONCURRENT_SWAPS)
135
-
136
- def cleanup_old_dirs(base_dir, age_seconds=3600):
137
- for folder in glob.glob(os.path.join(base_dir, "*")):
138
- if os.path.isdir(folder):
139
- try:
140
- shutil.rmtree(folder, ignore_errors=True)
141
- except Exception:
142
- pass
143
-
144
- def face_swap_and_enhance_multiuser(src_img, tgt_img):
145
- session_id = uuid.uuid4().hex
146
- upload_dir = os.path.join(UPLOAD_DIR, session_id)
147
- result_dir = os.path.join(RESULT_DIR, session_id)
148
- os.makedirs(upload_dir, exist_ok=True)
149
- os.makedirs(result_dir, exist_ok=True)
150
-
151
- # Try acquiring GPU slot
152
- acquired = swap_semaphore.acquire(blocking=False)
153
- if not acquired:
154
- return None, None, "⚠️ GPU is busy. Please try again in a few seconds."
155
 
 
 
 
 
156
  try:
157
- src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
158
- tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
 
 
 
159
 
160
- src_faces = face_analysis_app.get(src_bgr)
161
- tgt_faces = face_analysis_app.get(tgt_bgr)
162
- if not src_faces or not tgt_faces:
163
- return None, None, "❌ Face not detected in one of the images"
164
 
165
- swapped_path = os.path.join(upload_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
166
- swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0])
167
- if swapped_bgr is None:
168
- return None, None, "❌ Face swap failed"
169
- cv2.imwrite(swapped_path, swapped_bgr)
170
 
171
- cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {result_dir} --bg_upsampler realesrgan --face_upsample"
172
- result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
173
- if result.returncode != 0:
174
- return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
175
 
176
- final_files = glob.glob(os.path.join(result_dir, "final_results", "*.png"))
177
- if not final_files:
178
- return None, None, "❌ No enhanced image found"
179
 
180
- final_path = final_files[0]
181
- final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
 
 
182
 
183
- return final_img, final_path, ""
 
 
 
 
 
 
 
 
184
 
185
  except Exception as e:
186
  return None, None, f"❌ Error: {str(e)}"
187
 
188
- finally:
189
- swap_semaphore.release()
190
- cleanup_old_dirs(UPLOAD_DIR)
191
- cleanup_old_dirs(RESULT_DIR)
192
-
193
  # -------------------------------------------------
194
  # Gradio UI
195
  # -------------------------------------------------
196
  with gr.Blocks() as demo:
197
- gr.Markdown("## Face Swap & Enhancement")
198
 
199
  with gr.Row():
200
  src_input = gr.Image(type="numpy", label="Upload Your Face")
@@ -206,7 +192,7 @@ with gr.Blocks() as demo:
206
  error_box = gr.Textbox(label="Logs / Errors", interactive=False)
207
 
208
  def process(src, tgt):
209
- img, path, err = face_swap_and_enhance_multiuser(src, tgt)
210
  return img, path, err
211
 
212
  btn.click(process, [src_input, tgt_input], [output_img, download, error_box])
@@ -269,7 +255,7 @@ async def perform_faceswap(request: FaceSwapRequest):
269
  src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB)
270
  tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB)
271
 
272
- final_img, final_path, err = face_swap_and_enhance_multiuser(src_rgb, tgt_rgb)
273
  if err:
274
  raise HTTPException(status_code=500, detail=err)
275
 
 
1
  import os
2
+ os.environ["OMP_NUM_THREADS"] = "1"
3
  import shutil
4
+ import uuid
5
  import cv2
6
  import numpy as np
7
  import threading
 
128
  logger.info("MongoDB connection closed")
129
 
130
  # -------------------------------------------------
131
+ # Lock for face swap
132
  # -------------------------------------------------
133
+ swap_lock = threading.Lock()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
 
135
+ # -------------------------------------------------
136
+ # Pipeline Function
137
+ # -------------------------------------------------
138
+ def face_swap_and_enhance(src_img, tgt_img):
139
  try:
140
+ with swap_lock:
141
+ shutil.rmtree(UPLOAD_DIR, ignore_errors=True)
142
+ shutil.rmtree(RESULT_DIR, ignore_errors=True)
143
+ os.makedirs(UPLOAD_DIR, exist_ok=True)
144
+ os.makedirs(RESULT_DIR, exist_ok=True)
145
 
146
+ src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
147
+ tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
 
 
148
 
149
+ src_faces = face_analysis_app.get(src_bgr)
150
+ tgt_faces = face_analysis_app.get(tgt_bgr)
151
+ if not src_faces or not tgt_faces:
152
+ return None, None, "❌ Face not detected in one of the images"
 
153
 
154
+ swapped_path = os.path.join(UPLOAD_DIR, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
155
+ swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0])
156
+ if swapped_bgr is None:
157
+ return None, None, "❌ Face swap failed"
158
 
159
+ cv2.imwrite(swapped_path, swapped_bgr)
 
 
160
 
161
+ cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {RESULT_DIR} --bg_upsampler realesrgan --face_upsample"
162
+ result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
163
+ if result.returncode != 0:
164
+ return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
165
 
166
+ final_results_dir = os.path.join(RESULT_DIR, "final_results")
167
+ final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")]
168
+ if not final_files:
169
+ return None, None, "❌ No enhanced image found"
170
+
171
+ final_path = os.path.join(final_results_dir, final_files[0])
172
+ final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
173
+
174
+ return final_img, final_path, ""
175
 
176
  except Exception as e:
177
  return None, None, f"❌ Error: {str(e)}"
178
 
 
 
 
 
 
179
  # -------------------------------------------------
180
  # Gradio UI
181
  # -------------------------------------------------
182
  with gr.Blocks() as demo:
183
+ gr.Markdown("Face Swap")
184
 
185
  with gr.Row():
186
  src_input = gr.Image(type="numpy", label="Upload Your Face")
 
192
  error_box = gr.Textbox(label="Logs / Errors", interactive=False)
193
 
194
  def process(src, tgt):
195
+ img, path, err = face_swap_and_enhance(src, tgt)
196
  return img, path, err
197
 
198
  btn.click(process, [src_input, tgt_input], [output_img, download, error_box])
 
255
  src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB)
256
  tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB)
257
 
258
+ final_img, final_path, err = face_swap_and_enhance(src_rgb, tgt_rgb)
259
  if err:
260
  raise HTTPException(status_code=500, detail=err)
261