videopix commited on
Commit
aa1f7cf
·
verified ·
1 Parent(s): dc123fc

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +125 -126
app.py CHANGED
@@ -20,7 +20,6 @@ try:
20
  except Exception:
21
  ONNX_AVAILABLE = False
22
 
23
-
24
  # ============================================================
25
  # GLOBALS
26
  # ============================================================
@@ -34,139 +33,102 @@ gfpgan_session = None
34
  TASKS = {}
35
  executor = concurrent.futures.ThreadPoolExecutor(max_workers=6)
36
 
37
-
38
  # ============================================================
39
  # Helpers
40
  # ============================================================
41
- def download_file(url: str, dest_path: str, timeout: int = 120):
42
- try:
43
- r = requests.get(url, stream=True, timeout=timeout)
44
- r.raise_for_status()
45
- with open(dest_path, "wb") as f:
46
- for chunk in r.iter_content(8192):
47
- if chunk:
48
- f.write(chunk)
49
- return True
50
- except Exception as e:
51
- print("Download failed:", e)
52
- if os.path.exists(dest_path):
53
- try:
54
- os.remove(dest_path)
55
- except Exception:
56
- pass
57
- return False
58
-
59
-
60
  def ensure_swapper_model():
61
  path = os.path.join(BASE_DIR, "inswapper_128.onnx")
62
  if not os.path.isfile(path):
63
  raise RuntimeError(
64
- "inswapper_128.onnx not found. "
65
- "Place it in the same directory as app.py."
66
  )
67
  print("Found swapper model:", path)
68
  return path
69
 
70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  # ============================================================
72
  # GFPGAN-Lite
73
  # ============================================================
74
  GFPGAN_URL = "https://huggingface.co/ai-forever/GFPGAN-Lite/resolve/main/gfpgan_lite.onnx"
75
- GFPGAN_FILENAME = "gfpgan_lite.onnx"
76
-
77
 
78
  def try_load_gfpgan():
79
  global gfpgan_session
80
  if not ONNX_AVAILABLE:
81
- print("onnxruntime not available — GFPGAN disabled")
82
  return
83
-
84
- path = os.path.join(BASE_DIR, GFPGAN_FILENAME)
85
  if not os.path.exists(path):
86
  print("Downloading GFPGAN-Lite...")
87
- if not download_file(GFPGAN_URL, path, 180):
88
- print("GFPGAN download failed")
89
  return
90
-
91
  try:
92
  gfpgan_session = ort.InferenceSession(path, providers=["CPUExecutionProvider"])
93
- print("Loaded GFPGAN-Lite:", path)
94
  except Exception as e:
95
  print("GFPGAN load failed:", e)
96
  gfpgan_session = None
97
 
98
-
99
- def try_load_enhancer():
100
- global enhancer_session
101
- if not ONNX_AVAILABLE:
102
- return
103
- path = os.path.join(BASE_DIR, "small_enhancer.onnx")
104
- if os.path.exists(path):
105
- try:
106
- enhancer_session = ort.InferenceSession(path, providers=["CPUExecutionProvider"])
107
- print("Loaded small enhancer:", path)
108
- except Exception as e:
109
- enhancer_session = None
110
- print("Small enhancer failed:", e)
111
-
112
-
113
  # ============================================================
114
  # Image helpers
115
  # ============================================================
116
- def swap_faces(target_img, target_face, source_face):
117
- return swapper.get(target_img, target_face, source_face, paste_back=True)
118
-
119
 
120
- def clarity_enhance(img):
121
  blur = cv2.GaussianBlur(img, (0, 0), 1.6)
122
  return cv2.addWeighted(img, 1.25, blur, -0.25, 0)
123
 
124
-
125
- def resize_keep_aspect_max512(img):
126
  h, w = img.shape[:2]
127
  if max(h, w) <= 512:
128
  return img
129
- scale = 512 / max(h, w)
130
- return cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_CUBIC)
131
-
132
 
133
  # ============================================================
134
  # Worker
135
  # ============================================================
136
- def run_task(task_id, src_bytes, tgt_bytes):
137
- TASKS[task_id]["status"] = "processing"
138
  try:
139
  src = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR)
140
  tgt = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR)
141
 
142
- if src is None or tgt is None:
143
- raise ValueError("Invalid image")
144
-
145
- src_faces = face_app.get(src)
146
- tgt_faces = face_app.get(tgt)
147
 
148
- if not src_faces or not tgt_faces:
149
  raise ValueError("Face not detected")
