AkashKumarave commited on
Commit
9fca7c3
·
verified ·
1 Parent(s): 427cb66

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +65 -135
app.py CHANGED
@@ -1,163 +1,93 @@
1
  # -*- coding: UTF-8 -*-
2
- from fastapi import FastAPI, UploadFile, File, HTTPException
3
- from fastapi.responses import Response
4
- from fastapi.middleware.cors import CORSMiddleware
5
- from contextlib import asynccontextmanager
6
  import cv2
7
  import numpy as np
8
- from PIL import Image
9
- import os
10
- import requests
 
 
11
  from pathlib import Path
12
- import uvicorn
 
13
  import logging
14
 
15
- # Initialize FastAPI with explicit docs settings
16
- app = FastAPI(
17
- title="Face Swap API",
18
- description="API for swapping faces in images.",
19
- docs_url="/docs", # Explicitly set docs URL
20
- redoc_url="/redoc", # Explicitly set redoc URL
21
- )
22
-
23
  # Configure logging
24
  logging.basicConfig(level=logging.INFO)
25
  logger = logging.getLogger(__name__)
26
 
27
- # Add CORS middleware
28
- app.add_middleware(
29
- CORSMiddleware,
30
- allow_origins=["*"], # Update with your Framer domain in production
31
- allow_credentials=True,
32
- allow_methods=["*"],
33
- allow_headers=["*"],
34
- )
35
 
36
  # Health Check Endpoint
37
  @app.get("/health")
38
  async def health_check():
39
  return {"status": "healthy"}
40
 
41
- # Global flag to prevent multiple downloads
42
- MODEL_DOWNLOADED = False
 
43
 
44
  def download_model():
45
- global MODEL_DOWNLOADED
46
- if MODEL_DOWNLOADED:
47
- logger.info("Model already downloaded, skipping.")
48
- return
49
-
50
- model_dir = Path("models")
51
- model_path = model_dir / "inswapper_128.onnx"
52
- model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"
53
-
54
- if not model_path.exists():
55
- logger.info("Model not found. Downloading inswapper_128.onnx...")
56
- model_dir.mkdir(exist_ok=True)
57
- try:
58
- response = requests.get(model_url, stream=True, timeout=30)
59
- response.raise_for_status()
60
- with open(model_path, 'wb') as f:
61
- for chunk in response.iter_content(chunk_size=8192):
62
- f.write(chunk)
63
- logger.info("Model downloaded successfully.")
64
- MODEL_DOWNLOADED = True
65
- except Exception as e:
66
- logger.error(f"Failed to download model: {e}")
67
- raise RuntimeError("Could not download inswapper_128.onnx. Please check logs.")
68
- else:
69
- logger.info("Model already exists at: %s", model_path)
70
- MODEL_DOWNLOADED = True
71
-
72
- # Use lifespan event handler
73
- @asynccontextmanager
74
- async def lifespan(app: FastAPI):
75
- logger.info("Starting up application...")
76
- try:
77
- download_model()
78
- logger.info("Startup completed successfully.")
79
- except Exception as e:
80
- logger.error(f"Startup failed: {e}")
81
- raise
82
- yield
83
- logger.info("Shutting down application...")
84
-
85
- app.lifespan = lifespan
86
-
87
  def swap_faces(source_img, target_img):
88
- """Perform face swapping using insightface and inswapper model."""
89
- try:
90
- from insightface.app import FaceAnalysis
91
- from insightface.utils import face_align
92
- from insightface.model_zoo import face_swapper
93
-
94
- face_analyzer = FaceAnalysis(name="buffalo_l")
95
- face_analyzer.prepare(ctx_id=0, det_size=(640, 640))
96
-
97
- source_faces = face_analyzer.get(source_img)
98
- target_faces = face_analyzer.get(target_img)
99
-
100
- if not source_faces or not target_faces:
101
- raise ValueError("No faces detected in one or both images.")
102
- if len(source_faces) > 1 or len(target_faces) > 1:
103
- raise ValueError("Multiple faces detected; only one face per image is supported.")
104
-
105
- source_face = source_faces[0]
106
- target_face = target_faces[0]
107
 
108
- model_path = Path("models/inswapper_128.onnx")
109
- if not model_path.exists():
110
- raise FileNotFoundError("Model file inswapper_128.onnx not found.")
111
- swapper = face_swapper.FaceSwapper(model_path)
112
 
113
- result = swapper.get(target_img, target_face, source_face, paste_back=True)
 
 
 
114
 
115
- target_pil = Image.fromarray(cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB))
116
- result_pil = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
117
- result_pil = result_pil.resize(target_pil.size, Image.Resampling.LANCZOS)
118
 
119
- return cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR)
120
- except Exception as e:
121
- logger.error(f"Face swap failed: {e}")
122
- raise
123
-
124
- @app.post("/swap-face/")
125
- async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...)):
126
  try:
