#!/usr/bin/env python3 """ Simple Flask Backend for Shinyy's Face Swapper HTML Website """ from flask import Flask, request, jsonify, send_file from flask_cors import CORS import os from pathlib import Path import tempfile import shutil import uuid import glob import logging import sys import time from datetime import datetime try: import cv2 import numpy as np CV2_AVAILABLE = True except ImportError as e: print(f"Warning: OpenCV/NumPy not available: {e}") CV2_AVAILABLE = False cv2 = None np = None import base64 from io import BytesIO from PIL import Image import json import requests try: import imageio IMAGEIO_AVAILABLE = True except ImportError as e: print(f"Warning: imageio not available: {e}") IMAGEIO_AVAILABLE = False imageio = None # Import the face swapper try: from SinglePhoto import FaceSwapper FACE_SWAPPER_AVAILABLE = True except ImportError as e: print(f"Warning: FaceSwapper not available due to import error: {e}") print("Video processing will work in simulation mode only") FACE_SWAPPER_AVAILABLE = False FaceSwapper = None # Import enhanced face swapper if available try: from EnhancedFaceSwapper import EnhancedFaceSwapper from QualityPresets import QualityPresets, create_enhanced_swapper_with_quality ENHANCED_SWAPPER_AVAILABLE = True print("Enhanced face swapper loaded successfully!") except ImportError as e: print(f"Enhanced face swapper not available: {e}") ENHANCED_SWAPPER_AVAILABLE = False EnhancedFaceSwapper = None QualityPresets = None create_enhanced_swapper_with_quality = None app = Flask(__name__) CORS(app) # Enable CORS for all routes # Use different port to avoid conflicts - 7860 is required for Hugging Face WEB_SERVER_PORT = 7860 # Configure comprehensive logging def setup_logging(): """Setup detailed logging for console output""" # Create custom formatter for better readability class CustomFormatter(logging.Formatter): def format(self, record): # Add timestamp and format with colors timestamp = datetime.now().strftime('%H:%M:%S') level_color = { 'DEBUG': '\033[36m', # Cyan 'INFO': '\033[32m', # Green 'WARNING': '\033[33m', # Yellow 'ERROR': '\033[31m', # Red 'CRITICAL': '\033[35m', # Magenta }.get(record.levelname, '\033[0m') reset_color = '\033[0m' # Format: [TIME] LEVEL | MESSAGE return f"[{timestamp}] {level_color}{record.levelname}{reset_color} | {record.getMessage()}" # Setup root logger root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) # Remove existing handlers for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) # Add console handler with custom formatter console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(CustomFormatter()) root_logger.addHandler(console_handler) return root_logger # Initialize logging logger = setup_logging() # Global video processing progress tracking video_progress = { 'processing': False, 'phase': 'idle', 'total_frames': 0, 'processed_frames': 0, 'current_frame_base64': None, 'start_time': None, 'mode': None, 'fps': None, 'resolution': None, 'file_size': None, 'processing_speed': None, 'avg_frame_time': None, 'estimated_total_time': None, 'countdown_time': None } # Disable Flask auto-reload and other restart triggers app.config['DEBUG'] = False app.config['TEMPLATES_AUTO_RELOAD'] = False app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 # Initialize face swapper with GPU optimization if FACE_SWAPPER_AVAILABLE: try: # Try to initialize with GPU acceleration first try: swapper = FaceSwapper(gpu_enabled=True, gpu_id=0) gpu_info = swapper.get_gpu_info() print(f"FaceSwapper loaded with GPU acceleration!") print(f"GPU Info: {gpu_info}") except Exception as gpu_error: print(f"GPU initialization failed: {gpu_error}") print("Falling back to CPU-only mode...") swapper = FaceSwapper(gpu_enabled=False, gpu_id=-1) print("FaceSwapper loaded in CPU mode!") FACE_SWAPPER_AVAILABLE = True except Exception as e: print(f"Error loading FaceSwapper: {e}") swapper = None FACE_SWAPPER_AVAILABLE = False else: swapper = None print("Running in simulation mode - FaceSwapper not available") # Temporary storage for uploaded images UPLOAD_FOLDER = 'temp_uploads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) # Temporary folder system for compressor transfers COMPRESSOR_TEMP_FOLDER = 'temp_compressor' os.makedirs(COMPRESSOR_TEMP_FOLDER, exist_ok=True) # Dictionary to track temp folders and their creation times temp_folders = {} # Log server startup logger.info("=" * 60) logger.info("SHINYY'S FACE SWAPPER SERVER STARTING") logger.info("=" * 60) logger.info(f"Upload folder: {UPLOAD_FOLDER}") logger.info(f"OpenCV Available: {CV2_AVAILABLE}") logger.info(f"Face Swapper Available: {FACE_SWAPPER_AVAILABLE}") logger.info("=" * 60) def base64_to_image(base64_string): """Convert base64 string to OpenCV image""" if not CV2_AVAILABLE: # Return PIL image if cv2 not available if 'base64,' in base64_string: base64_string = base64_string.split('base64,')[1] image_data = base64.b64decode(base64_string) return Image.open(BytesIO(image_data)) # Remove data URL prefix if present if 'base64,' in base64_string: base64_string = base64_string.split('base64,')[1] # Decode base64 image_data = base64.b64decode(base64_string) # Convert to PIL Image pil_image = Image.open(BytesIO(image_data)) # Convert to OpenCV format cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) return cv_image def image_to_base64(image): """Convert image to base64 string""" if not CV2_AVAILABLE: # Handle PIL image if cv2 not available if isinstance(image, Image.Image): buffer = BytesIO() image.save(buffer, format='JPEG') image_str = base64.b64encode(buffer.getvalue()).decode() return f"data:image/jpeg;base64,{image_str}" else: # Assume it's already a base64 string return image # Convert to RGB for OpenCV images rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert to PIL Image pil_image = Image.fromarray(rgb_image) # Convert to base64 buffer = BytesIO() pil_image.save(buffer, format='JPEG') image_str = base64.b64encode(buffer.getvalue()).decode() return f"data:image/jpeg;base64,{image_str}" # Helper functions for enhanced multi-swap features def apply_face_alignment(image): """Apply basic face alignment to source image""" try: # Simple alignment using histogram equalization gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) aligned = cv2.equalizeHist(gray) aligned_bgr = cv2.cvtColor(aligned, cv2.COLOR_GRAY2BGR) return aligned_bgr except: return image def apply_enhanced_swap(source_path, target_path, source_face_idx, face_id, swap_hair, quality): """Apply enhanced face swap with better processing""" try: # Use higher quality processing for enhanced mode if quality in ['quality', 'ultra']: # Apply some preprocessing source_img = cv2.imread(source_path) source_img = cv2.bilateralFilter(source_img, 15, 80, 80) cv2.imwrite(source_path, source_img) return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) except: return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) def apply_precise_swap(source_path, target_path, source_face_idx, face_id, swap_hair, face_size): """Apply precise face swap with size control""" try: result = swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) # Apply precise size adjustments if face_size == 'precise': result = cv2.bilateralFilter(result, 5, 50, 50) return result except: return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) def apply_artistic_swap(source_path, target_path, source_face_idx, face_id, swap_hair): """Apply artistic face swap with creative effects""" try: result = swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) # Apply artistic filters result = cv2.detailEnhance(result, sigma_s=10, sigma_r=0.15) return result except: return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) def apply_face_enhancement(image, level): """Apply face enhancement based on level""" try: if level == 'subtle': image = cv2.bilateralFilter(image, 5, 30, 30) elif level == 'medium': image = cv2.bilateralFilter(image, 9, 50, 50) elif level == 'strong': image = cv2.detailEnhance(image, sigma_s=5, sigma_r=0.2) return image except: return image def apply_skin_tone_matching(swapped_face, target_image, face_id, level): """Apply skin tone matching between swapped and target""" try: faces = swapper.app.get(target_image) faces = sorted(faces, key=lambda x: x.bbox[0]) if face_id <= len(faces): face = faces[face_id - 1] x1, y1, x2, y2 = [int(v) for v in face.bbox] original_face = target_image[y1:y2, x1:x2] # Simple color balance adjustment if level == 'subtle': alpha = 0.3 elif level == 'medium': alpha = 0.5 else: # strong alpha = 0.7 blended = cv2.addWeighted(swapped_face, 1-alpha, original_face, alpha, 0) return blended return swapped_face except: return swapped_face def apply_face_size_adjustment(face, size_option): """Apply face size adjustments""" try: if size_option == 'shrink': scale = 0.9 elif size_option == 'expand': scale = 1.1 else: # precise scale = 0.95 h, w = face.shape[:2] new_h, new_w = int(h * scale), int(w * scale) resized = cv2.resize(face, (new_w, new_h)) if scale < 1.0: # Pad to original size pad_h = (h - new_h) // 2 pad_w = (w - new_w) // 2 # Ensure indices are integers to prevent slice errors pad_h = int(pad_h) pad_w = int(pad_w) padded = cv2.copyMakeBorder(resized, pad_h, h-new_h-pad_h, pad_w, w-new_w-pad_w, cv2.BORDER_REPLICATE) return padded else: # Crop to original size crop_h = (new_h - h) // 2 crop_w = (new_w - w) // 2 # Ensure indices are integers to prevent slice errors crop_h = int(crop_h) crop_w = int(crop_w) cropped = resized[crop_h:crop_h+h, crop_w:crop_w+w] return cropped except: return face def apply_lighting_preservation(swapped_face, original_face): """Preserve original lighting conditions""" try: # Convert to LAB color space for lighting preservation swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB) original_lab = cv2.cvtColor(original_face, cv2.COLOR_BGR2LAB) # Copy lighting from original swapped_lab[:,:,0] = original_lab[:,:,0] # Convert back to BGR result = cv2.cvtColor(swapped_lab, cv2.COLOR_LAB2BGR) return result except: return swapped_face def apply_auto_enhancement(face): """Apply automatic enhancement""" try: # Contrast and brightness adjustment enhanced = cv2.convertScaleAbs(face, alpha=1.1, beta=5) return enhanced except: return face def enhanced_face_alignment(source_img, target_img, source_face, target_face): """Enhanced face alignment using facial landmarks for better positioning""" try: # Get facial landmarks src_kps = source_face.kps dst_kps = target_face.kps # Use 5-point facial landmarks for better alignment # Points: 0=left eye, 1=right eye, 2=nose tip, 3=left mouth, 4=right mouth src_pts = np.array(src_kps, dtype=np.float32) dst_pts = np.array(dst_kps, dtype=np.float32) # Calculate similarity transform for better alignment than affine h, w = target_img.shape[:2] M = cv2.estimateAffinePartial2D(src_pts[:3], dst_pts[:3])[0] if M is not None: # Apply transform to source image for better alignment aligned_source = cv2.warpAffine(source_img, M, (w, h), borderMode=cv2.BORDER_REFLECT_101) return aligned_source else: return source_img except Exception as e: print(f"Face alignment enhancement failed: {e}") return source_img def advanced_color_matching(swapped_face, target_region, target_face_bbox): """Advanced color matching using LAB color space and histogram matching""" try: # Convert to LAB color space for better color separation swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB) target_lab = cv2.cvtColor(target_region, cv2.COLOR_BGR2LAB) # Apply histogram matching for each channel for i in range(3): # L, A, B channels swapped_hist = cv2.calcHist([swapped_lab], [i], None, [256], [0, 256]) target_hist = cv2.calcHist([target_lab], [i], None, [256], [0, 256]) # Normalize histograms swapped_hist = swapped_hist / swapped_hist.sum() target_hist = target_hist / target_hist.sum() # Create lookup table for histogram matching lut = create_histogram_lut(swapped_hist, target_hist) swapped_lab[:,:,i] = cv2.LUT(swapped_lab[:,:,i], lut) # Convert back to BGR enhanced_face = cv2.cvtColor(swapped_lab, cv2.COLOR_LAB2BGR) # Blend with original to maintain natural look alpha = 0.7 # 70% enhanced, 30% original final_face = cv2.addWeighted(enhanced_face, alpha, swapped_face, 1-alpha, 0) return final_face except Exception as e: print(f"Color matching enhancement failed: {e}") return swapped_face def create_histogram_lut(source_hist, target_hist): """Create lookup table for histogram matching""" lut = np.zeros(256, dtype=np.uint8) source_cdf = source_hist.cumsum() target_cdf = target_hist.cumsum() for i in range(256): source_val = source_cdf[i] target_idx = np.argmin(np.abs(target_cdf - source_val)) lut[i] = target_idx return lut def seamless_multi_band_blending(swapped_face, target_img, target_face_bbox): """Seamless blending using multi-band blending for natural integration""" try: x1, y1, x2, y2 = target_face_bbox # Create mask for face region mask = np.zeros(target_img.shape[:2], dtype=np.uint8) mask[y1:y2, x1:x2] = 255 # Apply Gaussian blur to mask for smooth edges mask_blurred = cv2.GaussianBlur(mask, (51, 51), 0) mask_blurred = mask_blurred.astype(np.float32) / 255.0 # Multi-band blending result = target_img.copy().astype(np.float32) # Create pyramid for seamless blending levels = 5 pyramid_swapped = create_gaussian_pyramid(swapped_face.astype(np.float32), levels) pyramid_target = create_gaussian_pyramid(target_img[y1:y2, x1:x2].astype(np.float32), levels) pyramid_mask = create_gaussian_pyramid(mask_blurred[y1:y2, x1:x2], levels) # Blend pyramids blended_pyramid = [] for i in range(levels): if i < len(pyramid_swapped) and i < len(pyramid_target) and i < len(pyramid_mask): blended = (pyramid_swapped[i] * pyramid_mask[i] + pyramid_target[i] * (1 - pyramid_mask[i])) blended_pyramid.append(blended) # Reconstruct from pyramid if blended_pyramid: blended_face = reconstruct_from_pyramid(blended_pyramid) result[y1:y2, x1:x2] = blended_face else: # Fallback to simple blending mask_3d = np.stack([mask_blurred[y1:y2, x1:x2]] * 3, axis=-1) result[y1:y2, x1:x2] = (swapped_face.astype(np.float32) * mask_3d + target_img[y1:y2, x1:x2].astype(np.float32) * (1 - mask_3d)) return result.astype(np.uint8) except Exception as e: print(f"Seamless blending failed: {e}") # Fallback to simple paste result = target_img.copy() x1, y1, x2, y2 = target_face_bbox result[y1:y2, x1:x2] = swapped_face return result def create_gaussian_pyramid(img, levels): """Create Gaussian pyramid for multi-band blending""" pyramid = [img] current = img for i in range(levels - 1): current = cv2.pyrDown(current) pyramid.append(current) return pyramid def reconstruct_from_pyramid(pyramid): """Reconstruct image from Gaussian pyramid""" result = pyramid[-1] for i in range(len(pyramid) - 2, -1, -1): result = cv2.pyrUp(result) if result.shape[:2] != pyramid[i].shape[:2]: result = cv2.resize(result, (pyramid[i].shape[1], pyramid[i].shape[0])) result = result + pyramid[i] return result def apply_edge_smoothing(face_img): """Apply edge smoothing to reduce blocky appearance in face swaps""" try: if not CV2_AVAILABLE: return face_img # Apply bilateral filter for edge-preserving smoothing # This reduces blocky edges while preserving important details smoothed = cv2.bilateralFilter(face_img, 5, 60, 60) # Apply subtle Gaussian blur to further smooth edges smoothed = cv2.GaussianBlur(smoothed, (3, 3), 0.5) # Blend with original to maintain natural look alpha = 0.9 # 90% smoothed, 10% original final_result = cv2.addWeighted(smoothed, alpha, face_img, 1-alpha, 0) return final_result except Exception as e: print(f"Edge smoothing failed: {e}") return face_img def smooth_face_blend(swapped_face, target_region, target_face_bbox): """Enhanced face blending with improved accuracy""" try: # Use the new seamless blending for better results # Create a dummy target image for the blending function h, w = target_region.shape[:2] dummy_target = np.zeros((h * 2, w * 2, 3), dtype=np.uint8) dummy_target[:h, :w] = target_region # Adjust bbox for the dummy target adjusted_bbox = (0, 0, w, h) # Apply seamless blending result = seamless_multi_band_blending(swapped_face, dummy_target, adjusted_bbox) # Extract the blended face region blended_face = result[:h, :w] return blended_face except Exception as e: print(f"Enhanced face blending error: {e}") # Fallback to minimal blending h, w = swapped_face.shape[:2] # Create elliptical mask for face shape center = (w // 2, h // 2) axes = (w // 2 - 5, h // 2 - 5) # Generate smooth elliptical mask mask = np.zeros((h, w), dtype=np.uint8) cv2.ellipse(mask, center, axes, 0, 0, 360, 255, -1) # Apply light Gaussian blur to mask mask_blurred = cv2.GaussianBlur(mask, (11, 11), 0) mask_blurred = mask_blurred.astype(np.float32) / 255.0 # Apply the feathered mask mask_3d = np.stack([mask_blurred] * 3, axis=-1) # Blend the face blended_face = (swapped_face.astype(np.float32) * mask_3d + target_region.astype(np.float32) * (1 - mask_3d)) return np.clip(blended_face, 0, 255).astype(np.uint8) def natural_color_match(swapped_face, target_region): """Enhanced color matching using LAB histogram matching for better accuracy""" try: # Use the advanced color matching for better results enhanced_face = advanced_color_matching(swapped_face, target_region, (0, 0, swapped_face.shape[1], swapped_face.shape[0])) return enhanced_face except Exception as e: print(f"Enhanced color matching failed, using fallback: {e}") # Fallback to minimal color matching try: # Convert to YCrCb color space for gentle skin tone adjustment swapped_ycrcb = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2YCrCb) target_ycrcb = cv2.cvtColor(target_region, cv2.COLOR_BGR2YCrCb) # Get mean values for skin tone channels swapped_mean = np.mean(swapped_ycrcb, axis=(0, 1)) target_mean = np.mean(target_ycrcb, axis=(0, 1)) # Gentle color correction y_ratio = target_mean[0] / swapped_mean[0] if swapped_mean[0] > 0 else 1.0 y_ratio = np.clip(y_ratio, 0.95, 1.05) cr_diff = (target_mean[1] - swapped_mean[1]) * 0.1 cb_diff = (target_mean[2] - swapped_mean[2]) * 0.1 # Apply color correction corrected_ycrcb = swapped_ycrcb.copy() corrected_ycrcb[:, :, 0] = np.clip(corrected_ycrcb[:, :, 0] * y_ratio, 0, 255) corrected_ycrcb[:, :, 1] = np.clip(corrected_ycrcb[:, :, 1] + cr_diff, 0, 255) corrected_ycrcb[:, :, 2] = np.clip(corrected_ycrcb[:, :, 2] + cb_diff, 0, 255) # Convert back to BGR corrected_face = cv2.cvtColor(corrected_ycrcb, cv2.COLOR_YCrCb2BGR) # Blend with original alpha = 0.9 final_face = cv2.addWeighted(corrected_face, alpha, swapped_face, 1 - alpha, 0) return final_face except Exception as fallback_error: print(f"Fallback color matching also failed: {fallback_error}") return swapped_face @app.route('/') def index(): """Serve the main HTML page""" return send_file('index.html') @app.route('/compressor.html') def compressor(): """Serve the compressor HTML page""" return send_file('compressor.html') @app.route('/compressor') def compressor_redirect(): """Serve compressor page (redirect from /compressor)""" return send_file('compressor.html') @app.route('/compressor/') def compressor_with_slash(): """Serve compressor page with slash""" return send_file('compressor.html') @app.route('/index') def index_page(): """Serve index page (same as main route)""" return send_file('index.html') def extract_video_frames(video_path, frames_dir): """Extract frames from video file""" if not CV2_AVAILABLE: raise ImportError("OpenCV is required for video processing") if not os.path.exists(frames_dir): os.makedirs(frames_dir) cap = cv2.VideoCapture(video_path) frame_paths = [] idx = 0 while True: ret, frame = cap.read() if not ret: break frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg") cv2.imwrite(frame_path, frame) frame_paths.append(frame_path) idx += 1 cap.release() return frame_paths def create_video_from_frames(frames_dir, output_video_path, fps): """Create video from processed frames""" if not CV2_AVAILABLE: raise ImportError("OpenCV is required for video processing") frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) if not frames: raise ValueError("No frames found in directory") first_frame = cv2.imread(frames[0]) height, width, layers = first_frame.shape fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) for frame_path in frames: frame = cv2.imread(frame_path) out.write(frame) out.release() def get_video_fps(video_path): """Get FPS from video file""" if not CV2_AVAILABLE: return 30.0 # Default FPS if OpenCV not available cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() return fps if fps > 0 else 30.0 @app.route('/api/gpu_status', methods=['GET']) def get_gpu_status(): """Get GPU acceleration status and information""" try: if not swapper: return jsonify({ 'gpu_available': False, 'message': 'Face swapper not initialized' }) gpu_info = swapper.get_gpu_info() return jsonify({ 'success': True, 'gpu_info': gpu_info, 'last_processing_time': getattr(swapper, 'last_processing_time', 0), 'message': 'GPU status retrieved successfully' }) except Exception as e: logger.error(f"GPU status error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/swap', methods=['POST']) def swap_faces(): """Handle face swapping request with GPU optimization""" start_time = time.time() logger.info("FACE SWAP REQUEST RECEIVED") try: data = request.json source_image_data = data.get('source_image') target_image_data = data.get('target_image') source_face_idx = int(data.get('source_face_idx', 1)) target_face_idx = int(data.get('target_face_idx', 1)) selected_model = data.get('model', 'inswapper_128.onnx') logger.info(f"Request parameters:") logger.info(f"Source face index: {source_face_idx}") logger.info(f"Target face index: {target_face_idx}") logger.info(f"Using Model: {selected_model}") if not source_image_data or not target_image_data: logger.error("Missing source or target image") return jsonify({'error': 'Missing source or target image'}), 400 if not swapper: logger.error("Face swapper not initialized") return jsonify({'error': 'Face swapper not initialized'}), 500 logger.info("Converting base64 images to OpenCV format...") # Convert base64 to OpenCV images source_image = base64_to_image(source_image_data) target_image = base64_to_image(target_image_data) logger.info(f"Image dimensions:") logger.info(f"Source: {source_image.shape[1]}x{source_image.shape[0]}") logger.info(f"Target: {target_image.shape[1]}x{target_image.shape[0]}") # Save temporary files logger.info("Saving temporary files...") source_path = os.path.join(UPLOAD_FOLDER, 'source.jpg') target_path = os.path.join(UPLOAD_FOLDER, 'target.jpg') result_path = os.path.join(UPLOAD_FOLDER, 'result.jpg') cv2.imwrite(source_path, source_image) cv2.imwrite(target_path, target_image) logger.info(f"Temporary files saved:") logger.info(f"Source: {source_path}") logger.info(f"Target: {target_path}") logger.info(f"Result: {result_path}") # Perform face swap with GPU acceleration logger.info("Performing face swap...") swap_start = time.time() try: result = swapper.swap_faces( source_path, source_face_idx, target_path, target_face_idx, swap_hair=False, model_name=selected_model ) except TypeError: result = swapper.swap_faces( source_path, source_face_idx, target_path, target_face_idx, swap_hair=False ) swap_time = time.time() - swap_start logger.info(f"Face swap completed in {swap_time:.2f} seconds") # Apply edge smoothing to reduce blocky appearance logger.info("Applying edge smoothing...") result = apply_edge_smoothing(result) # Save result logger.info("Saving result image...") cv2.imwrite(result_path, result) # Convert result to base64 logger.info("Converting result to base64...") result_base64 = image_to_base64(result) total_time = time.time() - start_time logger.info(f"FACE SWAP COMPLETED SUCCESSFULLY") logger.info(f"Total processing time: {total_time:.2f} seconds") logger.info(f"Result size: {len(result_base64)} chars") # Include GPU status in response gpu_status = getattr(swapper, 'gpu_enabled', False) return jsonify({ 'success': True, 'result_image': result_base64, 'message': f'Face swap completed successfully! (GPU: {"Enabled" if gpu_status else "Disabled"})', 'processing_time': total_time, 'gpu_accelerated': gpu_status }) except Exception as e: logger.error(f"FACE SWAP ERROR: {str(e)}") logger.error(f"Error location: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': str(e)}), 500 @app.route('/api/detect_faces', methods=['POST']) def detect_faces(): """Detect faces in an image""" start_time = time.time() logger.info("FACE DETECTION REQUEST RECEIVED") try: data = request.json image_data = data.get('image') logger.info(f"Image data length: {len(image_data) if image_data else 0} chars") if not image_data: logger.error("Missing image data") return jsonify({'error': 'Missing image data'}), 400 if not swapper: logger.error("Face swapper not initialized") return jsonify({'error': 'Face swapper not initialized'}), 500 logger.info("Converting base64 image to OpenCV format...") # Convert base64 to OpenCV image image = base64_to_image(image_data) logger.info(f"Image dimensions: {image.shape[1]}x{image.shape[0]}") # Detect faces logger.info("Detecting faces in image...") detection_start = time.time() faces = swapper.app.get(image) detection_time = time.time() - detection_start logger.info(f"Face detection completed in {detection_time:.2f} seconds") logger.info(f"Found {len(faces)} face(s)") # Sort faces from left to right faces = sorted(faces, key=lambda x: x.bbox[0]) logger.info("Sorted faces from left to right") # Prepare face data logger.info("Preparing face data...") detected_faces = [] for i, face in enumerate(faces): x1, y1, x2, y2 = [int(v) for v in face.bbox] logger.info(f"Face {i+1}: bbox=({x1},{y1},{x2},{y2}), size={x2-x1}x{y2-y1}") # Extract face region face_region = image[y1:y2, x1:x2] # Convert to base64 face_base64 = image_to_base64(face_region) detected_faces.append({ 'id': i + 1, 'label': f'Face {i + 1}', 'image': face_base64, 'bbox': [x1, y1, x2, y2], 'x': x1, 'y': y1, 'width': x2 - x1, 'height': y2 - y1 }) total_time = time.time() - start_time logger.info(f"FACE DETECTION COMPLETED SUCCESSFULLY") logger.info(f"Total processing time: {total_time:.2f} seconds") logger.info(f"Detected {len(detected_faces)} faces with bounding boxes") return jsonify({ 'success': True, 'faces': detected_faces, 'message': f'Detected {len(detected_faces)} faces', 'processing_time': total_time }) except Exception as e: logger.error(f"FACE DETECTION ERROR: {str(e)}") logger.error(f"Error location: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': str(e)}), 500 @app.route('/api/enhanced_swap', methods=['POST']) def enhanced_swap_faces(): """Handle enhanced face swapping request with quality presets""" start_time = time.time() logger.info("ENHANCED SWAP REQUEST RECEIVED") try: data = request.json source_image_data = data.get('source_image') target_image_data = data.get('target_image') source_face_idx = int(data.get('source_face_idx', 1)) target_face_idx = int(data.get('target_face_idx', 1)) quality_preset = data.get('quality_preset', 'balanced') logger.info(f"Request parameters:") logger.info(f"Source image data length: {len(source_image_data) if source_image_data else 0} chars") logger.info(f"Target image data length: {len(target_image_data) if target_image_data else 0} chars") logger.info(f"Source face index: {source_face_idx}") logger.info(f"Target face index: {target_face_idx}") logger.info(f"Quality preset: {quality_preset}") if not source_image_data or not target_image_data: logger.error("Missing source or target image") return jsonify({'error': 'Missing source or target image'}), 400 # Check if enhanced swapper is available if not ENHANCED_SWAPPER_AVAILABLE: logger.warning("Enhanced swapper not available, falling back to basic swapper") return jsonify({ 'error': 'Enhanced swapper not available. Please install EnhancedFaceSwapper.py and QualityPresets.py', 'fallback_available': FACE_SWAPPER_AVAILABLE }), 501 # Create enhanced swapper with quality preset try: enhanced_swapper = create_enhanced_swapper_with_quality(quality_preset) preset_info = QualityPresets.get_preset(quality_preset) logger.info(f"Using quality preset: {preset_info['name']}") logger.info(f"Expected processing time: {preset_info['processing_time']}") logger.info(f"Quality score: {preset_info['quality_score']}") except Exception as e: logger.error(f"Failed to create enhanced swapper: {e}") return jsonify({'error': f'Failed to create enhanced swapper: {str(e)}'}), 500 # Convert base64 to images source_image = base64_to_image(source_image_data) target_image = base64_to_image(target_image_data) if source_image is None or target_image is None: logger.error("Failed to convert base64 to image") return jsonify({'error': 'Failed to decode images'}), 400 # Save temporary files timestamp = int(time.time()) source_path = os.path.join(UPLOAD_FOLDER, f'enhanced_source_{timestamp}.jpg') target_path = os.path.join(UPLOAD_FOLDER, f'enhanced_target_{timestamp}.jpg') # Save images based on their type if CV2_AVAILABLE and isinstance(source_image, np.ndarray): cv2.imwrite(source_path, source_image) elif isinstance(source_image, Image.Image): source_image.save(source_path) else: logger.error(f"Invalid source image type: {type(source_image)}") return jsonify({'error': 'Invalid source image format'}), 400 if CV2_AVAILABLE and isinstance(target_image, np.ndarray): cv2.imwrite(target_path, target_image) elif isinstance(target_image, Image.Image): target_image.save(target_path) else: logger.error(f"Invalid target image type: {type(target_image)}") return jsonify({'error': 'Invalid target image format'}), 400 logger.info(f"Saved temporary files: {source_path}, {target_path}") # Perform enhanced face swapping try: logger.info("Starting enhanced face swapping...") result_image = enhanced_swapper.swap_faces_enhanced( source_path, target_path, source_face_idx, target_face_idx ) if result_image is None: logger.error("Enhanced face swapping returned None") return jsonify({'error': 'Enhanced face swapping failed'}), 500 logger.info("Enhanced face swapping completed successfully") except Exception as e: logger.error(f"Enhanced face swapping error: {e}") logger.error(f"Error type: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': f'Enhanced face swapping failed: {str(e)}'}), 500 # Save result result_path = os.path.join(UPLOAD_FOLDER, f'enhanced_result_{timestamp}.jpg') if CV2_AVAILABLE: cv2.imwrite(result_path, result_image) else: result_pil = Image.fromarray(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB)) result_pil.save(result_path) # Convert to base64 result_base64 = image_to_base64(result_path) # Clean up temporary files try: os.unlink(source_path) os.unlink(target_path) os.unlink(result_path) except: pass processing_time = time.time() - start_time logger.info(f"Enhanced swapping completed in {processing_time:.2f} seconds") return jsonify({ 'result_image': result_base64, 'processing_time': processing_time, 'quality_preset': quality_preset, 'preset_info': preset_info, 'enhanced_features': { 'face_structure_matching': enhanced_swapper.face_structure_matching, 'color_correction': enhanced_swapper.color_correction_enabled, 'seamless_blending': enhanced_swapper.seamless_blending_enabled, 'detail_enhancement': enhanced_swapper.enhancement_enabled } }) except Exception as e: logger.error(f"Enhanced swap endpoint error: {e}") logger.error(f"Error type: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': str(e)}), 500 @app.route('/api/quality_presets', methods=['GET']) def get_quality_presets(): """Get available quality presets for enhanced face swapping""" try: if not ENHANCED_SWAPPER_AVAILABLE: return jsonify({ 'error': 'Enhanced swapper not available', 'fallback_available': FACE_SWAPPER_AVAILABLE, 'presets': [] }), 501 presets = QualityPresets.get_all_presets() preset_names = QualityPresets.get_preset_names() logger.info(f"Providing {len(presets)} quality presets: {preset_names}") return jsonify({ 'presets': presets, 'preset_names': preset_names, 'enhanced_available': ENHANCED_SWAPPER_AVAILABLE, 'default_preset': 'balanced' }) except Exception as e: logger.error(f"Error getting quality presets: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/multi_swap', methods=['POST']) def multi_swap(): """Handle multi-face swapping with advanced un-cut processing to prevent overlap""" start_time = time.time() logger.info("MULTI-SWAP REQUEST RECEIVED") try: data = request.json target_image_data = data.get('target_image') assignments = data.get('assignments', {}) selected_model = data.get('model', 'inswapper_128.onnx') logger.info(f"Request parameters:") logger.info(f"Using Model: {selected_model}") logger.info(f"Target image data length: {len(target_image_data) if target_image_data else 0} chars") logger.info(f"Number of face assignments: {len(assignments)}") for face_id, source_data in assignments.items(): logger.info(f"Face {face_id}: {len(source_data) if source_data else 0} chars source data") if not target_image_data: logger.error("Missing target image") return jsonify({'error': 'Missing target image'}), 400 if not swapper: logger.error("Face swapper not initialized") return jsonify({'error': 'Face swapper not initialized'}), 500 logger.info("Converting target image to OpenCV format...") target_image = base64_to_image(target_image_data) logger.info(f"Target image dimensions: {target_image.shape[1]}x{target_image.shape[0]}") # Start with target image result = target_image.copy() logger.info("Created result image copy") # Process each face swap sequentially face_ids = sorted(assignments.keys(), key=int) logger.info(f"Processing {len(face_ids)} face assignments in order: {face_ids}") processed_faces = 0 for target_face_id in face_ids: source_image_data = assignments[target_face_id] if source_image_data: logger.info(f"Processing face {target_face_id}...") # Convert source image source_image = base64_to_image(source_image_data) # Save temporary files logger.info(f"Saving temporary files for face {target_face_id}...") source_path = os.path.join(UPLOAD_FOLDER, f'source_{target_face_id}.jpg') target_path = os.path.join(UPLOAD_FOLDER, f'target_{target_face_id}.jpg') cv2.imwrite(source_path, source_image) cv2.imwrite(target_path, result) # Use current result, not original target # Perform face swap directly without cutting/pasting rectangles logger.info(f"Performing face swap for face {target_face_id}...") swap_start = time.time() try: swapped = swapper.swap_faces( source_path, 1, # Use first face from source target_path, int(target_face_id), swap_hair=False, model_name=selected_model ) except TypeError: swapped = swapper.swap_faces( source_path, 1, target_path, int(target_face_id), swap_hair=False ) swap_time = time.time() - swap_start logger.info(f"Face swap completed in {swap_time:.2f} seconds") if swapped is not None: # Directly assign the seamlessly blended output back to result result = swapped processed_faces += 1 logger.info(f"Successfully applied face {target_face_id} to result") else: logger.warning(f"Face {target_face_id} swap failed to return an image.") else: logger.warning(f"No source image data for face {target_face_id}") total_time = time.time() - start_time logger.info(f"MULTI-SWAP COMPLETED SUCCESSFULLY") logger.info(f"Total processing time: {total_time:.2f} seconds") logger.info(f"Processed {processed_faces}/{len(face_ids)} faces successfully") # Convert result to base64 logger.info("Converting final result to base64...") result_base64 = image_to_base64(result) logger.info(f"Result size: {len(result_base64)} chars") return jsonify({ 'success': True, 'result_image': result_base64, 'message': f'Multi-face swap completed! Processed {processed_faces}/{len(face_ids)} faces.', 'processing_time': total_time, 'faces_processed': processed_faces, 'total_faces': len(face_ids) }) except Exception as e: logger.error(f"MULTI-SWAP ERROR: {str(e)}") logger.error(f"Error location: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': str(e)}), 500 @app.route('/api/video_swap', methods=['POST']) def video_swap(): """Handle video face swapping - simplified like working app.py""" start_time = time.time() logger.info("VIDEO SWAP REQUEST RECEIVED") try: data = request.json source_image_data = data.get('source_image') video_data = data.get('video') source_face_idx = int(data.get('source_face_idx', 1)) target_face_idx = int(data.get('target_face_idx', 1)) selected_model = data.get('model', 'inswapper_128.onnx') logger.info(f"Request parameters:") logger.info(f" • Source face index: {source_face_idx}") logger.info(f" • Target face index: {target_face_idx}") logger.info(f" • Selected Model: {selected_model}") logger.info(f" • Source image data length: {len(source_image_data) if source_image_data else 0} chars") logger.info(f" • Video data length: {len(video_data) if video_data else 0} chars") if not source_image_data or not video_data: logger.error("Missing source image or video") return jsonify({'error': 'Missing source image or video'}), 400 if not swapper: logger.error("Face swapper not initialized") return jsonify({'error': 'Face swapper not initialized'}), 500 # Initialize progress tracking global video_progress video_progress = { 'processing': True, 'phase': 'initializing', 'total_frames': 0, 'processed_frames': 0, 'current_frame_base64': None, 'start_time': time.time(), 'mode': 'normal', 'fps': None, 'resolution': None, 'file_size': 0, 'processing_speed': 0, 'avg_frame_time': None, 'estimated_total_time': None, 'countdown_time': None } logger.info("Starting video processing") # Simplified video processing like app.py source_image = base64_to_image(source_image_data) source_path = os.path.join(UPLOAD_FOLDER, f'video_source_{int(time.time())}.jpg') if CV2_AVAILABLE and isinstance(source_image, np.ndarray): cv2.imwrite(source_path, source_image) elif isinstance(source_image, Image.Image): source_image.save(source_path) else: logger.error(f"Invalid source image format: {type(source_image)}") return jsonify({'error': 'Invalid source image format'}), 400 # Convert video data to file video_path = os.path.join(UPLOAD_FOLDER, f'input_video_{int(time.time())}.mp4') if 'base64,' in video_data: video_data = video_data.split('base64,')[1] video_bytes = base64.b64decode(video_data) with open(video_path, 'wb') as f: f.write(video_bytes) video_progress['file_size'] = os.path.getsize(video_path) if os.path.exists(video_path) else 0 logger.info(f"Video file size: {video_progress['file_size'] / (1024*1024):.2f} MB") # Setup processing directories frames_dir = os.path.join(UPLOAD_FOLDER, 'video_frames') swapped_dir = os.path.join(UPLOAD_FOLDER, 'swapped_frames') output_video_path = os.path.join(UPLOAD_FOLDER, f'output_swapped_video_{int(time.time())}.mp4') # Clean up and create directories if os.path.exists(frames_dir): shutil.rmtree(frames_dir) if os.path.exists(swapped_dir): shutil.rmtree(swapped_dir) os.makedirs(frames_dir, exist_ok=True) os.makedirs(swapped_dir, exist_ok=True) try: # Extract frames from video (like app.py) logger.info("Extracting frames from video...") video_progress['phase'] = 'extracting' frame_paths = extract_video_frames(video_path, frames_dir) video_progress['total_frames'] = len(frame_paths) logger.info(f"Extracted {len(frame_paths)} frames") # Get FPS from original video (like app.py) cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) video_progress['fps'] = int(fps) if fps > 0 else 30 video_progress['resolution'] = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) cap.release() logger.info(f"Video: {video_progress['fps']} FPS, {video_progress['resolution']} resolution") # Process frames with face swapping (simplified like app.py) logger.info("Swapping faces on frames...") video_progress['phase'] = 'swapping' processed_count = 0 last_update_time = time.time() # Process ALL frames sequentially (matching working app.py) for idx, frame_path in enumerate(frame_paths): frame_start_time = time.time() swapped_name = f"swapped_{idx:05d}.jpg" out_path = os.path.join(swapped_dir, swapped_name) try: # Simple face swap call (like app.py line 343) try: swapped_frame = swapper.swap_faces( source_path=source_path, source_face_idx=source_face_idx, target_path=frame_path, target_face_idx=target_face_idx, model_name=selected_model ) except TypeError: swapped_frame = swapper.swap_faces( source_path=source_path, source_face_idx=source_face_idx, target_path=frame_path, target_face_idx=target_face_idx ) # Save swapped frame (like app.py line 344) if CV2_AVAILABLE and isinstance(swapped_frame, np.ndarray): cv2.imwrite(out_path, swapped_frame) processed_count += 1 # Update progress tracking video_progress['processed_frames'] = processed_count # Update progress with frame preview every 10 frames if processed_count % 10 == 0 and CV2_AVAILABLE and isinstance(swapped_frame, np.ndarray): _, buffer = cv2.imencode('.jpg', swapped_frame) frame_base64 = base64.b64encode(buffer).decode('utf-8') video_progress['current_frame_base64'] = f"data:image/jpeg;base64,{frame_base64}" # Update progress every 5 frames if processed_count % 5 == 0: elapsed = time.time() - video_progress['start_time'] avg_time = elapsed / processed_count if processed_count > 0 else 0 remaining_frames = len(frame_paths) - processed_count remaining_time = avg_time * remaining_frames # Estimate total time with buffer buffer_time = 10 total_estimated = elapsed + remaining_time + buffer_time video_progress['countdown_time'] = round(total_estimated, 1) mins, secs = divmod(int(remaining_time), 60) logger.info(f"Processed {processed_count}/{len(frame_paths)} frames | Est. time left: {mins:02d}:{secs:02d}") except Exception as e: logger.error(f"Error processing frame {idx}: {e}") # Copy original frame if swap fails (like app.py line 345) if os.path.exists(frame_path): shutil.copy2(frame_path, out_path) processed_count += 1 video_progress['processed_frames'] = processed_count logger.info(f"Face swapping completed. Processed {processed_count} frames.") # Combine swapped frames into video (like app.py line 349-350) logger.info("Combining swapped frames into video...") video_progress['phase'] = 'rendering' create_video_from_frames(swapped_dir, output_video_path, int(video_progress['fps'])) # Convert output video to base64 for response with open(output_video_path, 'rb') as f: video_bytes = f.read() output_video_base64 = base64.b64encode(video_bytes).decode() output_video_data_url = f"data:video/mp4;base64,{output_video_base64}" # Clean up temporary files (like app.py line 355-357) try: shutil.rmtree(frames_dir) shutil.rmtree(swapped_dir) os.remove(video_path) os.remove(source_path) except Exception as cleanup_error: logger.warning(f"Cleanup warning: {cleanup_error}") # Final progress update processing_time = time.time() - video_progress['start_time'] video_progress['phase'] = 'complete' video_progress['processing'] = False logger.info(f"VIDEO SWAPPING COMPLETED SUCCESSFULLY") logger.info(f"Total processing time: {processing_time:.1f} seconds") logger.info(f"Frames processed: {processed_count}") return jsonify({ 'success': True, 'message': f'Video face swap completed successfully!', 'processing_mode': 'Standard face swap processing', 'quality_level': 'High Quality', 'frames_processed': processed_count, 'total_frames': len(frame_paths), 'output_fps': video_progress['fps'], 'processing_time': round(processing_time, 1), 'output_video': output_video_data_url, 'total_frames': len(frame_paths) }) except Exception as processing_error: logger.error(f"VIDEO PROCESSING ERROR: {str(processing_error)}") logger.error(f"Error location: {type(processing_error).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") # Reset progress on error video_progress['processing'] = False video_progress['phase'] = 'error' # Clean up on error try: if os.path.exists(frames_dir): shutil.rmtree(frames_dir) if os.path.exists(swapped_dir): shutil.rmtree(swapped_dir) if os.path.exists(video_path): os.remove(video_path) if os.path.exists(source_path): os.remove(source_path) except: pass return jsonify({'error': f'Video processing failed: {str(processing_error)}'}), 500 except Exception as e: logger.error(f"VIDEO SWAP ERROR: {str(e)}") logger.error(f"Error location: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': str(e)}), 500 @app.route('/api/video_progress', methods=['GET']) def video_progress_endpoint(): """Get current video processing progress""" return jsonify(video_progress) @app.route('/api/multi_combination_swap', methods=['POST']) def multi_combination_swap(): """Handle multiple source and target image combinations with GPU optimization""" start_time = time.time() logger.info("MULTI-COMBINATION SWAP REQUEST RECEIVED") try: data = request.json source_images = data.get('source_images', []) target_images = data.get('target_images', []) selected_model = data.get('model', 'inswapper_128.onnx') logger.info(f"Processing {len(source_images)} source images x {len(target_images)} target images") logger.info(f"Using Model: {selected_model}") if not source_images or not target_images: return jsonify({'error': 'Missing source or target images'}), 400 if not swapper: return jsonify({'error': 'Face swapper not initialized'}), 500 results = [] timestamp = int(time.time()) # Check if GPU batch processing is available and beneficial use_batch_processing = (hasattr(swapper, 'gpu_enabled') and swapper.gpu_enabled and len(source_images) * len(target_images) > 1) if use_batch_processing: logger.info("Using GPU batch processing for optimal performance") # Save all source and target images first source_paths = [] target_paths = [] for i, source_image_data in enumerate(source_images): source_image = base64_to_image(source_image_data) source_path = os.path.join(UPLOAD_FOLDER, f'batch_source_{timestamp}_{i}.jpg') cv2.imwrite(source_path, source_image) source_paths.append(source_path) for j, target_image_data in enumerate(target_images): target_image = base64_to_image(target_image_data) target_path = os.path.join(UPLOAD_FOLDER, f'batch_target_{timestamp}_{j}.jpg') cv2.imwrite(target_path, target_image) target_paths.append(target_path) # Use batch processing for GPU optimization try: # Process first source against all targets as batch for i, source_path in enumerate(source_paths): source_face_indices = [1] # Use first face from each source # Use the optimized batch method try: batch_results = swapper.swap_faces_batch( source_path=source_path, target_path=target_paths[0], # Will be overridden in batch processing source_face_indices=source_face_indices, target_face_indices=list(range(1, len(target_paths) + 1)), swap_hair=False, model_name=selected_model ) except TypeError: batch_results = swapper.swap_faces_batch( source_path=source_path, target_path=target_paths[0], source_face_indices=source_face_indices, target_face_indices=list(range(1, len(target_paths) + 1)), swap_hair=False ) # Convert batch results to response format for j, result in enumerate(batch_results): if j < len(target_paths): # Apply edge smoothing to reduce blocky appearance result = apply_edge_smoothing(result) result_base64 = image_to_base64(result) result_filename = f"combo_{timestamp}_source{i+1}_target{j+1}.jpg" result_path = os.path.join(UPLOAD_FOLDER, result_filename) cv2.imwrite(result_path, result) results.append({ 'combination_name': f'Source {i+1} x Target {j+1}', 'source_index': i, 'target_index': j, 'result_image': result_base64, 'download_url': f'/download/{result_filename}' }) logger.info(f"GPU batch processed: Source {i+1} x Target {j+1}") except Exception as batch_error: logger.warning(f"GPU batch processing failed: {batch_error}") logger.info("Falling back to individual processing...") use_batch_processing = False if not use_batch_processing: logger.info("Using individual processing (CPU or GPU fallback)") # Process all combinations individually (original method with GPU support) for i, source_image_data in enumerate(source_images): for j, target_image_data in enumerate(target_images): try: logger.info(f"Processing combination {i+1}x{j+1}...") # Convert base64 to OpenCV images source_image = base64_to_image(source_image_data) target_image = base64_to_image(target_image_data) # Save temporary files source_path = os.path.join(UPLOAD_FOLDER, f'combo_{timestamp}_source_{i}_{j}.jpg') target_path = os.path.join(UPLOAD_FOLDER, f'combo_{timestamp}_target_{i}_{j}.jpg') cv2.imwrite(source_path, source_image) cv2.imwrite(target_path, target_image) # Perform face swap with GPU acceleration try: swapped_result = swapper.swap_faces( source_path, 1, # Use first face from source (default for multi-combination) target_path, 1, # Use first face from target (default for multi-combination) swap_hair=False, model_name=selected_model ) except TypeError: swapped_result = swapper.swap_faces( source_path, 1, target_path, 1, swap_hair=False ) # Apply edge smoothing to reduce blocky appearance swapped_result = apply_edge_smoothing(swapped_result) # Convert result to base64 result_base64 = image_to_base64(swapped_result) # Save result file for download result_filename = f"combo_{timestamp}_source{i+1}_target{j+1}.jpg" result_path = os.path.join(UPLOAD_FOLDER, result_filename) cv2.imwrite(result_path, swapped_result) results.append({ 'combination_name': f'Source {i+1} x Target {j+1}', 'source_index': i, 'target_index': j, 'result_image': result_base64, 'download_url': f'/download/{result_filename}' }) logger.info(f"Successfully processed combination {i+1}x{j+1}") except Exception as e: logger.error(f"Error processing combination {i+1}x{j+1}: {e}") # Continue with other combinations continue if not results: return jsonify({'error': 'No combinations processed successfully'}), 500 total_time = time.time() - start_time gpu_status = getattr(swapper, 'gpu_enabled', False) logger.info(f"MULTI-COMBINATION SWAP COMPLETED") logger.info(f"Total combinations: {len(source_images) * len(target_images)}") logger.info(f"Successful: {len(results)}") logger.info(f"Processing time: {total_time:.2f}s") logger.info(f"GPU acceleration: {'Enabled' if gpu_status else 'Disabled'}") return jsonify({ 'success': True, 'message': f'Processed {len(results)} combinations successfully! (GPU: {"Enabled" if gpu_status else "Disabled"})', 'results': results, 'total_combinations': len(source_images) * len(target_images), 'successful_combinations': len(results), 'processing_time': total_time, 'gpu_accelerated': gpu_status, 'batch_processing_used': use_batch_processing }) except Exception as e: logger.error(f"MULTI-COMBINATION SWAP ERROR: {str(e)}") logger.error(f"Error location: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': str(e)}), 500 @app.route('/api/test', methods=['GET']) def test_endpoint(): """Simple test endpoint to check if server is responding""" return jsonify({ 'success': True, 'message': 'Server is working correctly', 'timestamp': int(time.time()) }) @app.route('/api/face_morph', methods=['POST']) def face_morph(): """Handle face morphing between two faces with real GIF/MP4 generation""" start_time = time.time() logger.info("FACE MORPH REQUEST RECEIVED") try: data = request.json source_image_data = data.get('source_image') target_image_data = data.get('target_image') source_face_idx = int(data.get('source_face_idx', 1)) target_face_idx = int(data.get('target_face_idx', 1)) morph_speed = data.get('morph_speed', 'normal') morph_frames = int(data.get('morph_frames', 30)) output_format = data.get('output_format', 'gif') selected_model = data.get('model', 'inswapper_128.onnx') logger.info(f"Request parameters:") logger.info(f" • Source face index: {source_face_idx}") logger.info(f" • Target face index: {target_face_idx}") logger.info(f" • Morph speed: {morph_speed}") logger.info(f" • Morph frames: {morph_frames}") logger.info(f" • Output format: {output_format}") logger.info(f" • Using Model: {selected_model}") if not source_image_data or not target_image_data: logger.error("Missing source or target image") return jsonify({'error': 'Missing source or target image'}), 400 if not swapper: logger.error("Face swapper not initialized") return jsonify({'error': 'Face swapper not initialized'}), 500 logger.info("Converting base64 images to OpenCV format...") # Convert base64 to OpenCV images source_image = base64_to_image(source_image_data) target_image = base64_to_image(target_image_data) logger.info(f"Image dimensions:") logger.info(f" • Source: {source_image.shape[1]}x{source_image.shape[0]}") logger.info(f" • Target: {target_image.shape[1]}x{target_image.shape[0]}") # Save temporary files logger.info("Saving temporary files...") source_path = os.path.join(UPLOAD_FOLDER, 'morph_source.jpg') target_path = os.path.join(UPLOAD_FOLDER, 'morph_target.jpg') cv2.imwrite(source_path, source_image) cv2.imwrite(target_path, target_image) logger.info(f"Temporary files saved: {source_path}, {target_path}") # Perform face swap to get both faces in same position try: source_swapped = swapper.swap_faces( source_path, source_face_idx, target_path, target_face_idx, swap_hair=False, model_name=selected_model ) except TypeError: source_swapped = swapper.swap_faces( source_path, source_face_idx, target_path, target_face_idx, swap_hair=False ) # Extract face regions faces = swapper.app.get(target_image) faces = sorted(faces, key=lambda x: x.bbox[0]) if target_face_idx <= len(faces): face = faces[target_face_idx - 1] x1, y1, x2, y2 = [int(v) for v in face.bbox] # Perform face swap to get full image with swapped face full_swapped_image = source_swapped.copy() # Generate morph frames (full images with animated face region) morph_frames_list = [] for i in range(morph_frames): # Calculate blend ratio (0.0 to 1.0) ratio = i / (morph_frames - 1) if morph_frames > 1 else 0 # Create morph frame by blending only the face region morph_frame = target_image.copy() # Extract face regions original_face = target_image[y1:y2, x1:x2] swapped_face = full_swapped_image[y1:y2, x1:x2] # Blend only the face region alpha = ratio beta = 1.0 - alpha morphed_face = cv2.addWeighted(swapped_face, alpha, original_face, beta, 0) # Paste morphed face back into full image morph_frame[y1:y2, x1:x2] = morphed_face # Convert to RGB PIL Image for GIF creation morphed_rgb = cv2.cvtColor(morph_frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(morphed_rgb) morph_frames_list.append(pil_image) # Calculate duration based on speed speed_duration_map = { 'slow': 5000, # 5 seconds total 'normal': 3000, # 3 seconds total 'fast': 1500 # 1.5 seconds total } total_duration = speed_duration_map.get(morph_speed, 3000) frame_duration = total_duration // morph_frames # Duration per frame in milliseconds # Generate output file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if output_format == 'gif': # Create animated GIF gif_path = os.path.join(UPLOAD_FOLDER, f'face_morph_{timestamp}.gif') morph_frames_list[0].save( gif_path, save_all=True, append_images=morph_frames_list[1:], duration=frame_duration, loop=0, optimize=True ) # Convert GIF to base64 with open(gif_path, 'rb') as f: gif_data = f.read() gif_base64 = base64.b64encode(gif_data).decode() gif_data_url = f"data:image/gif;base64,{gif_base64}" return jsonify({ 'success': True, 'output_file': gif_data_url, 'download_url': f'/download/face_morph_{timestamp}.gif', 'total_duration': total_duration, 'output_format': output_format, 'frame_count': morph_frames, 'message': f'Face morph GIF created! {morph_frames} frames, {total_duration/1000:.1f}s duration.' }) elif output_format == 'mp4': # Create MP4 video using imageio mp4_path = os.path.join(UPLOAD_FOLDER, f'face_morph_{timestamp}.mp4') # Convert PIL frames to numpy arrays for imageio video_frames = [np.array(frame) for frame in morph_frames_list] # Write MP4 with imageio imageio.mimsave(mp4_path, video_frames, fps=(1000/frame_duration), quality=8) # For now, return first frame as preview (full MP4 download via separate endpoint) preview_base64 = image_to_base64(cv2.cvtColor(video_frames[0], cv2.COLOR_RGB2BGR)) return jsonify({ 'success': True, 'preview_image': preview_base64, 'download_url': f'/download/face_morph_{timestamp}.mp4', 'total_duration': total_duration, 'output_format': output_format, 'frame_count': morph_frames, 'message': f'Face morph MP4 created! {morph_frames} frames, {total_duration/1000:.1f}s duration.' }) else: return jsonify({'error': f'Target face index {target_face_idx} not found'}), 400 except Exception as e: logger.error(f"FACE MORPH ERROR: {str(e)}") logger.error(f"Error location: {type(e).__name__}") import traceback logger.error(f"Full traceback:\n{traceback.format_exc()}") return jsonify({'error': str(e)}), 500 @app.route('/download/') def download_file(filename): """Serve generated files for download""" try: file_path = os.path.join(UPLOAD_FOLDER, filename) if os.path.exists(file_path): return send_file(file_path, as_attachment=True) else: return jsonify({'error': 'File not found'}), 404 except Exception as e: print(f"Download error: {e}") return jsonify({'error': str(e)}), 500 # Pinterest integration removed - file not available # from pinterest_integration import search_pinterest_images, download_pinterest_images, scrape_pinterest_board def download_image_from_url(url, filename=None): """Download an image from URL and return base64 encoded data""" try: response = requests.get(url, timeout=10) response.raise_for_status() # Convert to base64 image_base64 = base64.b64encode(response.content).decode('utf-8') # Determine image type content_type = response.headers.get('content-type', 'image/jpeg') if 'png' in content_type: data_url = f"data:image/png;base64,{image_base64}" else: data_url = f"data:image/jpeg;base64,{image_base64}" return { 'success': True, 'data_url': data_url, 'size': len(response.content), 'content_type': content_type } except Exception as e: return {'success': False, 'error': f'Download failed: {str(e)}'} @app.route('/api/batch_detect_faces', methods=['POST']) def batch_detect_faces(): """Batch detect faces in multiple images for auto-harvesting""" try: data = request.json images = data.get('images', []) if not images: return jsonify({'error': 'No images provided'}), 400 if not swapper: return jsonify({'error': 'Face swapper not initialized'}), 500 results = [] for idx, image_data in enumerate(images): try: # Convert base64 to OpenCV image image = base64_to_image(image_data) # Detect faces faces = swapper.app.get(image) # Sort faces from left to right faces = sorted(faces, key=lambda x: x.bbox[0]) # Prepare face data detected_faces = [] for i, face in enumerate(faces): x1, y1, x2, y2 = [int(v) for v in face.bbox] # Extract face region face_region = image[y1:y2, x1:x2] # Convert to base64 face_base64 = image_to_base64(face_region) detected_faces.append({ 'id': i + 1, 'label': f'Face {i + 1}', 'image': face_base64, 'bbox': [x1, y1, x2, y2] }) results.append({ 'image_index': idx, 'success': True, 'faces': detected_faces, 'face_count': len(detected_faces) }) except Exception as e: results.append({ 'image_index': idx, 'success': False, 'error': str(e), 'faces': [], 'face_count': 0 }) return jsonify({ 'success': True, 'results': results, 'total_images': len(images), 'total_faces': sum(r['face_count'] for r in results) }) except Exception as e: print(f"Batch face detection error: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/pinterest/search', methods=['POST']) def pinterest_search(): """Search Pinterest for images using pinterest-dl""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'No JSON data received'}), 400 query = data.get('query', '').strip() per_page = min(data.get('per_page', 20), 50) # Limit to 50 max if not query: return jsonify({'success': False, 'error': 'Query parameter is required'}), 400 print(f"Pinterest search request: {query} (limit: {per_page})") # Search for images using pinterest-dl result = search_pinterest_images(query, per_page) if result['success']: print(f"Found {result['total_found']} images for: {query}") return jsonify(result) else: print(f"Pinterest search failed: {result.get('error', 'Unknown error')}") return jsonify(result), 500 except Exception as e: error_msg = f'Server error: {str(e)}' print(f"Pinterest search error: {error_msg}") return jsonify({'success': False, 'error': error_msg}), 500 @app.route('/api/pinterest/download', methods=['POST']) def pinterest_download(): """Download images from Pinterest using pinterest-dl""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'No JSON data received'}), 400 query = data.get('query', '').strip() num_images = min(data.get('num_images', 20), 100) # Limit to 100 max output_dir = data.get('output_dir', 'pinterest_downloads') if not query: return jsonify({'success': False, 'error': 'Query parameter is required'}), 400 print(f"Pinterest download request: {query} (count: {num_images})") # Download images using pinterest-dl result = download_pinterest_images(query, num_images, output_dir) if result['success']: print(f"Downloaded {result['total_downloaded']} images to: {result['output_directory']}") return jsonify(result) else: print(f"Pinterest download failed: {result.get('error', 'Unknown error')}") return jsonify(result), 500 except Exception as e: error_msg = f'Server error: {str(e)}' print(f"Pinterest download error: {error_msg}") return jsonify({'success': False, 'error': error_msg}), 500 @app.route('/api/pinterest/scrape', methods=['POST']) def pinterest_scrape(): """Scrape images from a Pinterest board using pinterest-dl""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'No JSON data received'}), 400 board_url = data.get('board_url', '').strip() num_images = min(data.get('num_images', 50), 200) # Limit to 200 max output_dir = data.get('output_dir', 'pinterest_downloads') if not board_url: return jsonify({'success': False, 'error': 'Board URL parameter is required'}), 400 # Validate Pinterest URL if not ('pinterest.com' in board_url and ('/board/' in board_url or '/pin/' in board_url)): return jsonify({'success': False, 'error': 'Invalid Pinterest board URL'}), 400 print(f"Pinterest scrape request: {board_url} (count: {num_images})") # Scrape board using pinterest-dl result = scrape_pinterest_board(board_url, num_images, output_dir) if result['success']: print(f"Scraped {result['total_scraped']} images from board") return jsonify(result) else: print(f"Pinterest scrape failed: {result.get('error', 'Unknown error')}") return jsonify(result), 500 except Exception as e: error_msg = f'Server error: {str(e)}' print(f"Pinterest scrape error: {error_msg}") return jsonify({'success': False, 'error': error_msg}), 500 @app.route('/pinterest_images/') def serve_pinterest_image(filename): """Serve Pinterest images from the downloads directory""" try: # Construct the file path pinterest_dir = Path("pinterest_downloads") file_path = pinterest_dir / filename # Security check - ensure file is within pinterest_downloads if not str(file_path).startswith(str(pinterest_dir.absolute())): return jsonify({'error': 'Invalid file path'}), 403 if file_path.exists() and file_path.is_file(): return send_file(str(file_path)) else: return jsonify({'error': 'File not found'}), 404 except Exception as e: print(f"Error serving Pinterest image: {e}") return jsonify({'error': 'Server error'}), 500 @app.route('/api/preset/save', methods=['POST']) def save_preset(): """Save an image to the presets folder on PC""" try: data = request.json image_url = data.get('url') title = data.get('title', 'untitled') if not image_url: return jsonify({'success': False, 'error': 'No image URL provided'}), 400 # Create presets folder if it doesn't exist presets_folder = os.path.join(os.getcwd(), 'presets') os.makedirs(presets_folder, exist_ok=True) # Download the image response = requests.get(image_url, timeout=30) response.raise_for_status() # Generate a safe filename safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).rstrip() timestamp = int(time.time()) filename = f"{safe_title}_{timestamp}.jpg" filepath = os.path.join(presets_folder, filename) # Save the image with open(filepath, 'wb') as f: f.write(response.content) # Convert to base64 for immediate use image_base64 = base64.b64encode(response.content).decode('utf-8') data_url = f"data:image/jpeg;base64,{image_base64}" return jsonify({ 'success': True, 'filename': filename, 'filepath': filepath, 'data_url': data_url, 'folder': presets_folder }) except Exception as e: print(f"Save preset error: {e}") return jsonify({'success': False, 'error': f'Failed to save preset: {str(e)}'}), 500 @app.route('/api/preset/save_all', methods=['POST']) def save_all_presets(): """Save multiple images to the presets folder on PC""" try: data = request.json images = data.get('images', []) if not images: return jsonify({'success': False, 'error': 'No images provided'}), 400 # Create presets folder if it doesn't exist presets_folder = os.path.join(os.getcwd(), 'presets') os.makedirs(presets_folder, exist_ok=True) saved_images = [] failed_images = [] for i, image in enumerate(images): try: image_url = image.get('url') title = image.get('title', f'image_{i}') if not image_url: failed_images.append({'title': title, 'error': 'No URL provided'}) continue # Download the image response = requests.get(image_url, timeout=30) response.raise_for_status() # Generate a safe filename safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).rstrip() timestamp = int(time.time()) + i # Add offset to avoid duplicates filename = f"{safe_title}_{timestamp}.jpg" filepath = os.path.join(presets_folder, filename) # Save the image with open(filepath, 'wb') as f: f.write(response.content) # Convert to base64 for immediate use image_base64 = base64.b64encode(response.content).decode('utf-8') data_url = f"data:image/jpeg;base64,{image_base64}" saved_images.append({ 'title': title, 'filename': filename, 'filepath': filepath, 'data_url': data_url, 'original_url': image_url }) except Exception as e: failed_images.append({'title': image.get('title', f'image_{i}'), 'error': str(e)}) return jsonify({ 'success': True, 'saved_count': len(saved_images), 'failed_count': len(failed_images), 'saved_images': saved_images, 'failed_images': failed_images, 'folder': presets_folder }) except Exception as e: print(f"Save all presets error: {e}") return jsonify({'success': False, 'error': f'Failed to save presets: {str(e)}'}), 500 @app.route('/api/preset/load', methods=['GET']) def load_presets(): """Load all saved presets from the presets folder""" try: presets_folder = os.path.join(os.getcwd(), 'presets') if not os.path.exists(presets_folder): return jsonify({'success': True, 'presets': []}) presets = [] # Get all image files in the presets folder for filename in os.listdir(presets_folder): if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): filepath = os.path.join(presets_folder, filename) try: # Get file modification time mod_time = os.path.getmtime(filepath) # Read image and convert to base64 with open(filepath, 'rb') as f: image_data = f.read() image_base64 = base64.b64encode(image_data).decode('utf-8') data_url = f"data:image/jpeg;base64,{image_base64}" # Extract title from filename (remove timestamp and extension) name_without_ext = os.path.splitext(filename)[0] # Remove timestamp if present (last part after underscore) parts = name_without_ext.rsplit('_', 1) if len(parts) > 1 and parts[-1].isdigit(): title = parts[0].replace('_', ' ').title() else: title = name_without_ext.replace('_', ' ').title() presets.append({ 'id': filename, # Use filename as ID 'filename': filename, 'title': title, 'filepath': filepath, 'data_url': data_url, 'saved_at': mod_time }) except Exception as e: print(f"Error loading preset {filename}: {e}") continue # Sort by saved_at (newest first) presets.sort(key=lambda x: x['saved_at'], reverse=True) return jsonify({'success': True, 'presets': presets}) except Exception as e: print(f"Load presets error: {e}") return jsonify({'success': False, 'error': f'Failed to load presets: {str(e)}'}), 500 @app.route('/api/preset/delete', methods=['POST']) def delete_preset(): """Delete a preset file from the presets folder""" try: data = request.json filename = data.get('filename') if not filename: return jsonify({'success': False, 'error': 'No filename provided'}), 400 presets_folder = os.path.join(os.getcwd(), 'presets') filepath = os.path.join(presets_folder, filename) if os.path.exists(filepath): os.remove(filepath) return jsonify({'success': True, 'deleted': filename}) else: return jsonify({'success': False, 'error': 'File not found'}), 404 except Exception as e: print(f"Delete preset error: {e}") return jsonify({'success': False, 'error': f'Failed to delete preset: {str(e)}'}), 500 @app.route('/preset/') def serve_preset(filename): """Serve a preset image file""" try: presets_folder = os.path.join(os.getcwd(), 'presets') filepath = os.path.join(presets_folder, filename) if os.path.exists(filepath): return send_file(filepath) else: return jsonify({'error': 'File not found'}), 404 except Exception as e: print(f"Serve preset error: {e}") return jsonify({'error': 'Failed to serve file'}), 500 @app.route('/api/preset/save_direct', methods=['POST']) def save_preset_direct(): """Save AI-generated image directly to presets folder""" try: if 'image' not in request.files: return jsonify({'error': 'No image file provided'}), 400 file = request.files['image'] title = request.form.get('title', 'AI Generated Face') if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Create presets directory if it doesn't exist presets_dir = os.path.join(os.getcwd(), 'presets') os.makedirs(presets_dir, exist_ok=True) # Generate unique filename timestamp = int(time.time()) import re safe_title = re.sub(r'[^a-zA-Z0-9]', '_', title)[:50] filename = f"ai_face_{safe_title}_{timestamp}.jpg" filepath = os.path.join(presets_dir, filename) # Save the file file.save(filepath) # Convert to base64 for immediate use data_url = image_to_base64(filepath) return jsonify({ 'success': True, 'filename': filename, 'filepath': filepath, 'folder': presets_dir, 'data_url': data_url, 'message': f'AI face saved as {filename}' }) except Exception as e: print(f"Save AI preset error: {e}") return jsonify({'error': f'Failed to save AI face: {str(e)}'}), 500 @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'face_swapper': 'loaded' if swapper else 'not loaded', 'upload_folder': UPLOAD_FOLDER }) @app.route('/api/perchance/generate', methods=['POST']) def perchance_generate(): """Proxy endpoint for Perchance AI image generation using iframe scraping""" try: data = request.get_json() prompt = data.get('prompt', '') model = data.get('model', 'realistic') if not prompt: return jsonify({'error': 'Prompt is required'}), 400 print(f"🎨 Perchance AI Request: {prompt[:50]}...") # Method 1: Try to scrape Perchance iframe for generated image try: from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time # Setup Chrome options for headless browsing chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--window-size=1024,768") # Create driver driver = webdriver.Chrome(options=chrome_options) # Navigate to Perchance with prompt perchance_url = f"https://perchance.org/ai-text-to-image-generator?prompt={requests.utils.quote(prompt)}" driver.get(perchance_url) print("🔄 Waiting for Perchance to load...") time.sleep(3) # Wait for image to appear (up to 30 seconds) try: # Look for generated image image_element = WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, "img[src*='generated'], img[src*='perchance'], img[src*='image']")) ) image_src = image_element.get_attribute('src') print(f"✅ Found image: {image_src[:100]}...") # Download the image if image_src.startswith('data:'): # Data URL - use directly img_data = image_src.split(',')[1] data_url = image_src else: # Regular URL - download and convert img_response = requests.get(image_src, timeout=10) if img_response.status_code == 200: img_data = base64.b64encode(img_response.content).decode('utf-8') data_url = f"data:image/jpeg;base64,{img_data}" else: raise Exception("Failed to download image") driver.quit() return jsonify({ 'success': True, 'image_url': data_url, 'model': model, 'prompt': prompt }) except Exception as wait_error: print(f"❌ Timeout waiting for image: {wait_error}") driver.quit() raise wait_error except ImportError: print("❌ Selenium not available. Install with: pip install selenium") except Exception as selenium_error: print(f"❌ Selenium approach failed: {selenium_error}") # Method 2: Try requests with session to maintain cookies try: session = requests.Session() # First visit the page to get cookies page_url = f"https://perchance.org/ai-text-to-image-generator?prompt={requests.utils.quote(prompt)}" response = session.get(page_url, timeout=30) if response.status_code == 200: # Parse HTML to find image from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') # Look for any image that might be generated images = soup.find_all('img') for img in images: src = img.get('src', '') if ('generated' in src or 'perchance' in src or 'image' in src) and not src.startswith('data:image/svg'): print(f"✅ Found image in HTML: {src[:100]}...") # Download image if src.startswith('http'): img_response = session.get(src, timeout=10) if img_response.status_code == 200: img_data = base64.b64encode(img_response.content).decode('utf-8') data_url = f"data:image/jpeg;base64,{img_data}" return jsonify({ 'success': True, 'image_url': data_url, 'model': model, 'prompt': prompt }) except ImportError: print("❌ BeautifulSoup not available. Install with: pip install beautifulsoup4") except Exception as requests_error: print(f"❌ Requests approach failed: {requests_error}") # Method 3: Fallback - Create a better placeholder print("🔄 Creating enhanced placeholder image...") # Create a more sophisticated placeholder from PIL import Image, ImageDraw, ImageFont import textwrap import random # Create image with gradient background width, height = 512, 512 img = Image.new('RGB', (width, height), color='#2a2a2a') draw = ImageDraw.Draw(img) # Add gradient effect for y in range(height): color_value = int(42 + (y / height) * 20) # Dark gradient draw.line([(0, y), (width, y)], fill=f'#{color_value:02x}{color_value:02x}{color_value:02x}') # Add title try: font_title = ImageFont.truetype("arial.ttf", 28) font_text = ImageFont.truetype("arial.ttf", 18) font_small = ImageFont.truetype("arial.ttf", 14) except: font_title = ImageFont.load_default() font_text = ImageFont.load_default() font_small = ImageFont.load_default() # Draw title title = "🎨 Perchance AI Generated" draw.text((width//2 - font_title.getlength(title)//2, 40), title, fill='#ffff55', font=font_title) # Draw prompt text (wrapped) max_width = width - 80 lines = textwrap.wrap(prompt, width=max_width//font_text.getlength(' ')) y_offset = 120 for line in lines[:8]: # Limit to 8 lines draw.text((width//2 - font_text.getlength(line)//2, y_offset), line, fill='#ffffff', font=font_text) y_offset += 30 # Add loading animation hint loading_text = "⏳ Generating high-quality image..." draw.text((width//2 - font_small.getlength(loading_text)//2, 380), loading_text, fill='#4CAF50', font=font_small) # Add instruction instruction = "This is a preview. Real generation requires external API." draw.text((width//2 - font_small.getlength(instruction)//2, 420), instruction, fill='#ff6b6b', font=font_small) # Add random "AI art" elements for _ in range(20): x = random.randint(0, width) y = random.randint(0, height) size = random.randint(1, 3) color = random.choice(['#ffff55', '#4CAF50', '#ff6b6b', '#4a90e2']) draw.ellipse([x, y, x+size, y+size], fill=color) # Convert to data URL buffer = BytesIO() img.save(buffer, format='PNG') img_data = base64.b64encode(buffer.getvalue()).decode('utf-8') data_url = f"data:image/png;base64,{img_data}" return jsonify({ 'success': True, 'image_url': data_url, 'model': model, 'prompt': prompt, 'note': 'Enhanced placeholder. Real generation requires Perchance website.' }) except Exception as e: print(f"❌ Perchance generation error: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/save_to_compressor', methods=['POST']) def save_to_compressor(): """Save images to temporary folder for compressor and return folder info""" try: data = request.get_json() image_urls = data.get('image_urls', []) if not image_urls: return jsonify({'success': False, 'error': 'No images provided'}), 400 # Create unique temp folder folder_id = str(uuid.uuid4())[:8] temp_folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id) os.makedirs(temp_folder_path, exist_ok=True) # Track folder creation time for cleanup temp_folders[folder_id] = time.time() saved_files = [] for i, image_url in enumerate(image_urls): try: # Extract base64 data if 'base64,' in image_url: base64_data = image_url.split('base64,')[1] else: base64_data = image_url # Decode and save image_data = base64.b64decode(base64_data) filename = f'compressed_image_{i+1}.jpg' file_path = os.path.join(temp_folder_path, filename) with open(file_path, 'wb') as f: f.write(image_data) saved_files.append({ 'filename': filename, 'path': file_path, 'url': f'/api/compressor_temp/{folder_id}/{filename}' }) except Exception as e: print(f"Error saving image {i}: {e}") continue if not saved_files: # Clean up empty folder shutil.rmtree(temp_folder_path, ignore_errors=True) del temp_folders[folder_id] return jsonify({'success': False, 'error': 'Failed to save any images'}), 500 return jsonify({ 'success': True, 'folder_id': folder_id, 'files': saved_files, 'compressor_url': f'/compressor.html?folder={folder_id}' }) except Exception as e: print(f"Save to compressor error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/compressor_temp//') def serve_compressor_temp(folder_id, filename): """Serve files from compressor temp folder""" try: file_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id, filename) if os.path.exists(file_path): return send_file(file_path) else: return jsonify({'error': 'File not found'}), 404 except Exception as e: print(f"Error serving compressor temp file: {e}") return jsonify({'error': 'Server error'}), 500 @app.route('/api/compressor_temp//list') def list_compressor_temp_files(folder_id): """List all files in a compressor temp folder""" try: folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id) if not os.path.exists(folder_path): return jsonify({'success': False, 'error': 'Folder not found'}), 404 files = [] for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) if os.path.isfile(file_path): files.append({ 'filename': filename, 'url': f'/api/compressor_temp/{folder_id}/{filename}', 'size': os.path.getsize(file_path) }) return jsonify({ 'success': True, 'folder_id': folder_id, 'files': files }) except Exception as e: print(f"Error listing compressor temp files: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def cleanup_old_temp_folders(): """Clean up temp folders older than 1 hour""" current_time = time.time() folders_to_remove = [] for folder_id, creation_time in temp_folders.items(): if current_time - creation_time > 3600: # 1 hour folders_to_remove.append(folder_id) for folder_id in folders_to_remove: try: folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id) shutil.rmtree(folder_path, ignore_errors=True) del temp_folders[folder_id] print(f"Cleaned up old temp folder: {folder_id}") except Exception as e: print(f"Error cleaning up folder {folder_id}: {e}") @app.route('/api/preset/upload', methods=['POST']) def upload_preset(): """Upload a preset to the local presets folder via base64""" try: data = request.json if not data: return jsonify({'success': False, 'error': 'No data provided'}), 400 image_base64 = data.get('image') filename = data.get('filename') if not image_base64 or not filename: return jsonify({'success': False, 'error': 'Missing image or filename'}), 400 presets_folder = os.path.join(os.getcwd(), 'presets') os.makedirs(presets_folder, exist_ok=True) # Clean filename to ensure it doesn't overwrite unexpectedly or have bad chars safe_name = os.path.basename(filename) # Add timestamp to avoid collisions name_parts = os.path.splitext(safe_name) final_filename = f"{name_parts[0]}_{int(time.time())}{name_parts[1]}" file_path = os.path.join(presets_folder, final_filename) # Extract base64 data if 'base64,' in image_base64: image_base64 = image_base64.split('base64,')[1] image_data = base64.b64decode(image_base64) with open(file_path, 'wb') as f: f.write(image_data) return jsonify({ 'success': True, 'message': 'Preset uploaded successfully', 'filename': final_filename }) except Exception as e: print(f"Error uploading preset: {e}") return jsonify({'success': False, 'error': str(e)}), 500 if __name__ == '__main__': print("Starting Shinyy's Face Swapper Web Server...") print(f"Open your browser and go to: http://localhost:{WEB_SERVER_PORT}") app.run(host='0.0.0.0', port=WEB_SERVER_PORT, debug=False)