Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import base64 | |
| import os | |
| import time | |
| import jwt | |
| import logging | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ===== API CONFIGURATION ===== | |
| ACCESS_KEY_ID = "AFyHfnQATghFdCMyAG3gRPbNY4TNKFGB" | |
| ACCESS_KEY_SECRET = "TTepeLyBterLNM3brYPGmdndBnnyKJBA" | |
| API_BASE_URL = "https://api-singapore.klingai.com" | |
| CREATE_TASK_ENDPOINT = f"{API_BASE_URL}/v1/images/generations" # SINGLE image endpoint | |
| # ===== AUTHENTICATION ===== | |
| def generate_jwt_token(): | |
| """Generate JWT token for API authentication""" | |
| payload = { | |
| "iss": ACCESS_KEY_ID, | |
| "exp": int(time.time()) + 1800, # 30 minutes expiration | |
| "nbf": int(time.time()) - 5 # Not before 5 seconds ago | |
| } | |
| return jwt.encode(payload, ACCESS_KEY_SECRET, algorithm="HS256") | |
| # ===== IMAGE PROCESSING ===== | |
| def prepare_image_base64(image_path): | |
| """Convert image to base64 without prefix""" | |
| try: | |
| with open(image_path, "rb") as img_file: | |
| return base64.b64encode(img_file.read()).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Image processing failed: {str(e)}") | |
| return None | |
| def validate_face_image(image_path): | |
| """Validate the image meets face transformation requirements""" | |
| try: | |
| # Check file exists | |
| if not os.path.exists(image_path): | |
| return False, "Image file not found" | |
| # Check file size (max 10MB) | |
| file_size = os.path.getsize(image_path) / (1024 * 1024) | |
| if file_size > 10: | |
| return False, "Image too large (max 10MB)" | |
| return True, "" | |
| except Exception as e: | |
| return False, f"Validation error: {str(e)}" | |
| # ===== API FUNCTIONS ===== | |
| def create_face_task(image_base64, prompt): | |
| """Create face transformation task with 97% fidelity""" | |
| headers = { | |
| "Authorization": f"Bearer {generate_jwt_token()}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model_name": "kling-v2.1", # Best for face preservation | |
| "prompt": prompt, | |
| "image": image_base64, | |
| "image_reference": "face", # Critical for face control | |
| "image_fidelity": 0.97, # 97% similarity | |
| "human_fidelity": 0.97, # 97% facial features | |
| "aspect_ratio": "1:1", | |
| "n": 1 | |
| } | |
| try: | |
| response = requests.post(CREATE_TASK_ENDPOINT, json=payload, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"API Error: {str(e)}") | |
| return None | |
| def check_task_status(task_id): | |
| headers = {"Authorization": f"Bearer {generate_jwt_token()}"} | |
| try: | |
| response = requests.get( | |
| f"{API_BASE_URL}/v1/images/generations/{task_id}", | |
| headers=headers | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Status Check Error: {str(e)}") | |
| return None | |
| # ===== MAIN FUNCTION ===== | |
| def transform_face(image_path, prompt): | |
| """Full transformation workflow""" | |
| # Validate image | |
| is_valid, error_msg = validate_face_image(image_path) | |
| if not is_valid: | |
| return None, error_msg | |
| try: | |
| # Prepare image | |
| image_base64 = prepare_image_base64(image_path) | |
| if not image_base64: | |
| return None, "Failed to process image" | |
| # Create task | |
| task_data = create_face_task(image_base64, prompt) | |
| if not task_data or task_data.get("code") != 0: | |
| return None, "Failed to start transformation" | |
| task_id = task_data["data"]["task_id"] | |
| logger.info(f"Task created: {task_id}") | |
| # Check results (max 3 minutes) | |
| for _ in range(18): # 18 attempts Γ 10 seconds | |
| time.sleep(10) | |
| status_data = check_task_status(task_id) | |
| if not status_data: | |
| continue | |
| if status_data["data"]["task_status"] == "succeed": | |
| image_url = status_data["data"]["task_result"]["images"][0]["url"] | |
| img_data = requests.get(image_url).content | |
| output_path = f"/tmp/face_result_{task_id}.png" | |
| with open(output_path, "wb") as f: | |
| f.write(img_data) | |
| return output_path, None | |
| elif status_data["data"]["task_status"] in ("failed", "canceled"): | |
| error_msg = status_data["data"].get("task_status_msg", "Task failed") | |
| return None, error_msg | |
| return None, "Processing timed out" | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| # ===== GRADIO INTERFACE ===== | |
| with gr.Blocks(title="Face Transformer") as app: | |
| gr.Markdown("# π Exact Face Transformation (97% Match)") | |
| gr.Markdown("Upload ONE face photo and describe your desired style") | |
| with gr.Row(): | |
| with gr.Column(): | |
| image_input = gr.Image( | |
| type="filepath", | |
| label="Upload Face Photo", | |
| sources=["upload"], | |
| height=300 | |
| ) | |
| prompt_input = gr.Textbox( | |
| label="Style Prompt", | |
| placeholder="e.g. 'anime character', 'oil painting'" | |
| ) | |
| generate_btn = gr.Button("Transform", variant="primary") | |
| gr.Markdown("### Requirements") | |
| gr.Markdown(""" | |
| - **Single clear face photo** | |
| - Front-facing works best | |
| - No glasses/masks | |
| - Max 10MB (JPG/PNG) | |
| - Min 300x300px | |
| """) | |
| with gr.Column(): | |
| output_image = gr.Image(label="Transformed Result", height=400) | |
| output_file = gr.File(label="Download") | |
| status_output = gr.Textbox(label="Status") | |
| generate_btn.click( | |
| fn=lambda img, prompt: transform_face(img, prompt) + (None,), | |
| inputs=[image_input, prompt_input], | |
| outputs=[output_image, output_file, status_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch(server_name="0.0.0.0", server_port=7860) |