Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import base64 | |
| import os | |
| import time | |
| import jwt | |
| import logging | |
| from pathlib import Path | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ===== CONFIGURATION ===== | |
| ACCESS_KEY_ID = "AFyHfnQATghFdCMyAG3gRPbNY4TNKFGB" | |
| ACCESS_KEY_SECRET = "TTepeLyBterLNM3brYPGmdndBnnyKJBA" | |
| BASE_URL = "https://api-singapore.klingai.com" | |
| CREATE_TASK_URL = f"{BASE_URL}/v1/images/generations" | |
| # ===== UTILITY FUNCTIONS ===== | |
| def generate_jwt_token(): | |
| """Generate JWT token for API authentication""" | |
| payload = { | |
| "iss": ACCESS_KEY_ID, | |
| "exp": int(time.time()) + 1800, # Expires in 30 mins | |
| "nbf": int(time.time()) - 5 # Not before 5 seconds ago | |
| } | |
| return jwt.encode(payload, ACCESS_KEY_SECRET, algorithm="HS256") | |
| def validate_image(image_path): | |
| """Check image meets API requirements""" | |
| try: | |
| img_size = os.path.getsize(image_path) / 1024 / 1024 # MB | |
| if img_size > 10: | |
| return False, "Image too large (max 10MB)" | |
| # Add actual dimension check if possible (requires PIL) | |
| return True, "" | |
| except Exception as e: | |
| return False, f"Image validation error: {str(e)}" | |
| # ===== API FUNCTIONS ===== | |
| def create_image_task(image_base64, prompt): | |
| """Create image generation task""" | |
| headers = { | |
| "Authorization": f"Bearer {generate_jwt_token()}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model_name": "kling-v2", # Best for image-to-image | |
| "prompt": prompt, | |
| "image": image_base64, | |
| "resolution": "2k", | |
| "n": 1, | |
| "aspect_ratio": "1:1" | |
| } | |
| try: | |
| response = requests.post(CREATE_TASK_URL, json=payload, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"API request failed: {str(e)}") | |
| return None | |
| def get_task_result(task_id): | |
| """Retrieve task results""" | |
| headers = {"Authorization": f"Bearer {generate_jwt_token()}"} | |
| task_url = f"{BASE_URL}/v1/images/generations/{task_id}" | |
| try: | |
| response = requests.get(task_url, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Task status check failed: {str(e)}") | |
| return None | |
| # ===== MAIN PROCESSING ===== | |
| def generate_image(image_path, prompt=""): | |
| """Handle end-to-end image generation""" | |
| # Validate input image | |
| is_valid, error_msg = validate_image(image_path) | |
| if not is_valid: | |
| return None, error_msg | |
| # Prepare image data | |
| try: | |
| with open(image_path, "rb") as img_file: | |
| image_base64 = base64.b64encode(img_file.read()).decode("utf-8") | |
| except Exception as e: | |
| return None, f"Image processing error: {str(e)}" | |
| # Create generation task | |
| task_response = create_image_task(image_base64, prompt or "Transform into a vibrant cartoon style") | |
| if not task_response or task_response.get("code") != 0: | |
| return None, "Failed to create task" | |
| task_id = task_response["data"]["task_id"] | |
| logger.info(f"Created task: {task_id}") | |
| # Poll for results (max 10 mins) | |
| for _ in range(60): | |
| task_data = get_task_result(task_id) | |
| if not task_data: | |
| time.sleep(5) | |
| continue | |
| status = task_data["data"]["task_status"] | |
| if status == "succeed": | |
| image_url = task_data["data"]["task_result"]["images"][0]["url"] | |
| try: | |
| img_data = requests.get(image_url).content | |
| output_path = Path(f"/tmp/kling_output_{task_id}.png") | |
| with open(output_path, "wb") as f: | |
| f.write(img_data) | |
| return str(output_path), None | |
| except Exception as e: | |
| return None, f"Failed to save image: {str(e)}" | |
| elif status in ("failed", "canceled"): | |
| return None, f"Task failed: {task_data.get('task_status_msg', 'Unknown error')}" | |
| time.sleep(10) | |
| return None, "Task timed out after 10 minutes" | |
| # ===== GRADIO INTERFACE ===== | |
| def chatbot_interface(image, prompt): | |
| if not image: | |
| return None, None, "Please upload an image first" | |
| output_path, error = generate_image(image, prompt) | |
| if error: | |
| return None, None, error | |
| return output_path, output_path, "Generation successful!" | |
| with gr.Blocks(title="Kling AI Image Transformer") as app: | |
| gr.Markdown("# π¨ Kling AI Image-to-Image Generator") | |
| gr.Markdown("Transform images using Kling AI's Kolors technology") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("## Input Settings") | |
| image_input = gr.Image( | |
| type="filepath", | |
| label="Upload Image", | |
| sources=["upload"], | |
| ) | |
| prompt_input = gr.Textbox( | |
| lines=2, | |
| label="Transformation Prompt", | |
| placeholder="Describe how you want to transform the image (e.g. 'vibrant watercolor painting')" | |
| ) | |
| generate_btn = gr.Button("Generate", variant="primary") | |
| gr.Markdown("### Requirements") | |
| gr.Markdown(""" | |
| - Max image size: 10MB | |
| - Min dimensions: 300x300px | |
| - Supported formats: JPG, PNG | |
| - Aspect ratio: Between 1:2.5 and 2.5:1 | |
| """) | |
| with gr.Column(): | |
| gr.Markdown("## Output") | |
| output_image = gr.Image(label="Generated Image", interactive=False) | |
| output_file = gr.File(label="Download Result", file_types=["image/png"]) | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| generate_btn.click( | |
| fn=chatbot_interface, | |
| inputs=[image_input, prompt_input], | |
| outputs=[output_image, output_file, status_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) |