|
|
|
|
|
import os |
|
|
|
|
|
os.environ.setdefault("MPLCONFIGDIR", "/tmp/matplotlib") |
|
|
os.environ.setdefault("XDG_CACHE_HOME", "/tmp/.cache") |
|
|
os.environ.setdefault("HF_HOME", "/tmp/hf_cache") |
|
|
os.environ.setdefault("INSIGHTFACE_HOME", "/tmp/.insightface") |
|
|
|
|
|
os.environ.setdefault("HOME", "/tmp") |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import urllib.request |
|
|
from flask import Flask, request, jsonify, send_file |
|
|
from io import BytesIO |
|
|
import base64 |
|
|
import json |
|
|
import re |
|
|
|
|
|
import insightface |
|
|
from insightface.app import FaceAnalysis |
|
|
|
|
|
|
|
|
model_url = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx" |
|
|
CACHE_DIR = os.environ.get("INSIGHTFACE_HOME", "/tmp/.insightface") |
|
|
model_path = os.path.join(CACHE_DIR, "models", "inswapper_128.onnx") |
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(model_path), exist_ok=True) |
|
|
if not os.path.exists(model_path): |
|
|
print("Đang tải mô hình từ URL...") |
|
|
urllib.request.urlretrieve(model_url, model_path) |
|
|
print("Hoàn tất tải mô hình!") |
|
|
|
|
|
|
|
|
swapper = insightface.model_zoo.get_model(model_path) |
|
|
|
|
|
|
|
|
app = FaceAnalysis(name='buffalo_l', root=CACHE_DIR) |
|
|
app.prepare(ctx_id=-1, det_size=(640, 640), det_thresh=0.3) |
|
|
|
|
|
|
|
|
flask_app = Flask(__name__) |
|
|
|
|
|
def get_gender(face): |
|
|
"""Xác định giới tính của khuôn mặt.""" |
|
|
return "male" if face.gender > 0.5 else "female" |
|
|
|
|
|
def process_image(file): |
|
|
"""Chuyển đổi tệp tin ảnh thành mảng NumPy.""" |
|
|
return cv2.imdecode(np.frombuffer(file.read(), np.uint8), cv2.IMREAD_COLOR) |
|
|
|
|
|
def swap_faces(source_img, target_img): |
|
|
"""Hoán đổi khuôn mặt từ ảnh nguồn sang ảnh đích.""" |
|
|
source_img = process_image(source_img) |
|
|
target_img = process_image(target_img) |
|
|
|
|
|
source_faces = app.get(source_img) |
|
|
target_faces = app.get(target_img) |
|
|
|
|
|
if not source_faces: |
|
|
return {"error": "Không tìm thấy khuôn mặt trong ảnh nguồn."}, 400 |
|
|
if not target_faces: |
|
|
return {"error": "Không tìm thấy khuôn mặt trong ảnh đích."}, 400 |
|
|
|
|
|
source_face = source_faces[0] |
|
|
result_img = target_img.copy() |
|
|
for target_face in target_faces: |
|
|
result_img = swapper.get(result_img, target_face, source_face, paste_back=True) |
|
|
|
|
|
_, buffer = cv2.imencode('.jpg', result_img) |
|
|
img_base64 = base64.b64encode(buffer).decode('utf-8') |
|
|
return img_base64 |
|
|
|
|
|
def swap_faces_gender_based(source_male, source_female, target_img): |
|
|
"""Hoán đổi khuôn mặt theo giới tính.""" |
|
|
source_male = process_image(source_male) |
|
|
source_female = process_image(source_female) |
|
|
target_img = process_image(target_img) |
|
|
|
|
|
source_male_faces = app.get(source_male) |
|
|
source_female_faces = app.get(source_female) |
|
|
target_faces = app.get(target_img) |
|
|
|
|
|
if not source_male_faces or not source_female_faces: |
|
|
return {"error": "Không tìm thấy khuôn mặt trong ảnh nguồn."}, 400 |
|
|
if not target_faces: |
|
|
return {"error": "Không tìm thấy khuôn mặt trong ảnh đích."}, 400 |
|
|
|
|
|
source_male_face = source_male_faces[0] |
|
|
source_female_face = source_female_faces[0] |
|
|
|
|
|
result_img = target_img.copy() |
|
|
for target_face in target_faces: |
|
|
gender = get_gender(target_face) |
|
|
swap_face = source_male_face if gender == "male" else source_female_face |
|
|
result_img = swapper.get(result_img, target_face, swap_face, paste_back=True) |
|
|
|
|
|
_, buffer = cv2.imencode('.jpg', result_img) |
|
|
img_base64 = base64.b64encode(buffer).decode('utf-8') |
|
|
return img_base64 |
|
|
|
|
|
|
|
|
def swap_faces_multi(source_files, target_file, mapping=None): |
|
|
"""Hoán đổi nhiều khuôn mặt: mỗi khuôn mặt đích dùng 1 khuôn mặt nguồn riêng. |
|
|
- source_files: list các FileStorage cho ảnh nguồn (mỗi ảnh nguồn nên chỉ có 1 mặt) |
|
|
- target_file: FileStorage cho ảnh đích (có nhiều mặt) |
|
|
- mapping: list ánh xạ index mặt đích -> index ảnh nguồn |
|
|
* Cho phép phần tử là số nguyên (chỉ mục ảnh nguồn), None/null, hoặc -1 để "bỏ qua" mặt đích tương ứng. |
|
|
* Nếu mapping ngắn hơn số mặt đích, các mặt còn lại sẽ bị bỏ qua. |
|
|
* Nếu mapping dài hơn, phần dư sẽ bị cắt bỏ. |
|
|
* Nếu mapping=None và số nguồn == số mặt đích, tự map trái→phải; nếu khác, tự map tối đa có thể và bỏ qua phần còn lại. |
|
|
""" |
|
|
|
|
|
source_imgs = [process_image(f) for f in source_files] |
|
|
target_img = process_image(target_file) |
|
|
|
|
|
if target_img is None: |
|
|
return {"error": "Ảnh đích không hợp lệ."}, 400 |
|
|
if any(img is None for img in source_imgs): |
|
|
return {"error": "Ít nhất một ảnh nguồn không hợp lệ."}, 400 |
|
|
|
|
|
|
|
|
source_faces_list = [app.get(img) for img in source_imgs] |
|
|
target_faces = app.get(target_img) |
|
|
|
|
|
if not target_faces: |
|
|
return {"error": "Không tìm thấy khuôn mặt trong ảnh đích."}, 400 |
|
|
|
|
|
|
|
|
source_single_faces = [] |
|
|
for faces in source_faces_list: |
|
|
if faces and len(faces) > 0: |
|
|
source_single_faces.append(faces[0]) |
|
|
else: |
|
|
source_single_faces.append(None) |
|
|
|
|
|
|
|
|
def face_center_x(face): |
|
|
x1, y1, x2, y2 = face.bbox.astype(int).tolist() |
|
|
return (x1 + x2) / 2.0 |
|
|
target_faces_sorted = sorted(target_faces, key=face_center_x) |
|
|
|
|
|
|
|
|
if mapping is None: |
|
|
if len(source_single_faces) == len(target_faces_sorted): |
|
|
mapping = list(range(len(source_single_faces))) |
|
|
else: |
|
|
limit = min(len(source_single_faces), len(target_faces_sorted)) |
|
|
mapping = list(range(limit)) + [None] * (len(target_faces_sorted) - limit) |
|
|
|
|
|
|
|
|
if len(mapping) < len(target_faces_sorted): |
|
|
mapping = list(mapping) + [None] * (len(target_faces_sorted) - len(mapping)) |
|
|
elif len(mapping) > len(target_faces_sorted): |
|
|
mapping = list(mapping)[:len(target_faces_sorted)] |
|
|
|
|
|
|
|
|
result_img = target_img.copy() |
|
|
for t_idx, target_face in enumerate(target_faces_sorted): |
|
|
s_idx = mapping[t_idx] |
|
|
|
|
|
if s_idx is None or s_idx == -1: |
|
|
continue |
|
|
|
|
|
if not isinstance(s_idx, int) or s_idx < 0 or s_idx >= len(source_single_faces): |
|
|
continue |
|
|
src_face = source_single_faces[s_idx] |
|
|
if src_face is None: |
|
|
|
|
|
continue |
|
|
result_img = swapper.get(result_img, target_face, src_face, paste_back=True) |
|
|
|
|
|
_, buffer = cv2.imencode('.jpg', result_img) |
|
|
img_base64 = base64.b64encode(buffer).decode('utf-8') |
|
|
return img_base64 |
|
|
|
|
|
|
|
|
@flask_app.route('/swap-multi', methods=['POST']) |
|
|
def swap_multi(): |
|
|
"""API hoán đổi nhiều khuôn mặt. |
|
|
Cách gọi 1 (tự map): gửi nhiều file nguồn cùng key `sources` (hoặc `sources[]`), và 1 file `target`. |
|
|
Nếu số ảnh nguồn == số mặt đích, hệ thống tự map trái→phải. Nếu khác, tự map tối đa có thể; phần còn lại bỏ qua. |
|
|
|
|
|
Cách gọi 2 (map tuỳ chỉnh): ngoài `sources` và `target`, gửi thêm field form `map` là JSON array |
|
|
(ví dụ: `[2,0,null,-1]`). Độ dài sẽ được chuẩn hoá bằng với số mặt đích. Mỗi phần tử: |
|
|
- số nguyên: chỉ mục ảnh nguồn tương ứng; |
|
|
- `null` hoặc `-1`: bỏ qua khuôn mặt đích ở vị trí đó. |
|
|
|
|
|
Cách gọi 3: gửi file nguồn với key dạng `sources[<index>]` (ví dụ: `sources[2]`, `sources[3]`). |
|
|
Chỉ mục trong key sẽ được sử dụng để gán file vào vị trí tương ứng trong danh sách nguồn. |
|
|
""" |
|
|
if 'target' not in request.files: |
|
|
return jsonify({"error": "Thiếu ảnh đích (field 'target')."}), 400 |
|
|
|
|
|
|
|
|
source_files = [] |
|
|
indexed_sources = {} |
|
|
|
|
|
|
|
|
index_pattern = re.compile(r'sources\[(\d+)\]') |
|
|
|
|
|
|
|
|
for key in request.files: |
|
|
if key == 'sources' or key == 'sources[]': |
|
|
|
|
|
source_files.extend(request.files.getlist(key)) |
|
|
elif index_pattern.match(key): |
|
|
|
|
|
match = index_pattern.match(key) |
|
|
index = int(match.group(1)) |
|
|
indexed_sources[index] = request.files[key] |
|
|
|
|
|
|
|
|
if indexed_sources: |
|
|
|
|
|
max_index = max(indexed_sources.keys()) |
|
|
|
|
|
source_files_with_index = [None] * (max_index + 1) |
|
|
|
|
|
for index, file in indexed_sources.items(): |
|
|
source_files_with_index[index] = file |
|
|
|
|
|
while source_files_with_index and source_files_with_index[-1] is None: |
|
|
source_files_with_index.pop() |
|
|
|
|
|
source_files = source_files_with_index + source_files |
|
|
else: |
|
|
|
|
|
source_files = source_files or request.files.getlist('sources[]') |
|
|
|
|
|
if not source_files: |
|
|
return jsonify({"error": "Thiếu ảnh nguồn (field 'sources', 'sources[]', hoặc 'sources[<index>]')."}), 400 |
|
|
|
|
|
target_file = request.files['target'] |
|
|
|
|
|
|
|
|
mapping = None |
|
|
map_text = request.form.get('map') |
|
|
if map_text: |
|
|
try: |
|
|
mapping = json.loads(map_text) |
|
|
except Exception: |
|
|
return jsonify({"error": "Giá trị 'map' không phải JSON hợp lệ."}), 400 |
|
|
if not isinstance(mapping, list) or not all((x is None) or isinstance(x, int) for x in mapping): |
|
|
return jsonify({"error": "'map' phải là mảng, mỗi phần tử là số nguyên, -1 hoặc null để bỏ qua."}), 400 |
|
|
|
|
|
|
|
|
result = swap_faces_multi(source_files, target_file, mapping) |
|
|
if isinstance(result, tuple): |
|
|
return jsonify(result[0]), result[1] |
|
|
return jsonify({"image_base64": result}) |
|
|
|
|
|
@flask_app.route('/swap', methods=['POST']) |
|
|
def swap(): |
|
|
"""API hoán đổi khuôn mặt cơ bản.""" |
|
|
if 'source' not in request.files or 'target' not in request.files: |
|
|
return jsonify({"error": "Thiếu ảnh nguồn hoặc ảnh đích."}), 400 |
|
|
|
|
|
source_img = request.files['source'] |
|
|
target_img = request.files['target'] |
|
|
|
|
|
result = swap_faces(source_img, target_img) |
|
|
|
|
|
if isinstance(result, tuple): |
|
|
return jsonify(result[0]), result[1] |
|
|
|
|
|
return jsonify({"image_base64": result}) |
|
|
|
|
|
@flask_app.route('/swap-2', methods=['POST']) |
|
|
def swap_2(): |
|
|
"""API hoán đổi khuôn mặt dựa trên giới tính.""" |
|
|
if 'source-male' not in request.files or 'source-female' not in request.files or 'target' not in request.files: |
|
|
return jsonify({"error": "Thiếu ảnh nguồn nam, nữ hoặc ảnh đích."}), 400 |
|
|
|
|
|
source_male = request.files['source-male'] |
|
|
source_female = request.files['source-female'] |
|
|
target_img = request.files['target'] |
|
|
|
|
|
result = swap_faces_gender_based(source_male, source_female, target_img) |
|
|
|
|
|
if isinstance(result, tuple): |
|
|
return jsonify(result[0]), result[1] |
|
|
|
|
|
return jsonify({"image_base64": result}) |
|
|
|
|
|
|
|
|
@flask_app.route('/analyze', methods=['POST']) |
|
|
def analyze_face(): |
|
|
if 'image' not in request.files: |
|
|
return jsonify({'error': 'Thiếu file ảnh'}), 400 |
|
|
|
|
|
image_file = request.files['image'] |
|
|
img = cv2.imdecode(np.frombuffer(image_file.read(), np.uint8), cv2.IMREAD_COLOR) |
|
|
|
|
|
if img is None: |
|
|
return jsonify({'error': 'Ảnh không hợp lệ'}), 400 |
|
|
|
|
|
|
|
|
modelFile = "models/res10_300x300_ssd_iter_140000.caffemodel" |
|
|
configFile = "models/deploy.prototxt" |
|
|
net = cv2.dnn.readNetFromCaffe(configFile, modelFile) |
|
|
|
|
|
h, w = img.shape[:2] |
|
|
blob = cv2.dnn.blobFromImage(img, 1.0, (300, 300), [104, 117, 123], False, False) |
|
|
net.setInput(blob) |
|
|
detections = net.forward() |
|
|
|
|
|
face_data = [] |
|
|
for i in range(detections.shape[2]): |
|
|
confidence = detections[0, 0, i, 2] |
|
|
if confidence > 0.5: |
|
|
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) |
|
|
(x1, y1, x2, y2) = box.astype("int") |
|
|
face_data.append({ |
|
|
'face_rectangle': { |
|
|
'left': x1 / w, |
|
|
'top': y1 / h, |
|
|
'width': (x2 - x1) / w, |
|
|
'height': (y2 - y1) / h |
|
|
} |
|
|
}) |
|
|
|
|
|
return jsonify({ |
|
|
'media_info_list': [{ |
|
|
'media_extra': { |
|
|
'faces': face_data |
|
|
} |
|
|
}] |
|
|
}) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
port = int(os.environ.get('PORT', 7860)) |
|
|
flask_app.run(host='0.0.0.0', port=port, debug=False) |