AkashKumarave commited on
Commit
944692d
Β·
verified Β·
1 Parent(s): 28842aa

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +257 -206
app.py CHANGED
@@ -1,223 +1,274 @@
1
- import gradio as gr
2
- import requests
 
3
  import base64
 
4
  from pathlib import Path
 
 
 
5
  import jwt
6
- import time
7
- import logging
8
- import os
9
 
10
- # Set up logging for debugging
11
- logging.basicConfig(level=logging.DEBUG)
12
- logger = logging.getLogger(__name__)
 
 
 
13
 
14
- # Kling AI API configuration (keys hardcoded as requested)
15
- ACCESS_KEY_ID = "AGBGmadNd9hakFYfahytyQQJtN8CJmDJ"
16
- ACCESS_KEY_SECRET = "dp3pAe4PpdmnAHCAPgEd3PyLmBQrkMde"
17
- API_URL = "https://api.klingai.com/v1/images/image2image" # Correct endpoint
18
 
19
- def generate_jwt_token():
20
- """Generate JWT token for Kling AI API authentication."""
21
- headers = {
22
- "alg": "HS256",
23
- "typ": "JWT"
24
- }
25
  payload = {
26
  "iss": ACCESS_KEY_ID,
27
- "exp": int(time.time()) + 1800, # Token expires in 30 minutes
28
- "nbf": int(time.time()) - 5 # Effective 5 seconds before current time
29
  }
30
- token = jwt.encode(payload, ACCESS_KEY_SECRET, headers=headers)
31
- logger.debug(f"Generated JWT token: {token}")
32
- return token
33
 
34
- def generate_image(image, prompt=""):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  """
36
- Call Kling AI API for single-reference face generation.
37
-
38
- Args:
39
- image: Uploaded image file (from Gradio, face image)
40
- prompt (str): Optional text prompt to guide transformation
41
-
42
- Returns:
43
- tuple: (Path to generated image or None, error message or None)
44
  """
45
- if not image:
46
- logger.error("No image uploaded")
47
- return None, "Error: Please upload a valid face image (PNG/JPEG, <10 MB, β‰₯512x512 pixels)."
48
-
49
- # Convert image to base64
50
- try:
51
- with open(image, "rb") as img_file:
52
- image_base64 = base64.b64encode(img_file.read()).decode("utf-8")
53
- logger.debug("Image converted to base64 successfully")
54
- except Exception as e:
55
- logger.error(f"Failed to process image: {str(e)}")
56
- return None, f"Error: Failed to process image. Ensure it’s a valid PNG/JPEG. Details: {str(e)}"
57
-
58
- headers = {
59
- "Authorization": f"Bearer {generate_jwt_token()}",
60
- "Content-Type": "application/json"
61
  }
62
-
63
- payload = {
64
- "model_name": "kling-v2-1", # V2.1 model for image-to-image
65
- "image": image_base64,
66
- "prompt": prompt or "Transform the face into a cartoon style while preserving identity",
67
- "strength": 0.97, # High reference strength for face preservation
68
- "output_format": "png"
69
  }
