File size: 6,698 Bytes
7863e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca1d300
 
7863e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import gradio as gr
import requests
import io
from PIL import Image, ImageOps
import base64
import time

def load_image_from_url(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
        image = Image.open(io.BytesIO(response.content))
        return image
    except Exception as e:
        return None, f"Error: {e}"

def send_to_api(key, prompt, image_url, mask_base64, path_points):
    """Send the image and mask to the API endpoint."""
    url = "https://api.goapi.ai/api/v1/task"
    payload = {
        "model": "kling",
        "task_type": "video_generation",
        "input": {
            "prompt": prompt,
            "negative_prompt": "",
            "cfg_scale": 0.5,
            "duration": 5,
            "image_url": image_url, 
            "image_tail_url": "",
            "mode": "std",
            "version": "1.0",
            "motion_brush": {
                "mask_url": f"data:image/png;base64,{mask_base64}", 
                "static_masks": [{"points": []}],
                "dynamic_masks": [{"points": path_points}]
            }
        }
    }

    headers = {
        "x-api-key": key  
    }

    response = requests.post(url, headers=headers, json=payload)
    if response.status_code == 200:
        data = response.json()
        task_id = data.get("data", {}).get("task_id")  
        return task_id if task_id else None
    else:
        return f"Request failed, status code: {response.status_code}", None

def fetch_api(task_id, key):
    """Fetch task status and return video URL, retrying every 20 seconds until task is completed."""
    url = f"https://api.goapi.ai/api/v1/task/{task_id}"
    headers = {
        "x-api-key": key
    }

    while True:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            status = data.get("data", {}).get("status", "")
            if status == "completed":
                video_url = data.get("data", {}).get("output", {}).get("video_url", "Error video URL")
                return video_url
            else:
                print(f"Task status is '{status}'. Retrying in 10 seconds...")
        else:
            return f"Request failed, status code: {response.status_code}", None
        
        time.sleep(10)

def image_to_base64(image):
    """Convert a PIL Image to a base64-encoded PNG string."""
    buffered = io.BytesIO()
    image.save(buffered, format="PNG")
    img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
    return img_base64

def generate_mask_and_path(editor_value, path_direction, key, prompt, original_image_url):
    layers = editor_value.get("layers", [])
    if len(layers) < 3:
        return None

    green_layer = layers[0]
    green_mask = ImageOps.colorize(
        ImageOps.grayscale(green_layer), black="black", white="green"
    )

    black_layer = layers[1]
    black_mask = ImageOps.colorize(
        ImageOps.grayscale(green_layer), black="black", white="green"
    )

    width, height = green_mask.size
    composite_image = Image.new("RGB", (width, height), "white")
    composite_image.paste(green_mask, mask=green_layer)
    composite_image.paste(black_mask, mask=black_layer)

    path_layer = layers[2]
    path_array = path_layer.load()
    path_points = []

    # Generate path points based on selected direction
    if path_direction == "Left to Right":
        for y in range(height):
            for x in range(width):
                if path_array[x, y] == (255, 255, 255, 255):
                    path_points.append({"x": x, "y": y})
    elif path_direction == "Right to Left":
        for y in range(height):
            for x in range(width - 1, -1, -1):
                if path_array[x, y] == (255, 255, 255, 255):
                    path_points.append({"x": x, "y": y})
    elif path_direction == "Top to Bottom":
        for x in range(width):
            for y in range(height):
                if path_array[x, y] == (255, 255, 255, 255):
                    path_points.append({"x": x, "y": y})
    elif path_direction == "Bottom to Top":
        for x in range(width):
            for y in range(height - 1, -1, -1):
                if path_array[x, y] == (255, 255, 255, 255):
                    path_points.append({"x": x, "y": y})

    selected_points = []
    if path_points:
        step = max(len(path_points) // 10, 1)
        selected_points = path_points[::step][:10]

    original_image = original_image_url
    mask_base64 = image_to_base64(composite_image)

    task_id = send_to_api(key, prompt, original_image, mask_base64, selected_points)
    video_url = fetch_api(task_id, key)

    return composite_image, selected_points, task_id, video_url

with gr.Blocks() as interface:
    gr.Markdown("# Video Motion Generation Tool")

    gr.Markdown("---")
    gr.Markdown("### 1. Input Background Image URL")
    with gr.Row():
        url_input = gr.Textbox(label="Input Background Image URL", placeholder="Enter the image URL")
        load_image_btn = gr.Button("Load Image")

    gr.Markdown("---")
    gr.Markdown("### 2. Use the Brush Tool to Edit the Image")
    gr.Markdown("Layer 1 will generate a dynamic mask, Layer 2 is a static mask, and Layer 3 will generate path points.")
    with gr.Row():
        image_editor = gr.ImageEditor(
            type="pil",
            brush=gr.Brush(default_size=20, colors=["#FFFFFF"], color_mode="fixed"),
            layers=True,
            interactive=True,
            label="Drawing Tool to Generate Mask and Path",
            height=700,
        )
    with gr.Row():
        prompt_input = gr.Textbox(label="Prompt", placeholder="Enter Prompt")

    with gr.Row():
        key_input = gr.Textbox(label="API Key", placeholder="Enter PiAPI Key")


    with gr.Row():
        direction_input = gr.Dropdown(
            choices=["Left to Right", "Right to Left", "Top to Bottom", "Bottom to Top"], label="Select Path Direction"
        )
        submit_btn = gr.Button("Generate")

    with gr.Row():
        output_composite_image = gr.Image(label="Generated Composite Image")
        output_path_points = gr.Textbox(label="Path Point Data")
        output_task_id = gr.Textbox(label="Task ID")
        output_video = gr.Video(label="Generated Video Link")


    load_image_btn.click(
        fn=load_image_from_url,
        inputs=[url_input],
        outputs=[image_editor],
    )

    submit_btn.click(
        fn=generate_mask_and_path,
        inputs=[image_editor, direction_input, key_input, prompt_input, url_input],
        outputs=[output_composite_image, output_path_points, output_task_id, output_video],
    )

interface.launch()