150
 
151
- swapped = swap_faces(tgt, tgt_faces[0], src_faces[0])
152
- final = clarity_enhance(swapped)
153
- final = resize_keep_aspect_max512(final)
154
 
155
- out_path = f"/tmp/{task_id}.jpg"
156
- cv2.imwrite(out_path, final)
157
 
158
- TASKS[task_id] = {"status": "done", "result": out_path}
159
 
160
  except Exception as e:
161
- TASKS[task_id] = {"status": "failed", "error": str(e)}
162
  print(traceback.format_exc())
163
 
164
-
165
  # ============================================================
166
- # FastAPI + FULL UI
167
  # ============================================================
168
- app = FastAPI(title="FaceSwap (GFPGAN-Lite)")
169
-
170
 
171
  @app.get("/", response_class=HTMLResponse)
172
  def home():
@@ -181,81 +143,119 @@ def home():
181
  body{background:#0e1525;color:#fff;font-family:Arial;margin:0;padding:20px}
182
  .container{max-width:1100px;margin:auto}
183
  .flex{display:flex;gap:16px;flex-wrap:wrap}
184
- .zone{flex:1;min-width:280px;border:2px dashed #ffffff33;padding:18px;text-align:center;border-radius:10px;cursor:pointer}
185
- .preview img{max-width:100%;border-radius:10px}
 
186
  button{padding:10px 18px;border-radius:10px;border:none;background:#6c5ce7;color:white;font-weight:600}
 
187
  </style>
188
  </head>
189
  <body>
190
  <div class="container">
191
  <h2>FaceSwap</h2>
 
192
  <div class="flex">
193
- <label class="zone">
194
- <input type="file" id="src" hidden>
195
- Source Image
196
- </label>
197
- <label class="zone">
198
- <input type="file" id="tgt" hidden>
199
- Target Image
200
- </label>
201
  </div>
202
- <button onclick="start()">Start</button>
203
- <img id="out">
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  </div>
 
 
205
  <script>
206
- let src,tgt;
207
- document.getElementById("src").onchange=e=>src=e.target.files[0];
208
- document.getElementById("tgt").onchange=e=>tgt=e.target.files[0];
209
- async function start(){
210
- if(!src||!tgt){alert("Select images");return;}
211
- let fd=new FormData();
212
- fd.append("source",src);
213
- fd.append("target",tgt);
214
- let r=await fetch("/swap-image",{method:"POST",body:fd});
215
- let j=await r.json();
216
- let id=j.task_id;
217
- let t=setInterval(async()=>{
218
- let s=await fetch("/task-status/"+id).then(r=>r.json());
219
- if(s.status==="done"){
220
- clearInterval(t);
221
- document.getElementById("out").src="/task-result/"+id;
222
- }
223
- },1000);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  }
225
  </script>
226
  </body>
227
  </html>
228
  """
229
 
230
-
231
  @app.post("/swap-image")
232
  async def swap_image(source: UploadFile = File(...), target: UploadFile = File(...)):
233
- task_id = str(uuid.uuid4())
234
- TASKS[task_id] = {"status": "queued"}
235
- executor.submit(run_task, task_id, await source.read(), await target.read())
236
- return {"task_id": task_id}
237
-
238
-
239
- @app.get("/task-status/{task_id}")
240
- def task_status(task_id: str):
241
- if task_id not in TASKS:
242
  raise HTTPException(404)
243
- return TASKS[task_id]
244
-
245
 
246
- @app.get("/task-result/{task_id}")
247
- def task_result(task_id: str):
248
- task = TASKS.get(task_id)
249
- if not task or task["status"] != "done":
250
  raise HTTPException(404)
251
- return StreamingResponse(open(task["result"], "rb"), media_type="image/jpeg")
252
-
253
 
254
  # ============================================================
255
  # INIT
256
  # ============================================================
257
  print("Initializing models...")
258
-
259
  face_app = FaceAnalysis(name="buffalo_l")
260
  face_app.prepare(ctx_id=-1, det_size=(640, 640))
261
 
@@ -265,7 +265,6 @@ swapper = insightface.model_zoo.get_model(
265
  root=BASE_DIR
266
  )
267
 
268
- try_load_enhancer()
269
  try_load_gfpgan()
270
 
271
- print("FaceSwap API ready.")
 
20
  except Exception:
21
  ONNX_AVAILABLE = False
22
 
 
23
  # ============================================================
24
  # GLOBALS
25
  # ============================================================
 
33
  TASKS = {}
34
  executor = concurrent.futures.ThreadPoolExecutor(max_workers=6)
35
 
 
36
  # ============================================================
37
  # Helpers
38
  # ============================================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  def ensure_swapper_model():
40
  path = os.path.join(BASE_DIR, "inswapper_128.onnx")
41
  if not os.path.isfile(path):
42
  raise RuntimeError(
43
+ "inswapper_128.onnx not found. Place it next to app.py"
 
44
  )
45
  print("Found swapper model:", path)
46
  return path
47
 
48
 
49
+ def download_file(url, dest, timeout=180):
50
+ try:
51
+ r = requests.get(url, stream=True, timeout=timeout)
52
+ r.raise_for_status()
53
+ with open(dest, "wb") as f:
54
+ for c in r.iter_content(8192):
55
+ if c:
56
+ f.write(c)
57
+ return True
58
+ except Exception as e:
59
+ print("Download failed:", e)
60
+ return False
61
+
62
  # ============================================================
63
  # GFPGAN-Lite
64
  # ============================================================
65
  GFPGAN_URL = "https://huggingface.co/ai-forever/GFPGAN-Lite/resolve/main/gfpgan_lite.onnx"
 
 
66
 
67
  def try_load_gfpgan():
68
  global gfpgan_session
69
  if not ONNX_AVAILABLE:
 
70
  return
71
+ path = os.path.join(BASE_DIR, "gfpgan_lite.onnx")
 
72
  if not os.path.exists(path):
73
  print("Downloading GFPGAN-Lite...")
74
+ if not download_file(GFPGAN_URL, path):
 
75
  return
 
76
  try:
77
  gfpgan_session = ort.InferenceSession(path, providers=["CPUExecutionProvider"])
78
+ print("GFPGAN-Lite loaded")
79
  except Exception as e:
80
  print("GFPGAN load failed:", e)
81
  gfpgan_session = None
82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
  # ============================================================
84
  # Image helpers
85
  # ============================================================
86
+ def swap_faces(target, t_face, s_face):
87
+ return swapper.get(target, t_face, s_face, paste_back=True)
 
88
 
89
+ def clarity(img):
90
  blur = cv2.GaussianBlur(img, (0, 0), 1.6)
91
  return cv2.addWeighted(img, 1.25, blur, -0.25, 0)
92
 
93
+ def resize_max512(img):
 
94
  h, w = img.shape[:2]
95
  if max(h, w) <= 512:
96
  return img
97
+ s = 512 / max(h, w)
98
+ return cv2.resize(img, (int(w * s), int(h * s)), interpolation=cv2.INTER_CUBIC)
 
99
 
100
  # ============================================================
101
  # Worker
102
  # ============================================================
103
+ def run_task(tid, src_bytes, tgt_bytes):
104
+ TASKS[tid]["status"] = "processing"
105
  try:
106
  src = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR)
107
  tgt = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR)
108
 
109
+ s_faces = face_app.get(src)
110
+ t_faces = face_app.get(tgt)
 
 
 
111
 
112
+ if not s_faces or not t_faces:
113
  raise ValueError("Face not detected")
114
 
115
+ out = swap_faces(tgt, t_faces[0], s_faces[0])
116
+ out = clarity(out)
117
+ out = resize_max512(out)
118
 
119
+ out_path = f"/tmp/{tid}.jpg"
120
+ cv2.imwrite(out_path, out)
121
 
122
+ TASKS[tid] = {"status": "done", "result": out_path}
123
 
124
  except Exception as e:
125
+ TASKS[tid] = {"status": "failed", "error": str(e)}
126
  print(traceback.format_exc())
127
 
 
128
  # ============================================================
129
+ # FastAPI
130
  # ============================================================
131
+ app = FastAPI(title="FaceSwap")
 
132
 
133
  @app.get("/", response_class=HTMLResponse)
134
  def home():
 
143
  body{background:#0e1525;color:#fff;font-family:Arial;margin:0;padding:20px}
144
  .container{max-width:1100px;margin:auto}
145
  .flex{display:flex;gap:16px;flex-wrap:wrap}
146
+ .box{flex:1;min-width:280px;background:#ffffff10;padding:12px;border-radius:10px;text-align:center}
147
+ .box img{max-width:100%;border-radius:10px}
148
+ .zone{border:2px dashed #ffffff33;padding:18px;border-radius:10px;cursor:pointer}
149
  button{padding:10px 18px;border-radius:10px;border:none;background:#6c5ce7;color:white;font-weight:600}
150
+ a.download{display:inline-block;margin-top:10px;color:#00cec9;text-decoration:none}
151
  </style>
152
  </head>
153
  <body>
154
  <div class="container">
155
  <h2>FaceSwap</h2>
156
+
157
  <div class="flex">
158
+ <label class="zone">
159
+ <input type="file" id="src" hidden>
160
+ Select SOURCE
161
+ </label>
162
+ <label class="zone">
163
+ <input type="file" id="tgt" hidden>
164
+ Select TARGET
165
+ </label>
166
  </div>
167
+
168
+ <div class="flex" style="margin-top:20px">
169
+ <div class="box">
170
+ <div>Source</div>
171
+ <img id="pSrc">
172
+ </div>
173
+ <div class="box">
174
+ <div>Target</div>
175
+ <img id="pTgt">
176
+ </div>
177
+ <div class="box">
178
+ <div>Output</div>
179
+ <img id="pOut">
180
+ <a id="dl" class="download" download="faceswap.jpg" style="display:none">Download</a>
181
+ </div>
182
+ </div>
183
+
184
+ <div style="margin-top:20px">
185
+ <button id="go">Start</button>
186
+ <span id="st" style="margin-left:10px"></span>
187
  </div>
188
+ </div>
189
+
190
  <script>
191
+ const s=document.getElementById("src"),
192
+ t=document.getElementById("tgt"),
193
+ ps=document.getElementById("pSrc"),
194
+ pt=document.getElementById("pTgt"),
195
+ po=document.getElementById("pOut"),
196
+ dl=document.getElementById("dl"),
197
+ st=document.getElementById("st");
198
+
199
+ s.onchange=()=>ps.src=URL.createObjectURL(s.files[0]);
200
+ t.onchange=()=>pt.src=URL.createObjectURL(t.files[0]);
201
+
202
+ document.getElementById("go").onclick=async()=>{
203
+ if(!s.files[0]||!t.files[0]) return alert("Select both images");
204
+ st.innerText="Processing...";
205
+ let fd=new FormData();
206
+ fd.append("source",s.files[0]);
207
+ fd.append("target",t.files[0]);
208
+ const r=await fetch("/swap-image",{method:"POST",body:fd});
209
+ const j=await r.json();
210
+ poll(j.task_id);
211
+ };
212
+
213
+ async function poll(id){
214
+ const r=await fetch("/task-status/"+id);
215
+ const j=await r.json();
216
+ if(j.status==="done"){
217
+ const img=await fetch("/task-result/"+id);
218
+ const b=await img.blob();
219
+ const u=URL.createObjectURL(b);
220
+ po.src=u;
221
+ dl.href=u;
222
+ dl.style.display="inline-block";
223
+ st.innerText="Done";
224
+ } else if(j.status==="failed"){
225
+ st.innerText="Failed: "+j.error;
226
+ } else {
227
+ setTimeout(()=>poll(id),1000);
228
+ }
229
  }
230
  </script>
231
  </body>
232
  </html>
233
  """
234
 
 
235
  @app.post("/swap-image")
236
  async def swap_image(source: UploadFile = File(...), target: UploadFile = File(...)):
237
+ tid = str(uuid.uuid4())
238
+ TASKS[tid] = {"status": "queued"}
239
+ executor.submit(run_task, tid, await source.read(), await target.read())
240
+ return {"task_id": tid}
241
+
242
+ @app.get("/task-status/{tid}")
243
+ def status(tid: str):
244
+ if tid not in TASKS:
 
245
  raise HTTPException(404)
246
+ return TASKS[tid]
 
247
 
248
+ @app.get("/task-result/{tid}")
249
+ def result(tid: str):
250
+ t = TASKS.get(tid)
251
+ if not t or t["status"] != "done":
252
  raise HTTPException(404)
253
+ return StreamingResponse(open(t["result"], "rb"), media_type="image/jpeg")
 
254
 
255
  # ============================================================
256
  # INIT
257
  # ============================================================
258
  print("Initializing models...")
 
259
  face_app = FaceAnalysis(name="buffalo_l")
260
  face_app.prepare(ctx_id=-1, det_size=(640, 640))
261
 
 
265
  root=BASE_DIR
266
  )
267
 
 
268
  try_load_gfpgan()
269
 
270
+ print("FaceSwap ready")