70
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  try:
72
- logger.debug(f"Sending POST request to {API_URL} with payload: {payload}")
73
- response = requests.post(API_URL, json=payload, headers=headers, timeout=30)
74
- response.raise_for_status()
75
- data = response.json()
76
- logger.debug(f"API response: {data}")
77
-
78
- task_id = data.get("task_id") or data.get("taskId") or data.get("id")
79
- if not task_id:
80
- logger.error("No task ID returned in API response")
81
- return None, "Error: No task ID returned. Check API endpoint or keys at https://app.klingai.com/global/dev."
82
-
83
- # Poll for task completion
84
- status_url = f"https://api.klingai.com/v1/tasks/{task_id}" # Updated status endpoint
85
- for _ in range(60): # Poll for up to 5 minutes
86
- logger.debug(f"Polling status at {status_url}")
87
- status_response = requests.get(status_url, headers=headers, timeout=30)
88
- status_response.raise_for_status()
89
- status_data = status_response.json()
90
- logger.debug(f"Status response: {status_data}")
91
- status = status_data.get("status")
92
- if status == "succeeded":
93
- image_url = status_data.get("image_url") or status_data.get("result", {}).get("image_url") or status_data.get("output") or status_data.get("data", {}).get("image")
94
- if not image_url:
95
- logger.error("No image URL in API response")
96
- return None, "Error: No image URL in API response. Check API documentation at https://app.klingai.com/global/dev."
97
- # Download the image
98
- logger.debug(f"Downloading image from {image_url}")
99
- image_response = requests.get(image_url, timeout=30)
100
- image_response.raise_for_status()
101
- output_path = Path("/tmp/output_image.png") # Use /tmp for Spaces compatibility
102
- with open(output_path, "wb") as f:
103
- f.write(image_response.content)
104
- if not output_path.exists():
105
- logger.error("Output image file not created")
106
- return None, "Error: Failed to save output image."
107
- logger.debug(f"Image saved to {output_path}")
108
- return str(output_path), None
109
- elif status == "failed":
110
- logger.error("Image generation failed")
111
- return None, "Error: Image generation failed. Ensure the image contains a clear face and avoid NSFW or sensitive content."
112
- elif status == "processing" and status_data.get("progress", 0) >= 0.99:
113
- logger.error("Generation stuck at 99%")
114
- return None, "Error: Generation stuck at 99%. Check account credits or upgrade to a paid plan at https://app.klingai.com."
115
- time.sleep(5)
116
-
117
- logger.error("Image generation timed out")
118
- return None, "Error: Image generation timed out. Try during off-peak hours (e.g., 2 AM IST) or check account status."
119
-
120
- except requests.exceptions.HTTPError as e:
121
- status_code = e.response.status_code if e.response else None
122
- error_data = e.response.json() if e.response and e.response.text else {}
123
- service_code = error_data.get("code", 0)
124
- logger.error(f"HTTP error: {status_code}, Service Code: {service_code}, Details: {str(e)}")
125
-
126
- if status_code == 404:
127
- if service_code == 1202:
128
- return None, "Error: Invalid request method (1202). Check https://app.klingai.com/global/dev for correct endpoint."
129
- elif service_code == 1203:
130
- return None, "Error: Resource does not exist (1203). Verify model or endpoint."
131
- return None, f"Error: Endpoint not found (404). Verify https://api.klingai.com/v1/images/image2image at https://app.klingai.com/global/dev."
132
- elif status_code == 401:
133
- if service_code == 1000:
134
- return None, "Error: Authentication failed (1000). Verify API keys are correct."
135
- elif service_code == 1001:
136
- return None, "Error: Authorization empty (1001). Ensure JWT token is included."
137
- elif service_code == 1002:
138
- return None, "Error: Authorization invalid (1002). Check key format."
139
- elif service_code == 1003:
140
- return None, "Error: Authorization not yet valid (1003). Adjust token start time (nbf)."
141
- elif service_code == 1004:
142
- return None, "Error: Authorization expired (1004). Generate a new token."
143
- elif status_code == 429:
144
- if service_code == 1100:
145
- return None, "Error: Account exception (1100). Verify account configuration at https://app.klingai.com."
146
- elif service_code == 1101:
147
- return None, "Error: Account in arrears (1101). Recharge your account."
148
- elif service_code == 1102:
149
- return None, "Error: Resource pack depleted or expired (1102). Purchase additional resources."
150
- elif service_code in (1302, 1303):
151
- return None, "Error: Rate limit exceeded (1302/1303). Reduce request frequency or contact support."
152
- elif service_code == 1304:
153
- return None, "Error: IP whitelisting issue (1304). Contact Kling AI support."
154
- elif status_code == 403 and service_code == 1103:
155
- return None, "Error: Unauthorized access (1103). Verify account permissions."
156
- elif status_code == 400:
157
- if service_code == 1200:
158
- return None, "Error: Invalid request parameters (1200). Check payload format."
159
- elif service_code == 1201:
160
- return None, "Error: Invalid parameters (1201). Use PNG/JPEG (<10 MB, β‰₯512x512) with a clear face."
161
- elif service_code == 1300:
162
- return None, "Error: Platform policy triggered (1300). Check input content."
163
- elif service_code == 1301:
164
- return None, "Error: Content security policy triggered (1301). Ensure the image contains a clear face and avoid NSFW content."
165
- elif status_code in (500, 503, 504):
166
- if service_code == 5000:
167
- return None, "Error: Server internal error (5000). Try again later."
168
- elif service_code == 5001:
169
- return None, "Error: Server unavailable due to maintenance (5001). Try again later."
170
- elif service_code == 5002:
171
- return None, "Error: Server timeout (5002). Try during off-peak hours."
172
- return None, f"Error: API request failed. HTTP {status_code}, Service Code {service_code}. Details: {str(e)}"
173
- except requests.exceptions.RequestException as e:
174
- logger.error(f"Network error: {str(e)}")
175
- return None, f"Error: Network issue. Ensure stable internet, disable VPN, and try again. Details: {str(e)}"
176
-
177
- def chatbot_interface(image, prompt):
178
- """
179
- Gradio interface for single-reference face generation.
180
-
181
- Args:
182
- image: Uploaded image file (containing a face)
183
- prompt (str): Optional text prompt
184
-
185
- Returns:
186
- tuple: (Output image path for display, output image path for download, error message)
187
- """
188
- output_path, error = generate_image(image, prompt)
189
- if error:
190
- logger.debug(f"Error returned: {error}")
191
- return None, None, error
192
- logger.debug(f"Returning image path for display and download: {output_path}")
193
- return output_path, output_path, None
194
-
195
- # Define Gradio interface
196
- with gr.Blocks() as iface:
197
- gr.Markdown(
198
- """
199
- # Kling AI Single-Reference Face Generator
200
- Upload a PNG/JPEG image (<10 MB, β‰₯512x512 pixels) with a clear face to generate a transformed image using Kling AI V2.1 API (reference strength 0.97). Avoid NSFW or sensitive content.
201
- If errors persist, check the correct API endpoint at https://app.klingai.com/global/dev.
202
- """
203
- )
204
  with gr.Row():
