File size: 6,698 Bytes
7863e99 ca1d300 7863e99 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import gradio as gr
import requests
import io
from PIL import Image, ImageOps
import base64
import time
def load_image_from_url(url):
try:
response = requests.get(url)
response.raise_for_status()
image = Image.open(io.BytesIO(response.content))
return image
except Exception as e:
return None, f"Error: {e}"
def send_to_api(key, prompt, image_url, mask_base64, path_points):
"""Send the image and mask to the API endpoint."""
url = "https://api.goapi.ai/api/v1/task"
payload = {
"model": "kling",
"task_type": "video_generation",
"input": {
"prompt": prompt,
"negative_prompt": "",
"cfg_scale": 0.5,
"duration": 5,
"image_url": image_url,
"image_tail_url": "",
"mode": "std",
"version": "1.0",
"motion_brush": {
"mask_url": f"data:image/png;base64,{mask_base64}",
"static_masks": [{"points": []}],
"dynamic_masks": [{"points": path_points}]
}
}
}
headers = {
"x-api-key": key
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
task_id = data.get("data", {}).get("task_id")
return task_id if task_id else None
else:
return f"Request failed, status code: {response.status_code}", None
def fetch_api(task_id, key):
"""Fetch task status and return video URL, retrying every 20 seconds until task is completed."""
url = f"https://api.goapi.ai/api/v1/task/{task_id}"
headers = {
"x-api-key": key
}
while True:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
status = data.get("data", {}).get("status", "")
if status == "completed":
video_url = data.get("data", {}).get("output", {}).get("video_url", "Error video URL")
return video_url
else:
print(f"Task status is '{status}'. Retrying in 10 seconds...")
else:
return f"Request failed, status code: {response.status_code}", None
time.sleep(10)
def image_to_base64(image):
"""Convert a PIL Image to a base64-encoded PNG string."""
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
return img_base64
def generate_mask_and_path(editor_value, path_direction, key, prompt, original_image_url):
layers = editor_value.get("layers", [])
if len(layers) < 3:
return None
green_layer = layers[0]
green_mask = ImageOps.colorize(
ImageOps.grayscale(green_layer), black="black", white="green"
)
black_layer = layers[1]
black_mask = ImageOps.colorize(
ImageOps.grayscale(green_layer), black="black", white="green"
)
width, height = green_mask.size
composite_image = Image.new("RGB", (width, height), "white")
composite_image.paste(green_mask, mask=green_layer)
composite_image.paste(black_mask, mask=black_layer)
path_layer = layers[2]
path_array = path_layer.load()
path_points = []
# Generate path points based on selected direction
if path_direction == "Left to Right":
for y in range(height):
for x in range(width):
if path_array[x, y] == (255, 255, 255, 255):
path_points.append({"x": x, "y": y})
elif path_direction == "Right to Left":
for y in range(height):
for x in range(width - 1, -1, -1):
if path_array[x, y] == (255, 255, 255, 255):
path_points.append({"x": x, "y": y})
elif path_direction == "Top to Bottom":
for x in range(width):
for y in range(height):
if path_array[x, y] == (255, 255, 255, 255):
path_points.append({"x": x, "y": y})
elif path_direction == "Bottom to Top":
for x in range(width):
for y in range(height - 1, -1, -1):
if path_array[x, y] == (255, 255, 255, 255):
path_points.append({"x": x, "y": y})
selected_points = []
if path_points:
step = max(len(path_points) // 10, 1)
selected_points = path_points[::step][:10]
original_image = original_image_url
mask_base64 = image_to_base64(composite_image)
task_id = send_to_api(key, prompt, original_image, mask_base64, selected_points)
video_url = fetch_api(task_id, key)
return composite_image, selected_points, task_id, video_url
with gr.Blocks() as interface:
gr.Markdown("# Video Motion Generation Tool")
gr.Markdown("---")
gr.Markdown("### 1. Input Background Image URL")
with gr.Row():
url_input = gr.Textbox(label="Input Background Image URL", placeholder="Enter the image URL")
load_image_btn = gr.Button("Load Image")
gr.Markdown("---")
gr.Markdown("### 2. Use the Brush Tool to Edit the Image")
gr.Markdown("Layer 1 will generate a dynamic mask, Layer 2 is a static mask, and Layer 3 will generate path points.")
with gr.Row():
image_editor = gr.ImageEditor(
type="pil",
brush=gr.Brush(default_size=20, colors=["#FFFFFF"], color_mode="fixed"),
layers=True,
interactive=True,
label="Drawing Tool to Generate Mask and Path",
height=700,
)
with gr.Row():
prompt_input = gr.Textbox(label="Prompt", placeholder="Enter Prompt")
with gr.Row():
key_input = gr.Textbox(label="API Key", placeholder="Enter PiAPI Key")
with gr.Row():
direction_input = gr.Dropdown(
choices=["Left to Right", "Right to Left", "Top to Bottom", "Bottom to Top"], label="Select Path Direction"
)
submit_btn = gr.Button("Generate")
with gr.Row():
output_composite_image = gr.Image(label="Generated Composite Image")
output_path_points = gr.Textbox(label="Path Point Data")
output_task_id = gr.Textbox(label="Task ID")
output_video = gr.Video(label="Generated Video Link")
load_image_btn.click(
fn=load_image_from_url,
inputs=[url_input],
outputs=[image_editor],
)
submit_btn.click(
fn=generate_mask_and_path,
inputs=[image_editor, direction_input, key_input, prompt_input, url_input],
outputs=[output_composite_image, output_path_points, output_task_id, output_video],
)
interface.launch()
|