127
- source_path = "temp_source.jpg"
128
- target_path = "temp_target.jpg"
129
- output_path = "output.jpg"
130
-
131
- source_content = await source_file.read()
132
- with open(source_path, "wb") as f:
133
- f.write(source_content)
134
- source_img = cv2.imread(source_path)
135
- if source_img is None:
136
- raise ValueError("Failed to load source image.")
137
-
138
- target_content = await target_file.read()
139
- with open(target_path, "wb") as f:
140
- f.write(target_content)
141
- target_img = cv2.imread(target_path)
142
- if target_img is None:
143
- raise ValueError("Failed to load target image.")
144
-
145
- result_img = swap_faces(source_img, target_img)
146
-
147
- cv2.imwrite(output_path, result_img)
148
-
149
- with open(output_path, "rb") as f:
150
- image_data = f.read()
151
-
152
- for path in [source_path, target_path, output_path]:
153
- if os.path.exists(path):
154
- os.remove(path)
155
-
156
- return Response(content=image_data, media_type="image/jpeg")
157
-
158
  except Exception as e:
159
- logger.error("Error in swap_face: %s", str(e))
160
- raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
 
162
  if __name__ == "__main__":
 
163
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
  # -*- coding: UTF-8 -*-
2
+ import gradio as gr
 
 
 
3
  import cv2
4
  import numpy as np
5
+ from fastapi import FastAPI
6
+ from fastapi.middleware.cors import CORSMiddleware
7
+ from contextlib import asynccontextmanager
8
+ from insightface.app import FaceAnalysis
9
+ from insightface.model_zoo.face_swapper import FaceSwapper
10
  from pathlib import Path
11
+ import requests
12
+ import os
13
  import logging
14
 
 
 
 
 
 
 
 
 
15
  # Configure logging
16
  logging.basicConfig(level=logging.INFO)
17
  logger = logging.getLogger(__name__)
18
 
19
+ # Initialize FastAPI
20
+ app = FastAPI(title="Face Swap API", description="Gradio-based Face Swap App")
21
+ app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
 
 
 
 
 
22
 
23
  # Health Check Endpoint
24
  @app.get("/health")
25
  async def health_check():
26
  return {"status": "healthy"}
27
 
28
+ # Download Model Function
29
+ MODEL_PATH = Path("models/inswapper_128.onnx")
30
+ MODEL_URL = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"
31
 
32
  def download_model():
33
+ if not MODEL_PATH.exists():
34
+ logger.info("Downloading model...")
35
+ os.makedirs("models", exist_ok=True)
36
+ response = requests.get(MODEL_URL, stream=True, timeout=30)
37
+ response.raise_for_status()
38
+ with open(MODEL_PATH, 'wb') as f:
39
+ for chunk in response.iter_content(chunk_size=8192):
40
+ f.write(chunk)
41
+ logger.info("Model downloaded successfully.")
42
+
43
+ download_model()
44
+
45
+ # Initialize Face Analyzer & Swapper
46
+ face_analyzer = FaceAnalysis(name="buffalo_l")
47
+ face_analyzer.prepare(ctx_id=0, det_size=(640, 640))
48
+ face_swapper = FaceSwapper(MODEL_PATH)
49
+
50
+ # Face Swap Function
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  def swap_faces(source_img, target_img):
52
+ source_img = cv2.cvtColor(np.array(source_img), cv2.COLOR_RGB2BGR)
53
+ target_img = cv2.cvtColor(np.array(target_img), cv2.COLOR_RGB2BGR)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
 
55
+ source_faces = face_analyzer.get(source_img)
56
+ target_faces = face_analyzer.get(target_img)
 
 
57
 
58
+ if not source_faces or not target_faces:
59
+ return "Error: No faces detected in one or both images."
60
+ if len(source_faces) > 1 or len(target_faces) > 1:
61
+ return "Error: Multiple faces detected; only one face per image is supported."
62
 
63
+ result_img = face_swapper.get(target_img, target_faces[0], source_faces[0], paste_back=True)
64
+ return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
 
65
 
66
+ # Gradio UI
67
+ def gradio_face_swap(source_img, target_img):
 
 
 
 
 
68
  try:
69
+ result = swap_faces(source_img, target_img)
70
+ if isinstance(result, str):
71
+ return result # Error message
72
+ return result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  except Exception as e:
74
+ return f"Error: {str(e)}"
75
+
76
+ gr_interface = gr.Interface(
77
+ fn=gradio_face_swap,
78
+ inputs=[
79
+ gr.Image(label="Upload Source Face"),
80
+ gr.Image(label="Upload Target Image")
81
+ ],
82
+ outputs=gr.Image(label="Swapped Face"),
83
+ title="AI Face Swap",
84
+ description="Upload two images: one with the source face and another where you want the face swapped.",
85
+ live=True
86
+ )
87
+
88
+ # Launch Gradio
89
+ app = gr.mount_gradio_app(app, gr_interface, path="/")
90
 
91
  if __name__ == "__main__":
92
+ import uvicorn
93
  uvicorn.run(app, host="0.0.0.0", port=7860)