205
- with gr.Column():
206
- image_input = gr.Image(type="filepath", label="Upload Face Image (PNG/JPEG, <10 MB, β‰₯512x512)")
207
- prompt_input = gr.Textbox(lines=2, placeholder="Enter an optional prompt (e.g., 'Turn this face into a cartoon')", label="Prompt")
208
- generate_button = gr.Button("Generate")
209
- with gr.Column():
210
- output_image = gr.Image(label="Generated Face Image")
211
- output_file = gr.File(label="Download Generated Image")
212
- error_message = gr.Textbox(label="Status/Error Message", interactive=False)
213
-
214
- generate_button.click(
215
- fn=chatbot_interface,
216
- inputs=[image_input, prompt_input],
217
- outputs=[output_image, output_file, error_message]
218
- )
219
-
220
- # Launch the interface
 
221
  if __name__ == "__main__":
222
- logger.debug("Launching Gradio interface")
223
- iface.launch(server_name="0.0.0.0", server_port=7860)
 
1
+ # app.py
2
+ import os
3
+ import time
4
  import base64
5
+ import json
6
  from pathlib import Path
7
+
8
+ import gradio as gr
9
+ import requests
10
  import jwt
11
+ from PIL import Image
 
 
12
 
13
+ # ──────────────────────────────────────────────────────────────────────────────
14
+ # CONFIG β€” set your keys as HF Space secrets or env vars for safety.
15
+ # (Falls back to the keys you shared.)
16
+ # ──────────────────────────────────────────────────────────────────────────────
17
+ ACCESS_KEY_ID = os.getenv("KLING_ACCESS_KEY_ID", "AGBGmadNd9hakFYfahytyQQJtN8CJmDJ")
18
+ ACCESS_KEY_SECRET = os.getenv("KLING_ACCESS_KEY_SECRET", "dp3pAe4PpdmnAHCAPgEd3PyLmBQrkMde")
19
 
