AkashKumarave commited on
Commit
c7d6b3a
·
verified ·
1 Parent(s): b8c2f1b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +32 -71
app.py CHANGED
@@ -7,18 +7,17 @@ import cv2
7
  import numpy as np
8
  from PIL import Image
9
  import os
10
- import shutil
11
  import logging
12
  import requests
13
  from pathlib import Path
14
  import uvicorn
15
 
16
- # Initialize FastAPI with explicit docs settings
17
  app = FastAPI(
18
  title="Face Swap API",
19
  description="API for swapping faces in images.",
20
- docs_url="/docs", # Explicitly set docs URL
21
- redoc_url="/redoc", # Explicitly set redoc URL
22
  )
23
 
24
  # Configure logging
@@ -28,37 +27,33 @@ logger = logging.getLogger(__name__)
28
  # Add CORS middleware
29
  app.add_middleware(
30
  CORSMiddleware,
31
- allow_origins=["*"], # Update with your Framer domain in production
32
  allow_credentials=True,
33
  allow_methods=["*"],
34
  allow_headers=["*"],
35
  )
36
 
37
- # Add a root endpoint to confirm the app is running
38
  @app.get("/")
39
  async def root():
40
  return {"message": "Welcome to the Face Swap API! Use /swap-face/ to swap faces or /docs to test the API."}
41
 
42
- # Add a health check endpoint
43
  @app.get("/health")
44
  async def health_check():
45
  return {"status": "healthy"}
46
 
47
- # Global flag to prevent multiple downloads
48
  MODEL_DOWNLOADED = False
49
 
50
  def download_model():
51
  global MODEL_DOWNLOADED
52
- if MODEL_DOWNLOADED:
53
- logger.info("Model already downloaded, skipping.")
54
- return
55
-
56
  model_dir = Path("models")
57
  model_path = model_dir / "inswapper_128.onnx"
58
  model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"
59
 
60
  if not model_path.exists():
61
- logger.info("Model not found. Downloading inswapper_128.onnx...")
62
  model_dir.mkdir(exist_ok=True)
63
  try:
64
  response = requests.get(model_url, stream=True, timeout=30)
@@ -70,58 +65,43 @@ def download_model():
70
  MODEL_DOWNLOADED = True
71
  except Exception as e:
72
  logger.error(f"Failed to download model: {e}")
73
- raise RuntimeError("Could not download inswapper_128.onnx. Please check logs.")
74
  else:
75
- logger.info("Model already exists at: %s", model_path)
76
  MODEL_DOWNLOADED = True
77
 
78
- # Use lifespan event handler
79
  @asynccontextmanager
80
  async def lifespan(app: FastAPI):
81
- # Startup code
82
- logger.info("Starting up application...")
83
  try:
84
  download_model()
85
- logger.info("Startup completed successfully.")
86
  except Exception as e:
87
  logger.error(f"Startup failed: {e}")
88
  raise
89
  yield
90
- # Shutdown code (if any)
91
  logger.info("Shutting down application...")
92
 
93
  app.lifespan = lifespan
94
 
95
- def get_many_faces(image):
96
- """Simplified face detection using insightface."""
97
- try:
98
- from insightface.app import FaceAnalysis
99
- app = FaceAnalysis(name="buffalo_l")
100
- app.prepare(ctx_id=0, det_size=(640, 640))
101
- faces = app.get(image)
102
- return faces if faces else []
103
- except Exception as e:
104
- logger.error(f"Face detection failed: {e}")
105
- raise
106
-
107
  def swap_faces(source_img, target_img):
108
- """Perform face swapping using insightface and inswapper model."""
109
  try:
 
110
  from insightface.utils import face_align
111
  from insightface.model_zoo import face_swapper
112
 
113
  # Initialize face analysis
114
  face_analyzer = FaceAnalysis(name="buffalo_l")
115
- face_analyzer.prepare(ctx_id=0, det_size=(640, 640))
116
 
117
  # Detect faces
118
  source_faces = face_analyzer.get(source_img)
119
  target_faces = face_analyzer.get(target_img)
