AkashKumarave commited on
Commit
427cb66
·
verified ·
1 Parent(s): e9cafa4

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +67 -55
app.py CHANGED
@@ -1,4 +1,4 @@
1
- # -*- coding:UTF-8 -*-
2
  from fastapi import FastAPI, UploadFile, File, HTTPException
3
  from fastapi.responses import Response
4
  from fastapi.middleware.cors import CORSMiddleware
@@ -7,66 +7,72 @@ import cv2
7
  import numpy as np
8
  from PIL import Image
9
  import os
10
- import logging
11
  import requests
12
  from pathlib import Path
13
  import uvicorn
 
14
 
15
- # Initialize FastAPI
16
  app = FastAPI(
17
  title="Face Swap API",
18
  description="API for swapping faces in images.",
19
- docs_url="/docs",
20
- redoc_url="/redoc",
21
  )
22
 
23
- # Logging setup
24
  logging.basicConfig(level=logging.INFO)
25
  logger = logging.getLogger(__name__)
26
 
27
- # CORS setup
28
  app.add_middleware(
29
  CORSMiddleware,
30
- allow_origins=["*"], # Update with your domain in production
31
  allow_credentials=True,
32
  allow_methods=["*"],
33
  allow_headers=["*"],
34
  )
35
 
36
- # Health check route
37
- @app.get("/")
38
- async def root():
39
- return {"message": "Face Swap API is running. Use /docs to test the API."}
40
-
41
  @app.get("/health")
42
  async def health_check():
43
  return {"status": "healthy"}
44
 
45
- # Prevent multiple downloads
46
- MODEL_PATH = Path("models/inswapper_128.onnx")
47
- MODEL_URL = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"
48
 
49
  def download_model():
50
- if MODEL_PATH.exists():
51
- logger.info("Model already exists, skipping download.")
 
52
  return
53
- logger.info("Downloading model...")
54
- MODEL_PATH.parent.mkdir(exist_ok=True)
55
- try:
56
- response = requests.get(MODEL_URL, stream=True, timeout=30)
57
- response.raise_for_status()
58
- with open(MODEL_PATH, 'wb') as f:
59
- for chunk in response.iter_content(chunk_size=8192):
60
- f.write(chunk)
61
- logger.info("Model downloaded successfully.")
62
- except Exception as e:
63
- logger.error(f"Failed to download model: {e}")
64
- raise RuntimeError("Could not download inswapper_128.onnx.")
65
 
66
- # FastAPI startup event
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  @asynccontextmanager
68
  async def lifespan(app: FastAPI):
69
- logger.info("Starting application...")
70
  try:
71
  download_model()
72
  logger.info("Startup completed successfully.")
@@ -78,19 +84,10 @@ async def lifespan(app: FastAPI):
78
 
79
  app.lifespan = lifespan
80
 
81
- # Face detection and swap functions
82
- def get_faces(image):
83
- try:
84
- from insightface.app import FaceAnalysis
85
- app = FaceAnalysis(name="buffalo_l")
86
- app.prepare(ctx_id=0, det_size=(640, 640))
87
- return app.get(image) or []
88
- except Exception as e:
89
- logger.error(f"Face detection failed: {e}")
90
- raise
91
-
92
  def swap_faces(source_img, target_img):
 
93
  try:
 
94
  from insightface.utils import face_align
95
  from insightface.model_zoo import face_swapper
96
 
@@ -101,14 +98,25 @@ def swap_faces(source_img, target_img):
101
  target_faces = face_analyzer.get(target_img)
102
 
103
  if not source_faces or not target_faces:
104
- raise ValueError("No faces detected.")
105
  if len(source_faces) > 1 or len(target_faces) > 1:
106
- raise ValueError("Multiple faces detected. Only one face per image is supported.")
 
 
 
 
 
 
 
 
107
 
108
- swapper = face_swapper.FaceSwapper(MODEL_PATH)
109
- result = swapper.get(target_img, target_faces[0], source_faces[0], paste_back=True)
110
 
111
- return cv2.cvtColor(np.array(Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))), cv2.COLOR_RGB2BGR)
 
 
 
 
112
  except Exception as e:
113
  logger.error(f"Face swap failed: {e}")
114
  raise