20
+ API_BASE = "https://api.klingai.com"
21
+ ENDPOINT_KOLORS = f"{API_BASE}/v1/images/kolors" # face/subject reference modes (image-to-image)
22
+ ENDPOINT_GENERATIONS = f"{API_BASE}/v1/images/generations" # listing (used as a fallback poller)
23
+ ENDPOINT_TASK = lambda tid: f"{API_BASE}/v1/tasks/{tid}" # primary poller
24
 
25
+ # ──────────────────────────────────────────────────────────────────────────────
26
+ # AUTH β€” Kling uses JWT: iss / exp / nbf with HS256 (no "access_key" field)
27
+ # ──────────────────────────────────────────────────────────────────────────────
28
+ def make_jwt() -> str:
29
+ headers = {"alg": "HS256", "typ": "JWT"}
30
+ now = int(time.time())
31
  payload = {
32
  "iss": ACCESS_KEY_ID,
33
+ "exp": now + 1800, # 30 minutes
34
+ "nbf": now - 5, # start now (minus small skew)
35
  }
36
+ return jwt.encode(payload, ACCESS_KEY_SECRET, algorithm="HS256", headers=headers)
 
 
37
 
38
+ # ──────────────────────────────────────────────────────────────────────────────
39
+ # HELPERS
40
+ # ──────────────────────────────────────────────────────────────────────────────
41
+ def ensure_image_ok(img_path: str):
42
+ with Image.open(img_path) as im:
43
+ im.verify() # quick integrity check
44
+
45
+ def b64_data_uri(img_path: str) -> str:
46
+ mime = "image/png" if img_path.lower().endswith(".png") else "image/jpeg"
47
+ with open(img_path, "rb") as f:
48
+ b = base64.b64encode(f.read()).decode("utf-8")
49
+ return f"data:{mime};base64,{b}"
50
+
51
+ def extract_task_id(resp_json: dict) -> str | None:
52
+ # Common shapes seen in the wild
53
+ if not resp_json:
54
+ return None
55
+ for key in ("task_id", "taskId", "id"):
56
+ if key in resp_json:
57
+ return str(resp_json[key])
58
+ data = resp_json.get("data") or {}
59
+ for key in ("task_id", "taskId", "id"):
60
+ if key in data:
61
+ return str(data[key])
62
+ # Sometimes nested deeper (e.g., {"task": {"id": ...}})
63
+ task = resp_json.get("task") or data.get("task") or {}
64
+ if "id" in task:
65
+ return str(task["id"])
66
+ return None
67
+
68
+ def extract_image_urls(resp_json: dict) -> list[str]:
69
+ if not resp_json:
70
+ return []
71
+ data = resp_json.get("data") or {}
72
+ # Typical: data.task_result.images = [{url: "..."}]
73
+ task_result = data.get("task_result") or {}
74
+ images = task_result.get("images") or []
75
+ urls = [img.get("url") for img in images if isinstance(img, dict) and img.get("url")]
76
+ if urls:
77
+ return urls
78
+ # Some variants: output, image_url, result.image_url
79
+ for k in ("output", "image_url"):
80
+ if k in resp_json and isinstance(resp_json[k], str):
81
+ return [resp_json[k]]
82
+ result = resp_json.get("result") or {}
83
+ if isinstance(result, dict) and result.get("image_url"):
84
+ return [result["image_url"]]
85
+ # Works array pattern
86
+ works = resp_json.get("works") or data.get("works") or []
87
+ urls = []
88
+ for w in works:
89
+ if isinstance(w, dict):
90
+ u = w.get("url") or w.get("imageUrl")
91
+ if u:
92
+ urls.append(u)
93
+ return urls
94
+
95
+ def download_to_file(url: str, out_path: Path) -> Path:
96
+ r = requests.get(url, timeout=60)
97
+ r.raise_for_status()
98
+ out_path.parent.mkdir(parents=True, exist_ok=True)
99
+ with open(out_path, "wb") as f:
100
+ f.write(r.content)
101
+ return out_path
102
+
103
+ def poll_for_result(task_id: str, headers: dict, timeout_s: int = 300, interval_s: float = 3.0) -> dict:
104
+ """Poll task endpoint first; fallback to listing."""
105
+ deadline = time.time() + timeout_s
106
+ last_error = None
107
+
108
+ while time.time() < deadline:
109
+ try:
110
+ # Preferred: direct task status
111
+ r = requests.get(ENDPOINT_TASK(task_id), headers=headers, timeout=30)
112
+ if r.status_code == 200:
113
+ j = r.json()
114
+ # Either "status_name":"succeed" or "data.task_status":"succeed"
115
+ status_name = (j.get("status_name")
116
+ or (j.get("data") or {}).get("task_status")
117
+ or (j.get("task") or {}).get("status_name"))
118
+ if isinstance(status_name, dict):
119
+ # Some SDKs wrap status as enum-like
120
+ status_name = status_name.get("value")
121
+ if status_name in ("succeed", "succeeded", "success", "SUCCEED"):
122
+ return j
123
+ if status_name in ("failed", "FAIL", "failed_with_error"):
124
+ return j
125
+ elif r.status_code in (401, 403, 404):
126
+ last_error = r.text
127
+ # Fallback: scan generations list
128
+ r2 = requests.get(ENDPOINT_GENERATIONS, headers=headers, params={"pageSize": 200}, timeout=30)
129
+ if r2.status_code == 200:
130
+ j2 = r2.json()
131
+ for item in (j2.get("data") or []):
132
+ if str(item.get("task_id")) == str(task_id):
133
+ status = item.get("task_status")
134
+ if status in ("succeed", "succeeded", "success"):
135
+ return item
136
+ if status in ("failed",):
137
+ return item
138
+ except requests.RequestException as e:
139
+ last_error = str(e)
140
+ time.sleep(interval_s)
141
+ raise TimeoutError(f"Polling timed out for task_id {task_id}. Last error: {last_error or 'n/a'}")
142
+
143
+ # ──────────────────────────────────────────────────────────────────────────────
144
+ # CORE CALL β€” Kolors face reference (single reference, faceStrength=97)
145
+ # ──────────────────────────────────────────────────────────────────────────────
146
+ def kling_face_reference(image_path: str, prompt: str, face_strength: int = 97, aspect_ratio: str = "1:1") -> tuple[str, str]:
147
  """
148
+ Returns (display_image_path, download_file_path)
 
 
 
 
 
 
 
149
  """
