File size: 10,955 Bytes
cd42783
3a257f2
 
 
 
 
c95cf2c
f62694b
b6dcd96
c95cf2c
 
 
 
 
3a257f2
 
c95cf2c
cd42783
3a257f2
c95cf2c
fab14c7
 
 
 
3a257f2
fab14c7
c95cf2c
 
3a257f2
c95cf2c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3a257f2
c95cf2c
 
 
3a257f2
c95cf2c
 
 
cfcfe98
3a257f2
 
cfcfe98
c95cf2c
cfcfe98
c95cf2c
 
 
7421e11
cfcfe98
c95cf2c
883f8b8
c95cf2c
3a257f2
 
 
c95cf2c
 
 
3a257f2
c95cf2c
 
 
 
 
 
 
3a257f2
 
 
c95cf2c
7421e11
 
c95cf2c
b92d265
c95cf2c
 
 
 
 
 
 
3a257f2
c95cf2c
 
 
 
 
f62694b
7421e11
c95cf2c
 
 
 
 
3a257f2
c95cf2c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32be9f0
c95cf2c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f62694b
c95cf2c
f62694b
c95cf2c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
045423f
c95cf2c
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import gradio as gr
import requests
import base64
from pathlib import Path
import jwt
import time
import logging
import os

# Set up logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Kling AI API configuration (keys hardcoded as requested)
ACCESS_KEY_ID = "AGBGmadNd9hakFYfahytyQQJtN8CJmDJ"
ACCESS_KEY_SECRET = "dp3pAe4PpdmnAHCAPgEd3PyLmBQrkMde"
API_URL = "https://api.klingai.com/v1/predictions"  # Updated endpoint

def generate_jwt_token():
    """Generate JWT token for Kling AI API authentication."""
    headers = {
        "alg": "HS256",
        "typ": "JWT"
    }
    payload = {
        "iss": ACCESS_KEY_ID,
        "exp": int(time.time()) + 1800,  # Token expires in 30 minutes
        "nbf": int(time.time()) - 5  # Effective 5 seconds before current time
    }
    token = jwt.encode(payload, ACCESS_KEY_SECRET, headers=headers)
    logger.debug(f"Generated JWT token: {token}")
    return token

