Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import base64 | |
| import os | |
| import time | |
| import jwt | |
| import logging | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ===== API CONFIGURATION ===== | |
| ACCESS_KEY_ID = "AFyHfnQATghFdCMyAG3gRPbNY4TNKFGB" | |
| ACCESS_KEY_SECRET = "TTepeLyBterLNM3brYPGmdndBnnyKJBA" | |
| API_BASE_URL = "https://api-singapore.klingai.com" | |
| CREATE_TASK_ENDPOINT = f"{API_BASE_URL}/v1/images/generations" # Correct endpoint for single image | |
| # ===== AUTHENTICATION ===== | |
| def generate_jwt_token(): | |
| """Generate JWT token with error handling""" | |
| try: | |
| payload = { | |
| "iss": ACCESS_KEY_ID, | |
| "exp": int(time.time()) + 1800, | |
| "nbf": int(time.time()) - 5 | |
| } | |
| return jwt.encode(payload, ACCESS_KEY_SECRET, algorithm="HS256") | |
| except Exception as e: | |
| logger.error(f"JWT generation failed: {str(e)}") | |
| return None | |
| # ===== IMAGE VALIDATION ===== | |
| def validate_face_image(image_path): | |
| """Validate the image meets face transformation requirements""" | |
| try: | |
| # Check file exists | |
| if not os.path.exists(image_path): | |
| return False, "Image file not found" | |
| # Check file size (max 10MB) | |
| file_size = os.path.getsize(image_path) / (1024 * 1024) | |
| if file_size > 10: | |
| return False, "Image too large (max 10MB)" | |
| # Check file extension | |
| valid_extensions = ['.jpg', '.jpeg', '.png'] | |
| if not any(image_path.lower().endswith(ext) for ext in valid_extensions): | |
| return False, "Invalid format (only JPG/PNG)" | |
| return True, "" | |
| except Exception as e: | |
| return False, f"Validation error: {str(e)}" | |
| # ===== API FUNCTIONS ===== | |
| def create_face_task(image_base64, prompt): | |
| """Create face transformation task with 97% fidelity""" | |
| token = generate_jwt_token() | |
| if not token: | |
| return None, "Authentication failed" | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model_name": "kling-v2.1", | |
| "prompt": prompt, | |
| "image": image_base64, | |
| "image_reference": "face", | |
| "image_fidelity": 0.97, # 97% face similarity | |
| "human_fidelity": 0.97, # 97% facial features | |
| "aspect_ratio": "1:1", | |
| "n": 1 | |
| } | |
| try: | |
| response = requests.post( | |
| CREATE_TASK_ENDPOINT, | |
| json=payload, | |
| headers=headers, | |
| timeout=30 | |
| ) | |
| # Check for API errors | |
| if response.status_code != 200: | |
| error_msg = f"API Error {response.status_code}" | |
| if response.text: | |
| error_msg += f": {response.text}" | |
| return None, error_msg | |
| data = response.json() | |
| if data.get("code") != 0: | |
| return None, f"API Error: {data.get('message', 'Unknown error')}" | |
| return data, None | |
| except requests.exceptions.RequestException as e: | |
| return None, f"Request failed: {str(e)}" | |
| def check_task_status(task_id): | |
| """Check task status with retries""" | |
| token = generate_jwt_token() | |
| if not token: | |
| return None, "Authentication failed" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| status_url = f"{API_BASE_URL}/v1/images/generations/{task_id}" | |
| try: | |
| response = requests.get(status_url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| return response.json(), None | |
| except requests.exceptions.RequestException as e: | |
| return None, f"Status check failed: {str(e)}" | |
| # ===== CORE FUNCTION ===== | |
| def transform_face(image_path, prompt): | |
| """Full transformation workflow""" | |
| # Validate image | |
| is_valid, error_msg = validate_face_image(image_path) | |
| if not is_valid: | |
| return None, error_msg | |
| # Prepare image | |
| try: | |
| with open(image_path, "rb") as f: | |
| image_base64 = base64.b64encode(f.read()).decode('utf-8') | |
| except Exception as e: | |
| return None, f"Image processing failed: {str(e)}" | |
| # Create task | |
| task_data, error = create_face_task(image_base64, prompt) | |
| if error: | |
| return None, error | |
| task_id = task_data["data"]["task_id"] | |
| logger.info(f"Task created: {task_id}") | |
| # Check results (max 3 minutes) | |
| for _ in range(18): # 18 attempts * 10 seconds = 3 minutes | |
| time.sleep(10) | |
| status_data, error = check_task_status(task_id) | |
| if error: | |
| continue # Retry on transient errors | |
| status = status_data["data"]["task_status"] | |
| if status == "succeed": | |
| try: | |
| image_url = status_data["data"]["task_result"]["images"][0]["url"] | |
| response = requests.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| output_path = f"/tmp/face_transform_{task_id}.png" | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| return output_path, None | |
| except Exception as e: | |
| return None, f"Failed to save result: {str(e)}" | |
| elif status in ("failed", "canceled"): | |
| error_msg = status_data["data"].get("task_status_msg", "Unknown error") | |
| return None, f"Task failed: {error_msg}" | |
| return None, "Processing timed out after 3 minutes" | |
| # ===== GRADIO INTERFACE ===== | |
| with gr.Blocks(title="Face Transformer Pro") as app: | |
| gr.Markdown("## 🎭 Exact Face Transformation (97% Fidelity)") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Input") | |
| image_input = gr.Image( | |
| type="filepath", | |
| label="Upload Clear Face Photo", | |
| sources=["upload"], | |
| height=300 | |
| ) | |
| prompt_input = gr.Textbox( | |
| label="Style Prompt", | |
| placeholder="Describe the transformation style (e.g. 'anime character', 'oil painting')" | |
| ) | |
| generate_btn = gr.Button("Transform", variant="primary") | |
| gr.Markdown("### Requirements") | |
| gr.Markdown(""" | |
| - **Single clear face** (front-facing recommended) | |
| - No glasses/masks/obstructions | |
| - Max 10MB (JPG/PNG only) | |
| - Min 300x300 resolution | |
| """) | |
| with gr.Column(): | |
| gr.Markdown("### Output") | |
| output_image = gr.Image( | |
| label="Transformed Result", | |
| interactive=False, | |
| height=400 | |
| ) | |
| output_file = gr.File( | |
| label="Download", | |
| file_types=["image/png"] | |
| ) | |
| status_output = gr.Textbox( | |
| label="Status", | |
| interactive=False | |
| ) | |
| generate_btn.click( | |
| fn=lambda img, prompt: transform_face(img, prompt) + (None,), | |
| inputs=[image_input, prompt_input], | |
| outputs=[output_image, output_file, status_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |