Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Simple Flask Backend for Shinyy's Face Swapper HTML Website | |
| """ | |
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS | |
| import os | |
| from pathlib import Path | |
| import tempfile | |
| import shutil | |
| import uuid | |
| import glob | |
| import logging | |
| import sys | |
| import time | |
| from datetime import datetime | |
| try: | |
| import cv2 | |
| import numpy as np | |
| CV2_AVAILABLE = True | |
| except ImportError as e: | |
| print(f"Warning: OpenCV/NumPy not available: {e}") | |
| CV2_AVAILABLE = False | |
| cv2 = None | |
| np = None | |
| import base64 | |
| from io import BytesIO | |
| from PIL import Image | |
| import json | |
| import requests | |
| try: | |
| import imageio | |
| IMAGEIO_AVAILABLE = True | |
| except ImportError as e: | |
| print(f"Warning: imageio not available: {e}") | |
| IMAGEIO_AVAILABLE = False | |
| imageio = None | |
| # Import the face swapper | |
| try: | |
| from SinglePhoto import FaceSwapper | |
| FACE_SWAPPER_AVAILABLE = True | |
| except ImportError as e: | |
| print(f"Warning: FaceSwapper not available due to import error: {e}") | |
| print("Video processing will work in simulation mode only") | |
| FACE_SWAPPER_AVAILABLE = False | |
| FaceSwapper = None | |
| # Import enhanced face swapper if available | |
| try: | |
| from EnhancedFaceSwapper import EnhancedFaceSwapper | |
| from QualityPresets import QualityPresets, create_enhanced_swapper_with_quality | |
| ENHANCED_SWAPPER_AVAILABLE = True | |
| print("Enhanced face swapper loaded successfully!") | |
| except ImportError as e: | |
| print(f"Enhanced face swapper not available: {e}") | |
| ENHANCED_SWAPPER_AVAILABLE = False | |
| EnhancedFaceSwapper = None | |
| QualityPresets = None | |
| create_enhanced_swapper_with_quality = None | |
| app = Flask(__name__) | |
| CORS(app) # Enable CORS for all routes | |
| # Use different port to avoid conflicts - 7860 is required for Hugging Face | |
| WEB_SERVER_PORT = 7860 | |
| # Configure comprehensive logging | |
| def setup_logging(): | |
| """Setup detailed logging for console output""" | |
| # Create custom formatter for better readability | |
| class CustomFormatter(logging.Formatter): | |
| def format(self, record): | |
| # Add timestamp and format with colors | |
| timestamp = datetime.now().strftime('%H:%M:%S') | |
| level_color = { | |
| 'DEBUG': '\033[36m', # Cyan | |
| 'INFO': '\033[32m', # Green | |
| 'WARNING': '\033[33m', # Yellow | |
| 'ERROR': '\033[31m', # Red | |
| 'CRITICAL': '\033[35m', # Magenta | |
| }.get(record.levelname, '\033[0m') | |
| reset_color = '\033[0m' | |
| # Format: [TIME] LEVEL | MESSAGE | |
| return f"[{timestamp}] {level_color}{record.levelname}{reset_color} | {record.getMessage()}" | |
| # Setup root logger | |
| root_logger = logging.getLogger() | |
| root_logger.setLevel(logging.DEBUG) | |
| # Remove existing handlers | |
| for handler in root_logger.handlers[:]: | |
| root_logger.removeHandler(handler) | |
| # Add console handler with custom formatter | |
| console_handler = logging.StreamHandler(sys.stdout) | |
| console_handler.setLevel(logging.DEBUG) | |
| console_handler.setFormatter(CustomFormatter()) | |
| root_logger.addHandler(console_handler) | |
| return root_logger | |
| # Initialize logging | |
| logger = setup_logging() | |
| # Global video processing progress tracking | |
| video_progress = { | |
| 'processing': False, | |
| 'phase': 'idle', | |
| 'total_frames': 0, | |
| 'processed_frames': 0, | |
| 'current_frame_base64': None, | |
| 'start_time': None, | |
| 'mode': None, | |
| 'fps': None, | |
| 'resolution': None, | |
| 'file_size': None, | |
| 'processing_speed': None, | |
| 'avg_frame_time': None, | |
| 'estimated_total_time': None, | |
| 'countdown_time': None | |
| } | |
| # Disable Flask auto-reload and other restart triggers | |
| app.config['DEBUG'] = False | |
| app.config['TEMPLATES_AUTO_RELOAD'] = False | |
| app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 | |
| # Initialize face swapper with GPU optimization | |
| if FACE_SWAPPER_AVAILABLE: | |
| try: | |
| # Try to initialize with GPU acceleration first | |
| try: | |
| swapper = FaceSwapper(gpu_enabled=True, gpu_id=0) | |
| gpu_info = swapper.get_gpu_info() | |
| print(f"FaceSwapper loaded with GPU acceleration!") | |
| print(f"GPU Info: {gpu_info}") | |
| except Exception as gpu_error: | |
| print(f"GPU initialization failed: {gpu_error}") | |
| print("Falling back to CPU-only mode...") | |
| swapper = FaceSwapper(gpu_enabled=False, gpu_id=-1) | |
| print("FaceSwapper loaded in CPU mode!") | |
| FACE_SWAPPER_AVAILABLE = True | |
| except Exception as e: | |
| print(f"Error loading FaceSwapper: {e}") | |
| swapper = None | |
| FACE_SWAPPER_AVAILABLE = False | |
| else: | |
| swapper = None | |
| print("Running in simulation mode - FaceSwapper not available") | |
| # Temporary storage for uploaded images | |
| UPLOAD_FOLDER = 'temp_uploads' | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| # Temporary folder system for compressor transfers | |
| COMPRESSOR_TEMP_FOLDER = 'temp_compressor' | |
| os.makedirs(COMPRESSOR_TEMP_FOLDER, exist_ok=True) | |
| # Dictionary to track temp folders and their creation times | |
| temp_folders = {} | |
| # Log server startup | |
| logger.info("=" * 60) | |
| logger.info("SHINYY'S FACE SWAPPER SERVER STARTING") | |
| logger.info("=" * 60) | |
| logger.info(f"Upload folder: {UPLOAD_FOLDER}") | |
| logger.info(f"OpenCV Available: {CV2_AVAILABLE}") | |
| logger.info(f"Face Swapper Available: {FACE_SWAPPER_AVAILABLE}") | |
| logger.info("=" * 60) | |
| def base64_to_image(base64_string): | |
| """Convert base64 string to OpenCV image""" | |
| if not CV2_AVAILABLE: | |
| # Return PIL image if cv2 not available | |
| if 'base64,' in base64_string: | |
| base64_string = base64_string.split('base64,')[1] | |
| image_data = base64.b64decode(base64_string) | |
| return Image.open(BytesIO(image_data)) | |
| # Remove data URL prefix if present | |
| if 'base64,' in base64_string: | |
| base64_string = base64_string.split('base64,')[1] | |
| # Decode base64 | |
| image_data = base64.b64decode(base64_string) | |
| # Convert to PIL Image | |
| pil_image = Image.open(BytesIO(image_data)) | |
| # Convert to OpenCV format | |
| cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
| return cv_image | |
| def image_to_base64(image): | |
| """Convert image to base64 string""" | |
| if not CV2_AVAILABLE: | |
| # Handle PIL image if cv2 not available | |
| if isinstance(image, Image.Image): | |
| buffer = BytesIO() | |
| image.save(buffer, format='JPEG') | |
| image_str = base64.b64encode(buffer.getvalue()).decode() | |
| return f"data:image/jpeg;base64,{image_str}" | |
| else: | |
| # Assume it's already a base64 string | |
| return image | |
| # Convert to RGB for OpenCV images | |
| rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # Convert to PIL Image | |
| pil_image = Image.fromarray(rgb_image) | |
| # Convert to base64 | |
| buffer = BytesIO() | |
| pil_image.save(buffer, format='JPEG') | |
| image_str = base64.b64encode(buffer.getvalue()).decode() | |
| return f"data:image/jpeg;base64,{image_str}" | |
| # Helper functions for enhanced multi-swap features | |
| def apply_face_alignment(image): | |
| """Apply basic face alignment to source image""" | |
| try: | |
| # Simple alignment using histogram equalization | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| aligned = cv2.equalizeHist(gray) | |
| aligned_bgr = cv2.cvtColor(aligned, cv2.COLOR_GRAY2BGR) | |
| return aligned_bgr | |
| except: | |
| return image | |
| def apply_enhanced_swap(source_path, target_path, source_face_idx, face_id, swap_hair, quality): | |
| """Apply enhanced face swap with better processing""" | |
| try: | |
| # Use higher quality processing for enhanced mode | |
| if quality in ['quality', 'ultra']: | |
| # Apply some preprocessing | |
| source_img = cv2.imread(source_path) | |
| source_img = cv2.bilateralFilter(source_img, 15, 80, 80) | |
| cv2.imwrite(source_path, source_img) | |
| return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) | |
| except: | |
| return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) | |
| def apply_precise_swap(source_path, target_path, source_face_idx, face_id, swap_hair, face_size): | |
| """Apply precise face swap with size control""" | |
| try: | |
| result = swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) | |
| # Apply precise size adjustments | |
| if face_size == 'precise': | |
| result = cv2.bilateralFilter(result, 5, 50, 50) | |
| return result | |
| except: | |
| return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) | |
| def apply_artistic_swap(source_path, target_path, source_face_idx, face_id, swap_hair): | |
| """Apply artistic face swap with creative effects""" | |
| try: | |
| result = swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) | |
| # Apply artistic filters | |
| result = cv2.detailEnhance(result, sigma_s=10, sigma_r=0.15) | |
| return result | |
| except: | |
| return swapper.swap_faces(source_path, source_face_idx, target_path, face_id, swap_hair=swap_hair) | |
| def apply_face_enhancement(image, level): | |
| """Apply face enhancement based on level""" | |
| try: | |
| if level == 'subtle': | |
| image = cv2.bilateralFilter(image, 5, 30, 30) | |
| elif level == 'medium': | |
| image = cv2.bilateralFilter(image, 9, 50, 50) | |
| elif level == 'strong': | |
| image = cv2.detailEnhance(image, sigma_s=5, sigma_r=0.2) | |
| return image | |
| except: | |
| return image | |
| def apply_skin_tone_matching(swapped_face, target_image, face_id, level): | |
| """Apply skin tone matching between swapped and target""" | |
| try: | |
| faces = swapper.app.get(target_image) | |
| faces = sorted(faces, key=lambda x: x.bbox[0]) | |
| if face_id <= len(faces): | |
| face = faces[face_id - 1] | |
| x1, y1, x2, y2 = [int(v) for v in face.bbox] | |
| original_face = target_image[y1:y2, x1:x2] | |
| # Simple color balance adjustment | |
| if level == 'subtle': | |
| alpha = 0.3 | |
| elif level == 'medium': | |
| alpha = 0.5 | |
| else: # strong | |
| alpha = 0.7 | |
| blended = cv2.addWeighted(swapped_face, 1-alpha, original_face, alpha, 0) | |
| return blended | |
| return swapped_face | |
| except: | |
| return swapped_face | |
| def apply_face_size_adjustment(face, size_option): | |
| """Apply face size adjustments""" | |
| try: | |
| if size_option == 'shrink': | |
| scale = 0.9 | |
| elif size_option == 'expand': | |
| scale = 1.1 | |
| else: # precise | |
| scale = 0.95 | |
| h, w = face.shape[:2] | |
| new_h, new_w = int(h * scale), int(w * scale) | |
| resized = cv2.resize(face, (new_w, new_h)) | |
| if scale < 1.0: | |
| # Pad to original size | |
| pad_h = (h - new_h) // 2 | |
| pad_w = (w - new_w) // 2 | |
| # Ensure indices are integers to prevent slice errors | |
| pad_h = int(pad_h) | |
| pad_w = int(pad_w) | |
| padded = cv2.copyMakeBorder(resized, pad_h, h-new_h-pad_h, pad_w, w-new_w-pad_w, cv2.BORDER_REPLICATE) | |
| return padded | |
| else: | |
| # Crop to original size | |
| crop_h = (new_h - h) // 2 | |
| crop_w = (new_w - w) // 2 | |
| # Ensure indices are integers to prevent slice errors | |
| crop_h = int(crop_h) | |
| crop_w = int(crop_w) | |
| cropped = resized[crop_h:crop_h+h, crop_w:crop_w+w] | |
| return cropped | |
| except: | |
| return face | |
| def apply_lighting_preservation(swapped_face, original_face): | |
| """Preserve original lighting conditions""" | |
| try: | |
| # Convert to LAB color space for lighting preservation | |
| swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB) | |
| original_lab = cv2.cvtColor(original_face, cv2.COLOR_BGR2LAB) | |
| # Copy lighting from original | |
| swapped_lab[:,:,0] = original_lab[:,:,0] | |
| # Convert back to BGR | |
| result = cv2.cvtColor(swapped_lab, cv2.COLOR_LAB2BGR) | |
| return result | |
| except: | |
| return swapped_face | |
| def apply_auto_enhancement(face): | |
| """Apply automatic enhancement""" | |
| try: | |
| # Contrast and brightness adjustment | |
| enhanced = cv2.convertScaleAbs(face, alpha=1.1, beta=5) | |
| return enhanced | |
| except: | |
| return face | |
| def enhanced_face_alignment(source_img, target_img, source_face, target_face): | |
| """Enhanced face alignment using facial landmarks for better positioning""" | |
| try: | |
| # Get facial landmarks | |
| src_kps = source_face.kps | |
| dst_kps = target_face.kps | |
| # Use 5-point facial landmarks for better alignment | |
| # Points: 0=left eye, 1=right eye, 2=nose tip, 3=left mouth, 4=right mouth | |
| src_pts = np.array(src_kps, dtype=np.float32) | |
| dst_pts = np.array(dst_kps, dtype=np.float32) | |
| # Calculate similarity transform for better alignment than affine | |
| h, w = target_img.shape[:2] | |
| M = cv2.estimateAffinePartial2D(src_pts[:3], dst_pts[:3])[0] | |
| if M is not None: | |
| # Apply transform to source image for better alignment | |
| aligned_source = cv2.warpAffine(source_img, M, (w, h), | |
| borderMode=cv2.BORDER_REFLECT_101) | |
| return aligned_source | |
| else: | |
| return source_img | |
| except Exception as e: | |
| print(f"Face alignment enhancement failed: {e}") | |
| return source_img | |
| def advanced_color_matching(swapped_face, target_region, target_face_bbox): | |
| """Advanced color matching using LAB color space and histogram matching""" | |
| try: | |
| # Convert to LAB color space for better color separation | |
| swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB) | |
| target_lab = cv2.cvtColor(target_region, cv2.COLOR_BGR2LAB) | |
| # Apply histogram matching for each channel | |
| for i in range(3): # L, A, B channels | |
| swapped_hist = cv2.calcHist([swapped_lab], [i], None, [256], [0, 256]) | |
| target_hist = cv2.calcHist([target_lab], [i], None, [256], [0, 256]) | |
| # Normalize histograms | |
| swapped_hist = swapped_hist / swapped_hist.sum() | |
| target_hist = target_hist / target_hist.sum() | |
| # Create lookup table for histogram matching | |
| lut = create_histogram_lut(swapped_hist, target_hist) | |
| swapped_lab[:,:,i] = cv2.LUT(swapped_lab[:,:,i], lut) | |
| # Convert back to BGR | |
| enhanced_face = cv2.cvtColor(swapped_lab, cv2.COLOR_LAB2BGR) | |
| # Blend with original to maintain natural look | |
| alpha = 0.7 # 70% enhanced, 30% original | |
| final_face = cv2.addWeighted(enhanced_face, alpha, swapped_face, 1-alpha, 0) | |
| return final_face | |
| except Exception as e: | |
| print(f"Color matching enhancement failed: {e}") | |
| return swapped_face | |
| def create_histogram_lut(source_hist, target_hist): | |
| """Create lookup table for histogram matching""" | |
| lut = np.zeros(256, dtype=np.uint8) | |
| source_cdf = source_hist.cumsum() | |
| target_cdf = target_hist.cumsum() | |
| for i in range(256): | |
| source_val = source_cdf[i] | |
| target_idx = np.argmin(np.abs(target_cdf - source_val)) | |
| lut[i] = target_idx | |
| return lut | |
| def seamless_multi_band_blending(swapped_face, target_img, target_face_bbox): | |
| """Seamless blending using multi-band blending for natural integration""" | |
| try: | |
| x1, y1, x2, y2 = target_face_bbox | |
| # Create mask for face region | |
| mask = np.zeros(target_img.shape[:2], dtype=np.uint8) | |
| mask[y1:y2, x1:x2] = 255 | |
| # Apply Gaussian blur to mask for smooth edges | |
| mask_blurred = cv2.GaussianBlur(mask, (51, 51), 0) | |
| mask_blurred = mask_blurred.astype(np.float32) / 255.0 | |
| # Multi-band blending | |
| result = target_img.copy().astype(np.float32) | |
| # Create pyramid for seamless blending | |
| levels = 5 | |
| pyramid_swapped = create_gaussian_pyramid(swapped_face.astype(np.float32), levels) | |
| pyramid_target = create_gaussian_pyramid(target_img[y1:y2, x1:x2].astype(np.float32), levels) | |
| pyramid_mask = create_gaussian_pyramid(mask_blurred[y1:y2, x1:x2], levels) | |
| # Blend pyramids | |
| blended_pyramid = [] | |
| for i in range(levels): | |
| if i < len(pyramid_swapped) and i < len(pyramid_target) and i < len(pyramid_mask): | |
| blended = (pyramid_swapped[i] * pyramid_mask[i] + | |
| pyramid_target[i] * (1 - pyramid_mask[i])) | |
| blended_pyramid.append(blended) | |
| # Reconstruct from pyramid | |
| if blended_pyramid: | |
| blended_face = reconstruct_from_pyramid(blended_pyramid) | |
| result[y1:y2, x1:x2] = blended_face | |
| else: | |
| # Fallback to simple blending | |
| mask_3d = np.stack([mask_blurred[y1:y2, x1:x2]] * 3, axis=-1) | |
| result[y1:y2, x1:x2] = (swapped_face.astype(np.float32) * mask_3d + | |
| target_img[y1:y2, x1:x2].astype(np.float32) * (1 - mask_3d)) | |
| return result.astype(np.uint8) | |
| except Exception as e: | |
| print(f"Seamless blending failed: {e}") | |
| # Fallback to simple paste | |
| result = target_img.copy() | |
| x1, y1, x2, y2 = target_face_bbox | |
| result[y1:y2, x1:x2] = swapped_face | |
| return result | |
| def create_gaussian_pyramid(img, levels): | |
| """Create Gaussian pyramid for multi-band blending""" | |
| pyramid = [img] | |
| current = img | |
| for i in range(levels - 1): | |
| current = cv2.pyrDown(current) | |
| pyramid.append(current) | |
| return pyramid | |
| def reconstruct_from_pyramid(pyramid): | |
| """Reconstruct image from Gaussian pyramid""" | |
| result = pyramid[-1] | |
| for i in range(len(pyramid) - 2, -1, -1): | |
| result = cv2.pyrUp(result) | |
| if result.shape[:2] != pyramid[i].shape[:2]: | |
| result = cv2.resize(result, (pyramid[i].shape[1], pyramid[i].shape[0])) | |
| result = result + pyramid[i] | |
| return result | |
| def apply_edge_smoothing(face_img): | |
| """Apply edge smoothing to reduce blocky appearance in face swaps""" | |
| try: | |
| if not CV2_AVAILABLE: | |
| return face_img | |
| # Apply bilateral filter for edge-preserving smoothing | |
| # This reduces blocky edges while preserving important details | |
| smoothed = cv2.bilateralFilter(face_img, 5, 60, 60) | |
| # Apply subtle Gaussian blur to further smooth edges | |
| smoothed = cv2.GaussianBlur(smoothed, (3, 3), 0.5) | |
| # Blend with original to maintain natural look | |
| alpha = 0.9 # 90% smoothed, 10% original | |
| final_result = cv2.addWeighted(smoothed, alpha, face_img, 1-alpha, 0) | |
| return final_result | |
| except Exception as e: | |
| print(f"Edge smoothing failed: {e}") | |
| return face_img | |
| def smooth_face_blend(swapped_face, target_region, target_face_bbox): | |
| """Enhanced face blending with improved accuracy""" | |
| try: | |
| # Use the new seamless blending for better results | |
| # Create a dummy target image for the blending function | |
| h, w = target_region.shape[:2] | |
| dummy_target = np.zeros((h * 2, w * 2, 3), dtype=np.uint8) | |
| dummy_target[:h, :w] = target_region | |
| # Adjust bbox for the dummy target | |
| adjusted_bbox = (0, 0, w, h) | |
| # Apply seamless blending | |
| result = seamless_multi_band_blending(swapped_face, dummy_target, adjusted_bbox) | |
| # Extract the blended face region | |
| blended_face = result[:h, :w] | |
| return blended_face | |
| except Exception as e: | |
| print(f"Enhanced face blending error: {e}") | |
| # Fallback to minimal blending | |
| h, w = swapped_face.shape[:2] | |
| # Create elliptical mask for face shape | |
| center = (w // 2, h // 2) | |
| axes = (w // 2 - 5, h // 2 - 5) | |
| # Generate smooth elliptical mask | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| cv2.ellipse(mask, center, axes, 0, 0, 360, 255, -1) | |
| # Apply light Gaussian blur to mask | |
| mask_blurred = cv2.GaussianBlur(mask, (11, 11), 0) | |
| mask_blurred = mask_blurred.astype(np.float32) / 255.0 | |
| # Apply the feathered mask | |
| mask_3d = np.stack([mask_blurred] * 3, axis=-1) | |
| # Blend the face | |
| blended_face = (swapped_face.astype(np.float32) * mask_3d + | |
| target_region.astype(np.float32) * (1 - mask_3d)) | |
| return np.clip(blended_face, 0, 255).astype(np.uint8) | |
| def natural_color_match(swapped_face, target_region): | |
| """Enhanced color matching using LAB histogram matching for better accuracy""" | |
| try: | |
| # Use the advanced color matching for better results | |
| enhanced_face = advanced_color_matching(swapped_face, target_region, (0, 0, swapped_face.shape[1], swapped_face.shape[0])) | |
| return enhanced_face | |
| except Exception as e: | |
| print(f"Enhanced color matching failed, using fallback: {e}") | |
| # Fallback to minimal color matching | |
| try: | |
| # Convert to YCrCb color space for gentle skin tone adjustment | |
| swapped_ycrcb = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2YCrCb) | |
| target_ycrcb = cv2.cvtColor(target_region, cv2.COLOR_BGR2YCrCb) | |
| # Get mean values for skin tone channels | |
| swapped_mean = np.mean(swapped_ycrcb, axis=(0, 1)) | |
| target_mean = np.mean(target_ycrcb, axis=(0, 1)) | |
| # Gentle color correction | |
| y_ratio = target_mean[0] / swapped_mean[0] if swapped_mean[0] > 0 else 1.0 | |
| y_ratio = np.clip(y_ratio, 0.95, 1.05) | |
| cr_diff = (target_mean[1] - swapped_mean[1]) * 0.1 | |
| cb_diff = (target_mean[2] - swapped_mean[2]) * 0.1 | |
| # Apply color correction | |
| corrected_ycrcb = swapped_ycrcb.copy() | |
| corrected_ycrcb[:, :, 0] = np.clip(corrected_ycrcb[:, :, 0] * y_ratio, 0, 255) | |
| corrected_ycrcb[:, :, 1] = np.clip(corrected_ycrcb[:, :, 1] + cr_diff, 0, 255) | |
| corrected_ycrcb[:, :, 2] = np.clip(corrected_ycrcb[:, :, 2] + cb_diff, 0, 255) | |
| # Convert back to BGR | |
| corrected_face = cv2.cvtColor(corrected_ycrcb, cv2.COLOR_YCrCb2BGR) | |
| # Blend with original | |
| alpha = 0.9 | |
| final_face = cv2.addWeighted(corrected_face, alpha, swapped_face, 1 - alpha, 0) | |
| return final_face | |
| except Exception as fallback_error: | |
| print(f"Fallback color matching also failed: {fallback_error}") | |
| return swapped_face | |
| def index(): | |
| """Serve the main HTML page""" | |
| return send_file('index.html') | |
| def compressor(): | |
| """Serve the compressor HTML page""" | |
| return send_file('compressor.html') | |
| def compressor_redirect(): | |
| """Serve compressor page (redirect from /compressor)""" | |
| return send_file('compressor.html') | |
| def compressor_with_slash(): | |
| """Serve compressor page with slash""" | |
| return send_file('compressor.html') | |
| def index_page(): | |
| """Serve index page (same as main route)""" | |
| return send_file('index.html') | |
| def extract_video_frames(video_path, frames_dir): | |
| """Extract frames from video file""" | |
| if not CV2_AVAILABLE: | |
| raise ImportError("OpenCV is required for video processing") | |
| if not os.path.exists(frames_dir): | |
| os.makedirs(frames_dir) | |
| cap = cv2.VideoCapture(video_path) | |
| frame_paths = [] | |
| idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg") | |
| cv2.imwrite(frame_path, frame) | |
| frame_paths.append(frame_path) | |
| idx += 1 | |
| cap.release() | |
| return frame_paths | |
| def create_video_from_frames(frames_dir, output_video_path, fps): | |
| """Create video from processed frames""" | |
| if not CV2_AVAILABLE: | |
| raise ImportError("OpenCV is required for video processing") | |
| frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) | |
| if not frames: | |
| raise ValueError("No frames found in directory") | |
| first_frame = cv2.imread(frames[0]) | |
| height, width, layers = first_frame.shape | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| for frame_path in frames: | |
| frame = cv2.imread(frame_path) | |
| out.write(frame) | |
| out.release() | |
| def get_video_fps(video_path): | |
| """Get FPS from video file""" | |
| if not CV2_AVAILABLE: | |
| return 30.0 # Default FPS if OpenCV not available | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| cap.release() | |
| return fps if fps > 0 else 30.0 | |
| def get_gpu_status(): | |
| """Get GPU acceleration status and information""" | |
| try: | |
| if not swapper: | |
| return jsonify({ | |
| 'gpu_available': False, | |
| 'message': 'Face swapper not initialized' | |
| }) | |
| gpu_info = swapper.get_gpu_info() | |
| return jsonify({ | |
| 'success': True, | |
| 'gpu_info': gpu_info, | |
| 'last_processing_time': getattr(swapper, 'last_processing_time', 0), | |
| 'message': 'GPU status retrieved successfully' | |
| }) | |
| except Exception as e: | |
| logger.error(f"GPU status error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def swap_faces(): | |
| """Handle face swapping request with GPU optimization""" | |
| start_time = time.time() | |
| logger.info("FACE SWAP REQUEST RECEIVED") | |
| try: | |
| data = request.json | |
| source_image_data = data.get('source_image') | |
| target_image_data = data.get('target_image') | |
| source_face_idx = int(data.get('source_face_idx', 1)) | |
| target_face_idx = int(data.get('target_face_idx', 1)) | |
| selected_model = data.get('model', 'inswapper_128.onnx') | |
| logger.info(f"Request parameters:") | |
| logger.info(f"Source face index: {source_face_idx}") | |
| logger.info(f"Target face index: {target_face_idx}") | |
| logger.info(f"Using Model: {selected_model}") | |
| if not source_image_data or not target_image_data: | |
| logger.error("Missing source or target image") | |
| return jsonify({'error': 'Missing source or target image'}), 400 | |
| if not swapper: | |
| logger.error("Face swapper not initialized") | |
| return jsonify({'error': 'Face swapper not initialized'}), 500 | |
| logger.info("Converting base64 images to OpenCV format...") | |
| # Convert base64 to OpenCV images | |
| source_image = base64_to_image(source_image_data) | |
| target_image = base64_to_image(target_image_data) | |
| logger.info(f"Image dimensions:") | |
| logger.info(f"Source: {source_image.shape[1]}x{source_image.shape[0]}") | |
| logger.info(f"Target: {target_image.shape[1]}x{target_image.shape[0]}") | |
| # Save temporary files | |
| logger.info("Saving temporary files...") | |
| source_path = os.path.join(UPLOAD_FOLDER, 'source.jpg') | |
| target_path = os.path.join(UPLOAD_FOLDER, 'target.jpg') | |
| result_path = os.path.join(UPLOAD_FOLDER, 'result.jpg') | |
| cv2.imwrite(source_path, source_image) | |
| cv2.imwrite(target_path, target_image) | |
| logger.info(f"Temporary files saved:") | |
| logger.info(f"Source: {source_path}") | |
| logger.info(f"Target: {target_path}") | |
| logger.info(f"Result: {result_path}") | |
| # Perform face swap with GPU acceleration | |
| logger.info("Performing face swap...") | |
| swap_start = time.time() | |
| try: | |
| result = swapper.swap_faces( | |
| source_path, | |
| source_face_idx, | |
| target_path, | |
| target_face_idx, | |
| swap_hair=False, | |
| model_name=selected_model | |
| ) | |
| except TypeError: | |
| result = swapper.swap_faces( | |
| source_path, | |
| source_face_idx, | |
| target_path, | |
| target_face_idx, | |
| swap_hair=False | |
| ) | |
| swap_time = time.time() - swap_start | |
| logger.info(f"Face swap completed in {swap_time:.2f} seconds") | |
| # Apply edge smoothing to reduce blocky appearance | |
| logger.info("Applying edge smoothing...") | |
| result = apply_edge_smoothing(result) | |
| # Save result | |
| logger.info("Saving result image...") | |
| cv2.imwrite(result_path, result) | |
| # Convert result to base64 | |
| logger.info("Converting result to base64...") | |
| result_base64 = image_to_base64(result) | |
| total_time = time.time() - start_time | |
| logger.info(f"FACE SWAP COMPLETED SUCCESSFULLY") | |
| logger.info(f"Total processing time: {total_time:.2f} seconds") | |
| logger.info(f"Result size: {len(result_base64)} chars") | |
| # Include GPU status in response | |
| gpu_status = getattr(swapper, 'gpu_enabled', False) | |
| return jsonify({ | |
| 'success': True, | |
| 'result_image': result_base64, | |
| 'message': f'Face swap completed successfully! (GPU: {"Enabled" if gpu_status else "Disabled"})', | |
| 'processing_time': total_time, | |
| 'gpu_accelerated': gpu_status | |
| }) | |
| except Exception as e: | |
| logger.error(f"FACE SWAP ERROR: {str(e)}") | |
| logger.error(f"Error location: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def detect_faces(): | |
| """Detect faces in an image""" | |
| start_time = time.time() | |
| logger.info("FACE DETECTION REQUEST RECEIVED") | |
| try: | |
| data = request.json | |
| image_data = data.get('image') | |
| logger.info(f"Image data length: {len(image_data) if image_data else 0} chars") | |
| if not image_data: | |
| logger.error("Missing image data") | |
| return jsonify({'error': 'Missing image data'}), 400 | |
| if not swapper: | |
| logger.error("Face swapper not initialized") | |
| return jsonify({'error': 'Face swapper not initialized'}), 500 | |
| logger.info("Converting base64 image to OpenCV format...") | |
| # Convert base64 to OpenCV image | |
| image = base64_to_image(image_data) | |
| logger.info(f"Image dimensions: {image.shape[1]}x{image.shape[0]}") | |
| # Detect faces | |
| logger.info("Detecting faces in image...") | |
| detection_start = time.time() | |
| faces = swapper.app.get(image) | |
| detection_time = time.time() - detection_start | |
| logger.info(f"Face detection completed in {detection_time:.2f} seconds") | |
| logger.info(f"Found {len(faces)} face(s)") | |
| # Sort faces from left to right | |
| faces = sorted(faces, key=lambda x: x.bbox[0]) | |
| logger.info("Sorted faces from left to right") | |
| # Prepare face data | |
| logger.info("Preparing face data...") | |
| detected_faces = [] | |
| for i, face in enumerate(faces): | |
| x1, y1, x2, y2 = [int(v) for v in face.bbox] | |
| logger.info(f"Face {i+1}: bbox=({x1},{y1},{x2},{y2}), size={x2-x1}x{y2-y1}") | |
| # Extract face region | |
| face_region = image[y1:y2, x1:x2] | |
| # Convert to base64 | |
| face_base64 = image_to_base64(face_region) | |
| detected_faces.append({ | |
| 'id': i + 1, | |
| 'label': f'Face {i + 1}', | |
| 'image': face_base64, | |
| 'bbox': [x1, y1, x2, y2], | |
| 'x': x1, | |
| 'y': y1, | |
| 'width': x2 - x1, | |
| 'height': y2 - y1 | |
| }) | |
| total_time = time.time() - start_time | |
| logger.info(f"FACE DETECTION COMPLETED SUCCESSFULLY") | |
| logger.info(f"Total processing time: {total_time:.2f} seconds") | |
| logger.info(f"Detected {len(detected_faces)} faces with bounding boxes") | |
| return jsonify({ | |
| 'success': True, | |
| 'faces': detected_faces, | |
| 'message': f'Detected {len(detected_faces)} faces', | |
| 'processing_time': total_time | |
| }) | |
| except Exception as e: | |
| logger.error(f"FACE DETECTION ERROR: {str(e)}") | |
| logger.error(f"Error location: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def enhanced_swap_faces(): | |
| """Handle enhanced face swapping request with quality presets""" | |
| start_time = time.time() | |
| logger.info("ENHANCED SWAP REQUEST RECEIVED") | |
| try: | |
| data = request.json | |
| source_image_data = data.get('source_image') | |
| target_image_data = data.get('target_image') | |
| source_face_idx = int(data.get('source_face_idx', 1)) | |
| target_face_idx = int(data.get('target_face_idx', 1)) | |
| quality_preset = data.get('quality_preset', 'balanced') | |
| logger.info(f"Request parameters:") | |
| logger.info(f"Source image data length: {len(source_image_data) if source_image_data else 0} chars") | |
| logger.info(f"Target image data length: {len(target_image_data) if target_image_data else 0} chars") | |
| logger.info(f"Source face index: {source_face_idx}") | |
| logger.info(f"Target face index: {target_face_idx}") | |
| logger.info(f"Quality preset: {quality_preset}") | |
| if not source_image_data or not target_image_data: | |
| logger.error("Missing source or target image") | |
| return jsonify({'error': 'Missing source or target image'}), 400 | |
| # Check if enhanced swapper is available | |
| if not ENHANCED_SWAPPER_AVAILABLE: | |
| logger.warning("Enhanced swapper not available, falling back to basic swapper") | |
| return jsonify({ | |
| 'error': 'Enhanced swapper not available. Please install EnhancedFaceSwapper.py and QualityPresets.py', | |
| 'fallback_available': FACE_SWAPPER_AVAILABLE | |
| }), 501 | |
| # Create enhanced swapper with quality preset | |
| try: | |
| enhanced_swapper = create_enhanced_swapper_with_quality(quality_preset) | |
| preset_info = QualityPresets.get_preset(quality_preset) | |
| logger.info(f"Using quality preset: {preset_info['name']}") | |
| logger.info(f"Expected processing time: {preset_info['processing_time']}") | |
| logger.info(f"Quality score: {preset_info['quality_score']}") | |
| except Exception as e: | |
| logger.error(f"Failed to create enhanced swapper: {e}") | |
| return jsonify({'error': f'Failed to create enhanced swapper: {str(e)}'}), 500 | |
| # Convert base64 to images | |
| source_image = base64_to_image(source_image_data) | |
| target_image = base64_to_image(target_image_data) | |
| if source_image is None or target_image is None: | |
| logger.error("Failed to convert base64 to image") | |
| return jsonify({'error': 'Failed to decode images'}), 400 | |
| # Save temporary files | |
| timestamp = int(time.time()) | |
| source_path = os.path.join(UPLOAD_FOLDER, f'enhanced_source_{timestamp}.jpg') | |
| target_path = os.path.join(UPLOAD_FOLDER, f'enhanced_target_{timestamp}.jpg') | |
| # Save images based on their type | |
| if CV2_AVAILABLE and isinstance(source_image, np.ndarray): | |
| cv2.imwrite(source_path, source_image) | |
| elif isinstance(source_image, Image.Image): | |
| source_image.save(source_path) | |
| else: | |
| logger.error(f"Invalid source image type: {type(source_image)}") | |
| return jsonify({'error': 'Invalid source image format'}), 400 | |
| if CV2_AVAILABLE and isinstance(target_image, np.ndarray): | |
| cv2.imwrite(target_path, target_image) | |
| elif isinstance(target_image, Image.Image): | |
| target_image.save(target_path) | |
| else: | |
| logger.error(f"Invalid target image type: {type(target_image)}") | |
| return jsonify({'error': 'Invalid target image format'}), 400 | |
| logger.info(f"Saved temporary files: {source_path}, {target_path}") | |
| # Perform enhanced face swapping | |
| try: | |
| logger.info("Starting enhanced face swapping...") | |
| result_image = enhanced_swapper.swap_faces_enhanced( | |
| source_path, target_path, source_face_idx, target_face_idx | |
| ) | |
| if result_image is None: | |
| logger.error("Enhanced face swapping returned None") | |
| return jsonify({'error': 'Enhanced face swapping failed'}), 500 | |
| logger.info("Enhanced face swapping completed successfully") | |
| except Exception as e: | |
| logger.error(f"Enhanced face swapping error: {e}") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': f'Enhanced face swapping failed: {str(e)}'}), 500 | |
| # Save result | |
| result_path = os.path.join(UPLOAD_FOLDER, f'enhanced_result_{timestamp}.jpg') | |
| if CV2_AVAILABLE: | |
| cv2.imwrite(result_path, result_image) | |
| else: | |
| result_pil = Image.fromarray(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB)) | |
| result_pil.save(result_path) | |
| # Convert to base64 | |
| result_base64 = image_to_base64(result_path) | |
| # Clean up temporary files | |
| try: | |
| os.unlink(source_path) | |
| os.unlink(target_path) | |
| os.unlink(result_path) | |
| except: | |
| pass | |
| processing_time = time.time() - start_time | |
| logger.info(f"Enhanced swapping completed in {processing_time:.2f} seconds") | |
| return jsonify({ | |
| 'result_image': result_base64, | |
| 'processing_time': processing_time, | |
| 'quality_preset': quality_preset, | |
| 'preset_info': preset_info, | |
| 'enhanced_features': { | |
| 'face_structure_matching': enhanced_swapper.face_structure_matching, | |
| 'color_correction': enhanced_swapper.color_correction_enabled, | |
| 'seamless_blending': enhanced_swapper.seamless_blending_enabled, | |
| 'detail_enhancement': enhanced_swapper.enhancement_enabled | |
| } | |
| }) | |
| except Exception as e: | |
| logger.error(f"Enhanced swap endpoint error: {e}") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def get_quality_presets(): | |
| """Get available quality presets for enhanced face swapping""" | |
| try: | |
| if not ENHANCED_SWAPPER_AVAILABLE: | |
| return jsonify({ | |
| 'error': 'Enhanced swapper not available', | |
| 'fallback_available': FACE_SWAPPER_AVAILABLE, | |
| 'presets': [] | |
| }), 501 | |
| presets = QualityPresets.get_all_presets() | |
| preset_names = QualityPresets.get_preset_names() | |
| logger.info(f"Providing {len(presets)} quality presets: {preset_names}") | |
| return jsonify({ | |
| 'presets': presets, | |
| 'preset_names': preset_names, | |
| 'enhanced_available': ENHANCED_SWAPPER_AVAILABLE, | |
| 'default_preset': 'balanced' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error getting quality presets: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def multi_swap(): | |
| """Handle multi-face swapping with advanced un-cut processing to prevent overlap""" | |
| start_time = time.time() | |
| logger.info("MULTI-SWAP REQUEST RECEIVED") | |
| try: | |
| data = request.json | |
| target_image_data = data.get('target_image') | |
| assignments = data.get('assignments', {}) | |
| selected_model = data.get('model', 'inswapper_128.onnx') | |
| logger.info(f"Request parameters:") | |
| logger.info(f"Using Model: {selected_model}") | |
| logger.info(f"Target image data length: {len(target_image_data) if target_image_data else 0} chars") | |
| logger.info(f"Number of face assignments: {len(assignments)}") | |
| for face_id, source_data in assignments.items(): | |
| logger.info(f"Face {face_id}: {len(source_data) if source_data else 0} chars source data") | |
| if not target_image_data: | |
| logger.error("Missing target image") | |
| return jsonify({'error': 'Missing target image'}), 400 | |
| if not swapper: | |
| logger.error("Face swapper not initialized") | |
| return jsonify({'error': 'Face swapper not initialized'}), 500 | |
| logger.info("Converting target image to OpenCV format...") | |
| target_image = base64_to_image(target_image_data) | |
| logger.info(f"Target image dimensions: {target_image.shape[1]}x{target_image.shape[0]}") | |
| # Start with target image | |
| result = target_image.copy() | |
| logger.info("Created result image copy") | |
| # Process each face swap sequentially | |
| face_ids = sorted(assignments.keys(), key=int) | |
| logger.info(f"Processing {len(face_ids)} face assignments in order: {face_ids}") | |
| processed_faces = 0 | |
| for target_face_id in face_ids: | |
| source_image_data = assignments[target_face_id] | |
| if source_image_data: | |
| logger.info(f"Processing face {target_face_id}...") | |
| # Convert source image | |
| source_image = base64_to_image(source_image_data) | |
| # Save temporary files | |
| logger.info(f"Saving temporary files for face {target_face_id}...") | |
| source_path = os.path.join(UPLOAD_FOLDER, f'source_{target_face_id}.jpg') | |
| target_path = os.path.join(UPLOAD_FOLDER, f'target_{target_face_id}.jpg') | |
| cv2.imwrite(source_path, source_image) | |
| cv2.imwrite(target_path, result) # Use current result, not original target | |
| # Perform face swap directly without cutting/pasting rectangles | |
| logger.info(f"Performing face swap for face {target_face_id}...") | |
| swap_start = time.time() | |
| try: | |
| swapped = swapper.swap_faces( | |
| source_path, | |
| 1, # Use first face from source | |
| target_path, | |
| int(target_face_id), | |
| swap_hair=False, | |
| model_name=selected_model | |
| ) | |
| except TypeError: | |
| swapped = swapper.swap_faces( | |
| source_path, | |
| 1, | |
| target_path, | |
| int(target_face_id), | |
| swap_hair=False | |
| ) | |
| swap_time = time.time() - swap_start | |
| logger.info(f"Face swap completed in {swap_time:.2f} seconds") | |
| if swapped is not None: | |
| # Directly assign the seamlessly blended output back to result | |
| result = swapped | |
| processed_faces += 1 | |
| logger.info(f"Successfully applied face {target_face_id} to result") | |
| else: | |
| logger.warning(f"Face {target_face_id} swap failed to return an image.") | |
| else: | |
| logger.warning(f"No source image data for face {target_face_id}") | |
| total_time = time.time() - start_time | |
| logger.info(f"MULTI-SWAP COMPLETED SUCCESSFULLY") | |
| logger.info(f"Total processing time: {total_time:.2f} seconds") | |
| logger.info(f"Processed {processed_faces}/{len(face_ids)} faces successfully") | |
| # Convert result to base64 | |
| logger.info("Converting final result to base64...") | |
| result_base64 = image_to_base64(result) | |
| logger.info(f"Result size: {len(result_base64)} chars") | |
| return jsonify({ | |
| 'success': True, | |
| 'result_image': result_base64, | |
| 'message': f'Multi-face swap completed! Processed {processed_faces}/{len(face_ids)} faces.', | |
| 'processing_time': total_time, | |
| 'faces_processed': processed_faces, | |
| 'total_faces': len(face_ids) | |
| }) | |
| except Exception as e: | |
| logger.error(f"MULTI-SWAP ERROR: {str(e)}") | |
| logger.error(f"Error location: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def video_swap(): | |
| """Handle video face swapping - simplified like working app.py""" | |
| start_time = time.time() | |
| logger.info("VIDEO SWAP REQUEST RECEIVED") | |
| try: | |
| data = request.json | |
| source_image_data = data.get('source_image') | |
| video_data = data.get('video') | |
| source_face_idx = int(data.get('source_face_idx', 1)) | |
| target_face_idx = int(data.get('target_face_idx', 1)) | |
| selected_model = data.get('model', 'inswapper_128.onnx') | |
| logger.info(f"Request parameters:") | |
| logger.info(f" • Source face index: {source_face_idx}") | |
| logger.info(f" • Target face index: {target_face_idx}") | |
| logger.info(f" • Selected Model: {selected_model}") | |
| logger.info(f" • Source image data length: {len(source_image_data) if source_image_data else 0} chars") | |
| logger.info(f" • Video data length: {len(video_data) if video_data else 0} chars") | |
| if not source_image_data or not video_data: | |
| logger.error("Missing source image or video") | |
| return jsonify({'error': 'Missing source image or video'}), 400 | |
| if not swapper: | |
| logger.error("Face swapper not initialized") | |
| return jsonify({'error': 'Face swapper not initialized'}), 500 | |
| # Initialize progress tracking | |
| global video_progress | |
| video_progress = { | |
| 'processing': True, | |
| 'phase': 'initializing', | |
| 'total_frames': 0, | |
| 'processed_frames': 0, | |
| 'current_frame_base64': None, | |
| 'start_time': time.time(), | |
| 'mode': 'normal', | |
| 'fps': None, | |
| 'resolution': None, | |
| 'file_size': 0, | |
| 'processing_speed': 0, | |
| 'avg_frame_time': None, | |
| 'estimated_total_time': None, | |
| 'countdown_time': None | |
| } | |
| logger.info("Starting video processing") | |
| # Simplified video processing like app.py | |
| source_image = base64_to_image(source_image_data) | |
| source_path = os.path.join(UPLOAD_FOLDER, f'video_source_{int(time.time())}.jpg') | |
| if CV2_AVAILABLE and isinstance(source_image, np.ndarray): | |
| cv2.imwrite(source_path, source_image) | |
| elif isinstance(source_image, Image.Image): | |
| source_image.save(source_path) | |
| else: | |
| logger.error(f"Invalid source image format: {type(source_image)}") | |
| return jsonify({'error': 'Invalid source image format'}), 400 | |
| # Convert video data to file | |
| video_path = os.path.join(UPLOAD_FOLDER, f'input_video_{int(time.time())}.mp4') | |
| if 'base64,' in video_data: | |
| video_data = video_data.split('base64,')[1] | |
| video_bytes = base64.b64decode(video_data) | |
| with open(video_path, 'wb') as f: | |
| f.write(video_bytes) | |
| video_progress['file_size'] = os.path.getsize(video_path) if os.path.exists(video_path) else 0 | |
| logger.info(f"Video file size: {video_progress['file_size'] / (1024*1024):.2f} MB") | |
| # Setup processing directories | |
| frames_dir = os.path.join(UPLOAD_FOLDER, 'video_frames') | |
| swapped_dir = os.path.join(UPLOAD_FOLDER, 'swapped_frames') | |
| output_video_path = os.path.join(UPLOAD_FOLDER, f'output_swapped_video_{int(time.time())}.mp4') | |
| # Clean up and create directories | |
| if os.path.exists(frames_dir): | |
| shutil.rmtree(frames_dir) | |
| if os.path.exists(swapped_dir): | |
| shutil.rmtree(swapped_dir) | |
| os.makedirs(frames_dir, exist_ok=True) | |
| os.makedirs(swapped_dir, exist_ok=True) | |
| try: | |
| # Extract frames from video (like app.py) | |
| logger.info("Extracting frames from video...") | |
| video_progress['phase'] = 'extracting' | |
| frame_paths = extract_video_frames(video_path, frames_dir) | |
| video_progress['total_frames'] = len(frame_paths) | |
| logger.info(f"Extracted {len(frame_paths)} frames") | |
| # Get FPS from original video (like app.py) | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| video_progress['fps'] = int(fps) if fps > 0 else 30 | |
| video_progress['resolution'] = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) | |
| cap.release() | |
| logger.info(f"Video: {video_progress['fps']} FPS, {video_progress['resolution']} resolution") | |
| # Process frames with face swapping (simplified like app.py) | |
| logger.info("Swapping faces on frames...") | |
| video_progress['phase'] = 'swapping' | |
| processed_count = 0 | |
| last_update_time = time.time() | |
| # Process ALL frames sequentially (matching working app.py) | |
| for idx, frame_path in enumerate(frame_paths): | |
| frame_start_time = time.time() | |
| swapped_name = f"swapped_{idx:05d}.jpg" | |
| out_path = os.path.join(swapped_dir, swapped_name) | |
| try: | |
| # Simple face swap call (like app.py line 343) | |
| try: | |
| swapped_frame = swapper.swap_faces( | |
| source_path=source_path, | |
| source_face_idx=source_face_idx, | |
| target_path=frame_path, | |
| target_face_idx=target_face_idx, | |
| model_name=selected_model | |
| ) | |
| except TypeError: | |
| swapped_frame = swapper.swap_faces( | |
| source_path=source_path, | |
| source_face_idx=source_face_idx, | |
| target_path=frame_path, | |
| target_face_idx=target_face_idx | |
| ) | |
| # Save swapped frame (like app.py line 344) | |
| if CV2_AVAILABLE and isinstance(swapped_frame, np.ndarray): | |
| cv2.imwrite(out_path, swapped_frame) | |
| processed_count += 1 | |
| # Update progress tracking | |
| video_progress['processed_frames'] = processed_count | |
| # Update progress with frame preview every 10 frames | |
| if processed_count % 10 == 0 and CV2_AVAILABLE and isinstance(swapped_frame, np.ndarray): | |
| _, buffer = cv2.imencode('.jpg', swapped_frame) | |
| frame_base64 = base64.b64encode(buffer).decode('utf-8') | |
| video_progress['current_frame_base64'] = f"data:image/jpeg;base64,{frame_base64}" | |
| # Update progress every 5 frames | |
| if processed_count % 5 == 0: | |
| elapsed = time.time() - video_progress['start_time'] | |
| avg_time = elapsed / processed_count if processed_count > 0 else 0 | |
| remaining_frames = len(frame_paths) - processed_count | |
| remaining_time = avg_time * remaining_frames | |
| # Estimate total time with buffer | |
| buffer_time = 10 | |
| total_estimated = elapsed + remaining_time + buffer_time | |
| video_progress['countdown_time'] = round(total_estimated, 1) | |
| mins, secs = divmod(int(remaining_time), 60) | |
| logger.info(f"Processed {processed_count}/{len(frame_paths)} frames | Est. time left: {mins:02d}:{secs:02d}") | |
| except Exception as e: | |
| logger.error(f"Error processing frame {idx}: {e}") | |
| # Copy original frame if swap fails (like app.py line 345) | |
| if os.path.exists(frame_path): | |
| shutil.copy2(frame_path, out_path) | |
| processed_count += 1 | |
| video_progress['processed_frames'] = processed_count | |
| logger.info(f"Face swapping completed. Processed {processed_count} frames.") | |
| # Combine swapped frames into video (like app.py line 349-350) | |
| logger.info("Combining swapped frames into video...") | |
| video_progress['phase'] = 'rendering' | |
| create_video_from_frames(swapped_dir, output_video_path, int(video_progress['fps'])) | |
| # Convert output video to base64 for response | |
| with open(output_video_path, 'rb') as f: | |
| video_bytes = f.read() | |
| output_video_base64 = base64.b64encode(video_bytes).decode() | |
| output_video_data_url = f"data:video/mp4;base64,{output_video_base64}" | |
| # Clean up temporary files (like app.py line 355-357) | |
| try: | |
| shutil.rmtree(frames_dir) | |
| shutil.rmtree(swapped_dir) | |
| os.remove(video_path) | |
| os.remove(source_path) | |
| except Exception as cleanup_error: | |
| logger.warning(f"Cleanup warning: {cleanup_error}") | |
| # Final progress update | |
| processing_time = time.time() - video_progress['start_time'] | |
| video_progress['phase'] = 'complete' | |
| video_progress['processing'] = False | |
| logger.info(f"VIDEO SWAPPING COMPLETED SUCCESSFULLY") | |
| logger.info(f"Total processing time: {processing_time:.1f} seconds") | |
| logger.info(f"Frames processed: {processed_count}") | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Video face swap completed successfully!', | |
| 'processing_mode': 'Standard face swap processing', | |
| 'quality_level': 'High Quality', | |
| 'frames_processed': processed_count, | |
| 'total_frames': len(frame_paths), | |
| 'output_fps': video_progress['fps'], | |
| 'processing_time': round(processing_time, 1), | |
| 'output_video': output_video_data_url, | |
| 'total_frames': len(frame_paths) | |
| }) | |
| except Exception as processing_error: | |
| logger.error(f"VIDEO PROCESSING ERROR: {str(processing_error)}") | |
| logger.error(f"Error location: {type(processing_error).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| # Reset progress on error | |
| video_progress['processing'] = False | |
| video_progress['phase'] = 'error' | |
| # Clean up on error | |
| try: | |
| if os.path.exists(frames_dir): | |
| shutil.rmtree(frames_dir) | |
| if os.path.exists(swapped_dir): | |
| shutil.rmtree(swapped_dir) | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| if os.path.exists(source_path): | |
| os.remove(source_path) | |
| except: | |
| pass | |
| return jsonify({'error': f'Video processing failed: {str(processing_error)}'}), 500 | |
| except Exception as e: | |
| logger.error(f"VIDEO SWAP ERROR: {str(e)}") | |
| logger.error(f"Error location: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def video_progress_endpoint(): | |
| """Get current video processing progress""" | |
| return jsonify(video_progress) | |
| def multi_combination_swap(): | |
| """Handle multiple source and target image combinations with GPU optimization""" | |
| start_time = time.time() | |
| logger.info("MULTI-COMBINATION SWAP REQUEST RECEIVED") | |
| try: | |
| data = request.json | |
| source_images = data.get('source_images', []) | |
| target_images = data.get('target_images', []) | |
| selected_model = data.get('model', 'inswapper_128.onnx') | |
| logger.info(f"Processing {len(source_images)} source images x {len(target_images)} target images") | |
| logger.info(f"Using Model: {selected_model}") | |
| if not source_images or not target_images: | |
| return jsonify({'error': 'Missing source or target images'}), 400 | |
| if not swapper: | |
| return jsonify({'error': 'Face swapper not initialized'}), 500 | |
| results = [] | |
| timestamp = int(time.time()) | |
| # Check if GPU batch processing is available and beneficial | |
| use_batch_processing = (hasattr(swapper, 'gpu_enabled') and | |
| swapper.gpu_enabled and | |
| len(source_images) * len(target_images) > 1) | |
| if use_batch_processing: | |
| logger.info("Using GPU batch processing for optimal performance") | |
| # Save all source and target images first | |
| source_paths = [] | |
| target_paths = [] | |
| for i, source_image_data in enumerate(source_images): | |
| source_image = base64_to_image(source_image_data) | |
| source_path = os.path.join(UPLOAD_FOLDER, f'batch_source_{timestamp}_{i}.jpg') | |
| cv2.imwrite(source_path, source_image) | |
| source_paths.append(source_path) | |
| for j, target_image_data in enumerate(target_images): | |
| target_image = base64_to_image(target_image_data) | |
| target_path = os.path.join(UPLOAD_FOLDER, f'batch_target_{timestamp}_{j}.jpg') | |
| cv2.imwrite(target_path, target_image) | |
| target_paths.append(target_path) | |
| # Use batch processing for GPU optimization | |
| try: | |
| # Process first source against all targets as batch | |
| for i, source_path in enumerate(source_paths): | |
| source_face_indices = [1] # Use first face from each source | |
| # Use the optimized batch method | |
| try: | |
| batch_results = swapper.swap_faces_batch( | |
| source_path=source_path, | |
| target_path=target_paths[0], # Will be overridden in batch processing | |
| source_face_indices=source_face_indices, | |
| target_face_indices=list(range(1, len(target_paths) + 1)), | |
| swap_hair=False, | |
| model_name=selected_model | |
| ) | |
| except TypeError: | |
| batch_results = swapper.swap_faces_batch( | |
| source_path=source_path, | |
| target_path=target_paths[0], | |
| source_face_indices=source_face_indices, | |
| target_face_indices=list(range(1, len(target_paths) + 1)), | |
| swap_hair=False | |
| ) | |
| # Convert batch results to response format | |
| for j, result in enumerate(batch_results): | |
| if j < len(target_paths): | |
| # Apply edge smoothing to reduce blocky appearance | |
| result = apply_edge_smoothing(result) | |
| result_base64 = image_to_base64(result) | |
| result_filename = f"combo_{timestamp}_source{i+1}_target{j+1}.jpg" | |
| result_path = os.path.join(UPLOAD_FOLDER, result_filename) | |
| cv2.imwrite(result_path, result) | |
| results.append({ | |
| 'combination_name': f'Source {i+1} x Target {j+1}', | |
| 'source_index': i, | |
| 'target_index': j, | |
| 'result_image': result_base64, | |
| 'download_url': f'/download/{result_filename}' | |
| }) | |
| logger.info(f"GPU batch processed: Source {i+1} x Target {j+1}") | |
| except Exception as batch_error: | |
| logger.warning(f"GPU batch processing failed: {batch_error}") | |
| logger.info("Falling back to individual processing...") | |
| use_batch_processing = False | |
| if not use_batch_processing: | |
| logger.info("Using individual processing (CPU or GPU fallback)") | |
| # Process all combinations individually (original method with GPU support) | |
| for i, source_image_data in enumerate(source_images): | |
| for j, target_image_data in enumerate(target_images): | |
| try: | |
| logger.info(f"Processing combination {i+1}x{j+1}...") | |
| # Convert base64 to OpenCV images | |
| source_image = base64_to_image(source_image_data) | |
| target_image = base64_to_image(target_image_data) | |
| # Save temporary files | |
| source_path = os.path.join(UPLOAD_FOLDER, f'combo_{timestamp}_source_{i}_{j}.jpg') | |
| target_path = os.path.join(UPLOAD_FOLDER, f'combo_{timestamp}_target_{i}_{j}.jpg') | |
| cv2.imwrite(source_path, source_image) | |
| cv2.imwrite(target_path, target_image) | |
| # Perform face swap with GPU acceleration | |
| try: | |
| swapped_result = swapper.swap_faces( | |
| source_path, | |
| 1, # Use first face from source (default for multi-combination) | |
| target_path, | |
| 1, # Use first face from target (default for multi-combination) | |
| swap_hair=False, | |
| model_name=selected_model | |
| ) | |
| except TypeError: | |
| swapped_result = swapper.swap_faces( | |
| source_path, | |
| 1, | |
| target_path, | |
| 1, | |
| swap_hair=False | |
| ) | |
| # Apply edge smoothing to reduce blocky appearance | |
| swapped_result = apply_edge_smoothing(swapped_result) | |
| # Convert result to base64 | |
| result_base64 = image_to_base64(swapped_result) | |
| # Save result file for download | |
| result_filename = f"combo_{timestamp}_source{i+1}_target{j+1}.jpg" | |
| result_path = os.path.join(UPLOAD_FOLDER, result_filename) | |
| cv2.imwrite(result_path, swapped_result) | |
| results.append({ | |
| 'combination_name': f'Source {i+1} x Target {j+1}', | |
| 'source_index': i, | |
| 'target_index': j, | |
| 'result_image': result_base64, | |
| 'download_url': f'/download/{result_filename}' | |
| }) | |
| logger.info(f"Successfully processed combination {i+1}x{j+1}") | |
| except Exception as e: | |
| logger.error(f"Error processing combination {i+1}x{j+1}: {e}") | |
| # Continue with other combinations | |
| continue | |
| if not results: | |
| return jsonify({'error': 'No combinations processed successfully'}), 500 | |
| total_time = time.time() - start_time | |
| gpu_status = getattr(swapper, 'gpu_enabled', False) | |
| logger.info(f"MULTI-COMBINATION SWAP COMPLETED") | |
| logger.info(f"Total combinations: {len(source_images) * len(target_images)}") | |
| logger.info(f"Successful: {len(results)}") | |
| logger.info(f"Processing time: {total_time:.2f}s") | |
| logger.info(f"GPU acceleration: {'Enabled' if gpu_status else 'Disabled'}") | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Processed {len(results)} combinations successfully! (GPU: {"Enabled" if gpu_status else "Disabled"})', | |
| 'results': results, | |
| 'total_combinations': len(source_images) * len(target_images), | |
| 'successful_combinations': len(results), | |
| 'processing_time': total_time, | |
| 'gpu_accelerated': gpu_status, | |
| 'batch_processing_used': use_batch_processing | |
| }) | |
| except Exception as e: | |
| logger.error(f"MULTI-COMBINATION SWAP ERROR: {str(e)}") | |
| logger.error(f"Error location: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def test_endpoint(): | |
| """Simple test endpoint to check if server is responding""" | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Server is working correctly', | |
| 'timestamp': int(time.time()) | |
| }) | |
| def face_morph(): | |
| """Handle face morphing between two faces with real GIF/MP4 generation""" | |
| start_time = time.time() | |
| logger.info("FACE MORPH REQUEST RECEIVED") | |
| try: | |
| data = request.json | |
| source_image_data = data.get('source_image') | |
| target_image_data = data.get('target_image') | |
| source_face_idx = int(data.get('source_face_idx', 1)) | |
| target_face_idx = int(data.get('target_face_idx', 1)) | |
| morph_speed = data.get('morph_speed', 'normal') | |
| morph_frames = int(data.get('morph_frames', 30)) | |
| output_format = data.get('output_format', 'gif') | |
| selected_model = data.get('model', 'inswapper_128.onnx') | |
| logger.info(f"Request parameters:") | |
| logger.info(f" • Source face index: {source_face_idx}") | |
| logger.info(f" • Target face index: {target_face_idx}") | |
| logger.info(f" • Morph speed: {morph_speed}") | |
| logger.info(f" • Morph frames: {morph_frames}") | |
| logger.info(f" • Output format: {output_format}") | |
| logger.info(f" • Using Model: {selected_model}") | |
| if not source_image_data or not target_image_data: | |
| logger.error("Missing source or target image") | |
| return jsonify({'error': 'Missing source or target image'}), 400 | |
| if not swapper: | |
| logger.error("Face swapper not initialized") | |
| return jsonify({'error': 'Face swapper not initialized'}), 500 | |
| logger.info("Converting base64 images to OpenCV format...") | |
| # Convert base64 to OpenCV images | |
| source_image = base64_to_image(source_image_data) | |
| target_image = base64_to_image(target_image_data) | |
| logger.info(f"Image dimensions:") | |
| logger.info(f" • Source: {source_image.shape[1]}x{source_image.shape[0]}") | |
| logger.info(f" • Target: {target_image.shape[1]}x{target_image.shape[0]}") | |
| # Save temporary files | |
| logger.info("Saving temporary files...") | |
| source_path = os.path.join(UPLOAD_FOLDER, 'morph_source.jpg') | |
| target_path = os.path.join(UPLOAD_FOLDER, 'morph_target.jpg') | |
| cv2.imwrite(source_path, source_image) | |
| cv2.imwrite(target_path, target_image) | |
| logger.info(f"Temporary files saved: {source_path}, {target_path}") | |
| # Perform face swap to get both faces in same position | |
| try: | |
| source_swapped = swapper.swap_faces( | |
| source_path, | |
| source_face_idx, | |
| target_path, | |
| target_face_idx, | |
| swap_hair=False, | |
| model_name=selected_model | |
| ) | |
| except TypeError: | |
| source_swapped = swapper.swap_faces( | |
| source_path, | |
| source_face_idx, | |
| target_path, | |
| target_face_idx, | |
| swap_hair=False | |
| ) | |
| # Extract face regions | |
| faces = swapper.app.get(target_image) | |
| faces = sorted(faces, key=lambda x: x.bbox[0]) | |
| if target_face_idx <= len(faces): | |
| face = faces[target_face_idx - 1] | |
| x1, y1, x2, y2 = [int(v) for v in face.bbox] | |
| # Perform face swap to get full image with swapped face | |
| full_swapped_image = source_swapped.copy() | |
| # Generate morph frames (full images with animated face region) | |
| morph_frames_list = [] | |
| for i in range(morph_frames): | |
| # Calculate blend ratio (0.0 to 1.0) | |
| ratio = i / (morph_frames - 1) if morph_frames > 1 else 0 | |
| # Create morph frame by blending only the face region | |
| morph_frame = target_image.copy() | |
| # Extract face regions | |
| original_face = target_image[y1:y2, x1:x2] | |
| swapped_face = full_swapped_image[y1:y2, x1:x2] | |
| # Blend only the face region | |
| alpha = ratio | |
| beta = 1.0 - alpha | |
| morphed_face = cv2.addWeighted(swapped_face, alpha, original_face, beta, 0) | |
| # Paste morphed face back into full image | |
| morph_frame[y1:y2, x1:x2] = morphed_face | |
| # Convert to RGB PIL Image for GIF creation | |
| morphed_rgb = cv2.cvtColor(morph_frame, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(morphed_rgb) | |
| morph_frames_list.append(pil_image) | |
| # Calculate duration based on speed | |
| speed_duration_map = { | |
| 'slow': 5000, # 5 seconds total | |
| 'normal': 3000, # 3 seconds total | |
| 'fast': 1500 # 1.5 seconds total | |
| } | |
| total_duration = speed_duration_map.get(morph_speed, 3000) | |
| frame_duration = total_duration // morph_frames # Duration per frame in milliseconds | |
| # Generate output file | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| if output_format == 'gif': | |
| # Create animated GIF | |
| gif_path = os.path.join(UPLOAD_FOLDER, f'face_morph_{timestamp}.gif') | |
| morph_frames_list[0].save( | |
| gif_path, | |
| save_all=True, | |
| append_images=morph_frames_list[1:], | |
| duration=frame_duration, | |
| loop=0, | |
| optimize=True | |
| ) | |
| # Convert GIF to base64 | |
| with open(gif_path, 'rb') as f: | |
| gif_data = f.read() | |
| gif_base64 = base64.b64encode(gif_data).decode() | |
| gif_data_url = f"data:image/gif;base64,{gif_base64}" | |
| return jsonify({ | |
| 'success': True, | |
| 'output_file': gif_data_url, | |
| 'download_url': f'/download/face_morph_{timestamp}.gif', | |
| 'total_duration': total_duration, | |
| 'output_format': output_format, | |
| 'frame_count': morph_frames, | |
| 'message': f'Face morph GIF created! {morph_frames} frames, {total_duration/1000:.1f}s duration.' | |
| }) | |
| elif output_format == 'mp4': | |
| # Create MP4 video using imageio | |
| mp4_path = os.path.join(UPLOAD_FOLDER, f'face_morph_{timestamp}.mp4') | |
| # Convert PIL frames to numpy arrays for imageio | |
| video_frames = [np.array(frame) for frame in morph_frames_list] | |
| # Write MP4 with imageio | |
| imageio.mimsave(mp4_path, video_frames, fps=(1000/frame_duration), quality=8) | |
| # For now, return first frame as preview (full MP4 download via separate endpoint) | |
| preview_base64 = image_to_base64(cv2.cvtColor(video_frames[0], cv2.COLOR_RGB2BGR)) | |
| return jsonify({ | |
| 'success': True, | |
| 'preview_image': preview_base64, | |
| 'download_url': f'/download/face_morph_{timestamp}.mp4', | |
| 'total_duration': total_duration, | |
| 'output_format': output_format, | |
| 'frame_count': morph_frames, | |
| 'message': f'Face morph MP4 created! {morph_frames} frames, {total_duration/1000:.1f}s duration.' | |
| }) | |
| else: | |
| return jsonify({'error': f'Target face index {target_face_idx} not found'}), 400 | |
| except Exception as e: | |
| logger.error(f"FACE MORPH ERROR: {str(e)}") | |
| logger.error(f"Error location: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| return jsonify({'error': str(e)}), 500 | |
| def download_file(filename): | |
| """Serve generated files for download""" | |
| try: | |
| file_path = os.path.join(UPLOAD_FOLDER, filename) | |
| if os.path.exists(file_path): | |
| return send_file(file_path, as_attachment=True) | |
| else: | |
| return jsonify({'error': 'File not found'}), 404 | |
| except Exception as e: | |
| print(f"Download error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| # Pinterest integration removed - file not available | |
| # from pinterest_integration import search_pinterest_images, download_pinterest_images, scrape_pinterest_board | |
| def download_image_from_url(url, filename=None): | |
| """Download an image from URL and return base64 encoded data""" | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| # Convert to base64 | |
| image_base64 = base64.b64encode(response.content).decode('utf-8') | |
| # Determine image type | |
| content_type = response.headers.get('content-type', 'image/jpeg') | |
| if 'png' in content_type: | |
| data_url = f"data:image/png;base64,{image_base64}" | |
| else: | |
| data_url = f"data:image/jpeg;base64,{image_base64}" | |
| return { | |
| 'success': True, | |
| 'data_url': data_url, | |
| 'size': len(response.content), | |
| 'content_type': content_type | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': f'Download failed: {str(e)}'} | |
| def batch_detect_faces(): | |
| """Batch detect faces in multiple images for auto-harvesting""" | |
| try: | |
| data = request.json | |
| images = data.get('images', []) | |
| if not images: | |
| return jsonify({'error': 'No images provided'}), 400 | |
| if not swapper: | |
| return jsonify({'error': 'Face swapper not initialized'}), 500 | |
| results = [] | |
| for idx, image_data in enumerate(images): | |
| try: | |
| # Convert base64 to OpenCV image | |
| image = base64_to_image(image_data) | |
| # Detect faces | |
| faces = swapper.app.get(image) | |
| # Sort faces from left to right | |
| faces = sorted(faces, key=lambda x: x.bbox[0]) | |
| # Prepare face data | |
| detected_faces = [] | |
| for i, face in enumerate(faces): | |
| x1, y1, x2, y2 = [int(v) for v in face.bbox] | |
| # Extract face region | |
| face_region = image[y1:y2, x1:x2] | |
| # Convert to base64 | |
| face_base64 = image_to_base64(face_region) | |
| detected_faces.append({ | |
| 'id': i + 1, | |
| 'label': f'Face {i + 1}', | |
| 'image': face_base64, | |
| 'bbox': [x1, y1, x2, y2] | |
| }) | |
| results.append({ | |
| 'image_index': idx, | |
| 'success': True, | |
| 'faces': detected_faces, | |
| 'face_count': len(detected_faces) | |
| }) | |
| except Exception as e: | |
| results.append({ | |
| 'image_index': idx, | |
| 'success': False, | |
| 'error': str(e), | |
| 'faces': [], | |
| 'face_count': 0 | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'results': results, | |
| 'total_images': len(images), | |
| 'total_faces': sum(r['face_count'] for r in results) | |
| }) | |
| except Exception as e: | |
| print(f"Batch face detection error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def pinterest_search(): | |
| """Search Pinterest for images using pinterest-dl""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'success': False, 'error': 'No JSON data received'}), 400 | |
| query = data.get('query', '').strip() | |
| per_page = min(data.get('per_page', 20), 50) # Limit to 50 max | |
| if not query: | |
| return jsonify({'success': False, 'error': 'Query parameter is required'}), 400 | |
| print(f"Pinterest search request: {query} (limit: {per_page})") | |
| # Search for images using pinterest-dl | |
| result = search_pinterest_images(query, per_page) | |
| if result['success']: | |
| print(f"Found {result['total_found']} images for: {query}") | |
| return jsonify(result) | |
| else: | |
| print(f"Pinterest search failed: {result.get('error', 'Unknown error')}") | |
| return jsonify(result), 500 | |
| except Exception as e: | |
| error_msg = f'Server error: {str(e)}' | |
| print(f"Pinterest search error: {error_msg}") | |
| return jsonify({'success': False, 'error': error_msg}), 500 | |
| def pinterest_download(): | |
| """Download images from Pinterest using pinterest-dl""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'success': False, 'error': 'No JSON data received'}), 400 | |
| query = data.get('query', '').strip() | |
| num_images = min(data.get('num_images', 20), 100) # Limit to 100 max | |
| output_dir = data.get('output_dir', 'pinterest_downloads') | |
| if not query: | |
| return jsonify({'success': False, 'error': 'Query parameter is required'}), 400 | |
| print(f"Pinterest download request: {query} (count: {num_images})") | |
| # Download images using pinterest-dl | |
| result = download_pinterest_images(query, num_images, output_dir) | |
| if result['success']: | |
| print(f"Downloaded {result['total_downloaded']} images to: {result['output_directory']}") | |
| return jsonify(result) | |
| else: | |
| print(f"Pinterest download failed: {result.get('error', 'Unknown error')}") | |
| return jsonify(result), 500 | |
| except Exception as e: | |
| error_msg = f'Server error: {str(e)}' | |
| print(f"Pinterest download error: {error_msg}") | |
| return jsonify({'success': False, 'error': error_msg}), 500 | |
| def pinterest_scrape(): | |
| """Scrape images from a Pinterest board using pinterest-dl""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'success': False, 'error': 'No JSON data received'}), 400 | |
| board_url = data.get('board_url', '').strip() | |
| num_images = min(data.get('num_images', 50), 200) # Limit to 200 max | |
| output_dir = data.get('output_dir', 'pinterest_downloads') | |
| if not board_url: | |
| return jsonify({'success': False, 'error': 'Board URL parameter is required'}), 400 | |
| # Validate Pinterest URL | |
| if not ('pinterest.com' in board_url and ('/board/' in board_url or '/pin/' in board_url)): | |
| return jsonify({'success': False, 'error': 'Invalid Pinterest board URL'}), 400 | |
| print(f"Pinterest scrape request: {board_url} (count: {num_images})") | |
| # Scrape board using pinterest-dl | |
| result = scrape_pinterest_board(board_url, num_images, output_dir) | |
| if result['success']: | |
| print(f"Scraped {result['total_scraped']} images from board") | |
| return jsonify(result) | |
| else: | |
| print(f"Pinterest scrape failed: {result.get('error', 'Unknown error')}") | |
| return jsonify(result), 500 | |
| except Exception as e: | |
| error_msg = f'Server error: {str(e)}' | |
| print(f"Pinterest scrape error: {error_msg}") | |
| return jsonify({'success': False, 'error': error_msg}), 500 | |
| def serve_pinterest_image(filename): | |
| """Serve Pinterest images from the downloads directory""" | |
| try: | |
| # Construct the file path | |
| pinterest_dir = Path("pinterest_downloads") | |
| file_path = pinterest_dir / filename | |
| # Security check - ensure file is within pinterest_downloads | |
| if not str(file_path).startswith(str(pinterest_dir.absolute())): | |
| return jsonify({'error': 'Invalid file path'}), 403 | |
| if file_path.exists() and file_path.is_file(): | |
| return send_file(str(file_path)) | |
| else: | |
| return jsonify({'error': 'File not found'}), 404 | |
| except Exception as e: | |
| print(f"Error serving Pinterest image: {e}") | |
| return jsonify({'error': 'Server error'}), 500 | |
| def save_preset(): | |
| """Save an image to the presets folder on PC""" | |
| try: | |
| data = request.json | |
| image_url = data.get('url') | |
| title = data.get('title', 'untitled') | |
| if not image_url: | |
| return jsonify({'success': False, 'error': 'No image URL provided'}), 400 | |
| # Create presets folder if it doesn't exist | |
| presets_folder = os.path.join(os.getcwd(), 'presets') | |
| os.makedirs(presets_folder, exist_ok=True) | |
| # Download the image | |
| response = requests.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| # Generate a safe filename | |
| safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| timestamp = int(time.time()) | |
| filename = f"{safe_title}_{timestamp}.jpg" | |
| filepath = os.path.join(presets_folder, filename) | |
| # Save the image | |
| with open(filepath, 'wb') as f: | |
| f.write(response.content) | |
| # Convert to base64 for immediate use | |
| image_base64 = base64.b64encode(response.content).decode('utf-8') | |
| data_url = f"data:image/jpeg;base64,{image_base64}" | |
| return jsonify({ | |
| 'success': True, | |
| 'filename': filename, | |
| 'filepath': filepath, | |
| 'data_url': data_url, | |
| 'folder': presets_folder | |
| }) | |
| except Exception as e: | |
| print(f"Save preset error: {e}") | |
| return jsonify({'success': False, 'error': f'Failed to save preset: {str(e)}'}), 500 | |
| def save_all_presets(): | |
| """Save multiple images to the presets folder on PC""" | |
| try: | |
| data = request.json | |
| images = data.get('images', []) | |
| if not images: | |
| return jsonify({'success': False, 'error': 'No images provided'}), 400 | |
| # Create presets folder if it doesn't exist | |
| presets_folder = os.path.join(os.getcwd(), 'presets') | |
| os.makedirs(presets_folder, exist_ok=True) | |
| saved_images = [] | |
| failed_images = [] | |
| for i, image in enumerate(images): | |
| try: | |
| image_url = image.get('url') | |
| title = image.get('title', f'image_{i}') | |
| if not image_url: | |
| failed_images.append({'title': title, 'error': 'No URL provided'}) | |
| continue | |
| # Download the image | |
| response = requests.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| # Generate a safe filename | |
| safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| timestamp = int(time.time()) + i # Add offset to avoid duplicates | |
| filename = f"{safe_title}_{timestamp}.jpg" | |
| filepath = os.path.join(presets_folder, filename) | |
| # Save the image | |
| with open(filepath, 'wb') as f: | |
| f.write(response.content) | |
| # Convert to base64 for immediate use | |
| image_base64 = base64.b64encode(response.content).decode('utf-8') | |
| data_url = f"data:image/jpeg;base64,{image_base64}" | |
| saved_images.append({ | |
| 'title': title, | |
| 'filename': filename, | |
| 'filepath': filepath, | |
| 'data_url': data_url, | |
| 'original_url': image_url | |
| }) | |
| except Exception as e: | |
| failed_images.append({'title': image.get('title', f'image_{i}'), 'error': str(e)}) | |
| return jsonify({ | |
| 'success': True, | |
| 'saved_count': len(saved_images), | |
| 'failed_count': len(failed_images), | |
| 'saved_images': saved_images, | |
| 'failed_images': failed_images, | |
| 'folder': presets_folder | |
| }) | |
| except Exception as e: | |
| print(f"Save all presets error: {e}") | |
| return jsonify({'success': False, 'error': f'Failed to save presets: {str(e)}'}), 500 | |
| def load_presets(): | |
| """Load all saved presets from the presets folder""" | |
| try: | |
| presets_folder = os.path.join(os.getcwd(), 'presets') | |
| if not os.path.exists(presets_folder): | |
| return jsonify({'success': True, 'presets': []}) | |
| presets = [] | |
| # Get all image files in the presets folder | |
| for filename in os.listdir(presets_folder): | |
| if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): | |
| filepath = os.path.join(presets_folder, filename) | |
| try: | |
| # Get file modification time | |
| mod_time = os.path.getmtime(filepath) | |
| # Read image and convert to base64 | |
| with open(filepath, 'rb') as f: | |
| image_data = f.read() | |
| image_base64 = base64.b64encode(image_data).decode('utf-8') | |
| data_url = f"data:image/jpeg;base64,{image_base64}" | |
| # Extract title from filename (remove timestamp and extension) | |
| name_without_ext = os.path.splitext(filename)[0] | |
| # Remove timestamp if present (last part after underscore) | |
| parts = name_without_ext.rsplit('_', 1) | |
| if len(parts) > 1 and parts[-1].isdigit(): | |
| title = parts[0].replace('_', ' ').title() | |
| else: | |
| title = name_without_ext.replace('_', ' ').title() | |
| presets.append({ | |
| 'id': filename, # Use filename as ID | |
| 'filename': filename, | |
| 'title': title, | |
| 'filepath': filepath, | |
| 'data_url': data_url, | |
| 'saved_at': mod_time | |
| }) | |
| except Exception as e: | |
| print(f"Error loading preset {filename}: {e}") | |
| continue | |
| # Sort by saved_at (newest first) | |
| presets.sort(key=lambda x: x['saved_at'], reverse=True) | |
| return jsonify({'success': True, 'presets': presets}) | |
| except Exception as e: | |
| print(f"Load presets error: {e}") | |
| return jsonify({'success': False, 'error': f'Failed to load presets: {str(e)}'}), 500 | |
| def delete_preset(): | |
| """Delete a preset file from the presets folder""" | |
| try: | |
| data = request.json | |
| filename = data.get('filename') | |
| if not filename: | |
| return jsonify({'success': False, 'error': 'No filename provided'}), 400 | |
| presets_folder = os.path.join(os.getcwd(), 'presets') | |
| filepath = os.path.join(presets_folder, filename) | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| return jsonify({'success': True, 'deleted': filename}) | |
| else: | |
| return jsonify({'success': False, 'error': 'File not found'}), 404 | |
| except Exception as e: | |
| print(f"Delete preset error: {e}") | |
| return jsonify({'success': False, 'error': f'Failed to delete preset: {str(e)}'}), 500 | |
| def serve_preset(filename): | |
| """Serve a preset image file""" | |
| try: | |
| presets_folder = os.path.join(os.getcwd(), 'presets') | |
| filepath = os.path.join(presets_folder, filename) | |
| if os.path.exists(filepath): | |
| return send_file(filepath) | |
| else: | |
| return jsonify({'error': 'File not found'}), 404 | |
| except Exception as e: | |
| print(f"Serve preset error: {e}") | |
| return jsonify({'error': 'Failed to serve file'}), 500 | |
| def save_preset_direct(): | |
| """Save AI-generated image directly to presets folder""" | |
| try: | |
| if 'image' not in request.files: | |
| return jsonify({'error': 'No image file provided'}), 400 | |
| file = request.files['image'] | |
| title = request.form.get('title', 'AI Generated Face') | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| # Create presets directory if it doesn't exist | |
| presets_dir = os.path.join(os.getcwd(), 'presets') | |
| os.makedirs(presets_dir, exist_ok=True) | |
| # Generate unique filename | |
| timestamp = int(time.time()) | |
| import re | |
| safe_title = re.sub(r'[^a-zA-Z0-9]', '_', title)[:50] | |
| filename = f"ai_face_{safe_title}_{timestamp}.jpg" | |
| filepath = os.path.join(presets_dir, filename) | |
| # Save the file | |
| file.save(filepath) | |
| # Convert to base64 for immediate use | |
| data_url = image_to_base64(filepath) | |
| return jsonify({ | |
| 'success': True, | |
| 'filename': filename, | |
| 'filepath': filepath, | |
| 'folder': presets_dir, | |
| 'data_url': data_url, | |
| 'message': f'AI face saved as {filename}' | |
| }) | |
| except Exception as e: | |
| print(f"Save AI preset error: {e}") | |
| return jsonify({'error': f'Failed to save AI face: {str(e)}'}), 500 | |
| def health_check(): | |
| """Health check endpoint""" | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'face_swapper': 'loaded' if swapper else 'not loaded', | |
| 'upload_folder': UPLOAD_FOLDER | |
| }) | |
| def perchance_generate(): | |
| """Proxy endpoint for Perchance AI image generation using iframe scraping""" | |
| try: | |
| data = request.get_json() | |
| prompt = data.get('prompt', '') | |
| model = data.get('model', 'realistic') | |
| if not prompt: | |
| return jsonify({'error': 'Prompt is required'}), 400 | |
| print(f"🎨 Perchance AI Request: {prompt[:50]}...") | |
| # Method 1: Try to scrape Perchance iframe for generated image | |
| try: | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| import time | |
| # Setup Chrome options for headless browsing | |
| chrome_options = Options() | |
| chrome_options.add_argument("--headless") | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| chrome_options.add_argument("--disable-gpu") | |
| chrome_options.add_argument("--window-size=1024,768") | |
| # Create driver | |
| driver = webdriver.Chrome(options=chrome_options) | |
| # Navigate to Perchance with prompt | |
| perchance_url = f"https://perchance.org/ai-text-to-image-generator?prompt={requests.utils.quote(prompt)}" | |
| driver.get(perchance_url) | |
| print("🔄 Waiting for Perchance to load...") | |
| time.sleep(3) | |
| # Wait for image to appear (up to 30 seconds) | |
| try: | |
| # Look for generated image | |
| image_element = WebDriverWait(driver, 30).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, "img[src*='generated'], img[src*='perchance'], img[src*='image']")) | |
| ) | |
| image_src = image_element.get_attribute('src') | |
| print(f"✅ Found image: {image_src[:100]}...") | |
| # Download the image | |
| if image_src.startswith('data:'): | |
| # Data URL - use directly | |
| img_data = image_src.split(',')[1] | |
| data_url = image_src | |
| else: | |
| # Regular URL - download and convert | |
| img_response = requests.get(image_src, timeout=10) | |
| if img_response.status_code == 200: | |
| img_data = base64.b64encode(img_response.content).decode('utf-8') | |
| data_url = f"data:image/jpeg;base64,{img_data}" | |
| else: | |
| raise Exception("Failed to download image") | |
| driver.quit() | |
| return jsonify({ | |
| 'success': True, | |
| 'image_url': data_url, | |
| 'model': model, | |
| 'prompt': prompt | |
| }) | |
| except Exception as wait_error: | |
| print(f"❌ Timeout waiting for image: {wait_error}") | |
| driver.quit() | |
| raise wait_error | |
| except ImportError: | |
| print("❌ Selenium not available. Install with: pip install selenium") | |
| except Exception as selenium_error: | |
| print(f"❌ Selenium approach failed: {selenium_error}") | |
| # Method 2: Try requests with session to maintain cookies | |
| try: | |
| session = requests.Session() | |
| # First visit the page to get cookies | |
| page_url = f"https://perchance.org/ai-text-to-image-generator?prompt={requests.utils.quote(prompt)}" | |
| response = session.get(page_url, timeout=30) | |
| if response.status_code == 200: | |
| # Parse HTML to find image | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Look for any image that might be generated | |
| images = soup.find_all('img') | |
| for img in images: | |
| src = img.get('src', '') | |
| if ('generated' in src or 'perchance' in src or 'image' in src) and not src.startswith('data:image/svg'): | |
| print(f"✅ Found image in HTML: {src[:100]}...") | |
| # Download image | |
| if src.startswith('http'): | |
| img_response = session.get(src, timeout=10) | |
| if img_response.status_code == 200: | |
| img_data = base64.b64encode(img_response.content).decode('utf-8') | |
| data_url = f"data:image/jpeg;base64,{img_data}" | |
| return jsonify({ | |
| 'success': True, | |
| 'image_url': data_url, | |
| 'model': model, | |
| 'prompt': prompt | |
| }) | |
| except ImportError: | |
| print("❌ BeautifulSoup not available. Install with: pip install beautifulsoup4") | |
| except Exception as requests_error: | |
| print(f"❌ Requests approach failed: {requests_error}") | |
| # Method 3: Fallback - Create a better placeholder | |
| print("🔄 Creating enhanced placeholder image...") | |
| # Create a more sophisticated placeholder | |
| from PIL import Image, ImageDraw, ImageFont | |
| import textwrap | |
| import random | |
| # Create image with gradient background | |
| width, height = 512, 512 | |
| img = Image.new('RGB', (width, height), color='#2a2a2a') | |
| draw = ImageDraw.Draw(img) | |
| # Add gradient effect | |
| for y in range(height): | |
| color_value = int(42 + (y / height) * 20) # Dark gradient | |
| draw.line([(0, y), (width, y)], fill=f'#{color_value:02x}{color_value:02x}{color_value:02x}') | |
| # Add title | |
| try: | |
| font_title = ImageFont.truetype("arial.ttf", 28) | |
| font_text = ImageFont.truetype("arial.ttf", 18) | |
| font_small = ImageFont.truetype("arial.ttf", 14) | |
| except: | |
| font_title = ImageFont.load_default() | |
| font_text = ImageFont.load_default() | |
| font_small = ImageFont.load_default() | |
| # Draw title | |
| title = "🎨 Perchance AI Generated" | |
| draw.text((width//2 - font_title.getlength(title)//2, 40), title, fill='#ffff55', font=font_title) | |
| # Draw prompt text (wrapped) | |
| max_width = width - 80 | |
| lines = textwrap.wrap(prompt, width=max_width//font_text.getlength(' ')) | |
| y_offset = 120 | |
| for line in lines[:8]: # Limit to 8 lines | |
| draw.text((width//2 - font_text.getlength(line)//2, y_offset), line, fill='#ffffff', font=font_text) | |
| y_offset += 30 | |
| # Add loading animation hint | |
| loading_text = "⏳ Generating high-quality image..." | |
| draw.text((width//2 - font_small.getlength(loading_text)//2, 380), loading_text, fill='#4CAF50', font=font_small) | |
| # Add instruction | |
| instruction = "This is a preview. Real generation requires external API." | |
| draw.text((width//2 - font_small.getlength(instruction)//2, 420), instruction, fill='#ff6b6b', font=font_small) | |
| # Add random "AI art" elements | |
| for _ in range(20): | |
| x = random.randint(0, width) | |
| y = random.randint(0, height) | |
| size = random.randint(1, 3) | |
| color = random.choice(['#ffff55', '#4CAF50', '#ff6b6b', '#4a90e2']) | |
| draw.ellipse([x, y, x+size, y+size], fill=color) | |
| # Convert to data URL | |
| buffer = BytesIO() | |
| img.save(buffer, format='PNG') | |
| img_data = base64.b64encode(buffer.getvalue()).decode('utf-8') | |
| data_url = f"data:image/png;base64,{img_data}" | |
| return jsonify({ | |
| 'success': True, | |
| 'image_url': data_url, | |
| 'model': model, | |
| 'prompt': prompt, | |
| 'note': 'Enhanced placeholder. Real generation requires Perchance website.' | |
| }) | |
| except Exception as e: | |
| print(f"❌ Perchance generation error: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def save_to_compressor(): | |
| """Save images to temporary folder for compressor and return folder info""" | |
| try: | |
| data = request.get_json() | |
| image_urls = data.get('image_urls', []) | |
| if not image_urls: | |
| return jsonify({'success': False, 'error': 'No images provided'}), 400 | |
| # Create unique temp folder | |
| folder_id = str(uuid.uuid4())[:8] | |
| temp_folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id) | |
| os.makedirs(temp_folder_path, exist_ok=True) | |
| # Track folder creation time for cleanup | |
| temp_folders[folder_id] = time.time() | |
| saved_files = [] | |
| for i, image_url in enumerate(image_urls): | |
| try: | |
| # Extract base64 data | |
| if 'base64,' in image_url: | |
| base64_data = image_url.split('base64,')[1] | |
| else: | |
| base64_data = image_url | |
| # Decode and save | |
| image_data = base64.b64decode(base64_data) | |
| filename = f'compressed_image_{i+1}.jpg' | |
| file_path = os.path.join(temp_folder_path, filename) | |
| with open(file_path, 'wb') as f: | |
| f.write(image_data) | |
| saved_files.append({ | |
| 'filename': filename, | |
| 'path': file_path, | |
| 'url': f'/api/compressor_temp/{folder_id}/{filename}' | |
| }) | |
| except Exception as e: | |
| print(f"Error saving image {i}: {e}") | |
| continue | |
| if not saved_files: | |
| # Clean up empty folder | |
| shutil.rmtree(temp_folder_path, ignore_errors=True) | |
| del temp_folders[folder_id] | |
| return jsonify({'success': False, 'error': 'Failed to save any images'}), 500 | |
| return jsonify({ | |
| 'success': True, | |
| 'folder_id': folder_id, | |
| 'files': saved_files, | |
| 'compressor_url': f'/compressor.html?folder={folder_id}' | |
| }) | |
| except Exception as e: | |
| print(f"Save to compressor error: {e}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def serve_compressor_temp(folder_id, filename): | |
| """Serve files from compressor temp folder""" | |
| try: | |
| file_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id, filename) | |
| if os.path.exists(file_path): | |
| return send_file(file_path) | |
| else: | |
| return jsonify({'error': 'File not found'}), 404 | |
| except Exception as e: | |
| print(f"Error serving compressor temp file: {e}") | |
| return jsonify({'error': 'Server error'}), 500 | |
| def list_compressor_temp_files(folder_id): | |
| """List all files in a compressor temp folder""" | |
| try: | |
| folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id) | |
| if not os.path.exists(folder_path): | |
| return jsonify({'success': False, 'error': 'Folder not found'}), 404 | |
| files = [] | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| if os.path.isfile(file_path): | |
| files.append({ | |
| 'filename': filename, | |
| 'url': f'/api/compressor_temp/{folder_id}/{filename}', | |
| 'size': os.path.getsize(file_path) | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'folder_id': folder_id, | |
| 'files': files | |
| }) | |
| except Exception as e: | |
| print(f"Error listing compressor temp files: {e}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def cleanup_old_temp_folders(): | |
| """Clean up temp folders older than 1 hour""" | |
| current_time = time.time() | |
| folders_to_remove = [] | |
| for folder_id, creation_time in temp_folders.items(): | |
| if current_time - creation_time > 3600: # 1 hour | |
| folders_to_remove.append(folder_id) | |
| for folder_id in folders_to_remove: | |
| try: | |
| folder_path = os.path.join(COMPRESSOR_TEMP_FOLDER, folder_id) | |
| shutil.rmtree(folder_path, ignore_errors=True) | |
| del temp_folders[folder_id] | |
| print(f"Cleaned up old temp folder: {folder_id}") | |
| except Exception as e: | |
| print(f"Error cleaning up folder {folder_id}: {e}") | |
| def upload_preset(): | |
| """Upload a preset to the local presets folder via base64""" | |
| try: | |
| data = request.json | |
| if not data: | |
| return jsonify({'success': False, 'error': 'No data provided'}), 400 | |
| image_base64 = data.get('image') | |
| filename = data.get('filename') | |
| if not image_base64 or not filename: | |
| return jsonify({'success': False, 'error': 'Missing image or filename'}), 400 | |
| presets_folder = os.path.join(os.getcwd(), 'presets') | |
| os.makedirs(presets_folder, exist_ok=True) | |
| # Clean filename to ensure it doesn't overwrite unexpectedly or have bad chars | |
| safe_name = os.path.basename(filename) | |
| # Add timestamp to avoid collisions | |
| name_parts = os.path.splitext(safe_name) | |
| final_filename = f"{name_parts[0]}_{int(time.time())}{name_parts[1]}" | |
| file_path = os.path.join(presets_folder, final_filename) | |
| # Extract base64 data | |
| if 'base64,' in image_base64: | |
| image_base64 = image_base64.split('base64,')[1] | |
| image_data = base64.b64decode(image_base64) | |
| with open(file_path, 'wb') as f: | |
| f.write(image_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Preset uploaded successfully', | |
| 'filename': final_filename | |
| }) | |
| except Exception as e: | |
| print(f"Error uploading preset: {e}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| print("Starting Shinyy's Face Swapper Web Server...") | |
| print(f"Open your browser and go to: http://localhost:{WEB_SERVER_PORT}") | |
| app.run(host='0.0.0.0', port=WEB_SERVER_PORT, debug=False) |