File size: 6,318 Bytes
85a7fa6
a594839
1a8f2c7
dc04565
85a7fa6
ae39b5f
3a53c8d
ae39b5f
1a8f2c7
3a53c8d
 
 
 
 
deecbc4
 
dc04565
3a53c8d
1a8f2c7
3a53c8d
1a8f2c7
3a53c8d
3a257f2
90ebabe
3a53c8d
 
3a257f2
90ebabe
 
3a53c8d
 
 
 
 
 
 
 
 
 
 
 
dc04565
3a53c8d
90ebabe
3a53c8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90ebabe
3a53c8d
 
 
 
dc04565
3a53c8d
 
dc04565
3a53c8d
 
90ebabe
3a53c8d
 
 
 
90ebabe
22fcb4a
90ebabe
3a53c8d
c3f22c6
90ebabe
22fcb4a
ae39b5f
90ebabe
3a53c8d
 
90ebabe
 
 
 
 
85a7fa6
3a53c8d
 
 
013dbb5
ae39b5f
85a7fa6
 
3a53c8d
 
 
 
 
 
 
 
013dbb5
3a53c8d
dc04565
c0c3ada
3a53c8d
ae39b5f
3a53c8d
 
 
90ebabe
3a53c8d
ae39b5f
 
85a7fa6
013dbb5
 
 
ae39b5f
 
3a53c8d
dc04565
 
85a7fa6
1a8f2c7
045423f
90ebabe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import gradio as gr
import requests
import base64
import os
import time
import jwt
import logging
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ===== API CONFIGURATION =====
ACCESS_KEY_ID = "AFyHfnQATghFdCMyAG3gRPbNY4TNKFGB"
ACCESS_KEY_SECRET = "TTepeLyBterLNM3brYPGmdndBnnyKJBA"
API_BASE_URL = "https://api-singapore.klingai.com"
CREATE_TASK_ENDPOINT = f"{API_BASE_URL}/v1/images/generations"  # SINGLE image endpoint

# ===== AUTHENTICATION =====
def generate_jwt_token():
    """Generate JWT token for API authentication"""
    payload = {
        "iss": ACCESS_KEY_ID,
        "exp": int(time.time()) + 1800,  # 30 minutes expiration
        "nbf": int(time.time()) - 5      # Not before 5 seconds ago
    }
    return jwt.encode(payload, ACCESS_KEY_SECRET, algorithm="HS256")

# ===== IMAGE PROCESSING =====
def prepare_image_base64(image_path):
    """Convert image to base64 without prefix"""
    try:
        with open(image_path, "rb") as img_file:
            return base64.b64encode(img_file.read()).decode('utf-8')
    except Exception as e:
        logger.error(f"Image processing failed: {str(e)}")
        return None

def validate_face_image(image_path):
    """Validate the image meets face transformation requirements"""
    try:
        # Check file exists
        if not os.path.exists(image_path):
            return False, "Image file not found"
            
        # Check file size (max 10MB)
        file_size = os.path.getsize(image_path) / (1024 * 1024)
        if file_size > 10:
            return False, "Image too large (max 10MB)"
            
        return True, ""
    except Exception as e:
        return False, f"Validation error: {str(e)}"

# ===== API FUNCTIONS =====
def create_face_task(image_base64, prompt):
    """Create face transformation task with 97% fidelity"""
    headers = {
        "Authorization": f"Bearer {generate_jwt_token()}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model_name": "kling-v2.1",  # Best for face preservation
        "prompt": prompt,
        "image": image_base64,
        "image_reference": "face",    # Critical for face control
        "image_fidelity": 0.97,       # 97% similarity
        "human_fidelity": 0.97,       # 97% facial features
        "aspect_ratio": "1:1",
        "n": 1
    }
    
    try:
        response = requests.post(CREATE_TASK_ENDPOINT, json=payload, headers=headers)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logger.error(f"API Error: {str(e)}")
        return None

def check_task_status(task_id):
    headers = {"Authorization": f"Bearer {generate_jwt_token()}"}
    try:
        response = requests.get(
            f"{API_BASE_URL}/v1/images/generations/{task_id}",
            headers=headers
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logger.error(f"Status Check Error: {str(e)}")
        return None

# ===== MAIN FUNCTION =====
def transform_face(image_path, prompt):
    """Full transformation workflow"""
    # Validate image
    is_valid, error_msg = validate_face_image(image_path)
    if not is_valid:
        return None, error_msg
    
    try:
        # Prepare image
        image_base64 = prepare_image_base64(image_path)
        if not image_base64:
            return None, "Failed to process image"
        
        # Create task
        task_data = create_face_task(image_base64, prompt)
        if not task_data or task_data.get("code") != 0:
            return None, "Failed to start transformation"
        
        task_id = task_data["data"]["task_id"]
        logger.info(f"Task created: {task_id}")
        
        # Check results (max 3 minutes)
        for _ in range(18):  # 18 attempts × 10 seconds
            time.sleep(10)
            status_data = check_task_status(task_id)
            if not status_data:
                continue
                
            if status_data["data"]["task_status"] == "succeed":
                image_url = status_data["data"]["task_result"]["images"][0]["url"]
                img_data = requests.get(image_url).content
                output_path = f"/tmp/face_result_{task_id}.png"
                with open(output_path, "wb") as f:
                    f.write(img_data)
                return output_path, None
                
            elif status_data["data"]["task_status"] in ("failed", "canceled"):
                error_msg = status_data["data"].get("task_status_msg", "Task failed")
                return None, error_msg
        
        return None, "Processing timed out"
        
    except Exception as e:
        return None, f"Error: {str(e)}"

# ===== GRADIO INTERFACE =====
with gr.Blocks(title="Face Transformer") as app:
    gr.Markdown("# 🎭 Exact Face Transformation (97% Match)")
    gr.Markdown("Upload ONE face photo for style transformation (97% similarity)")
    
    with gr.Row():
        with gr.Column():
            image_input = gr.Image(
                type="filepath",
                label="Upload Face Photo",
                sources=["upload"],
                height=300
            )
            prompt_input = gr.Textbox(
                label="Style Prompt", 
                placeholder="e.g. 'anime character', 'watercolor portrait'"
            )
            generate_btn = gr.Button("Transform", variant="primary")
            
            gr.Markdown("### Requirements")
            gr.Markdown("""
            - **Single clear face photo**
            - Front-facing works best
            - No glasses/masks
            - Max 10MB (JPG/PNG)
            - Min 300x300px
            """)
            
        with gr.Column():
            output_image = gr.Image(label="Result", interactive=False, height=400)
            output_file = gr.File(label="Download Result")
            status_output = gr.Textbox(label="Status", interactive=False)
    
    generate_btn.click(
        fn=lambda img, prompt: transform_face(img, prompt) + (None,),
        inputs=[image_input, prompt_input],
        outputs=[output_image, output_file, status_output]
    )

if __name__ == "__main__":
    app.launch(server_name="0.0.0.0", server_port=7860)