150
+ if not image_path:
151
+ raise gr.Error("Please upload a face/reference image.")
152
+ ensure_image_ok(image_path)
153
+
154
+ token = make_jwt()
155
+ headers_json = {
156
+ "Authorization": f"Bearer {token}",
157
+ "Content-Type": "application/json",
 
 
 
 
 
 
 
 
158
  }
159
+ headers_multipart = {
160
+ "Authorization": f"Bearer {token}",
 
 
 
 
 
161
  }
162
+
163
+ # First try: multipart/form-data (send file as `imageReference`)
164
+ data_multipart = {
165
+ "prompt": (None, prompt),
166
+ "reference": (None, "face"),
167
+ "faceStrength": (None, str(max(1, min(100, int(face_strength))))),
168
+ "faceNo": (None, "1"), # single face reference
169
+ "imageCount": (None, "1"),
170
+ "aspect_ratio": (None, aspect_ratio),
171
+ }
172
+ files = {
173
+ "imageReference": (os.path.basename(image_path), open(image_path, "rb"),
174
+ "image/png" if image_path.lower().endswith(".png") else "image/jpeg")
175
+ }
176
+
177
+ # Attempt 1 β€” multipart
178
  try:
179
+ resp = requests.post(ENDPOINT_KOLORS, headers=headers_multipart, files=files, data=data_multipart, timeout=60)
180
+ if resp.status_code == 200:
181
+ j = resp.json()
182
+ else:
183
+ # Read JSON anyway if possible
184
+ try:
185
+ j = resp.json()
186
+ except Exception:
187
+ j = {"code": resp.status_code, "message": resp.text}
188
+ finally:
189
+ # Close file handle if opened
190
+ try:
191
+ files["imageReference"][1].close()
192
+ except Exception:
193
+ pass
194
+
195
+ task_id = extract_task_id(j)
196
+
197
+ # If Kolors rejected multipart or no task_id, try JSON with data URI
198
+ if not task_id:
199
+ payload = {
200
+ "prompt": prompt,
201
+ "reference": "face",
202
+ "faceStrength": max(1, min(100, int(face_strength))),
203
+ "faceNo": 1,
204
+ "imageCount": 1,
205
+ "aspect_ratio": aspect_ratio,
206
+ "imageReference": b64_data_uri(image_path),
207
+ }
208
+ resp2 = requests.post(ENDPOINT_KOLORS, headers=headers_json, json=payload, timeout=60)
209
+ try:
210
+ j = resp2.json()
211
+ except Exception:
212
+ j = {"code": resp2.status_code, "message": resp2.text}
213
+ task_id = extract_task_id(j)
214
+
215
+ if not task_id:
216
+ code = j.get("code") or j.get("service_code") or "?"
217
+ msg = j.get("message") or j.get("error") or f"HTTP {resp.status_code if 'resp' in locals() else '?'}"
218
+ raise gr.Error(f"Create task failed. Code: {code}. Message: {msg}")
219
+
220
+ # Poll
221
+ result_json = poll_for_result(task_id, headers=headers_json, timeout_s=420, interval_s=3.0)
222
+
223
+ # Gather image URLs
224
+ urls = extract_image_urls(result_json)
225
+ if not urls:
226
+ # Some APIs return the latest object on /v1/images/generations with same task_id
227
+ try:
228
+ listing = requests.get(ENDPOINT_GENERATIONS, headers=headers_json, params={"pageSize": 200}, timeout=30).json()
229
+ for item in (listing.get("data") or []):
230
+ if str(item.get("task_id")) == str(task_id):
231
+ urls = extract_image_urls(item)
232
+ if urls:
233
+ break
234
+ except Exception:
235
+ pass
236
+
237
+ if not urls:
238
+ raise gr.Error(f"Task {task_id} succeeded but no image URL found in response.")
239
+
240
+ # Download first image
241
+ out_dir = Path("outputs")
242
+ out_dir.mkdir(parents=True, exist_ok=True)
243
+ out_path = out_dir / f"kling_face_{task_id}.png"
244
+ download_to_file(urls[0], out_path)
245
+
246
+ # Return same path for preview and download
247
+ return str(out_path), str(out_path)
248
+
249
+ # ──────────────────────────────────────────────────────────────────────────────
250
+ # GRADIO UI
251
+ # ──────────────────────────────────────────────────────────────────────────────
252
+ with gr.Blocks(title="Kling AI β€” Image to Image (Face Reference)") as demo:
253
+ gr.Markdown("### Kling AI β€” Image-to-Image (Single Face Reference)\nUpload a face image and a prompt. Strength defaults to 97.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
  with gr.Row():
255
+ in_image = gr.Image(type="filepath", label="Reference Face Image (PNG/JPG)")
256
+ in_prompt = gr.Textbox(label="Prompt", placeholder="e.g., Ultra-detailed portrait, soft light, studio background", lines=2)
257
+ with gr.Row():
258
+ in_strength = gr.Slider(1, 100, value=97, step=1, label="Face Reference Strength")
259
+ in_aspect = gr.Dropdown(choices=["1:1", "3:4", "4:3", "2:3", "3:2", "16:9", "9:16", "21:9"], value="1:1", label="Aspect Ratio")
260
+ btn = gr.Button("Generate", variant="primary")
261
+
262
+ out_img = gr.Image(label="Generated Image", show_download_button=False)
263
+ out_file = gr.File(label="Download Image")
264
+
265
+ def run(image, prompt, strength, aspect):
266
+ if not prompt or not prompt.strip():
267
+ raise gr.Error("Please enter a prompt.")
268
+ return kling_face_reference(image, prompt.strip(), int(strength), aspect)
269
+
270
+ btn.click(fn=run, inputs=[in_image, in_prompt, in_strength, in_aspect], outputs=[out_img, out_file])
271
+
272
  if __name__ == "__main__":
273
+ # On HF Spaces, just `python app.py` is enough β€” no need to set host/port.
274
+ demo.launch()