ShinyySwapper / web_server.py
ShinyyMineyyON's picture
Update web_server.py
e462113 verified
#!/usr/bin/env python3
"""
Simple Flask Backend for Shinyy's Face Swapper HTML Website
"""
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import os
from pathlib import Path
import tempfile
import shutil
import uuid
import glob
import logging
import sys
import time
from datetime import datetime
try:
import cv2
import numpy as np
CV2_AVAILABLE = True
except ImportError as e:
print(f"Warning: OpenCV/NumPy not available: {e}")
CV2_AVAILABLE = False
cv2 = None
np = None
import base64
from io import BytesIO
from PIL import Image
import json
import requests
try:
import imageio
IMAGEIO_AVAILABLE = True
except ImportError as e:
print(f"Warning: imageio not available: {e}")
IMAGEIO_AVAILABLE = False
imageio = None
# Import the face swapper
try:
from SinglePhoto import FaceSwapper
FACE_SWAPPER_AVAILABLE = True
except ImportError as e:
print(f"Warning: FaceSwapper not available due to import error: {e}")
print("Video processing will work in simulation mode only")
FACE_SWAPPER_AVAILABLE = False
FaceSwapper = None
# Import enhanced face swapper if available
try:
from EnhancedFaceSwapper import EnhancedFaceSwapper
from QualityPresets import QualityPresets, create_enhanced_swapper_with_quality
ENHANCED_SWAPPER_AVAILABLE = True
print("Enhanced face swapper loaded successfully!")
except ImportError as e:
print(f"Enhanced face swapper not available: {e}")
ENHANCED_SWAPPER_AVAILABLE = False
EnhancedFaceSwapper = None
QualityPresets = None
create_enhanced_swapper_with_quality = None
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Use different port to avoid conflicts - 7860 is required for Hugging Face
WEB_SERVER_PORT = 7860
# Configure comprehensive logging
def setup_logging():
"""Setup detailed logging for console output"""
# Create custom formatter for better readability
class CustomFormatter(logging.Formatter):
def format(self, record):
# Add timestamp and format with colors
timestamp = datetime.now().strftime('%H:%M:%S')
level_color = {
'DEBUG': '\033[36m', # Cyan
'INFO': '\033[32m', # Green
'WARNING': '\033[33m', # Yellow
'ERROR': '\033[31m', # Red
'CRITICAL': '\033[35m', # Magenta
}.get(record.levelname, '\033[0m')
reset_color = '\033[0m'
# Format: [TIME] LEVEL | MESSAGE
return f"[{timestamp}] {level_color}{record.levelname}{reset_color} | {record.getMessage()}"
# Setup root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# Remove existing handlers
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Add console handler with custom formatter
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(CustomFormatter())
root_logger.addHandler(console_handler)
return root_logger
# Initialize logging
logger = setup_logging()
# Global video processing progress tracking
video_progress = {
'processing': False,
'phase': 'idle',
'total_frames': 0,
'processed_frames': 0,
'current_frame_base64': None,
'start_time': None,
'mode': None,
'fps': None,
'resolution': None,
'file_size': None,
'processing_speed': None,
'avg_frame_time': None,
'estimated_total_time': None,
'countdown_time': None
}
# Disable Flask auto-reload and other restart triggers
app.config['DEBUG'] = False
app.config['TEMPLATES_AUTO_RELOAD'] = False
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
# Initialize face swapper with GPU optimization
if FACE_SWAPPER_AVAILABLE:
try:
# Try to initialize with GPU acceleration first
try:
swapper = FaceSwapper(gpu_enabled=True, gpu_id=0)
gpu_info = swapper.get_gpu_info()
print(f"FaceSwapper loaded with GPU acceleration!")
print(f"GPU Info: {gpu_info}")
except Exception as gpu_error:
print(f"GPU initialization failed: {gpu_error}")
print("Falling back to CPU-only mode...")
swapper = FaceSwapper(gpu_enabled=False, gpu_id=-1)
print("FaceSwapper loaded in CPU mode!")
FACE_SWAPPER_AVAILABLE = True
except Exception as e:
print(f"Error loading FaceSwapper: {e}")
swapper = None
FACE_SWAPPER_AVAILABLE = False
else:
swapper = None
print("Running in simulation mode - FaceSwapper not available")
# Temporary storage for uploaded images
UPLOAD_FOLDER = 'temp_uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Temporary folder system for compressor transfers
COMPRESSOR_TEMP_FOLDER = 'temp_compressor'
os.makedirs(COMPRESSOR_TEMP_FOLDER, exist_ok=True)
# Dictionary to track temp folders and their creation times
temp_folders = {}
# Log server startup
logger.info("=" * 60)
logger.info("SHINYY'S FACE SWAPPER SERVER STARTING")
logger.info("=" * 60)
logger.info(f"Upload folder: {UPLOAD_FOLDER}")
logger.info(f"OpenCV Available: {CV2_AVAILABLE}")
logger.info(f"Face Swapper Available: {FACE_SWAPPER_AVAILABLE}")
logger.info("=" * 60)
def base64_to_image(base64_string):
"""Convert base64 string to OpenCV image"""
if not CV2_AVAILABLE:
# Return PIL image if cv2 not available
if 'base64,' in base64_string:
base64_string = base64_string.split('base64,')[1]
image_data = base64.b64decode(base64_string)
return Image.open(BytesIO(image_data))
# Remove data URL prefix if present
if 'base64,' in base64_string:
base64_string = base64_string.split('base64,')[1]
# Decode base64
image_data = base64.b64decode(base64_string)
# Convert to PIL Image
pil_image = Image.open(BytesIO(image_data))
# Convert to OpenCV format
cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
return cv_image
def image_to_base64(image):
"""Convert image to base64 string"""
if not CV2_AVAILABLE:
# Handle PIL image if cv2 not available
if isinstance(image, Image.Image):
buffer = BytesIO()
image.save(buffer, format='JPEG')
image_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/jpeg;base64,{image_str}"
else:
# Assume it's already a base64 string
return image
# Convert to RGB for OpenCV images
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Convert to PIL Image
pil_image = Image.fromarray(rgb_image)
# Convert to base64
buffer = BytesIO()
pil_image.save(buffer, format='JPEG')
image_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/jpeg;base64,{image_str}"
# Helper functions for enhanced multi-swap features
def apply_face_alignment(image):
"""Apply basic face alignment to source image"""
try:
# Simple alignment using histogram equalization
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
aligned = cv2.equalizeHist(gray)
aligned_bgr = cv2.cvtColor(aligned, cv2.COLOR_GRAY2BGR)
return aligned_bgr
except:
return image
def apply_enhanced_swap(source_path, target_path, source_face_idx, face_id, swap_hair, quality):
"""Apply enhanced face swap with better processing"""
try:
# Use higher quality processing for enhanced mode
if quality in ['quality', 'ultra']:
# Apply some preprocessing
source_img = cv2.imread(source_path)
source_img = cv2.bilateralFilter(source_img, 15, 80, 80)
cv2.imwrite(source_path, source_img)
return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair)
except:
return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair)
def apply_precise_swap(source_path, target_path, source_face_idx, face_id, swap_hair, face_size):
"""Apply precise face swap with size control"""
try:
result = swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair)
# Apply precise size adjustments
if face_size == 'precise':
result = cv2.bilateralFilter(result, 5, 50, 50)
return result
except:
return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair)
def apply_artistic_swap(source_path, target_path, source_face_idx, face_id, swap_hair):
"""Apply artistic face swap with creative effects"""
try:
result = swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair)
# Apply artistic filters
result = cv2.detailEnhance(result, sigma_s=10, sigma_r=0.15)
return result
except:
return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair)
def apply_face_enhancement(image, level):
"""Apply face enhancement based on level"""
try:
if level == 'subtle':
image = cv2.bilateralFilter(image, 5, 30, 30)
elif level == 'medium':
image = cv2.bilateralFilter(image, 9, 50, 50)
elif level == 'strong':
image = cv2.detailEnhance(image, sigma_s=5, sigma_r=0.2)
return image
except:
return image
def apply_skin_tone_matching(swapped_face, target_image, face_id, level):
"""Apply skin tone matching between swapped and target"""
try:
faces = swapper.app.get(target_image)
faces = sorted(faces, key=lambda x: x.bbox[0])
if face_id <= len(faces):
face = faces[face_id - 1]
x1, y1, x2, y2 = [int(v) for v in face.bbox]
original_face = target_image[y1:y2, x1:x2]
# Simple color balance adjustment
if level == 'subtle':
alpha = 0.3
elif level == 'medium':
alpha = 0.5
else: # strong
alpha = 0.7
blended = cv2.addWeighted(swapped_face, 1-alpha, original_face, alpha, 0)
return blended
return swapped_face
except:
return swapped_face
def apply_face_size_adjustment(face, size_option):
"""Apply face size adjustments"""
try:
if size_option == 'shrink':
scale = 0.9
elif size_option == 'expand':
scale = 1.1
else: # precise
scale = 0.95
h, w = face.shape[:2]
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(face, (new_w, new_h))
if scale < 1.0:
# Pad to original size
pad_h = (h - new_h) // 2
pad_w = (w - new_w) // 2
# Ensure indices are integers to prevent slice errors
pad_h = int(pad_h)
pad_w = int(pad_w)
padded = cv2.copyMakeBorder(resized, pad_h, h-new_h-pad_h, pad_w, w-new_w-pad_w, cv2.BORDER_REPLICATE)
return padded
else:
# Crop to original size
crop_h = (new_h - h) // 2
crop_w = (new_w - w) // 2
# Ensure indices are integers to prevent slice errors
crop_h = int(crop_h)
crop_w = int(crop_w)
cropped = resized[crop_h:crop_h+h, crop_w:crop_w+w]
return cropped
except:
return face
def apply_lighting_preservation(swapped_face, original_face):
"""Preserve original lighting conditions"""
try:
# Convert to LAB color space for lighting preservation
swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB)
original_lab = cv2.cvtColor(original_face, cv2.COLOR_BGR2LAB)
# Copy lighting from original
swapped_lab[:,:,0] = original_lab[:,:,0]
# Convert back to BGR
result = cv2.cvtColor(swapped_lab, cv2.COLOR_LAB2BGR)
return result
except:
return swapped_face
def apply_auto_enhancement(face):
"""Apply automatic enhancement"""
try:
# Contrast and brightness adjustment
enhanced = cv2.convertScaleAbs(face, alpha=1.1, beta=5)
return enhanced
except:
return face
def enhanced_face_alignment(source_img, target_img, source_face, target_face):
"""Enhanced face alignment using facial landmarks for better positioning"""
try:
# Get facial landmarks
src_kps = source_face.kps
dst_kps = target_face.kps
# Use 5-point facial landmarks for better alignment
# Points: 0=left eye, 1=right eye, 2=nose tip, 3=left mouth, 4=right mouth
src_pts = np.array(src_kps, dtype=np.float32)
dst_pts = np.array(dst_kps, dtype=np.float32)
# Calculate similarity transform for better alignment than affine
h, w = target_img.shape[:2]
M = cv2.estimateAffinePartial2D(src_pts[:3], dst_pts[:3])[0]
if M is not None:
# Apply transform to source image for better alignment
aligned_source = cv2.warpAffine(source_img, M, (w, h),
borderMode=cv2.BORDER_REFLECT_101)
return aligned_source
else:
return source_img
except Exception as e:
print(f"Face alignment enhancement failed: {e}")
return source_img
def advanced_color_matching(swapped_face, target_region, target_face_bbox):
"""Advanced color matching using LAB color space and histogram matching"""
try:
# Convert to LAB color space for better color separation
swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB)
target_lab = cv2.cvtColor(target_region, cv2.COLOR_BGR2LAB)
# Apply histogram matching for each channel
for i in range(3): # L, A, B channels
swapped_hist = cv2.calcHist([swapped_lab], [i], None, [256], [0, 256])
target_hist = cv2.calcHist([target_lab], [i], None, [256], [0, 256])
# Normalize histograms
swapped_hist = swapped_hist / swapped_hist.sum()
target_hist = target_hist / target_hist.sum()
# Create lookup table for histogram matching
lut = create_histogram_lut(swapped_hist, target_hist)
swapped_lab[:,:,i] = cv2.LUT(swapped_lab[:,:,i], lut)
# Convert back to BGR
enhanced_face = cv2.cvtColor(swapped_lab, cv2.COLOR_LAB2BGR)
# Blend with original to maintain natural look
alpha = 0.7 # 70% enhanced, 30% original
final_face = cv2.addWeighted(enhanced_face, alpha, swapped_face, 1-alpha, 0)
return final_face
except Exception as e:
print(f"Color matching enhancement failed: {e}")
return swapped_face
def create_histogram_lut(source_hist, target_hist):
"""Create lookup table for histogram matching"""
lut = np.zeros(256, dtype=np.uint8)
source_cdf = source_hist.cumsum()
target_cdf = target_hist.cumsum()
for i in range(256):
source_val = source_cdf[i]
target_idx = np.argmin(np.abs(target_cdf - source_val))
lut[i] = target_idx
return lut
def seamless_multi_band_blending(swapped_face, target_img, target_face_bbox):
"""Seamless blending using multi-band blending for natural integration"""
try:
x1, y1, x2, y2 = target_face_bbox
# Create mask for face region
mask = np.zeros(target_img.shape[:2], dtype=np.uint8)
mask[y1:y2, x1:x2] = 255
# Apply Gaussian blur to mask for smooth edges
mask_blurred = cv2.GaussianBlur(mask, (51, 51), 0)
mask_blurred = mask_blurred.astype(np.float32) / 255.0
# Multi-band blending
result = target_img.copy().astype(np.float32)
# Create pyramid for seamless blending
levels = 5
pyramid_swapped = create_gaussian_pyramid(swapped_face.astype(np.float32), levels)
pyramid_target = create_gaussian_pyramid(target_img[y1:y2, x1:x2].astype(np.float32), levels)
pyramid_mask = create_gaussian_pyramid(mask_blurred[y1:y2, x1:x2], levels)
# Blend pyramids
blended_pyramid = []
for i in range(levels):
if i < len(pyramid_swapped) and i < len(pyramid_target) and i < len(pyramid_mask):
blended = (pyramid_swapped[i] * pyramid_mask[i] +
pyramid_target[i] * (1 - pyramid_mask[i]))
blended_pyramid.append(blended)
# Reconstruct from pyramid
if blended_pyramid:
blended_face = reconstruct_from_pyramid(blended_pyramid)
result[y1:y2, x1:x2] = blended_face
else:
# Fallback to simple blending
mask_3d = np.stack([mask_blurred[y1:y2, x1:x2]] * 3, axis=-1)
result[y1:y2, x1:x2] = (swapped_face.astype(np.float32) * mask_3d +
target_img[y1:y2, x1:x2].astype(np.float32) * (1 - mask_3d))
return result.astype(np.uint8)
except Exception as e:
print(f"Seamless blending failed: {e}")
# Fallback to simple paste
result = target_img.copy()
x1, y1, x2, y2 = target_face_bbox
result[y1:y2, x1:x2] = swapped_face
return result
def create_gaussian_pyramid(img, levels):
"""Create Gaussian pyramid for multi-band blending"""
pyramid = [img]
current = img
for i in range(levels - 1):
current = cv2.pyrDown(current)
pyramid.append(current)
return pyramid
def reconstruct_from_pyramid(pyramid):
"""Reconstruct image from Gaussian pyramid"""
result = pyramid[-1]
for i in range(len(pyramid) - 2, -1, -1):
result = cv2.pyrUp(result)
if result.shape[:2] != pyramid[i].shape[:2]:
result = cv2.resize(result, (pyramid[i].shape[1], pyramid[i].shape[0]))
result = result + pyramid[i]
return result
def apply_edge_smoothing(face_img):
"""Apply edge smoothing to reduce blocky appearance in face swaps"""
try:
if not CV2_AVAILABLE:
return face_img
# Apply bilateral filter for edge-preserving smoothing
# This reduces blocky edges while preserving important details
smoothed = cv2.bilateralFilter(face_img, 5, 60, 60)
# Apply subtle Gaussian blur to further smooth edges
smoothed = cv2.GaussianBlur(smoothed, (3, 3), 0.5)
# Blend with original to maintain natural look
alpha = 0.9 # 90% smoothed, 10% original
final_result = cv2.addWeighted(smoothed, alpha, face_img, 1-alpha, 0)
return final_result
except Exception as e:
print(f"Edge smoothing failed: {e}")
return face_img
def smooth_face_blend(swapped_face, target_region, target_face_bbox):
"""Enhanced face blending with improved accuracy"""
try:
# Use the new seamless blending for better results
# Create a dummy target image for the blending function
h, w = target_region.shape[:2]
dummy_target = np.zeros((h * 2, w * 2, 3), dtype=np.uint8)
dummy_target[:h, :w] = target_region
# Adjust bbox for the dummy target
adjusted_bbox = (0, 0, w, h)
# Apply seamless blending
result = seamless_multi_band_blending(swapped_face, dummy_target, adjusted_bbox)
# Extract the blended face region
blended_face = result[:h, :w]
return blended_face
except Exception as e:
print(f"Enhanced face blending error: {e}")
# Fallback to minimal blending
h, w = swapped_face.shape[:2]
# Create elliptical mask for face shape
center = (w // 2, h // 2)
axes = (w // 2 - 5, h // 2 - 5)
# Generate smooth elliptical mask
mask = np.zeros((h, w), dtype=np.uint8)
cv2.ellipse(mask, center, axes, 0, 0, 360, 255, -1)
# Apply light Gaussian blur to mask
mask_blurred = cv2.GaussianBlur(mask, (11, 11), 0)
mask_blurred = mask_blurred.astype(np.float32) / 255.0
# Apply the feathered mask
mask_3d = np.stack([mask_blurred] * 3, axis=-1)
# Blend the face
blended_face = (swapped_face.astype(np.float32) * mask_3d +
target_region.astype(np.float32) * (1 - mask_3d))
return np.clip(blended_face, 0, 255).astype(np.uint8)
def natural_color_match(swapped_face, target_region):
"""Enhanced color matching using LAB histogram matching for better accuracy"""
try:
# Use the advanced color matching for better results
enhanced_face = advanced_color_matching(swapped_face, target_region, (0, 0, swapped_face.shape[1], swapped_face.shape[0]))
return enhanced_face
except Exception as e:
print(f"Enhanced color matching failed, using fallback: {e}")
# Fallback to minimal color matching
try:
# Convert to YCrCb color space for gentle skin tone adjustment
swapped_ycrcb = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2YCrCb)
target_ycrcb = cv2.cvtColor(target_region, cv2.COLOR_BGR2YCrCb)
# Get mean values for skin tone channels
swapped_mean = np.mean(swapped_ycrcb, axis=(0, 1))
target_mean = np.mean(target_ycrcb, axis=(0, 1))
# Gentle color correction
y_ratio = target_mean[0] / swapped_mean[0] if swapped_mean[0] > 0 else 1.0
y_ratio = np.clip(y_ratio, 0.95, 1.05)
cr_diff = (target_mean[1] - swapped_mean[1]) * 0.1
cb_diff = (target_mean[2] - swapped_mean[2]) * 0.1
# Apply color correction
corrected_ycrcb = swapped_ycrcb.copy()
corrected_ycrcb[:, :, 0] = np.clip(corrected_ycrcb[:, :, 0] * y_ratio, 0, 255)
corrected_ycrcb[:, :, 1] = np.clip(corrected_ycrcb[:, :, 1] + cr_diff, 0, 255)
corrected_ycrcb[:, :, 2] = np.clip(corrected_ycrcb[:, :, 2] + cb_diff, 0, 255)
# Convert back to BGR
corrected_face = cv2.cvtColor(corrected_ycrcb, cv2.COLOR_YCrCb2BGR)
# Blend with original
alpha = 0.9
final_face = cv2.addWeighted(corrected_face, alpha, swapped_face, 1 - alpha, 0)
return final_face
except Exception as fallback_error:
print(f"Fallback color matching also failed: {fallback_error}")
return swapped_face
@app.route('/')
def index():
"""Serve the main HTML page"""
return send_file('index.html')
@app.route('/compressor.html')
def compressor():
"""Serve the compressor HTML page"""
return send_file('compressor.html')
@app.route('/compressor')
def compressor_redirect():
"""Serve compressor page (redirect from /compressor)"""
return send_file('compressor.html')
@app.route('/compressor/')
def compressor_with_slash():
"""Serve compressor page with slash"""
return send_file('compressor.html')
@app.route('/index')
def index_page():
"""Serve index page (same as main route)"""
return send_file('index.html')
def extract_video_frames(video_path, frames_dir):
"""Extract frames from video file"""
if not CV2_AVAILABLE:
raise ImportError("OpenCV is required for video processing")
if not os.path.exists(frames_dir):
os.makedirs(frames_dir)
cap = cv2.VideoCapture(video_path)
frame_paths = []
idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg")
cv2.imwrite(frame_path, frame)
frame_paths.append(frame_path)
idx += 1
cap.release()
return frame_paths
def create_video_from_frames(frames_dir, output_video_path, fps):
"""Create video from processed frames"""
if not CV2_AVAILABLE:
raise ImportError("OpenCV is required for video processing")
frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')])
if not frames:
raise ValueError("No frames found in directory")
first_frame = cv2.imread(frames[0])
height, width, layers = first_frame.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
for frame_path in frames:
frame = cv2.imread(frame_path)
out.write(frame)
out.release()
def get_video_fps(video_path):
"""Get FPS from video file"""
if not CV2_AVAILABLE:
return 30.0 # Default FPS if OpenCV not available
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return fps if fps > 0 else 30.0
@app.route('/api/gpu_status', methods=['GET'])
def get_gpu_status():
"""Get GPU acceleration status and information"""
try:
if not swapper:
return jsonify({
'gpu_available': False,
'message': 'Face swapper not initialized'
})
gpu_info = swapper.get_gpu_info()
return jsonify({
'success': True,
'gpu_info': gpu_info,
'last_processing_time': getattr(swapper, 'last_processing_time', 0),
'message': 'GPU status retrieved successfully'
})
except Exception as e:
logger.error(f"GPU status error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/swap', methods=['POST'])
def swap_faces():
"""Handle face swapping request with GPU optimization"""
start_time = time.time()
logger.info("FACE SWAP REQUEST RECEIVED")
try:
data = request.json
source_image_data = data.get('source_image')
target_image_data = data.get('target_image')
source_face_idx = int(data.get('source_face_idx', 1))
target_face_idx = int(data.get('target_face_idx', 1))
selected_model = data.get('model', 'inswapper_128.onnx')
logger.info(f"Request parameters:")
logger.info(f"Source face index: {source_face_idx}")
logger.info(f"Target face index: {target_face_idx}")
logger.info(f"Using Model: {selected_model}")
if not source_image_data or not target_image_data:
logger.error("Missing source or target image")
return jsonify({'error': 'Missing source or target image'}), 400
if not swapper:
logger.error("Face swapper not initialized")
return jsonify({'error': 'Face swapper not initialized'}), 500
logger.info("Converting base64 images to OpenCV format...")
# Convert base64 to OpenCV images
source_image = base64_to_image(source_image_data)
target_image = base64_to_image(target_image_data)
logger.info(f"Image dimensions:")
logger.info(f"Source: {source_image.shape[1]}x{source_image.shape[0]}")
logger.info(f"Target: {target_image.shape[1]}x{target_image.shape[0]}")
# Save temporary files
logger.info("Saving temporary files...")
source_path = os.path.join(UPLOAD_FOLDER, 'source.jpg')
target_path = os.path.join(UPLOAD_FOLDER, 'target.jpg')
result_path = os.path.join(UPLOAD_FOLDER, 'result.jpg')
cv2.imwrite(source_path, source_image)
cv2.imwrite(target_path, target_image)
logger.info(f"Temporary files saved:")
logger.info(f"Source: {source_path}")
logger.info(f"Target: {target_path}")
logger.info(f"Result: {result_path}")
# Perform face swap with GPU acceleration
logger.info("Performing face swap...")
swap_start = time.time()
try:
result = swapper.swap_faces(
source_path,
source_face_idx,
target_path,
target_face_idx,
swap_hair=False,
model_name=selected_model
)
except TypeError:
result = swapper.swap_faces(
source_path,
source_face_idx,
target_path,
target_face_idx,
swap_hair=False
)
swap_time = time.time() - swap_start
logger.info(f"Face swap completed in {swap_time:.2f} seconds")
# Apply edge smoothing to reduce blocky appearance
logger.info("Applying edge smoothing...")
result = apply_edge_smoothing(result)
# Save result
logger.info("Saving result image...")
cv2.imwrite(result_path, result)
# Convert result to base64
logger.info("Converting result to base64...")
result_base64 = image_to_base64(result)
total_time = time.time() - start_time
logger.info(f"FACE SWAP COMPLETED SUCCESSFULLY")
logger.info(f"Total processing time: {total_time:.2f} seconds")
logger.info(f"Result size: {len(result_base64)} chars")
# Include GPU status in response
gpu_status = getattr(swapper, 'gpu_enabled', False)
return jsonify({
'success': True,
'result_image': result_base64,
'message': f'Face swap completed successfully! (GPU: {"Enabled" if gpu_status else "Disabled"})',
'processing_time': total_time,
'gpu_accelerated': gpu_status
})
except Exception as e:
logger.error(f"FACE SWAP ERROR: {str(e)}")
logger.error(f"Error location: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/detect_faces', methods=['POST'])
def detect_faces():
"""Detect faces in an image"""
start_time = time.time()
logger.info("FACE DETECTION REQUEST RECEIVED")
try:
data = request.json
image_data = data.get('image')
logger.info(f"Image data length: {len(image_data) if image_data else 0} chars")
if not image_data:
logger.error("Missing image data")
return jsonify({'error': 'Missing image data'}), 400
if not swapper:
logger.error("Face swapper not initialized")
return jsonify({'error': 'Face swapper not initialized'}), 500
logger.info("Converting base64 image to OpenCV format...")
# Convert base64 to OpenCV image
image = base64_to_image(image_data)
logger.info(f"Image dimensions: {image.shape[1]}x{image.shape[0]}")
# Detect faces
logger.info("Detecting faces in image...")
detection_start = time.time()
faces = swapper.app.get(image)
detection_time = time.time() - detection_start
logger.info(f"Face detection completed in {detection_time:.2f} seconds")
logger.info(f"Found {len(faces)} face(s)")
# Sort faces from left to right
faces = sorted(faces, key=lambda x: x.bbox[0])
logger.info("Sorted faces from left to right")
# Prepare face data
logger.info("Preparing face data...")
detected_faces = []
for i, face in enumerate(faces):
x1, y1, x2, y2 = [int(v) for v in face.bbox]
logger.info(f"Face {i+1}: bbox=({x1},{y1},{x2},{y2}), size={x2-x1}x{y2-y1}")
# Extract face region
face_region = image[y1:y2, x1:x2]
# Convert to base64
face_base64 = image_to_base64(face_region)
detected_faces.append({
'id': i + 1,
'label': f'Face {i + 1}',
'image': face_base64,
'bbox': [x1, y1, x2, y2],
'x': x1,
'y': y1,
'width': x2 - x1,
'height': y2 - y1
})
total_time = time.time() - start_time
logger.info(f"FACE DETECTION COMPLETED SUCCESSFULLY")
logger.info(f"Total processing time: {total_time:.2f} seconds")
logger.info(f"Detected {len(detected_faces)} faces with bounding boxes")
return jsonify({
'success': True,
'faces': detected_faces,
'message': f'Detected {len(detected_faces)} faces',
'processing_time': total_time
})
except Exception as e:
logger.error(f"FACE DETECTION ERROR: {str(e)}")
logger.error(f"Error location: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/enhanced_swap', methods=['POST'])
def enhanced_swap_faces():
"""Handle enhanced face swapping request with quality presets"""
start_time = time.time()
logger.info("ENHANCED SWAP REQUEST RECEIVED")
try:
data = request.json
source_image_data = data.get('source_image')
target_image_data = data.get('target_image')
source_face_idx = int(data.get('source_face_idx', 1))
target_face_idx = int(data.get('target_face_idx', 1))
quality_preset = data.get('quality_preset', 'balanced')
logger.info(f"Request parameters:")
logger.info(f"Source image data length: {len(source_image_data) if source_image_data else 0} chars")
logger.info(f"Target image data length: {len(target_image_data) if target_image_data else 0} chars")
logger.info(f"Source face index: {source_face_idx}")
logger.info(f"Target face index: {target_face_idx}")
logger.info(f"Quality preset: {quality_preset}")
if not source_image_data or not target_image_data:
logger.error("Missing source or target image")
return jsonify({'error': 'Missing source or target image'}), 400
# Check if enhanced swapper is available
if not ENHANCED_SWAPPER_AVAILABLE:
logger.warning("Enhanced swapper not available, falling back to basic swapper")
return jsonify({
'error': 'Enhanced swapper not available. Please install EnhancedFaceSwapper.py and QualityPresets.py',
'fallback_available': FACE_SWAPPER_AVAILABLE
}), 501
# Create enhanced swapper with quality preset
try:
enhanced_swapper = create_enhanced_swapper_with_quality(quality_preset)
preset_info = QualityPresets.get_preset(quality_preset)
logger.info(f"Using quality preset: {preset_info['name']}")
logger.info(f"Expected processing time: {preset_info['processing_time']}")
logger.info(f"Quality score: {preset_info['quality_score']}")
except Exception as e:
logger.error(f"Failed to create enhanced swapper: {e}")
return jsonify({'error': f'Failed to create enhanced swapper: {str(e)}'}), 500
# Convert base64 to images
source_image = base64_to_image(source_image_data)
target_image = base64_to_image(target_image_data)
if source_image is None or target_image is None:
logger.error("Failed to convert base64 to image")
return jsonify({'error': 'Failed to decode images'}), 400
# Save temporary files
timestamp = int(time.time())
source_path = os.path.join(UPLOAD_FOLDER, f'enhanced_source_{timestamp}.jpg')
target_path = os.path.join(UPLOAD_FOLDER, f'enhanced_target_{timestamp}.jpg')
# Save images based on their type
if CV2_AVAILABLE and isinstance(source_image, np.ndarray):
cv2.imwrite(source_path, source_image)
elif isinstance(source_image, Image.Image):
source_image.save(source_path)
else:
logger.error(f"Invalid source image type: {type(source_image)}")
return jsonify({'error': 'Invalid source image format'}), 400
if CV2_AVAILABLE and isinstance(target_image, np.ndarray):
cv2.imwrite(target_path, target_image)
elif isinstance(target_image, Image.Image):
target_image.save(target_path)
else:
logger.error(f"Invalid target image type: {type(target_image)}")
return jsonify({'error': 'Invalid target image format'}), 400
logger.info(f"Saved temporary files: {source_path}, {target_path}")
# Perform enhanced face swapping
try:
logger.info("Starting enhanced face swapping...")
result_image = enhanced_swapper.swap_faces_enhanced(
source_path, target_path, source_face_idx, target_face_idx
)
if result_image is None:
logger.error("Enhanced face swapping returned None")
return jsonify({'error': 'Enhanced face swapping failed'}), 500
logger.info("Enhanced face swapping completed successfully")
except Exception as e:
logger.error(f"Enhanced face swapping error: {e}")
logger.error(f"Error type: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': f'Enhanced face swapping failed: {str(e)}'}), 500
# Save result
result_path = os.path.join(UPLOAD_FOLDER, f'enhanced_result_{timestamp}.jpg')
if CV2_AVAILABLE:
cv2.imwrite(result_path, result_image)
else:
result_pil = Image.fromarray(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB))
result_pil.save(result_path)
# Convert to base64
result_base64 = image_to_base64(result_path)
# Clean up temporary files
try:
os.unlink(source_path)
os.unlink(target_path)
os.unlink(result_path)
except:
pass
processing_time = time.time() - start_time
logger.info(f"Enhanced swapping completed in {processing_time:.2f} seconds")
return jsonify({
'result_image': result_base64,
'processing_time': processing_time,
'quality_preset': quality_preset,
'preset_info': preset_info,
'enhanced_features': {
'face_structure_matching': enhanced_swapper.face_structure_matching,
'color_correction': enhanced_swapper.color_correction_enabled,
'seamless_blending': enhanced_swapper.seamless_blending_enabled,
'detail_enhancement': enhanced_swapper.enhancement_enabled
}
})
except Exception as e:
logger.error(f"Enhanced swap endpoint error: {e}")
logger.error(f"Error type: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/quality_presets', methods=['GET'])
def get_quality_presets():
"""Get available quality presets for enhanced face swapping"""
try:
if not ENHANCED_SWAPPER_AVAILABLE:
return jsonify({
'error': 'Enhanced swapper not available',
'fallback_available': FACE_SWAPPER_AVAILABLE,
'presets': []
}), 501
presets = QualityPresets.get_all_presets()
preset_names = QualityPresets.get_preset_names()
logger.info(f"Providing {len(presets)} quality presets: {preset_names}")
return jsonify({
'presets': presets,
'preset_names': preset_names,
'enhanced_available': ENHANCED_SWAPPER_AVAILABLE,
'default_preset': 'balanced'
})
except Exception as e:
logger.error(f"Error getting quality presets: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/multi_swap', methods=['POST'])
def multi_swap():
"""Handle multi-face swapping with advanced un-cut processing to prevent overlap"""
start_time = time.time()
logger.info("MULTI-SWAP REQUEST RECEIVED")
try:
data = request.json
target_image_data = data.get('target_image')
assignments = data.get('assignments', {})
selected_model = data.get('model', 'inswapper_128.onnx')
logger.info(f"Request parameters:")
logger.info(f"Using Model: {selected_model}")
logger.info(f"Target image data length: {len(target_image_data) if target_image_data else 0} chars")
logger.info(f"Number of face assignments: {len(assignments)}")
for face_id, source_data in assignments.items():
logger.info(f"Face {face_id}: {len(source_data) if source_data else 0} chars source data")
if not target_image_data:
logger.error("Missing target image")
return jsonify({'error': 'Missing target image'}), 400
if not swapper:
logger.error("Face swapper not initialized")
return jsonify({'error': 'Face swapper not initialized'}), 500
logger.info("Converting target image to OpenCV format...")
target_image = base64_to_image(target_image_data)
logger.info(f"Target image dimensions: {target_image.shape[1]}x{target_image.shape[0]}")
# Start with target image
result = target_image.copy()
logger.info("Created result image copy")
# Process each face swap sequentially
face_ids = sorted(assignments.keys(), key=int)
logger.info(f"Processing {len(face_ids)} face assignments in order: {face_ids}")
processed_faces = 0
for target_face_id in face_ids:
source_image_data = assignments[target_face_id]
if source_image_data:
logger.info(f"Processing face {target_face_id}...")
# Convert source image
source_image = base64_to_image(source_image_data)
# Save temporary files
logger.info(f"Saving temporary files for face {target_face_id}...")
source_path = os.path.join(UPLOAD_FOLDER, f'source_{target_face_id}.jpg')
target_path = os.path.join(UPLOAD_FOLDER, f'target_{target_face_id}.jpg')
cv2.imwrite(source_path, source_image)
cv2.imwrite(target_path, result) # Use current result, not original target
# Perform face swap directly without cutting/pasting rectangles
logger.info(f"Performing face swap for face {target_face_id}...")
swap_start = time.time()
try:
swapped = swapper.swap_faces(
source_path,
1, # Use first face from source
target_path,
int(target_face_id),
swap_hair=False,
model_name=selected_model
)
except TypeError:
swapped = swapper.swap_faces(
source_path,
1,
target_path,
int(target_face_id),
swap_hair=False
)
swap_time = time.time() - swap_start
logger.info(f"Face swap completed in {swap_time:.2f} seconds")
if swapped is not None:
# Directly assign the seamlessly blended output back to result
result = swapped
processed_faces += 1
logger.info(f"Successfully applied face {target_face_id} to result")
else:
logger.warning(f"Face {target_face_id} swap failed to return an image.")
else:
logger.warning(f"No source image data for face {target_face_id}")
total_time = time.time() - start_time
logger.info(f"MULTI-SWAP COMPLETED SUCCESSFULLY")
logger.info(f"Total processing time: {total_time:.2f} seconds")
logger.info(f"Processed {processed_faces}/{len(face_ids)} faces successfully")
# Convert result to base64
logger.info("Converting final result to base64...")
result_base64 = image_to_base64(result)
logger.info(f"Result size: {len(result_base64)} chars")
return jsonify({
'success': True,
'result_image': result_base64,
'message': f'Multi-face swap completed! Processed {processed_faces}/{len(face_ids)} faces.',
'processing_time': total_time,
'faces_processed': processed_faces,
'total_faces': len(face_ids)
})
except Exception as e:
logger.error(f"MULTI-SWAP ERROR: {str(e)}")
logger.error(f"Error location: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/video_swap', methods=['POST'])
def video_swap():
"""Handle video face swapping - simplified like working app.py"""
start_time = time.time()
logger.info("VIDEO SWAP REQUEST RECEIVED")
try:
data = request.json
source_image_data = data.get('source_image')
video_data = data.get('video')
source_face_idx = int(data.get('source_face_idx', 1))
target_face_idx = int(data.get('target_face_idx', 1))
selected_model = data.get('model', 'inswapper_128.onnx')
logger.info(f"Request parameters:")
logger.info(f" • Source face index: {source_face_idx}")
logger.info(f" • Target face index: {target_face_idx}")
logger.info(f" • Selected Model: {selected_model}")
logger.info(f" • Source image data length: {len(source_image_data) if source_image_data else 0} chars")
logger.info(f" • Video data length: {len(video_data) if video_data else 0} chars")
if not source_image_data or not video_data:
logger.error("Missing source image or video")
return jsonify({'error': 'Missing source image or video'}), 400
if not swapper:
logger.error("Face swapper not initialized")
return jsonify({'error': 'Face swapper not initialized'}), 500
# Initialize progress tracking
global video_progress
video_progress = {
'processing': True,
'phase': 'initializing',
'total_frames': 0,
'processed_frames': 0,
'current_frame_base64': None,
'start_time': time.time(),
'mode': 'normal',
'fps': None,
'resolution': None,
'file_size': 0,
'processing_speed': 0,
'avg_frame_time': None,
'estimated_total_time': None,
'countdown_time': None
}
logger.info("Starting video processing")
# Simplified video processing like app.py
source_image = base64_to_image(source_image_data)
source_path = os.path.join(UPLOAD_FOLDER, f'video_source_{int(time.time())}.jpg')
if CV2_AVAILABLE and isinstance(source_image, np.ndarray):
cv2.imwrite(source_path, source_image)
elif isinstance(source_image, Image.Image):
source_image.save(source_path)
else:
logger.error(f"Invalid source image format: {type(source_image)}")
return jsonify({'error': 'Invalid source image format'}), 400
# Convert video data to file
video_path = os.path.join(UPLOAD_FOLDER, f'input_video_{int(time.time())}.mp4')
if 'base64,' in video_data:
video_data = video_data.split('base64,')[1]
video_bytes = base64.b64decode(video_data)
with open(video_path, 'wb') as f:
f.write(video_bytes)
video_progress['file_size'] = os.path.getsize(video_path) if os.path.exists(video_path) else 0
logger.info(f"Video file size: {video_progress['file_size'] / (1024*1024):.2f} MB")
# Setup processing directories
frames_dir = os.path.join(UPLOAD_FOLDER, 'video_frames')
swapped_dir = os.path.join(UPLOAD_FOLDER, 'swapped_frames')
output_video_path = os.path.join(UPLOAD_FOLDER, f'output_swapped_video_{int(time.time())}.mp4')
# Clean up and create directories
if os.path.exists(frames_dir):
shutil.rmtree(frames_dir)
if os.path.exists(swapped_dir):
shutil.rmtree(swapped_dir)
os.makedirs(frames_dir, exist_ok=True)
os.makedirs(swapped_dir, exist_ok=True)
try:
# Extract frames from video (like app.py)
logger.info("Extracting frames from video...")
video_progress['phase'] = 'extracting'
frame_paths = extract_video_frames(video_path, frames_dir)
video_progress['total_frames'] = len(frame_paths)
logger.info(f"Extracted {len(frame_paths)} frames")
# Get FPS from original video (like app.py)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
video_progress['fps'] = int(fps) if fps > 0 else 30
video_progress['resolution'] = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
cap.release()
logger.info(f"Video: {video_progress['fps']} FPS, {video_progress['resolution']} resolution")
# Process frames with face swapping (simplified like app.py)
logger.info("Swapping faces on frames...")
video_progress['phase'] = 'swapping'
processed_count = 0
last_update_time = time.time()
# Process ALL frames sequentially (matching working app.py)
for idx, frame_path in enumerate(frame_paths):
frame_start_time = time.time()
swapped_name = f"swapped_{idx:05d}.jpg"
out_path = os.path.join(swapped_dir, swapped_name)
try:
# Simple face swap call (like app.py line 343)
try:
swapped_frame = swapper.swap_faces(
source_path=source_path,
source_face_idx=source_face_idx,
target_path=frame_path,
target_face_idx=target_face_idx,
model_name=selected_model
)
except TypeError:
swapped_frame = swapper.swap_faces(
source_path=source_path,
source_face_idx=source_face_idx,
target_path=frame_path,
target_face_idx=target_face_idx
)
# Save swapped frame (like app.py line 344)
if CV2_AVAILABLE and isinstance(swapped_frame, np.ndarray):
cv2.imwrite(out_path, swapped_frame)
processed_count += 1
# Update progress tracking
video_progress['processed_frames'] = processed_count
# Update progress with frame preview every 10 frames
if processed_count % 10 == 0 and CV2_AVAILABLE and isinstance(swapped_frame, np.ndarray):
_, buffer = cv2.imencode('.jpg', swapped_frame)
frame_base64 = base64.b64encode(buffer).decode('utf-8')
video_progress['current_frame_base64'] = f"data:image/jpeg;base64,{frame_base64}"
# Update progress every 5 frames
if processed_count % 5 == 0:
elapsed = time.time() - video_progress['start_time']
avg_time = elapsed / processed_count if processed_count > 0 else 0
remaining_frames = len(frame_paths) - processed_count
remaining_time = avg_time * remaining_frames
# Estimate total time with buffer
buffer_time = 10
total_estimated = elapsed + remaining_time + buffer_time
video_progress['countdown_time'] = round(total_estimated, 1)
mins, secs = divmod(int(remaining_time), 60)
logger.info(f"Processed {processed_count}/{len(frame_paths)} frames | Est. time left: {mins:02d}:{secs:02d}")
except Exception as e:
logger.error(f"Error processing frame {idx}: {e}")
# Copy original frame if swap fails (like app.py line 345)
if os.path.exists(frame_path):
shutil.copy2(frame_path, out_path)
processed_count += 1
video_progress['processed_frames'] = processed_count
logger.info(f"Face swapping completed. Processed {processed_count} frames.")
# Combine swapped frames into video (like app.py line 349-350)
logger.info("Combining swapped frames into video...")
video_progress['phase'] = 'rendering'
create_video_from_frames(swapped_dir, output_video_path, int(video_progress['fps']))
# Convert output video to base64 for response
with open(output_video_path, 'rb') as f:
video_bytes = f.read()
output_video_base64 = base64.b64encode(video_bytes).decode()
output_video_data_url = f"data:video/mp4;base64,{output_video_base64}"
# Clean up temporary files (like app.py line 355-357)
try:
shutil.rmtree(frames_dir)
shutil.rmtree(swapped_dir)
os.remove(video_path)
os.remove(source_path)
except Exception as cleanup_error:
logger.warning(f"Cleanup warning: {cleanup_error}")
# Final progress update
processing_time = time.time() - video_progress['start_time']
video_progress['phase'] = 'complete'
video_progress['processing'] = False
logger.info(f"VIDEO SWAPPING COMPLETED SUCCESSFULLY")
logger.info(f"Total processing time: {processing_time:.1f} seconds")
logger.info(f"Frames processed: {processed_count}")
return jsonify({
'success': True,
'message': f'Video face swap completed successfully!',
'processing_mode': 'Standard face swap processing',
'quality_level': 'High Quality',
'frames_processed': processed_count,
'total_frames': len(frame_paths),
'output_fps': video_progress['fps'],
'processing_time': round(processing_time, 1),
'output_video': output_video_data_url,
'total_frames': len(frame_paths)
})
except Exception as processing_error:
logger.error(f"VIDEO PROCESSING ERROR: {str(processing_error)}")
logger.error(f"Error location: {type(processing_error).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
# Reset progress on error
video_progress['processing'] = False
video_progress['phase'] = 'error'
# Clean up on error
try:
if os.path.exists(frames_dir):
shutil.rmtree(frames_dir)
if os.path.exists(swapped_dir):
shutil.rmtree(swapped_dir)
if os.path.exists(video_path):
os.remove(video_path)
if os.path.exists(source_path):
os.remove(source_path)
except:
pass
return jsonify({'error': f'Video processing failed: {str(processing_error)}'}), 500
except Exception as e:
logger.error(f"VIDEO SWAP ERROR: {str(e)}")
logger.error(f"Error location: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/video_progress', methods=['GET'])
def video_progress_endpoint():
"""Get current video processing progress"""
return jsonify(video_progress)
@app.route('/api/multi_combination_swap', methods=['POST'])
def multi_combination_swap():
"""Handle multiple source and target image combinations with GPU optimization"""
start_time = time.time()
logger.info("MULTI-COMBINATION SWAP REQUEST RECEIVED")
try:
data = request.json
source_images = data.get('source_images', [])
target_images = data.get('target_images', [])
selected_model = data.get('model', 'inswapper_128.onnx')
logger.info(f"Processing {len(source_images)} source images x {len(target_images)} target images")
logger.info(f"Using Model: {selected_model}")
if not source_images or not target_images:
return jsonify({'error': 'Missing source or target images'}), 400
if not swapper:
return jsonify({'error': 'Face swapper not initialized'}), 500
results = []
timestamp = int(time.time())
# Check if GPU batch processing is available and beneficial
use_batch_processing = (hasattr(swapper, 'gpu_enabled') and
swapper.gpu_enabled and
len(source_images) * len(target_images) > 1)
if use_batch_processing:
logger.info("Using GPU batch processing for optimal performance")
# Save all source and target images first
source_paths = []
target_paths = []
for i, source_image_data in enumerate(source_images):
source_image = base64_to_image(source_image_data)
source_path = os.path.join(UPLOAD_FOLDER, f'batch_source_{timestamp}_{i}.jpg')
cv2.imwrite(source_path, source_image)
source_paths.append(source_path)
for j, target_image_data in enumerate(target_images):
target_image = base64_to_image(target_image_data)
target_path = os.path.join(UPLOAD_FOLDER, f'batch_target_{timestamp}_{j}.jpg')
cv2.imwrite(target_path, target_image)
target_paths.append(target_path)
# Use batch processing for GPU optimization
try:
# Process first source against all targets as batch
for i, source_path in enumerate(source_paths):
source_face_indices = [1] # Use first face from each source
# Use the optimized batch method
try:
batch_results = swapper.swap_faces_batch(
source_path=source_path,
target_path=target_paths[0], # Will be overridden in batch processing
source_face_indices=source_face_indices,
target_face_indices=list(range(1, len(target_paths) + 1)),
swap_hair=False,
model_name=selected_model
)
except TypeError:
batch_results = swapper.swap_faces_batch(
source_path=source_path,
target_path=target_paths[0],
source_face_indices=source_face_indices,
target_face_indices=list(range(1, len(target_paths) + 1)),
swap_hair=False
)
# Convert batch results to response format
for j, result in enumerate(batch_results):
if j < len(target_paths):
# Apply edge smoothing to reduce blocky appearance
result = apply_edge_smoothing(result)
result_base64 = image_to_base64(result)
result_filename = f"combo_{timestamp}_source{i+1}_target{j+1}.jpg"
result_path = os.path.join(UPLOAD_FOLDER, result_filename)
cv2.imwrite(result_path, result)
results.append({
'combination_name': f'Source {i+1} x Target {j+1}',
'source_index': i,
'target_index': j,
'result_image': result_base64,
'download_url': f'/download/{result_filename}'
})
logger.info(f"GPU batch processed: Source {i+1} x Target {j+1}")
except Exception as batch_error:
logger.warning(f"GPU batch processing failed: {batch_error}")
logger.info("Falling back to individual processing...")
use_batch_processing = False
if not use_batch_processing:
logger.info("Using individual processing (CPU or GPU fallback)")
# Process all combinations individually (original method with GPU support)
for i, source_image_data in enumerate(source_images):
for j, target_image_data in enumerate(target_images):
try:
logger.info(f"Processing combination {i+1}x{j+1}...")
# Convert base64 to OpenCV images
source_image = base64_to_image(source_image_data)
target_image = base64_to_image(target_image_data)
# Save temporary files
source_path = os.path.join(UPLOAD_FOLDER, f'combo_{timestamp}_source_{i}_{j}.jpg')
target_path = os.path.join(UPLOAD_FOLDER, f'combo_{timestamp}_target_{i}_{j}.jpg')
cv2.imwrite(source_path, source_image)
cv2.imwrite(target_path, target_image)
# Perform face swap with GPU acceleration
try:
swapped_result = swapper.swap_faces(
source_path,
1, # Use first face from source (default for multi-combination)
target_path,
1, # Use first face from target (default for multi-combination)
swap_hair=False,
model_name=selected_model
)
except TypeError:
swapped_result = swapper.swap_faces(
source_path,
1,
target_path,
1,
swap_hair=False
)
# Apply edge smoothing to reduce blocky appearance
swapped_result = apply_edge_smoothing(swapped_result)
# Convert result to base64
result_base64 = image_to_base64(swapped_result)
# Save result file for download
result_filename = f"combo_{timestamp}_source{i+1}_target{j+1}.jpg"
result_path = os.path.join(UPLOAD_FOLDER, result_filename)
cv2.imwrite(result_path, swapped_result)
results.append({
'combination_name': f'Source {i+1} x Target {j+1}',
'source_index': i,
'target_index': j,
'result_image': result_base64,
'download_url': f'/download/{result_filename}'
})
logger.info(f"Successfully processed combination {i+1}x{j+1}")
except Exception as e:
logger.error(f"Error processing combination {i+1}x{j+1}: {e}")
# Continue with other combinations
continue
if not results:
return jsonify({'error': 'No combinations processed successfully'}), 500
total_time = time.time() - start_time
gpu_status = getattr(swapper, 'gpu_enabled', False)
logger.info(f"MULTI-COMBINATION SWAP COMPLETED")
logger.info(f"Total combinations: {len(source_images) * len(target_images)}")
logger.info(f"Successful: {len(results)}")
logger.info(f"Processing time: {total_time:.2f}s")
logger.info(f"GPU acceleration: {'Enabled' if gpu_status else 'Disabled'}")
return jsonify({
'success': True,
'message': f'Processed {len(results)} combinations successfully! (GPU: {"Enabled" if gpu_status else "Disabled"})',
'results': results,
'total_combinations': len(source_images) * len(target_images),
'successful_combinations': len(results),
'processing_time': total_time,
'gpu_accelerated': gpu_status,
'batch_processing_used': use_batch_processing
})
except Exception as e:
logger.error(f"MULTI-COMBINATION SWAP ERROR: {str(e)}")
logger.error(f"Error location: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/test', methods=['GET'])
def test_endpoint():
"""Simple test endpoint to check if server is responding"""
return jsonify({
'success': True,
'message': 'Server is working correctly',
'timestamp': int(time.time())
})
@app.route('/api/face_morph', methods=['POST'])
def face_morph():
"""Handle face morphing between two faces with real GIF/MP4 generation"""
start_time = time.time()
logger.info("FACE MORPH REQUEST RECEIVED")
try:
data = request.json
source_image_data = data.get('source_image')
target_image_data = data.get('target_image')
source_face_idx = int(data.get('source_face_idx', 1))
target_face_idx = int(data.get('target_face_idx', 1))
morph_speed = data.get('morph_speed', 'normal')
morph_frames = int(data.get('morph_frames', 30))
output_format = data.get('output_format', 'gif')
selected_model = data.get('model', 'inswapper_128.onnx')
logger.info(f"Request parameters:")
logger.info(f" • Source face index: {source_face_idx}")
logger.info(f" • Target face index: {target_face_idx}")
logger.info(f" • Morph speed: {morph_speed}")
logger.info(f" • Morph frames: {morph_frames}")
logger.info(f" • Output format: {output_format}")
logger.info(f" • Using Model: {selected_model}")
if not source_image_data or not target_image_data:
logger.error("Missing source or target image")
return jsonify({'error': 'Missing source or target image'}), 400
if not swapper:
logger.error("Face swapper not initialized")
return jsonify({'error': 'Face swapper not initialized'}), 500
logger.info("Converting base64 images to OpenCV format...")
# Convert base64 to OpenCV images
source_image = base64_to_image(source_image_data)
target_image = base64_to_image(target_image_data)
logger.info(f"Image dimensions:")
logger.info(f" • Source: {source_image.shape[1]}x{source_image.shape[0]}")
logger.info(f" • Target: {target_image.shape[1]}x{target_image.shape[0]}")
# Save temporary files
logger.info("Saving temporary files...")
source_path = os.path.join(UPLOAD_FOLDER, 'morph_source.jpg')
target_path = os.path.join(UPLOAD_FOLDER, 'morph_target.jpg')
cv2.imwrite(source_path, source_image)
cv2.imwrite(target_path, target_image)
logger.info(f"Temporary files saved: {source_path}, {target_path}")
# Perform face swap to get both faces in same position
try:
source_swapped = swapper.swap_faces(
source_path,
source_face_idx,
target_path,
target_face_idx,
swap_hair=False,
model_name=selected_model
)
except TypeError:
source_swapped = swapper.swap_faces(
source_path,
source_face_idx,
target_path,
target_face_idx,
swap_hair=False
)
# Extract face regions
faces = swapper.app.get(target_image)
faces = sorted(faces, key=lambda x: x.bbox[0])
if target_face_idx <= len(faces):
face = faces[target_face_idx - 1]
x1, y1, x2, y2 = [int(v) for v in face.bbox]
# Perform face swap to get full image with swapped face
full_swapped_image = source_swapped.copy()
# Generate morph frames (full images with animated face region)
morph_frames_list = []
for i in range(morph_frames):
# Calculate blend ratio (0.0 to 1.0)
ratio = i / (morph_frames - 1) if morph_frames > 1 else 0
# Create morph frame by blending only the face region
morph_frame = target_image.copy()
# Extract face regions
original_face = target_image[y1:y2, x1:x2]
swapped_face = full_swapped_image[y1:y2, x1:x2]
# Blend only the face region
alpha = ratio
beta = 1.0 - alpha
morphed_face = cv2.addWeighted(swapped_face, alpha, original_face, beta, 0)
# Paste morphed face back into full image
morph_frame[y1:y2, x1:x2] = morphed_face
# Convert to RGB PIL Image for GIF creation
morphed_rgb = cv2.cvtColor(morph_frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(morphed_rgb)
morph_frames_list.append(pil_image)
# Calculate duration based on speed
speed_duration_map = {
'slow': 5000, # 5 seconds total
'normal': 3000, # 3 seconds total
'fast': 1500 # 1.5 seconds total
}
total_duration = speed_duration_map.get(morph_speed, 3000)
frame_duration = total_duration // morph_frames # Duration per frame in milliseconds
# Generate output file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if output_format == 'gif':
# Create animated GIF
gif_path = os.path.join(UPLOAD_FOLDER, f'face_morph_{timestamp}.gif')
morph_frames_list[0].save(
gif_path,
save_all=True,
append_images=morph_frames_list[1:],
duration=frame_duration,
loop=0,
optimize=True
)
# Convert GIF to base64
with open(gif_path, 'rb') as f:
gif_data = f.read()
gif_base64 = base64.b64encode(gif_data).decode()
gif_data_url = f"data:image/gif;base64,{gif_base64}"
return jsonify({
'success': True,
'output_file': gif_data_url,
'download_url': f'/download/face_morph_{timestamp}.gif',
'total_duration': total_duration,
'output_format': output_format,
'frame_count': morph_frames,
'message': f'Face morph GIF created! {morph_frames} frames, {total_duration/1000:.1f}s duration.'
})
elif output_format == 'mp4':
# Create MP4 video using imageio
mp4_path = os.path.join(UPLOAD_FOLDER, f'face_morph_{timestamp}.mp4')
# Convert PIL frames to numpy arrays for imageio
video_frames = [np.array(frame) for frame in morph_frames_list]
# Write MP4 with imageio
imageio.mimsave(mp4_path, video_frames, fps=(1000/frame_duration), quality=8)
# For now, return first frame as preview (full MP4 download via separate endpoint)
preview_base64 = image_to_base64(cv2.cvtColor(video_frames[0], cv2.COLOR_RGB2BGR))
return jsonify({
'success': True,
'preview_image': preview_base64,
'download_url': f'/download/face_morph_{timestamp}.mp4',
'total_duration': total_duration,
'output_format': output_format,
'frame_count': morph_frames,
'message': f'Face morph MP4 created! {morph_frames} frames, {total_duration/1000:.1f}s duration.'
})
else:
return jsonify({'error': f'Target face index {target_face_idx} not found'}), 400
except Exception as e:
logger.error(f"FACE MORPH ERROR: {str(e)}")
logger.error(f"Error location: {type(e).__name__}")
import traceback
logger.error(f"Full traceback:\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/download/<filename>')
def download_file(filename):
"""Serve generated files for download"""
try:
file_path = os.path.join(UPLOAD_FOLDER, filename)
if os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
else:
return jsonify({'error': 'File not found'}), 404
except Exception as e:
print(f"Download error: {e}")
return jsonify({'error': str(e)}), 500
# Pinterest integration removed - file not available
# from pinterest_integration import search_pinterest_images, download_pinterest_images, scrape_pinterest_board
def download_image_from_url(url, filename=None):
"""Download an image from URL and return base64 encoded data"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
# Convert to base64
image_base64 = base64.b64encode(response.content).decode('utf-8')
# Determine image type
content_type = response.headers.get('content-type', 'image/jpeg')
if 'png' in content_type:
data_url = f"data:image/png;base64,{image_base64}"
else:
data_url = f"data:image/jpeg;base64,{image_base64}"
return {
'success': True,
'data_url': data_url,
'size': len(response.content),
'content_type': content_type
}
except Exception as e:
return {'success': False, 'error': f'Download failed: {str(e)}'}
@app.route('/api/batch_detect_faces', methods=['POST'])
def batch_detect_faces():
"""Batch detect faces in multiple images for auto-harvesting"""
try:
data = request.json
images = data.get('images', [])
if not images:
return jsonify({'error': 'No images provided'}), 400
if not swapper:
return jsonify({'error': 'Face swapper not initialized'}), 500
results = []
for idx, image_data in enumerate(images):
try:
# Convert base64 to OpenCV image
image = base64_to_image(image_data)
# Detect faces
faces = swapper.app.get(image)
# Sort faces from left to right
faces = sorted(faces, key=lambda x: x.bbox[0])
# Prepare face data
detected_faces = []
for i, face in enumerate(faces):
x1, y1, x2, y2 = [int(v) for v in face.bbox]
# Extract face region
face_region = image[y1:y2, x1:x2]
# Convert to base64
face_base64 = image_to_base64(face_region)
detected_faces.append({
'id': i + 1,
'label': f'Face {i + 1}',
'image': face_base64,
'bbox': [x1, y1, x2, y2]
})
results.append({
'image_index': idx,
'success': True,
'faces': detected_faces,
'face_count': len(detected_faces)
})
except Exception as e:
results.append({
'image_index': idx,
'success': False,
'error': str(e),
'faces': [],
'face_count': 0
})
return jsonify({
'success': True,
'results': results,
'total_images': len(images),
'total_faces': sum(r['face_count'] for r in results)
})
except Exception as e:
print(f"Batch face detection error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/pinterest/search', methods=['POST'])
def pinterest_search():
"""Search Pinterest for images using pinterest-dl"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'No JSON data received'}), 400
query = data.get('query', '').strip()
per_page = min(data.get('per_page', 20), 50) # Limit to 50 max
if not query:
return jsonify({'success': False, 'error': 'Query parameter is required'}), 400
print(f"Pinterest search request: {query} (limit: {per_page})")
# Search for images using pinterest-dl
result = search_pinterest_images(query, per_page)
if result['success']:
print(f"Found {result['total_found']} images for: {query}")
return jsonify(result)
else:
print(f"Pinterest search failed: {result.get('error', 'Unknown error')}")
return jsonify(result), 500
except Exception as e:
error_msg = f'Server error: {str(e)}'
print(f"Pinterest search error: {error_msg}")
return jsonify({'success': False, 'error': error_msg}), 500
@app.route('/api/pinterest/download', methods=['POST'])
def pinterest_download():
"""Download images from Pinterest using pinterest-dl"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'No JSON data received'}), 400
query = data.get('query', '').strip()
num_images = min(data.get('num_images', 20), 100) # Limit to 100 max
output_dir = data.get('output_dir', 'pinterest_downloads')
if not query:
return jsonify({'success': False, 'error': 'Query parameter is required'}), 400
print(f"Pinterest download request: {query} (count: {num_images})")
# Download images using pinterest-dl
result = download_pinterest_images(query, num_images, output_dir)
if result['success']:
print(f"Downloaded {result['total_downloaded']} images to: {result['output_directory']}")
return jsonify(result)
else:
print(f"Pinterest download failed: {result.get('error', 'Unknown error')}")
return jsonify(result), 500
except Exception as e:
error_msg = f'Server error: {str(e)}'
print(f"Pinterest download error: {error_msg}")
return jsonify({'success': False, 'error': error_msg}), 500
@app.route('/api/pinterest/scrape', methods=['POST'])
def pinterest_scrape():
"""Scrape images from a Pinterest board using pinterest-dl"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'No JSON data received'}), 400
board_url = data.get('board_url', '').strip()
num_images = min(data.get('num_images', 50), 200) # Limit to 200 max
output_dir = data.get('output_dir', 'pinterest_downloads')
if not board_url:
return jsonify({'success': False, 'error': 'Board URL parameter is required'}), 400
# Validate Pinterest URL
if not ('pinterest.com' in board_url and ('/board/' in board_url or '/pin/' in board_url)):
return jsonify({'success': False, 'error': 'Invalid Pinterest board URL'}), 400
print(f"Pinterest scrape request: {board_url} (count: {num_images})")
# Scrape board using pinterest-dl
result = scrape_pinterest_board(board_url, num_images, output_dir)
if result['success']:
print(f"Scraped {result['total_scraped']} images from board")
return jsonify(result)
else:
print(f"Pinterest scrape failed: {result.get('error', 'Unknown error')}")
return jsonify(result), 500
except Exception as e:
error_msg = f'Server error: {str(e)}'
print(f"Pinterest scrape error: {error_msg}")
return jsonify({'success': False, 'error': error_msg}), 500
@app.route('/pinterest_images/<path:filename>')
def serve_pinterest_image(filename):
"""Serve Pinterest images from the downloads directory"""
try:
# Construct the file path
pinterest_dir = Path("pinterest_downloads")
file_path = pinterest_dir / filename
# Security check - ensure file is within pinterest_downloads
if not str(file_path).startswith(str(pinterest_dir.absolute())):
return jsonify({'error': 'Invalid file path'}), 403
if file_path.exists() and file_path.is_file():
return send_file(str(file_path))
else:
return jsonify({'error': 'File not found'}), 404
except Exception as e:
print(f"Error serving Pinterest image: {e}")
return jsonify({'error': 'Server error'}), 500
@app.route('/api/preset/save', methods=['POST'])
def save_preset():
"""Save an image to the presets folder on PC"""
try:
data = request.json
image_url = data.get('url')
title = data.get('title', 'untitled')
if not image_url:
return jsonify({'success': False, 'error': 'No image URL provided'}), 400
# Create presets folder if it doesn't exist
presets_folder = os.path.join(os.getcwd(), 'presets')
os.makedirs(presets_folder, exist_ok=True)
# Download the image
response = requests.get(image_url, timeout=30)
response.raise_for_status()
# Generate a safe filename
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).rstrip()
timestamp = int(time.time())
filename = f"{safe_title}_{timestamp}.jpg"
filepath = os.path.join(presets_folder, filename)
# Save the image
with open(filepath, 'wb') as f:
f.write(response.content)
# Convert to base64 for immediate use
image_base64 = base64.b64encode(response.content).decode('utf-8')
data_url = f"data:image/jpeg;base64,{image_base64}"
return jsonify({
'success': True,
'filename': filename,
'filepath': filepath,
'data_url': data_url,
'folder': presets_folder
})
except Exception as e:
print(f"Save preset error: {e}")
return jsonify({'success': False, 'error': f'Failed to save preset: {str(e)}'}), 500
@app.route('/api/preset/save_all', methods=['POST'])
def save_all_presets():
"""Save multiple images to the presets folder on PC"""
try:
data = request.json
images = data.get('images', [])
if not images:
return jsonify({'success': False, 'error': 'No images provided'}), 400
# Create presets folder if it doesn't exist
presets_folder = os.path.join(os.getcwd(), 'presets')
os.makedirs(presets_folder, exist_ok=True)
saved_images = []
failed_images = []
for i, image in enumerate(images):
try:
image_url = image.get('url')
title = image.get('title', f'image_{i}')
if not image_url:
failed_images.append({'title': title, 'error': 'No URL provided'})
continue
# Download the image
response = requests.get(image_url, timeout=30)
response.raise_for_status()
# Generate a safe filename
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).rstrip()
timestamp = int(time.time()) + i # Add offset to avoid duplicates
filename = f"{safe_title}_{timestamp}.jpg"
filepath = os.path.join(presets_folder, filename)
# Save the image
with open(filepath, 'wb') as f:
f.write(response.content)
# Convert to base64 for immediate use
image_base64 = base64.b64encode(response.content).decode('utf-8')
data_url = f"data:image/jpeg;base64,{image_base64}"
saved_images.append({
'title': title,
'filename': filename,
'filepath': filepath,
'data_url': data_url,
'original_url': image_url
})
except Exception as e:
failed_images.append({'title': image.get('title', f'image_{i}'), 'error': str(e)})
return jsonify({
'success': True,
'saved_count': len(saved_images),
'failed_count': len(failed_images),
'saved_images': saved_images,
'failed_images': failed_images,
'folder': presets_folder
})
except Exception as e:
print(f"Save all presets error: {e}")
return jsonify({'success': False, 'error': f'Failed to save presets: {str(e)}'}), 500
@app.route('/api/preset/load', methods=['GET'])
def load_presets():
"""Load all saved presets from the presets folder"""
try:
presets_folder = os.path.join(os.getcwd(), 'presets')
if not os.path.exists(presets_folder):
return jsonify({'success': True, 'presets': []})
presets = []
# Get all image files in the presets folder
for filename in os.listdir(presets_folder):
if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
filepath = os.path.join(presets_folder, filename)
try:
# Get file modification time
mod_time = os.path.getmtime(filepath)
# Read image and convert to base64
with open(filepath, 'rb') as f:
image_data = f.read()
image_base64 = base64.b64encode(image_data).decode('utf-8')
data_url = f"data:image/jpeg;base64,{image_base64}"
# Extract title from filename (remove timestamp and extension)
name_without_ext = os.path.splitext(filename)[0]
# Remove timestamp if present (last part after underscore)
parts = name_without_ext.rsplit('_', 1)
if len(parts) > 1 and parts[-1].isdigit():
title = parts[0].replace('_', ' ').title()
else:
title = name_without_ext.replace('_', ' ').title()
presets.append({
'id': filename, # Use filename as ID
'filename': filename,
'title': title,
'filepath': filepath,
'data_url': data_url,
'saved_at': mod_time
})
except Exception as e:
print(f"Error loading preset {filename}: {e}")
continue
# Sort by saved_at (newest first)
presets.sort(key=lambda x: x['saved_at'], reverse=True)
return jsonify({'success': True, 'presets': presets})
except Exception as e:
print(f"Load presets error: {e}")
return jsonify({'success': False, 'error': f'Failed to load presets: {str(e)}'}), 500
@app.route('/api/preset/delete', methods=['POST'])
def delete_preset():
"""Delete a preset file from the presets folder"""
try:
data = request.json
filename = data.get('filename')
if not filename:
return jsonify({'success': False, 'error': 'No filename provided'}), 400
presets_folder = os.path.join(os.getcwd(), 'presets')
filepath = os.path.join(presets_folder, filename)
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({'success': True, 'deleted': filename})
else:
return jsonify({'success': False, 'error': 'File not found'}), 404
except Exception as e:
print(f"Delete preset error: {e}")
return jsonify({'success': False, 'error': f'Failed to delete preset: {str(e)}'}), 500
@app.route('/preset/<filename>')
def serve_preset(filename):
"""Serve a preset image file"""
try:
presets_folder = os.path.join(os.getcwd(), 'presets')
filepath = os.path.join(presets_folder, filename)
if os.path.exists(filepath):
return send_file(filepath)
else:
return jsonify({'error': 'File not found'}), 404
except Exception as e:
print(f"Serve preset error: {e}")
return jsonify({'error': 'Failed to serve file'}), 500
@app.route('/api/preset/save_direct', methods=['POST'])
def save_preset_direct():
"""Save AI-generated image directly to presets folder"""
try:
if 'image' not in request.files:
return jsonify({'error': 'No image file provided'}), 400
file = request.files['image']
title = request.form.get('title', 'AI Generated Face')
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Create presets directory if it doesn't exist
presets_dir = os.path.join(os.getcwd(), 'presets')
os.makedirs(presets_dir, exist_ok=True)
# Generate unique filename
timestamp = int(time.time())
import re
safe_title = re.sub(r'[^a-zA-Z0-9]', '_', title)[:50]
filename = f"ai_face_{safe_title}_{timestamp}.jpg"
filepath = os.path.join(presets_dir, filename)
# Save the file
file.save(filepath)
# Convert to base64 for immediate use
data_url = image_to_base64(filepath)
return jsonify({
'success': True,
'filename': filename,
'filepath': filepath,
'folder': presets_dir,
'data_url': data_url,
'message': f'AI face saved as {filename}'
})
except Exception as e:
print(f"Save AI preset error: {e}")
return jsonify({'error': f'Failed to save AI face: {str(e)}'}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'face_swapper': 'loaded' if swapper else 'not loaded',
'upload_folder': UPLOAD_FOLDER
})
@app.route('/api/perchance/generate', methods=['POST'])
def perchance_generate():
"""Proxy endpoint for Perchance AI image generation using iframe scraping"""
try:
data = request.get_json()
prompt = data.get('prompt', '')
model = data.get('model', 'realistic')
if not prompt:
return jsonify({'error': 'Prompt is required'}), 400
print(f"🎨 Perchance AI Request: {prompt[:50]}...")
# Method 1: Try to scrape Perchance iframe for generated image
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
# Setup Chrome options for headless browsing
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1024,768")
# Create driver
driver = webdriver.Chrome(options=chrome_options)
# Navigate to Perchance with prompt
perchance_url = f"https://perchance.org/ai-text-to-image-generator?prompt={requests.utils.quote(prompt)}"
driver.get(perchance_url)
print("🔄 Waiting for Perchance to load...")
time.sleep(3)
# Wait for image to appear (up to 30 seconds)
try:
# Look for generated image
image_element = WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "img[src*='generated'], img[src*='perchance'], img[src*='image']"))
)
image_src = image_element.get_attribute('src')
print(f"✅ Found image: {image_src[:100]}...")
# Download the image
if image_src.startswith('data:'):
# Data URL - use directly
img_data = image_src.split(',')[1]
data_url = image_src
else:
# Regular URL - download and convert
img_response = requests.get(image_src, timeout=10)
if img_response.status_code == 200:
img_data = base64.b64encode(img_response.content).decode('utf-8')
data_url = f"data:image/jpeg;base64,{img_data}"
else:
raise Exception("Failed to download image")
driver.quit()
return jsonify({
'success': True,
'image_url': data_url,
'model': model,
'prompt': prompt
})
except Exception as wait_error:
print(f"❌ Timeout waiting for image: {wait_error}")
driver.quit()
raise wait_error
except ImportError:
print("❌ Selenium not available. Install with: pip install selenium")
except Exception as selenium_error:
print(f"❌ Selenium approach failed: {selenium_error}")
# Method 2: Try requests with session to maintain cookies
try:
session = requests.Session()
# First visit the page to get cookies
page_url = f"https://perchance.org/ai-text-to-image-generator?prompt={requests.utils.quote(prompt)}"
response = session.get(page_url, timeout=30)
if response.status_code == 200:
# Parse HTML to find image
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Look for any image that might be generated
images = soup.find_all('img')
for img in images:
src = img.get('src', '')
if ('generated' in src or 'perchance' in src or 'image' in src) and not src.startswith('data:image/svg'):
print(f"✅ Found image in HTML: {src[:100]}...")
# Download image
if src.startswith('http'):
img_response = session.get(src, timeout=10)
if img_response.status_code == 200:
img_data = base64.b64encode(img_response.content).decode('utf-8')
data_url = f"data:image/jpeg;base64,{img_data}"
return jsonify({
'success': True,
'image_url': data_url,
'model': model,
'prompt': prompt
})
except ImportError:
print("❌ BeautifulSoup not available. Install with: pip install beautifulsoup4")
except Exception as requests_error:
print(f"❌ Requests approach failed: {requests_error}")
# Method 3: Fallback - Create a better placeholder
print("🔄 Creating enhanced placeholder image...")
# Create a more sophisticated placeholder
from PIL import Image, ImageDraw, ImageFont
import textwrap
import random
# Create image with gradient background
width, height = 512, 512
img = Image.new('RGB', (width, height), color='#2a2a2a')
draw = ImageDraw.Draw(img)
# Add gradient effect
for y in range(height):
color_value = int(42 + (y / height) * 20) # Dark gradient
draw.line([(0, y), (width, y)], fill=f'#{color_value:02x}{color_value:02x}{color_value:02x}')
# Add title
try:
font_title = ImageFont.truetype("arial.ttf", 28)
font_text = ImageFont.truetype("arial.ttf", 18)
font_small = ImageFont.truetype("arial.ttf", 14)
except:
font_title = ImageFont.load_default()
font_text = ImageFont.load_default()
font_small = ImageFont.load_default()
# Draw title
title = "🎨 Perchance AI Generated"
draw.text((width//2 - font_title.getlength(title)//2, 40), title, fill='#ffff55', font=font_title)
# Draw prompt text (wrapped)
max_width = width - 80
lines = textwrap.wrap(prompt, width=max_width//font_text.getlength(' '))
y_offset = 120
for line in lines[:8]: # Limit to 8 lines
draw.text((width//2 - font_text.getlength(line)//2, y_offset), line, fill='#ffffff', font=font_text)
y_offset += 30
# Add loading animation hint
loading_text = "⏳ Generating high-quality image..."
draw.text((width//2 - font_small.getlength(loading_text)//2, 380), loading_text, fill='#4CAF50', font=font_small)
# Add instruction
instruction = "This is a preview. Real generation requires external API."
draw.text((width//2 - font_small.getlength(instruction)//2, 420), instruction, fill='#ff6b6b', font=font_small)
# Add random "AI art" elements
for _ in range(20):
x = random.randint(0, width)
y = random.randint(0, height)
size = random.randint(1, 3)
color = random.choice(['#ffff55', '#4CAF50', '#ff6b6b', '#4a90e2'])
draw.ellipse([x, y, x+size, y+size], fill=color)
# Convert to data URL
buffer = BytesIO()
img.save(buffer, format='PNG')
img_data = base64.b64encode(buffer.getvalue()).decode('utf-8')
data_url = f"data:image/png;base64,{img_data}"
return jsonify({
'success': True,
'image_url': data_url,
'model': model,
'prompt': prompt,
'note': 'Enhanced placeholder. Real generation requires Perchance website.'
})
except Exception as e:
print(f"❌ Perchance generation error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/save_to_compressor', methods=['POST'])
def save_to_compressor():
"""Save images to temporary folder for compressor and return folder info"""
try:
data = request.get_json()
image_urls = data.get('image_urls', [])
if not image_urls:
return jsonify({'success': False, 'error': 'No images provided'}), 400
# Create unique temp folder
folder_id = str(uuid.uuid4())[:8]
temp_folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id)
os.makedirs(temp_folder_path, exist_ok=True)
# Track folder creation time for cleanup
temp_folders[folder_id] = time.time()
saved_files = []
for i, image_url in enumerate(image_urls):
try:
# Extract base64 data
if 'base64,' in image_url:
base64_data = image_url.split('base64,')[1]
else:
base64_data = image_url
# Decode and save
image_data = base64.b64decode(base64_data)
filename = f'compressed_image_{i+1}.jpg'
file_path = os.path.join(temp_folder_path, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
saved_files.append({
'filename': filename,
'path': file_path,
'url': f'/api/compressor_temp/{folder_id}/{filename}'
})
except Exception as e:
print(f"Error saving image {i}: {e}")
continue
if not saved_files:
# Clean up empty folder
shutil.rmtree(temp_folder_path, ignore_errors=True)
del temp_folders[folder_id]
return jsonify({'success': False, 'error': 'Failed to save any images'}), 500
return jsonify({
'success': True,
'folder_id': folder_id,
'files': saved_files,
'compressor_url': f'/compressor.html?folder={folder_id}'
})
except Exception as e:
print(f"Save to compressor error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/compressor_temp/<folder_id>/<filename>')
def serve_compressor_temp(folder_id, filename):
"""Serve files from compressor temp folder"""
try:
file_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id, filename)
if os.path.exists(file_path):
return send_file(file_path)
else:
return jsonify({'error': 'File not found'}), 404
except Exception as e:
print(f"Error serving compressor temp file: {e}")
return jsonify({'error': 'Server error'}), 500
@app.route('/api/compressor_temp/<folder_id>/list')
def list_compressor_temp_files(folder_id):
"""List all files in a compressor temp folder"""
try:
folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id)
if not os.path.exists(folder_path):
return jsonify({'success': False, 'error': 'Folder not found'}), 404
files = []
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
files.append({
'filename': filename,
'url': f'/api/compressor_temp/{folder_id}/{filename}',
'size': os.path.getsize(file_path)
})
return jsonify({
'success': True,
'folder_id': folder_id,
'files': files
})
except Exception as e:
print(f"Error listing compressor temp files: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
def cleanup_old_temp_folders():
"""Clean up temp folders older than 1 hour"""
current_time = time.time()
folders_to_remove = []
for folder_id, creation_time in temp_folders.items():
if current_time - creation_time > 3600: # 1 hour
folders_to_remove.append(folder_id)
for folder_id in folders_to_remove:
try:
folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id)
shutil.rmtree(folder_path, ignore_errors=True)
del temp_folders[folder_id]
print(f"Cleaned up old temp folder: {folder_id}")
except Exception as e:
print(f"Error cleaning up folder {folder_id}: {e}")
@app.route('/api/preset/upload', methods=['POST'])
def upload_preset():
"""Upload a preset to the local presets folder via base64"""
try:
data = request.json
if not data:
return jsonify({'success': False, 'error': 'No data provided'}), 400
image_base64 = data.get('image')
filename = data.get('filename')
if not image_base64 or not filename:
return jsonify({'success': False, 'error': 'Missing image or filename'}), 400
presets_folder = os.path.join(os.getcwd(), 'presets')
os.makedirs(presets_folder, exist_ok=True)
# Clean filename to ensure it doesn't overwrite unexpectedly or have bad chars
safe_name = os.path.basename(filename)
# Add timestamp to avoid collisions
name_parts = os.path.splitext(safe_name)
final_filename = f"{name_parts[0]}_{int(time.time())}{name_parts[1]}"
file_path = os.path.join(presets_folder, final_filename)
# Extract base64 data
if 'base64,' in image_base64:
image_base64 = image_base64.split('base64,')[1]
image_data = base64.b64decode(image_base64)
with open(file_path, 'wb') as f:
f.write(image_data)
return jsonify({
'success': True,
'message': 'Preset uploaded successfully',
'filename': final_filename
})
except Exception as e:
print(f"Error uploading preset: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
print("Starting Shinyy's Face Swapper Web Server...")
print(f"Open your browser and go to: http://localhost:{WEB_SERVER_PORT}")
app.run(host='0.0.0.0', port=WEB_SERVER_PORT, debug=False)