120
 
121
  if not source_faces or not target_faces:
122
- raise ValueError("No faces detected in one or both images.")
123
  if len(source_faces) > 1 or len(target_faces) > 1:
124
- raise ValueError("Multiple faces detected; only one face per image is supported.")
125
 
126
  source_face = source_faces[0]
127
  target_face = target_faces[0]
@@ -130,14 +110,14 @@ def swap_faces(source_img, target_img):
130
  model_path = Path("models/inswapper_128.onnx")
131
  if not model_path.exists():
132
  raise FileNotFoundError("Model file inswapper_128.onnx not found.")
133
- swapper = face_swapper.FaceSwapper(model_path)
134
 
135
  # Perform face swap
136
  result = swapper.get(target_img, target_face, source_face, paste_back=True)
137
 
138
- # Resize to match target image size
139
- target_pil = Image.fromarray(cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB))
140
  result_pil = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
 
141
  result_pil = result_pil.resize(target_pil.size, Image.Resampling.LANCZOS)
142
 
143
  return cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR)
@@ -146,51 +126,32 @@ def swap_faces(source_img, target_img):
146
  raise
147
 
148
  @app.post("/swap-face/")
149
- async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...), doFaceEnhancer: bool = True):
150
  try:
151
- # Save uploaded files temporarily
152
- source_path = "temp_source.jpg"
153
- target_path = "temp_target.jpg"
154
- output_path = "output.jpg"
155
-
156
- # Read and save source image
157
  source_content = await source_file.read()
158
- with open(source_path, "wb") as f:
159
- f.write(source_content)
160
- source_img = cv2.imread(source_path)
161
  if source_img is None:
162
- raise ValueError("Failed to load source image.")
163
 
164
- # Read and save target image
165
  target_content = await target_file.read()
166
- with open(target_path, "wb") as f:
167
- f.write(target_content)
168
- target_img = cv2.imread(target_path)
169
  if target_img is None:
170
- raise ValueError("Failed to load target image.")
171
 
172
  # Perform face swap
173
  result_img = swap_faces(source_img, target_img)
174
 
175
- # Save the result
176
- cv2.imwrite(output_path, result_img)
177
-
178
- # Read the output image
179
- with open(output_path, "rb") as f:
180
- image_data = f.read()
181
-
182
- # Clean up temporary files
183
- for path in [source_path, target_path, output_path]:
184
- if os.path.exists(path):
185
- os.remove(path)
186
-
187
- # Return the swapped image
188
- return Response(content=image_data, media_type="image/jpeg")
189
 
190
  except Exception as e:
191
  logger.error("Error in swap_face: %s", str(e))
192
  raise HTTPException(status_code=500, detail=str(e))
193
 
194
  if __name__ == "__main__":
195
- # Hugging Face Spaces expects the app to run on port 7860
196
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
7
  import numpy as np
8
  from PIL import Image
9
  import os
 
10
  import logging
11
  import requests
12
  from pathlib import Path
13
  import uvicorn
14
 
15
+ # Initialize FastAPI
16
  app = FastAPI(
17
  title="Face Swap API",
18
  description="API for swapping faces in images.",
19
+ docs_url="/docs",
20
+ redoc_url="/redoc"
21
  )
22
 
23
  # Configure logging
 
27
  # Add CORS middleware
28
  app.add_middleware(
29
  CORSMiddleware,
30
+ allow_origins=["*"],
31
  allow_credentials=True,
32
  allow_methods=["*"],
33
  allow_headers=["*"],
34
  )
35
 
36
+ # Root endpoint
37
  @app.get("/")
38
  async def root():
39
  return {"message": "Welcome to the Face Swap API! Use /swap-face/ to swap faces or /docs to test the API."}
40
 
41
+ # Health check
42
  @app.get("/health")
43
  async def health_check():
44
  return {"status": "healthy"}
45
 
46
+ # Global flag for model download
47
  MODEL_DOWNLOADED = False
48
 
49
  def download_model():
50
  global MODEL_DOWNLOADED
 
 
 
 