def generate_image(image, prompt=""):
    """
    Call Kling AI API for single-reference face generation.
    
    Args:
        image: Uploaded image file (from Gradio, face image)
        prompt (str): Optional text prompt to guide transformation
    
    Returns:
        tuple: (Path to generated image or None, error message or None)
    """
    if not image:
        logger.error("No image uploaded")
        return None, "Error: Please upload a valid face image (PNG/JPEG, <10 MB, ≥512x512 pixels)."
    
    # Convert image to base64
    try:
        with open(image, "rb") as img_file:
            image_base64 = base64.b64encode(img_file.read()).decode("utf-8")
        logger.debug("Image converted to base64 successfully")
    except Exception as e:
        logger.error(f"Failed to process image: {str(e)}")
        return None, f"Error: Failed to process image. Ensure it’s a valid PNG/JPEG. Details: {str(e)}"
    
    headers = {
        "Authorization": f"Bearer {generate_jwt_token()}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "image": image_base64,
        "prompt": prompt or "Transform the face into a cartoon style while preserving identity",
        "strength": 0.97,  # High reference strength for face preservation
        "output_format": "png"
    }
    
    try:
        logger.debug(f"Sending POST request to {API_URL} with payload: {payload}")
        response = requests.post(API_URL, json=payload, headers=headers, timeout=30)
        response.raise_for_status()
        data = response.json()
        logger.debug(f"API response: {data}")
        
        task_id = data.get("task_id") or data.get("taskId") or data.get("id")
        if not task_id:
            logger.error("No task ID returned in API response")
            return None, "Error: No task ID returned. Check API endpoint or keys at https://klingai.com/global/dev."
        
        # Poll for task completion
        status_url = f"{API_URL}/{task_id}"
        for _ in range(60):  # Poll for up to 5 minutes
            logger.debug(f"Polling status at {status_url}")
            status_response = requests.get(status_url, headers=headers, timeout=30)
            status_response.raise_for_status()
            status_data = status_response.json()
            logger.debug(f"Status response: {status_data}")
            status = status_data.get("status")
            if status == "succeeded":
                image_url = status_data.get("image_url") or status_data.get("result", {}).get("image_url") or status_data.get("output") or status_data.get("data", {}).get("image")
                if not image_url:
                    logger.error("No image URL in API response")
                    return None, "Error: No image URL in API response. Check API documentation at https://klingai.com/global/dev."
                # Download the image
                logger.debug(f"Downloading image from {image_url}")
                image_response = requests.get(image_url, timeout=30)
                image_response.raise_for_status()
                output_path = Path("/tmp/output_image.png")  # Use /tmp for Spaces compatibility
                with open(output_path, "wb") as f:
                    f.write(image_response.content)
                if not output_path.exists():
                    logger.error("Output image file not created")
                    return None, "Error: Failed to save output image."
                logger.debug(f"Image saved to {output_path}")
                return str(output_path), None
            elif status == "failed":
                logger.error("Image generation failed")
                return None, "Error: Image generation failed. Ensure the image contains a clear face and avoid NSFW or sensitive content."
            elif status == "processing" and status_data.get("progress", 0) >= 0.99:
                logger.error("Generation stuck at 99%")
                return None, "Error: Generation stuck at 99%. Check account credits or upgrade to a paid plan at https://klingai.com."
            time.sleep(5)
        
        logger.error("Image generation timed out")
        return None, "Error: Image generation timed out. Try during off-peak hours or check account status."
    
    except requests.exceptions.HTTPError as e:
        status_code = e.response.status_code if e.response else None
        error_data = e.response.json() if e.response and e.response.text else {}
        service_code = error_data.get("code", 0)
        logger.error(f"HTTP error: {status_code}, Service Code: {service_code}, Details: {str(e)}")
        
        if status_code == 404:
            if service_code == 1202:
                return None, "Error: Invalid request method (1202). Check https://klingai.com/global/dev for correct endpoint."
            elif service_code == 1203:
                return None, "Error: Resource does not exist (1203). Verify model or endpoint."
            return None, f"Error: Endpoint not found (404). Try https://api.klingai.com/v1/predictions or check https://klingai.com/global/dev."
        elif status_code == 401:
            if service_code == 1000:
                return None, "Error: Authentication failed (1000). Verify API keys are correct."
            elif service_code == 1001:
                return None, "Error: Authorization empty (1001). Ensure JWT token is included."
            elif service_code == 1002:
                return None, "Error: Authorization invalid (1002). Check key format."
            elif service_code == 1003:
                return None, "Error: Authorization not yet valid (1003). Adjust token start time (nbf)."
            elif service_code == 1004:
                return None, "Error: Authorization expired (1004). Generate a new token."
        elif status_code == 429:
            if service_code == 1100:
                return None, "Error: Account exception (1100). Verify account configuration at https://klingai.com."
            elif service_code == 1101:
                return None, "Error: Account in arrears (1101). Recharge your account."
            elif service_code == 1102:
                return None, "Error: Resource pack depleted or expired (1102). Purchase additional resources."
            elif service_code in (1302, 1303):
                return None, "Error: Rate limit exceeded (1302/1303). Reduce request frequency or contact support."
            elif service_code == 1304:
                return None, "Error: IP whitelisting issue (1304). Contact Kling AI support."
        elif status_code == 403 and service_code == 1103:
            return None, "Error: Unauthorized access (1103). Verify account permissions."
        elif status_code == 400:
            if service_code == 1200:
                return None, "Error: Invalid request parameters (1200). Check payload format."
            elif service_code == 1201:
                return None, "Error: Invalid parameters (1201). Use PNG/JPEG (<10 MB, ≥512x512) with a clear face."
            elif service_code == 1300:
                return None, "Error: Platform policy triggered (1300). Check input content."
            elif service_code == 1301:
                return None, "Error: Content security policy triggered (1301). Ensure the image contains a clear face and avoid NSFW content."
        elif status_code in (500, 503, 504):
            if service_code == 5000:
                return None, "Error: Server internal error (5000). Try again later."
            elif service_code == 5001:
                return None, "Error: Server unavailable due to maintenance (5001). Try again later."
            elif service_code == 5002:
                return None, "Error: Server timeout (5002). Try during off-peak hours."
        return None, f"Error: API request failed. HTTP {status_code}, Service Code {service_code}. Details: {str(e)}"
    except requests.exceptions.RequestException as e:
        logger.error(f"Network error: {str(e)}")
        return None, f"Error: Network issue. Ensure stable internet, disable VPN, and try again. Details: {str(e)}"

def chatbot_interface(image, prompt):
    """
    Gradio interface for single-reference face generation.
    
    Args:
        image: Uploaded image file (containing a face)
        prompt (str): Optional text prompt
    
    Returns:
        tuple: (Output image path for display, output image path for download, error message)
    """
    output_path, error = generate_image(image, prompt)
    if error:
        logger.debug(f"Error returned: {error}")
        return None, None, error
    logger.debug(f"Returning image path for display and download: {output_path}")
    return output_path, output_path, None

# Define Gradio interface
with gr.Blocks() as iface:
    gr.Markdown(
        """
        # Kling AI Single-Reference Face Generator
        Upload a PNG/JPEG image (<10 MB, ≥512x512 pixels) with a clear face to generate a transformed image using Kling AI API (reference strength 0.97). Avoid NSFW or sensitive content.
        """
    )
    with gr.Row():
        with gr.Column():
            image_input = gr.Image(type="filepath", label="Upload Face Image (PNG/JPEG, <10 MB, ≥512x512)")
            prompt_input = gr.Textbox(lines=2, placeholder="Enter an optional prompt (e.g., 'Turn this face into a cartoon')", label="Prompt")
            generate_button = gr.Button("Generate")
        with gr.Column():
            output_image = gr.Image(label="Generated Face Image")
            output_file = gr.File(label="Download Generated Image")
            error_message = gr.Textbox(label="Status/Error Message", interactive=False)
    
    generate_button.click(
        fn=chatbot_interface,
        inputs=[image_input, prompt_input],
        outputs=[output_image, output_file, error_message]
    )

# Launch the interface
if __name__ == "__main__":
    logger.debug("Launching Gradio interface")
    iface.launch(server_name="0.0.0.0", server_port=7860)