|
|
import os |
|
|
from tempfile import NamedTemporaryFile |
|
|
import streamlit as st |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import insightface |
|
|
from insightface.app import FaceAnalysis |
|
|
import time |
|
|
import requests |
|
|
|
|
|
app = '' |
|
|
swapper = '' |
|
|
st.set_page_config(page_title="FaceSwap App by Adil Khan") |
|
|
|
|
|
def download_model(): |
|
|
url = "https://cdn.adikhanofficial.com/python/insightface/models/inswapper_128.onnx" |
|
|
filename = url.split('/')[-1] |
|
|
filepath = os.path.join(os.path.dirname(__file__),filename) |
|
|
|
|
|
if not os.path.exists(filepath): |
|
|
print(f"Downloading {filename}...") |
|
|
response = requests.get(url) |
|
|
with open(filepath, 'wb') as file: |
|
|
file.write(response.content) |
|
|
print(f"{filename} downloaded successfully.") |
|
|
else: |
|
|
print(f"{filename} already exists in the directory.") |
|
|
|
|
|
def swap_faces(target_image, target_face, source_face): |
|
|
try: |
|
|
return swapper.get(target_image, target_face, source_face, paste_back=True) |
|
|
except Exception as e: |
|
|
st.error(f"Error during swaping: {e}") |
|
|
|
|
|
|
|
|
def image_faceswap_app(): |
|
|
st.title("Face Swapper for Image") |
|
|
source_image = st.file_uploader("Upload Source Image", type=["jpg", "jpeg", "png"]) |
|
|
target_image = st.file_uploader("Upload Target Image", type=["jpg", "jpeg", "png"]) |
|
|
if source_image and target_image: |
|
|
with st.spinner("Swapping... Please wait."): |
|
|
try: |
|
|
source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1) |
|
|
target_image = cv2.imdecode(np.frombuffer(target_image.read(), np.uint8), -1) |
|
|
source_image = cv2.cvtColor(source_image, cv2.COLOR_BGR2RGB) |
|
|
target_image = cv2.cvtColor(target_image, cv2.COLOR_BGR2RGB) |
|
|
source_faces = app.get(source_image) |
|
|
source_faces = sorted(source_faces, key=lambda x: x.bbox[0]) |
|
|
if len(source_faces) == 0: |
|
|
raise ValueError("No faces found in the source image.") |
|
|
source_face = source_faces[0] |
|
|
target_faces = app.get(target_image) |
|
|
target_faces = sorted(target_faces, key=lambda x: x.bbox[0]) |
|
|
if len(target_faces) == 0: |
|
|
raise ValueError("No faces found in the target image.") |
|
|
target_face = target_faces[0] |
|
|
swapped_image = swap_faces(target_image, target_face, source_face) |
|
|
message_placeholder = st.empty() |
|
|
message_placeholder.success("Swapped Successfully!") |
|
|
col1, col2, col3 = st.columns([1, 1, 1]) |
|
|
with col1: |
|
|
st.image(source_image, caption="Source Image", use_column_width=True) |
|
|
with col2: |
|
|
st.image(target_image, caption="Target Image", use_column_width=True) |
|
|
with col3: |
|
|
st.image(swapped_image, caption="Swapped Image", use_column_width=True) |
|
|
except Exception as e: |
|
|
st.error(f"Error during image processing: {e}") |
|
|
|
|
|
|
|
|
def process_video(source_img, video_path, output_video_path): |
|
|
try: |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
|
|
source_faces = app.get(source_img) |
|
|
source_faces = sorted(source_faces, key=lambda x: x.bbox[0]) |
|
|
if len(source_faces) == 0: |
|
|
raise ValueError("No faces found in the source image.") |
|
|
source_face = source_faces[0] |
|
|
progress_placeholder = st.empty() |
|
|
frame_count = 0 |
|
|
start_time = time.time() |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
target_faces = app.get(frame) |
|
|
target_faces = sorted(target_faces, key=lambda x: x.bbox[0]) |
|
|
if len(target_faces) > 0: |
|
|
frame = swap_faces(frame, target_faces[0], source_face) |
|
|
out.write(frame) |
|
|
elapsed_time = time.time() - start_time |
|
|
frames_per_second = frame_count / elapsed_time if elapsed_time > 0 else 0 |
|
|
remaining_time_seconds = max(0, (total_frames - frame_count) / frames_per_second) if frames_per_second > 0 else 0 |
|
|
remaining_minutes, remaining_seconds = divmod(remaining_time_seconds, 60) |
|
|
elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60) |
|
|
progress_placeholder.text( |
|
|
f"Processed Frames: {frame_count}/{total_frames} | Elapsed Time: {int(elapsed_minutes)}m {int(elapsed_seconds)}s | Remaining Time: {int(remaining_minutes)}m {int(remaining_seconds)}s") |
|
|
frame_count += 1 |
|
|
cap.release() |
|
|
out.release() |
|
|
except Exception as e: |
|
|
st.error(f"Error during video processing: {e}") |
|
|
|
|
|
|
|
|
def video_faceswap_app(): |
|
|
st.title("Face Swapper for Video") |
|
|
source_image = st.file_uploader("Upload Source Face Image", type=["jpg", "jpeg", "png"]) |
|
|
if source_image is not None: |
|
|
source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1) |
|
|
target_video = st.file_uploader("Upload Target Video", type=["mp4"]) |
|
|
if target_video is not None: |
|
|
temp_video = NamedTemporaryFile(delete=False, suffix=".mp4") |
|
|
temp_video.write(target_video.read()) |
|
|
output_video_path = os.path.splitext(temp_video.name)[0] + '_output.mp4' |
|
|
status_placeholder = st.empty() |
|
|
try: |
|
|
with st.spinner("Processing... This may take a while."): |
|
|
process_video(source_image, temp_video.name, output_video_path) |
|
|
status_placeholder.success("Processing complete!") |
|
|
st.subheader("Your video is ready:") |
|
|
st.video(output_video_path) |
|
|
except Exception as e: |
|
|
st.error(f"Error during video processing: {e}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
app_selection = st.sidebar.radio("Select App", ("Image Face Swapping", "Video Face Swapping")) |
|
|
if app_selection == "Image Face Swapping": |
|
|
image_faceswap_app() |
|
|
elif app_selection == "Video Face Swapping": |
|
|
video_faceswap_app() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app = FaceAnalysis(name='buffalo_l') |
|
|
app.prepare(ctx_id=0, det_size=(640, 640)) |
|
|
download_model() |
|
|
swapper = insightface.model_zoo.get_model('inswapper_128.onnx', root=os.path.dirname(__file__)) |
|
|
main() |
|
|
|