51
  model_dir = Path("models")
52
  model_path = model_dir / "inswapper_128.onnx"
53
  model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"
54
 
55
  if not model_path.exists():
56
+ logger.info("Downloading inswapper_128.onnx...")
57
  model_dir.mkdir(exist_ok=True)
58
  try:
59
  response = requests.get(model_url, stream=True, timeout=30)
 
65
  MODEL_DOWNLOADED = True
66
  except Exception as e:
67
  logger.error(f"Failed to download model: {e}")
68
+ raise RuntimeError("Could not download inswapper_128.onnx.")
69
  else:
70
+ logger.info("Model already exists.")
71
  MODEL_DOWNLOADED = True
72
 
 
73
  @asynccontextmanager
74
  async def lifespan(app: FastAPI):
75
+ logger.info("Starting application...")
 
76
  try:
77
  download_model()
78
+ logger.info("Application started successfully.")
79
  except Exception as e:
80
  logger.error(f"Startup failed: {e}")
81
  raise
82
  yield
 
83
  logger.info("Shutting down application...")
84
 
85
  app.lifespan = lifespan
86
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  def swap_faces(source_img, target_img):
 
88
  try:
89
+ from insightface.app import FaceAnalysis
90
  from insightface.utils import face_align
91
  from insightface.model_zoo import face_swapper
92
 
93
  # Initialize face analysis
94
  face_analyzer = FaceAnalysis(name="buffalo_l")
95
+ face_analyzer.prepare(ctx_id=-1, det_size=(640, 640))
96
 
97
  # Detect faces
98
  source_faces = face_analyzer.get(source_img)
99
  target_faces = face_analyzer.get(target_img)
100
 
101
  if not source_faces or not target_faces:
102
+ raise ValueError("No faces detected.")
103
  if len(source_faces) > 1 or len(target_faces) > 1:
104
+ raise ValueError("Multiple faces detected; only one per image supported.")
105
 
106
  source_face = source_faces[0]
107
  target_face = target_faces[0]
 
110
  model_path = Path("models/inswapper_128.onnx")
111
  if not model_path.exists():
112
  raise FileNotFoundError("Model file inswapper_128.onnx not found.")
113
+ swapper = face_swapper.FaceSwapper(str(model_path))
114
 
115
  # Perform face swap
116
  result = swapper.get(target_img, target_face, source_face, paste_back=True)
117
 
118
+ # Resize result to match target image size
 
119
  result_pil = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
120
+ target_pil = Image.fromarray(cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB))
121
  result_pil = result_pil.resize(target_pil.size, Image.Resampling.LANCZOS)
122
 
123
  return cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR)
 
126
  raise
127
 
128
  @app.post("/swap-face/")
129
+ async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile = File(...)):
130
  try:
131
+ # Read source image
 
 
 
 
 
132
  source_content = await source_file.read()
133
+ source_np = np.frombuffer(source_content, np.uint8)
134
+ source_img = cv2.imdecode(source_np, cv2.IMREAD_COLOR)
 
135
  if source_img is None:
136
+ raise ValueError("Invalid source image.")
137
 
138
+ # Read target image
139
  target_content = await target_file.read()
140
+ target_np = np.frombuffer(target_content, np.uint8)
141
+ target_img = cv2.imdecode(target_np, cv2.IMREAD_COLOR)
 
142
  if target_img is None:
143
+ raise ValueError("Invalid target image.")
144
 
145
  # Perform face swap
146
  result_img = swap_faces(source_img, target_img)
147
 
148
+ # Convert result to bytes
149
+ _, img_encoded = cv2.imencode(".jpg", result_img)
150
+ return Response(content=img_encoded.tobytes(), media_type="image/jpeg")
 
 
 
 
 
 
 
 
 
 
 
151
 
152
  except Exception as e:
153
  logger.error("Error in swap_face: %s", str(e))
154
  raise HTTPException(status_code=500, detail=str(e))
155
 
156
  if __name__ == "__main__":
157
+ uvicorn.run(app, host="0.0.0.0", port=7860)