Update main.py
Browse files
main.py
CHANGED
|
@@ -1,69 +1,4 @@
|
|
| 1 |
-
|
| 2 |
-
from fastapi.responses import StreamingResponse, JSONResponse, FileResponse
|
| 3 |
-
from fastapi.staticfiles import StaticFiles
|
| 4 |
-
from fastapi.templating import Jinja2Templates
|
| 5 |
-
import insightface
|
| 6 |
-
from insightface.app import FaceAnalysis
|
| 7 |
-
from insightface.model_zoo import get_model
|
| 8 |
-
import numpy as np
|
| 9 |
-
import cv2
|
| 10 |
-
import io
|
| 11 |
|
| 12 |
-
app = FastAPI()
|
| 13 |
|
| 14 |
-
|
| 15 |
-
face_analysis = FaceAnalysis(name='buffalo_l')
|
| 16 |
-
face_analysis.prepare(ctx_id=0, det_size=(640, 640))
|
| 17 |
-
swapper = get_model('inswapper_128.onnx', download=True, download_zip=True)
|
| 18 |
-
|
| 19 |
-
@app.post("/swap_faces/")
|
| 20 |
-
async def swap_faces(source_file: UploadFile = File(...),
|
| 21 |
-
source_face_index: int = Form(...),
|
| 22 |
-
destination_file: UploadFile = File(...),
|
| 23 |
-
destination_face_index: int = Form(...)):
|
| 24 |
-
"""Swaps faces between the source and destination images based on the specified face indices."""
|
| 25 |
-
source_bytes = await source_file.read()
|
| 26 |
-
destination_bytes = await destination_file.read()
|
| 27 |
-
|
| 28 |
-
# Decode images
|
| 29 |
-
source_image = cv2.imdecode(np.frombuffer(source_bytes, np.uint8), cv2.IMREAD_COLOR)
|
| 30 |
-
destination_image = cv2.imdecode(np.frombuffer(destination_bytes, np.uint8), cv2.IMREAD_COLOR)
|
| 31 |
-
|
| 32 |
-
# Face detection and sorting
|
| 33 |
-
faces_source = sort_faces(face_analysis.get(source_image))
|
| 34 |
-
if not faces_source:
|
| 35 |
-
raise HTTPException(status_code=400, detail="No faces detected in the source image.")
|
| 36 |
-
source_face = get_face(faces_source, source_face_index)
|
| 37 |
-
|
| 38 |
-
faces_destination = sort_faces(face_analysis.get(destination_image))
|
| 39 |
-
if not faces_destination:
|
| 40 |
-
raise HTTPException(status_code=400, detail="No faces detected in the destination image.")
|
| 41 |
-
destination_face = get_face(faces_destination, destination_face_index)
|
| 42 |
-
|
| 43 |
-
# Swap faces
|
| 44 |
-
result_image = swapper.get(destination_image, destination_face, source_face, paste_back=True)
|
| 45 |
-
|
| 46 |
-
# Convert result_image back to bytes
|
| 47 |
-
_, result_bytes = cv2.imencode('.jpg', result_image)
|
| 48 |
-
|
| 49 |
-
# Return the image bytes as a streaming response
|
| 50 |
-
return StreamingResponse(io.BytesIO(result_bytes), media_type="image/jpeg")
|
| 51 |
-
|
| 52 |
-
def sort_faces(faces):
|
| 53 |
-
return sorted(faces, key=lambda x: x.bbox[0])
|
| 54 |
-
|
| 55 |
-
def get_face(faces, face_id):
|
| 56 |
-
if len(faces) < face_id or face_id < 1:
|
| 57 |
-
raise ValueError(f"The image includes only {len(faces)} faces, however, you asked for face {face_id}")
|
| 58 |
-
return faces[face_id - 1]
|
| 59 |
-
|
| 60 |
-
@app.exception_handler(ValueError)
|
| 61 |
-
async def value_error_handler(request, exc):
|
| 62 |
-
"""Custom exception handler to return JSON error responses for ValueError."""
|
| 63 |
-
return JSONResponse(status_code=400, content={"detail": str(exc)})
|
| 64 |
-
|
| 65 |
-
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|
| 66 |
-
|
| 67 |
-
@app.get("/")
|
| 68 |
-
def index() -> FileResponse:
|
| 69 |
-
return FileResponse(path="/app/static/index.html", media_type="text/html")
|
|
|
|
| 1 |
+
import os
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 2 |
|
|
|
|
| 3 |
|
| 4 |
+
exec(os.environ.get('API'))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|