@@ -120,20 +128,24 @@ async def swap_face(source_file: UploadFile = File(...), target_file: UploadFile
120
  target_path = "temp_target.jpg"
121
  output_path = "output.jpg"
122
 
 
123
  with open(source_path, "wb") as f:
124
- f.write(await source_file.read())
125
- with open(target_path, "wb") as f:
126
- f.write(await target_file.read())
127
-
128
  source_img = cv2.imread(source_path)
129
- target_img = cv2.imread(target_path)
 
130
 
131
- if source_img is None or target_img is None:
132
- raise ValueError("Invalid images provided.")
 
 
 
 
133
 
134
  result_img = swap_faces(source_img, target_img)
135
 
136
  cv2.imwrite(output_path, result_img)
 
137
  with open(output_path, "rb") as f:
138
  image_data = f.read()
139
 
 
1
+ # -*- coding: UTF-8 -*-
2
  from fastapi import FastAPI, UploadFile, File, HTTPException
3
  from fastapi.responses import Response
4
  from fastapi.middleware.cors import CORSMiddleware
 
7
  import numpy as np
8
  from PIL import Image
9
  import os
 
10
  import requests
11
  from pathlib import Path
12
  import uvicorn
13
+ import logging
14
 
15
+ # Initialize FastAPI with explicit docs settings
16
  app = FastAPI(
17
  title="Face Swap API",
18
  description="API for swapping faces in images.",
19
+ docs_url="/docs", # Explicitly set docs URL
20
+ redoc_url="/redoc", # Explicitly set redoc URL
21
  )
22
 
23
+ # Configure logging
24
  logging.basicConfig(level=logging.INFO)
25
  logger = logging.getLogger(__name__)
26
 
27
+ # Add CORS middleware
28
  app.add_middleware(
29
  CORSMiddleware,
30
+ allow_origins=["*"], # Update with your Framer domain in production
31
  allow_credentials=True,
32
  allow_methods=["*"],
33
  allow_headers=["*"],
34
  )
35
 
36
+ # Health Check Endpoint
 
 
 
 
37
  @app.get("/health")
38
  async def health_check():
39
  return {"status": "healthy"}
40
 
41
+ # Global flag to prevent multiple downloads
42
+ MODEL_DOWNLOADED = False
 
43
 
44
  def download_model():
45
+ global MODEL_DOWNLOADED
46
+ if MODEL_DOWNLOADED:
47
+ logger.info("Model already downloaded, skipping.")
48
  return
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
+ model_dir = Path("models")
51
+ model_path = model_dir / "inswapper_128.onnx"
52
+ model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx"
53
+
54
+ if not model_path.exists():
55
+ logger.info("Model not found. Downloading inswapper_128.onnx...")
56
+ model_dir.mkdir(exist_ok=True)
57
+ try:
58
+ response = requests.get(model_url, stream=True, timeout=30)
59
+ response.raise_for_status()
60
+ with open(model_path, 'wb') as f:
61
+ for chunk in response.iter_content(chunk_size=8192):
62
+ f.write(chunk)
63
+ logger.info("Model downloaded successfully.")
64
+ MODEL_DOWNLOADED = True
65
+ except Exception as e:
66
+ logger.error(f"Failed to download model: {e}")
67
+ raise RuntimeError("Could not download inswapper_128.onnx. Please check logs.")
68
+ else:
69
+ logger.info("Model already exists at: %s", model_path)
70
+ MODEL_DOWNLOADED = True
71
+
72
+ # Use lifespan event handler
73
  @asynccontextmanager
74
  async def lifespan(app: FastAPI):
75
+ logger.info("Starting up application...")
76
  try:
77
  download_model()
78
  logger.info("Startup completed successfully.")
 
84
 
85
  app.lifespan = lifespan
86
 
 
 
 
 
 
 
 
 
 
 
 
87
  def swap_faces(source_img, target_img):
88
+ """Perform face swapping using insightface and inswapper model."""
89
  try:
90
+ from insightface.app import FaceAnalysis
91
  from insightface.utils import face_align
92
  from insightface.model_zoo import face_swapper
93
 
 
98
  target_faces = face_analyzer.get(target_img)
99
 
100
  if not source_faces or not target_faces:
101
+ raise ValueError("No faces detected in one or both images.")
102
  if len(source_faces) > 1 or len(target_faces) > 1:
103
+ raise ValueError("Multiple faces detected; only one face per image is supported.")
104
+
105
+ source_face = source_faces[0]
106
+ target_face = target_faces[0]
107
+
108
+ model_path = Path("models/inswapper_128.onnx")
109
+ if not model_path.exists():
110
+ raise FileNotFoundError("Model file inswapper_128.onnx not found.")
111
+ swapper = face_swapper.FaceSwapper(model_path)
112
 
113
+ result = swapper.get(target_img, target_face, source_face, paste_back=True)
 
114
 
115
+ target_pil = Image.fromarray(cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB))
116
+ result_pil = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
117
+ result_pil = result_pil.resize(target_pil.size, Image.Resampling.LANCZOS)
118
+
119
+ return cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR)
120
  except Exception as e:
121
  logger.error(f"Face swap failed: {e}")
122
  raise
 
128
  target_path = "temp_target.jpg"
129
  output_path = "output.jpg"
130
 
131
+ source_content = await source_file.read()
132
  with open(source_path, "wb") as f:
133
+ f.write(source_content)
 
 
 
134
  source_img = cv2.imread(source_path)
135
+ if source_img is None:
136
+ raise ValueError("Failed to load source image.")
137
 
138
+ target_content = await target_file.read()
139
+ with open(target_path, "wb") as f:
140
+ f.write(target_content)
141
+ target_img = cv2.imread(target_path)
142
+ if target_img is None:
143
+ raise ValueError("Failed to load target image.")
144
 
145
  result_img = swap_faces(source_img, target_img)
146
 
147
  cv2.imwrite(output_path, result_img)
148
+
149
  with open(output_path, "rb") as f:
150
  image_data = f.read()
151