import gradio as gr import requests import os import tempfile import time import json from PIL import Image import io import base64 import urllib.request from google.cloud import storage from google.oauth2 import service_account import json import shutil import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Get API endpoint from environment variables with fallback API_HOST = os.environ.get("FACESWAP_API_HOST", "162.243.118.138") API_PORT = os.environ.get("FACESWAP_API_PORT", "8080") API_URL = f"http://{API_HOST}:{API_PORT}/faceswap" logger.info(f"Using API endpoint: {API_URL}") def setup_gcs_auth(): """Set up GCS authentication using service account from environment variable""" service_account_json = os.environ.get("SERVICE_ACCOUNT") if service_account_json: try: service_account_info = json.loads(service_account_json) credentials = service_account.Credentials.from_service_account_info(service_account_info) return credentials except Exception as e: logger.error(f"Error setting up GCS auth: {e}") return None return None # Get GCS credentials gcs_credentials = setup_gcs_auth() def download_from_gcs(gcs_url): """Download a file from GCS using authenticated client""" try: # Parse the GCS URL if gcs_url.startswith("https://storage.googleapis.com/"): path = gcs_url.replace("https://storage.googleapis.com/", "") bucket_name, blob_path = path.split("/", 1) storage_client = storage.Client(credentials=gcs_credentials) if gcs_credentials else storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_path) temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".gif") blob.download_to_filename(temp_file.name) return temp_file.name else: temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".gif") urllib.request.urlretrieve(gcs_url, temp_file.name) return temp_file.name except Exception as e: logger.error(f"Error downloading from GCS: {e}") return None def faceswap_process(gif_file, face_file): """Process faceswap by calling the API""" if gif_file is None or face_file is None: return None, "Please upload both a GIF and a face image." temp_gif = tempfile.NamedTemporaryFile(suffix='.gif', delete=False) temp_face = tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) try: # Handle gif_file - could be a PIL Image (from gr.Image) or a filepath if isinstance(gif_file, str): # It's a filepath shutil.copy(gif_file, temp_gif.name) elif hasattr(gif_file, 'save'): # It's a PIL Image gif_file.save(temp_gif.name, 'GIF') elif hasattr(gif_file, 'read'): # It's a file-like object with open(temp_gif.name, 'wb') as f: f.write(gif_file.read()) else: # Try as numpy array from PIL import Image Image.fromarray(gif_file).save(temp_gif.name, 'GIF') # Handle face_file - could be a PIL Image (from gr.Image) or a filepath if isinstance(face_file, str): # It's a filepath shutil.copy(face_file, temp_face.name) elif hasattr(face_file, 'save'): # It's a PIL Image face_file.save(temp_face.name, 'JPEG') elif hasattr(face_file, 'read'): # It's a file-like object with open(temp_face.name, 'wb') as f: f.write(face_file.read()) else: # Try as numpy array from PIL import Image Image.fromarray(face_file).save(temp_face.name, 'JPEG') files = { 'gif_file': ('input.gif', open(temp_gif.name, 'rb'), 'image/gif'), 'face_file': ('face.jpg', open(temp_face.name, 'rb'), 'image/jpeg') } start_time = time.time() response = requests.post(API_URL, files=files) # Clean up temporary files os.unlink(temp_gif.name) os.unlink(temp_face.name) if response.status_code == 200: result = response.json() if result.get('status') == 'success': output_url = result.get('url') time_taken = result.get('time_taken', "Unknown") # Download the GIF from GCS local_gif_path = download_from_gcs(output_url) if local_gif_path: with open(local_gif_path, 'rb') as f: gif_data = f.read() gif_base64 = base64.b64encode(gif_data).decode('utf-8') os.unlink(local_gif_path) return f"data:image/gif;base64,{gif_base64}", f"✅ Faceswap completed in {time_taken}!" else: return None, "❌ Error downloading the result GIF" else: return None, f"❌ Error: {result.get('message', 'Unknown error')}" else: return None, f"❌ API Error: Status code {response.status_code}" except Exception as e: try: os.unlink(temp_gif.name) os.unlink(temp_face.name) except: pass logger.exception("Error in faceswap_process") return None, f"❌ Error: {str(e)}" # Custom CSS custom_css = """ .centered-title { text-align: center; margin-bottom: 1.5rem; } .container { max-width: 900px; margin: 0 auto; } .output-container img { max-width: 100%; height: auto; display: block; margin: 0 auto; } /* Loading animation for output area only */ .loading-container { text-align: center; padding: 30px; } .loading-spinner { display: inline-block; width: 50px; height: 50px; border: 5px solid rgba(0, 0, 0, 0.1); border-radius: 50%; border-top-color: #3498db; animation: spin 1s ease-in-out infinite; } @keyframes spin { to { transform: rotate(360deg); } } .loading-text { margin-top: 15px; font-weight: bold; color: #555; } """ with gr.Blocks(title="Easel x GifSwap", css=custom_css) as demo: with gr.Column(elem_classes="container"): gr.HTML( """
Upload a GIF and a face image to swap faces in the GIF.