Spaces:
Running
Running
herodevcode commited on
Commit ·
a0199e2
1
Parent(s): 6070720
Updated generate_image.py
Browse files- generate_image.py +29 -271
generate_image.py
CHANGED
|
@@ -6,19 +6,14 @@ from typing import List, Optional, Tuple
|
|
| 6 |
from runwayml import RunwayML
|
| 7 |
import mimetypes
|
| 8 |
from urllib.parse import urlparse
|
| 9 |
-
import replicate
|
| 10 |
|
| 11 |
def encode_image_to_data_uri(image_path: str) -> str:
|
| 12 |
"""Convert a local image file to a data URI."""
|
| 13 |
-
# Get the MIME type
|
| 14 |
mime_type, _ = mimetypes.guess_type(image_path)
|
| 15 |
if not mime_type or not mime_type.startswith('image/'):
|
| 16 |
raise ValueError(f"Unsupported image type for {image_path}")
|
| 17 |
-
|
| 18 |
-
# Read and encode the image
|
| 19 |
with open(image_path, 'rb') as image_file:
|
| 20 |
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
|
| 21 |
-
|
| 22 |
return f"data:{mime_type};base64,{encoded_string}"
|
| 23 |
|
| 24 |
def save_generated_image(image_url: str, filename: str = None, batch_folder: str = None) -> str:
|
|
@@ -33,33 +28,21 @@ def save_generated_image(image_url: str, filename: str = None, batch_folder: str
|
|
| 33 |
Returns:
|
| 34 |
Path to the saved image file
|
| 35 |
"""
|
| 36 |
-
# Create batch folder if not provided
|
| 37 |
if not batch_folder:
|
| 38 |
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
| 39 |
batch_folder = f"batch_{timestamp}"
|
| 40 |
-
|
| 41 |
-
# Create directory structure
|
| 42 |
output_dir = os.path.join("output", batch_folder)
|
| 43 |
os.makedirs(output_dir, exist_ok=True)
|
| 44 |
-
|
| 45 |
-
# Generate filename if not provided
|
| 46 |
if not filename:
|
| 47 |
timestamp = int(time.time())
|
| 48 |
filename = f"generated_{timestamp}.jpg"
|
| 49 |
-
|
| 50 |
-
# Ensure filename has extension
|
| 51 |
if not os.path.splitext(filename)[1]:
|
| 52 |
filename += ".jpg"
|
| 53 |
-
|
| 54 |
output_path = os.path.join(output_dir, filename)
|
| 55 |
-
|
| 56 |
-
# Download and save the image
|
| 57 |
response = requests.get(image_url)
|
| 58 |
response.raise_for_status()
|
| 59 |
-
|
| 60 |
with open(output_path, 'wb') as f:
|
| 61 |
f.write(response.content)
|
| 62 |
-
|
| 63 |
return output_path
|
| 64 |
|
| 65 |
def generate_image_with_references(
|
|
@@ -82,37 +65,23 @@ def generate_image_with_references(
|
|
| 82 |
seed: Optional seed for reproducible results
|
| 83 |
api_key: Optional API key (uses RUNWAYML_API_SECRET env var if not provided)
|
| 84 |
auto_tag_prompt: Whether to automatically append tags to prompt (default: True)
|
| 85 |
-
When False, expects user to manually include @character, @scene, @style in prompt
|
| 86 |
|
| 87 |
Returns:
|
| 88 |
Task ID for the generation request
|
| 89 |
"""
|
| 90 |
-
# Initialize client
|
| 91 |
client = RunwayML(api_key=api_key or os.environ.get("RUNWAYML_API_SECRET"))
|
| 92 |
-
|
| 93 |
-
# Validate inputs
|
| 94 |
if len(reference_image_paths) > 3:
|
| 95 |
raise ValueError("Maximum 3 reference images allowed")
|
| 96 |
-
|
| 97 |
if len(prompt_text) > 1000:
|
| 98 |
raise ValueError("Prompt text must be 1000 characters or less")
|
| 99 |
-
|
| 100 |
-
# Prepare reference images with standardized tags
|
| 101 |
reference_images = []
|
| 102 |
tags = []
|
| 103 |
-
|
| 104 |
-
# Keep track of used standard tags to avoid duplicates
|
| 105 |
used_standard_tags = set()
|
| 106 |
-
|
| 107 |
for i, image_path in enumerate(reference_image_paths):
|
| 108 |
if not os.path.exists(image_path):
|
| 109 |
raise FileNotFoundError(f"Image file not found: {image_path}")
|
| 110 |
-
|
| 111 |
-
# Create tag based on path structure, prioritizing standard categories
|
| 112 |
filename = os.path.splitext(os.path.basename(image_path))[0]
|
| 113 |
path_parts = image_path.split(os.sep)
|
| 114 |
-
|
| 115 |
-
# Look for standard category directories
|
| 116 |
tag = None
|
| 117 |
for part in path_parts:
|
| 118 |
if part == 'characters' and 'character' not in used_standard_tags:
|
|
@@ -127,66 +96,40 @@ def generate_image_with_references(
|
|
| 127 |
tag = 'style'
|
| 128 |
used_standard_tags.add('style')
|
| 129 |
break
|
| 130 |
-
|
| 131 |
-
# If no standard category found, create a custom tag from filename
|
| 132 |
if not tag:
|
| 133 |
tag = f"ref_{filename}".replace('-', '_').replace(' ', '_')[:16]
|
| 134 |
-
# Ensure tag starts with letter and is alphanumeric + underscore
|
| 135 |
tag = ''.join(c for c in tag if c.isalnum() or c == '_')
|
| 136 |
if not tag[0].isalpha():
|
| 137 |
tag = f"img_{tag}"
|
| 138 |
-
tag = tag[:16]
|
| 139 |
-
|
| 140 |
tags.append(tag)
|
| 141 |
-
|
| 142 |
-
# Convert to data URI
|
| 143 |
data_uri = encode_image_to_data_uri(image_path)
|
| 144 |
-
|
| 145 |
-
reference_images.append({
|
| 146 |
-
"uri": data_uri,
|
| 147 |
-
"tag": tag
|
| 148 |
-
})
|
| 149 |
-
|
| 150 |
-
# Handle prompt modification based on auto_tag_prompt setting
|
| 151 |
final_prompt = prompt_text
|
| 152 |
if auto_tag_prompt and tags:
|
| 153 |
-
# Auto-append tags to prompt
|
| 154 |
tag_mentions = " ".join([f"@{tag}" for tag in tags])
|
| 155 |
final_prompt = f"{prompt_text} using references: {tag_mentions}"
|
| 156 |
-
|
| 157 |
-
# Ensure we don't exceed character limit
|
| 158 |
if len(final_prompt) > 1000:
|
| 159 |
-
# Try without the descriptive text
|
| 160 |
tag_mentions = " ".join([f"@{tag}" for tag in tags])
|
| 161 |
final_prompt = f"{prompt_text} {tag_mentions}"
|
| 162 |
-
|
| 163 |
-
# If still too long, truncate prompt text
|
| 164 |
if len(final_prompt) > 1000:
|
| 165 |
available_chars = 1000 - len(tag_mentions) - 1
|
| 166 |
final_prompt = f"{prompt_text[:available_chars]} {tag_mentions}"
|
| 167 |
-
|
| 168 |
print(f"Using tags: {tags}")
|
| 169 |
if auto_tag_prompt:
|
| 170 |
print(f"Auto-tagged prompt: {final_prompt}")
|
| 171 |
else:
|
| 172 |
print(f"Manual tagging mode - use @{', @'.join(tags)} in your prompt")
|
| 173 |
print(f"Original prompt: {final_prompt}")
|
| 174 |
-
|
| 175 |
-
# Prepare the request parameters
|
| 176 |
create_params = {
|
| 177 |
"model": model,
|
| 178 |
"prompt_text": final_prompt,
|
| 179 |
"ratio": ratio,
|
| 180 |
"reference_images": reference_images
|
| 181 |
}
|
| 182 |
-
|
| 183 |
-
# Only include seed if it's not None
|
| 184 |
if seed is not None:
|
| 185 |
create_params["seed"] = seed
|
| 186 |
-
|
| 187 |
-
# Create the generation task
|
| 188 |
task = client.text_to_image.create(**create_params)
|
| 189 |
-
|
| 190 |
return task.id
|
| 191 |
|
| 192 |
def check_task_status(task_id: str, api_key: Optional[str] = None):
|
|
@@ -234,7 +177,6 @@ def generate_and_wait_for_result(
|
|
| 234 |
Returns:
|
| 235 |
Tuple of (task_id, saved_image_path)
|
| 236 |
"""
|
| 237 |
-
# Start the generation task
|
| 238 |
task_id = generate_image_with_references(
|
| 239 |
prompt_text=prompt_text,
|
| 240 |
reference_image_paths=reference_image_paths,
|
|
@@ -244,263 +186,79 @@ def generate_and_wait_for_result(
|
|
| 244 |
api_key=api_key,
|
| 245 |
auto_tag_prompt=auto_tag_prompt
|
| 246 |
)
|
| 247 |
-
|
| 248 |
print(f"Image generation started. Task ID: {task_id}")
|
| 249 |
print(f"Checking status every {wait_interval} seconds (max {max_retries} attempts)...")
|
| 250 |
-
|
| 251 |
-
# Wait and check status
|
| 252 |
for attempt in range(max_retries):
|
| 253 |
print(f"Attempt {attempt + 1}/{max_retries} - Waiting {wait_interval} seconds...")
|
| 254 |
time.sleep(wait_interval)
|
| 255 |
-
|
| 256 |
try:
|
| 257 |
status = check_task_status(task_id, api_key)
|
| 258 |
print(f"Status: {status.status}")
|
| 259 |
-
|
| 260 |
if status.status == "SUCCEEDED":
|
| 261 |
if hasattr(status, 'output') and status.output:
|
| 262 |
image_url = status.output[0]
|
| 263 |
print(f"Generation completed! Image URL: {image_url}")
|
| 264 |
-
|
| 265 |
-
# Save the image
|
| 266 |
saved_path = save_generated_image(image_url, filename, batch_folder)
|
| 267 |
print(f"Image saved to: {saved_path}")
|
| 268 |
-
|
| 269 |
return task_id, saved_path
|
| 270 |
else:
|
| 271 |
print("Task succeeded but no output found")
|
| 272 |
return task_id, None
|
| 273 |
-
|
| 274 |
elif status.status == "FAILED":
|
| 275 |
print("Task failed")
|
| 276 |
return task_id, None
|
| 277 |
-
|
| 278 |
elif status.status in ["PENDING", "RUNNING"]:
|
| 279 |
print("Task still in progress...")
|
| 280 |
continue
|
| 281 |
-
|
| 282 |
except Exception as e:
|
| 283 |
print(f"Error checking status: {e}")
|
| 284 |
if attempt == max_retries - 1:
|
| 285 |
print("Max retries reached. Task may still be processing.")
|
| 286 |
return task_id, None
|
| 287 |
-
|
| 288 |
print(f"Timeout after {max_retries} attempts. Task may still be processing.")
|
| 289 |
print(f"You can manually check status later using task ID: {task_id}")
|
| 290 |
return task_id, None
|
| 291 |
|
| 292 |
-
def generate_image_with_replicate_imagen(
|
| 293 |
-
prompt: str,
|
| 294 |
-
aspect_ratio: str = "1:1",
|
| 295 |
-
output_format: str = "jpg",
|
| 296 |
-
model: str = "google/imagen-4-fast",
|
| 297 |
-
safety_filter_level: str = "block_only_high",
|
| 298 |
-
filename: str = None,
|
| 299 |
-
api_token: Optional[str] = None
|
| 300 |
-
) -> str:
|
| 301 |
-
"""
|
| 302 |
-
Generate an image using Replicate's Google Imagen models.
|
| 303 |
-
|
| 304 |
-
Args:
|
| 305 |
-
prompt: Text prompt for image generation
|
| 306 |
-
aspect_ratio: Aspect ratio of the generated image (default: "1:1")
|
| 307 |
-
output_format: Format of the output image (default: "jpg")
|
| 308 |
-
model: Imagen model to use (default: "google/imagen-4-fast")
|
| 309 |
-
safety_filter_level: Safety filter level (default: "block_only_high")
|
| 310 |
-
filename: Optional filename for saved image
|
| 311 |
-
api_token: Optional API token (uses REPLICATE_API_TOKEN env var if not provided)
|
| 312 |
-
|
| 313 |
-
Returns:
|
| 314 |
-
Path to the saved image file
|
| 315 |
-
"""
|
| 316 |
-
# Set API token
|
| 317 |
-
if api_token:
|
| 318 |
-
os.environ["REPLICATE_API_TOKEN"] = api_token
|
| 319 |
-
elif not os.environ.get("REPLICATE_API_TOKEN"):
|
| 320 |
-
raise ValueError("REPLICATE_API_TOKEN environment variable must be set or api_token must be provided")
|
| 321 |
-
|
| 322 |
-
print(f"Generating image with model: {model}")
|
| 323 |
-
print(f"Prompt: {prompt}")
|
| 324 |
-
print(f"Aspect ratio: {aspect_ratio}, Format: {output_format}")
|
| 325 |
-
|
| 326 |
-
# Run the model
|
| 327 |
-
try:
|
| 328 |
-
output = replicate.run(
|
| 329 |
-
model,
|
| 330 |
-
input={
|
| 331 |
-
"prompt": prompt,
|
| 332 |
-
"aspect_ratio": aspect_ratio,
|
| 333 |
-
"output_format": output_format,
|
| 334 |
-
"safety_filter_level": safety_filter_level
|
| 335 |
-
}
|
| 336 |
-
)
|
| 337 |
-
|
| 338 |
-
# The output is a URL string
|
| 339 |
-
image_url = output
|
| 340 |
-
print(f"Image generated successfully: {image_url}")
|
| 341 |
-
|
| 342 |
-
# Save the image
|
| 343 |
-
saved_path = save_generated_image(image_url, filename)
|
| 344 |
-
print(f"Image saved to: {saved_path}")
|
| 345 |
-
|
| 346 |
-
return saved_path
|
| 347 |
-
|
| 348 |
-
except Exception as e:
|
| 349 |
-
print(f"Error generating image with Replicate Imagen: {e}")
|
| 350 |
-
raise
|
| 351 |
-
|
| 352 |
def main():
|
| 353 |
-
"
|
| 354 |
-
print("=== Image Generation Model Selection ===")
|
| 355 |
-
print("Available models:")
|
| 356 |
-
print("1. runway - RunwayML with reference images")
|
| 357 |
-
print("2. imagen-fast - Replicate's Google Imagen 4 Fast")
|
| 358 |
-
|
| 359 |
-
model_choice = input("Enter model choice (runway/imagen-fast): ").strip().lower()
|
| 360 |
-
|
| 361 |
-
if model_choice == "runway":
|
| 362 |
-
print("\n=== Testing RunwayML with Reference Images ===")
|
| 363 |
-
# Example reference images
|
| 364 |
-
reference_images = [
|
| 365 |
-
"assets/characters/japanese_guy.jpg",
|
| 366 |
-
"assets/scenes/f1-fields.jpg",
|
| 367 |
-
"assets/styles/f1-cockpit.jpg"
|
| 368 |
-
]
|
| 369 |
-
|
| 370 |
-
print("=== Manual Tagging Mode (Default for Testing) ===")
|
| 371 |
-
# Example with manual tagging (auto_tag_prompt=False)
|
| 372 |
-
manual_prompt = "@character in a @scene with @style composition, cinematic lighting, high detail"
|
| 373 |
-
|
| 374 |
-
try:
|
| 375 |
-
task_id, saved_path = generate_and_wait_for_result(
|
| 376 |
-
prompt_text=manual_prompt,
|
| 377 |
-
reference_image_paths=reference_images,
|
| 378 |
-
ratio="1920:1080",
|
| 379 |
-
filename="f1_driver_manual_tags.jpg",
|
| 380 |
-
auto_tag_prompt=False # Manual tagging mode
|
| 381 |
-
)
|
| 382 |
-
|
| 383 |
-
if saved_path:
|
| 384 |
-
print(f"Manual tagging success! Image saved to: {saved_path}")
|
| 385 |
-
else:
|
| 386 |
-
print(f"Manual tagging incomplete. Task ID: {task_id}")
|
| 387 |
-
|
| 388 |
-
except Exception as e:
|
| 389 |
-
print(f"Manual tagging error: {e}")
|
| 390 |
-
|
| 391 |
-
print("\n" + "="*50)
|
| 392 |
-
print("=== Auto Tagging Mode Example ===")
|
| 393 |
-
# Example with automatic tagging (auto_tag_prompt=True)
|
| 394 |
-
auto_prompt = "A Japanese F1 driver in a cockpit style setting on a racing field, cinematic lighting, high detail"
|
| 395 |
-
|
| 396 |
-
try:
|
| 397 |
-
task_id, saved_path = generate_and_wait_for_result(
|
| 398 |
-
prompt_text=auto_prompt,
|
| 399 |
-
reference_image_paths=reference_images,
|
| 400 |
-
ratio="1920:1080",
|
| 401 |
-
filename="f1_driver_auto_tags.jpg",
|
| 402 |
-
auto_tag_prompt=True # Auto tagging mode
|
| 403 |
-
)
|
| 404 |
-
|
| 405 |
-
if saved_path:
|
| 406 |
-
print(f"Auto tagging success! Image saved to: {saved_path}")
|
| 407 |
-
else:
|
| 408 |
-
print(f"Auto tagging incomplete. Task ID: {task_id}")
|
| 409 |
-
|
| 410 |
-
except Exception as e:
|
| 411 |
-
print(f"Auto tagging error: {e}")
|
| 412 |
-
|
| 413 |
-
elif model_choice == "imagen-fast":
|
| 414 |
-
print("\n=== Testing Replicate's Google Imagen 4 Fast ===")
|
| 415 |
-
|
| 416 |
-
# Get prompt from user or use default
|
| 417 |
-
prompt = input("Enter image prompt (or press Enter for default): ").strip()
|
| 418 |
-
if not prompt:
|
| 419 |
-
prompt = "A cinematic shot of a futuristic sports car racing through a neon-lit cyberpunk city at night, high detail, dramatic lighting"
|
| 420 |
-
|
| 421 |
-
# Get aspect ratio
|
| 422 |
-
aspect_ratio = input("Enter aspect ratio (default 16:9): ").strip() or "16:9"
|
| 423 |
-
|
| 424 |
-
# Get model version
|
| 425 |
-
model_version = input("Enter model version (fast/ultra, default fast): ").strip().lower() or "fast"
|
| 426 |
-
model_name = "google/imagen-4-fast" if model_version == "fast" else "google/imagen-4-ultra"
|
| 427 |
-
|
| 428 |
-
try:
|
| 429 |
-
saved_path = generate_image_with_replicate_imagen(
|
| 430 |
-
prompt=prompt,
|
| 431 |
-
aspect_ratio=aspect_ratio,
|
| 432 |
-
model=model_name,
|
| 433 |
-
filename="imagen_test.jpg"
|
| 434 |
-
)
|
| 435 |
-
print(f"Imagen generation success! Image saved to: {saved_path}")
|
| 436 |
-
|
| 437 |
-
except Exception as e:
|
| 438 |
-
print(f"Imagen generation error: {e}")
|
| 439 |
-
|
| 440 |
-
else:
|
| 441 |
-
print(f"Invalid model choice: {model_choice}")
|
| 442 |
-
print("Please choose either 'runway' or 'imagen-fast'")
|
| 443 |
-
|
| 444 |
-
def example_manual_tagging():
|
| 445 |
-
"""
|
| 446 |
-
Example function demonstrating manual tagging mode.
|
| 447 |
-
When auto_tag_prompt=False, users must include @character, @scene, @style in their prompts.
|
| 448 |
-
"""
|
| 449 |
reference_images = [
|
| 450 |
-
"assets/characters/
|
| 451 |
-
"assets/scenes/
|
| 452 |
-
"assets/styles/
|
| 453 |
]
|
| 454 |
-
|
| 455 |
-
|
| 456 |
-
prompt_with_tags = """
|
| 457 |
-
A futuristic @character standing in a @cyberpunk @scene
|
| 458 |
-
with @style aesthetic, glowing neon lights, 4k resolution
|
| 459 |
-
""".strip()
|
| 460 |
-
|
| 461 |
-
print("Manual Tagging Example:")
|
| 462 |
-
print(f"Prompt: {prompt_with_tags}")
|
| 463 |
-
|
| 464 |
try:
|
| 465 |
task_id, saved_path = generate_and_wait_for_result(
|
| 466 |
-
prompt_text=
|
| 467 |
reference_image_paths=reference_images,
|
| 468 |
-
|
| 469 |
-
filename="
|
|
|
|
| 470 |
)
|
| 471 |
-
|
|
|
|
|
|
|
|
|
|
| 472 |
except Exception as e:
|
| 473 |
-
print(f"
|
| 474 |
-
|
| 475 |
-
|
| 476 |
-
|
| 477 |
-
"""
|
| 478 |
-
Example function demonstrating auto tagging mode.
|
| 479 |
-
When auto_tag_prompt=True, tags are automatically appended to the prompt.
|
| 480 |
-
"""
|
| 481 |
-
reference_images = [
|
| 482 |
-
"assets/characters/warrior.jpg",
|
| 483 |
-
"assets/scenes/medieval_castle.jpg",
|
| 484 |
-
"assets/styles/oil_painting.jpg"
|
| 485 |
-
]
|
| 486 |
-
|
| 487 |
-
# Simple prompt without tag references
|
| 488 |
-
simple_prompt = "A brave warrior defending a castle, epic fantasy art"
|
| 489 |
-
|
| 490 |
-
print("Auto Tagging Example:")
|
| 491 |
-
print(f"Original prompt: {simple_prompt}")
|
| 492 |
-
|
| 493 |
try:
|
| 494 |
task_id, saved_path = generate_and_wait_for_result(
|
| 495 |
-
prompt_text=
|
| 496 |
reference_image_paths=reference_images,
|
| 497 |
-
|
| 498 |
-
filename="
|
|
|
|
| 499 |
)
|
| 500 |
-
|
|
|
|
|
|
|
|
|
|
| 501 |
except Exception as e:
|
| 502 |
-
print(f"
|
| 503 |
-
return None, None
|
| 504 |
|
| 505 |
if __name__ == "__main__":
|
| 506 |
main()
|
|
|
|
| 6 |
from runwayml import RunwayML
|
| 7 |
import mimetypes
|
| 8 |
from urllib.parse import urlparse
|
|
|
|
| 9 |
|
| 10 |
def encode_image_to_data_uri(image_path: str) -> str:
|
| 11 |
"""Convert a local image file to a data URI."""
|
|
|
|
| 12 |
mime_type, _ = mimetypes.guess_type(image_path)
|
| 13 |
if not mime_type or not mime_type.startswith('image/'):
|
| 14 |
raise ValueError(f"Unsupported image type for {image_path}")
|
|
|
|
|
|
|
| 15 |
with open(image_path, 'rb') as image_file:
|
| 16 |
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
| 17 |
return f"data:{mime_type};base64,{encoded_string}"
|
| 18 |
|
| 19 |
def save_generated_image(image_url: str, filename: str = None, batch_folder: str = None) -> str:
|
|
|
|
| 28 |
Returns:
|
| 29 |
Path to the saved image file
|
| 30 |
"""
|
|
|
|
| 31 |
if not batch_folder:
|
| 32 |
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
| 33 |
batch_folder = f"batch_{timestamp}"
|
|
|
|
|
|
|
| 34 |
output_dir = os.path.join("output", batch_folder)
|
| 35 |
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
|
| 36 |
if not filename:
|
| 37 |
timestamp = int(time.time())
|
| 38 |
filename = f"generated_{timestamp}.jpg"
|
|
|
|
|
|
|
| 39 |
if not os.path.splitext(filename)[1]:
|
| 40 |
filename += ".jpg"
|
|
|
|
| 41 |
output_path = os.path.join(output_dir, filename)
|
|
|
|
|
|
|
| 42 |
response = requests.get(image_url)
|
| 43 |
response.raise_for_status()
|
|
|
|
| 44 |
with open(output_path, 'wb') as f:
|
| 45 |
f.write(response.content)
|
|
|
|
| 46 |
return output_path
|
| 47 |
|
| 48 |
def generate_image_with_references(
|
|
|
|
| 65 |
seed: Optional seed for reproducible results
|
| 66 |
api_key: Optional API key (uses RUNWAYML_API_SECRET env var if not provided)
|
| 67 |
auto_tag_prompt: Whether to automatically append tags to prompt (default: True)
|
|
|
|
| 68 |
|
| 69 |
Returns:
|
| 70 |
Task ID for the generation request
|
| 71 |
"""
|
|
|
|
| 72 |
client = RunwayML(api_key=api_key or os.environ.get("RUNWAYML_API_SECRET"))
|
|
|
|
|
|
|
| 73 |
if len(reference_image_paths) > 3:
|
| 74 |
raise ValueError("Maximum 3 reference images allowed")
|
|
|
|
| 75 |
if len(prompt_text) > 1000:
|
| 76 |
raise ValueError("Prompt text must be 1000 characters or less")
|
|
|
|
|
|
|
| 77 |
reference_images = []
|
| 78 |
tags = []
|
|
|
|
|
|
|
| 79 |
used_standard_tags = set()
|
|
|
|
| 80 |
for i, image_path in enumerate(reference_image_paths):
|
| 81 |
if not os.path.exists(image_path):
|
| 82 |
raise FileNotFoundError(f"Image file not found: {image_path}")
|
|
|
|
|
|
|
| 83 |
filename = os.path.splitext(os.path.basename(image_path))[0]
|
| 84 |
path_parts = image_path.split(os.sep)
|
|
|
|
|
|
|
| 85 |
tag = None
|
| 86 |
for part in path_parts:
|
| 87 |
if part == 'characters' and 'character' not in used_standard_tags:
|
|
|
|
| 96 |
tag = 'style'
|
| 97 |
used_standard_tags.add('style')
|
| 98 |
break
|
|
|
|
|
|
|
| 99 |
if not tag:
|
| 100 |
tag = f"ref_{filename}".replace('-', '_').replace(' ', '_')[:16]
|
|
|
|
| 101 |
tag = ''.join(c for c in tag if c.isalnum() or c == '_')
|
| 102 |
if not tag[0].isalpha():
|
| 103 |
tag = f"img_{tag}"
|
| 104 |
+
tag = tag[:16]
|
|
|
|
| 105 |
tags.append(tag)
|
|
|
|
|
|
|
| 106 |
data_uri = encode_image_to_data_uri(image_path)
|
| 107 |
+
reference_images.append({"uri": data_uri, "tag": tag})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 108 |
final_prompt = prompt_text
|
| 109 |
if auto_tag_prompt and tags:
|
|
|
|
| 110 |
tag_mentions = " ".join([f"@{tag}" for tag in tags])
|
| 111 |
final_prompt = f"{prompt_text} using references: {tag_mentions}"
|
|
|
|
|
|
|
| 112 |
if len(final_prompt) > 1000:
|
|
|
|
| 113 |
tag_mentions = " ".join([f"@{tag}" for tag in tags])
|
| 114 |
final_prompt = f"{prompt_text} {tag_mentions}"
|
|
|
|
|
|
|
| 115 |
if len(final_prompt) > 1000:
|
| 116 |
available_chars = 1000 - len(tag_mentions) - 1
|
| 117 |
final_prompt = f"{prompt_text[:available_chars]} {tag_mentions}"
|
|
|
|
| 118 |
print(f"Using tags: {tags}")
|
| 119 |
if auto_tag_prompt:
|
| 120 |
print(f"Auto-tagged prompt: {final_prompt}")
|
| 121 |
else:
|
| 122 |
print(f"Manual tagging mode - use @{', @'.join(tags)} in your prompt")
|
| 123 |
print(f"Original prompt: {final_prompt}")
|
|
|
|
|
|
|
| 124 |
create_params = {
|
| 125 |
"model": model,
|
| 126 |
"prompt_text": final_prompt,
|
| 127 |
"ratio": ratio,
|
| 128 |
"reference_images": reference_images
|
| 129 |
}
|
|
|
|
|
|
|
| 130 |
if seed is not None:
|
| 131 |
create_params["seed"] = seed
|
|
|
|
|
|
|
| 132 |
task = client.text_to_image.create(**create_params)
|
|
|
|
| 133 |
return task.id
|
| 134 |
|
| 135 |
def check_task_status(task_id: str, api_key: Optional[str] = None):
|
|
|
|
| 177 |
Returns:
|
| 178 |
Tuple of (task_id, saved_image_path)
|
| 179 |
"""
|
|
|
|
| 180 |
task_id = generate_image_with_references(
|
| 181 |
prompt_text=prompt_text,
|
| 182 |
reference_image_paths=reference_image_paths,
|
|
|
|
| 186 |
api_key=api_key,
|
| 187 |
auto_tag_prompt=auto_tag_prompt
|
| 188 |
)
|
|
|
|
| 189 |
print(f"Image generation started. Task ID: {task_id}")
|
| 190 |
print(f"Checking status every {wait_interval} seconds (max {max_retries} attempts)...")
|
|
|
|
|
|
|
| 191 |
for attempt in range(max_retries):
|
| 192 |
print(f"Attempt {attempt + 1}/{max_retries} - Waiting {wait_interval} seconds...")
|
| 193 |
time.sleep(wait_interval)
|
|
|
|
| 194 |
try:
|
| 195 |
status = check_task_status(task_id, api_key)
|
| 196 |
print(f"Status: {status.status}")
|
|
|
|
| 197 |
if status.status == "SUCCEEDED":
|
| 198 |
if hasattr(status, 'output') and status.output:
|
| 199 |
image_url = status.output[0]
|
| 200 |
print(f"Generation completed! Image URL: {image_url}")
|
|
|
|
|
|
|
| 201 |
saved_path = save_generated_image(image_url, filename, batch_folder)
|
| 202 |
print(f"Image saved to: {saved_path}")
|
|
|
|
| 203 |
return task_id, saved_path
|
| 204 |
else:
|
| 205 |
print("Task succeeded but no output found")
|
| 206 |
return task_id, None
|
|
|
|
| 207 |
elif status.status == "FAILED":
|
| 208 |
print("Task failed")
|
| 209 |
return task_id, None
|
|
|
|
| 210 |
elif status.status in ["PENDING", "RUNNING"]:
|
| 211 |
print("Task still in progress...")
|
| 212 |
continue
|
|
|
|
| 213 |
except Exception as e:
|
| 214 |
print(f"Error checking status: {e}")
|
| 215 |
if attempt == max_retries - 1:
|
| 216 |
print("Max retries reached. Task may still be processing.")
|
| 217 |
return task_id, None
|
|
|
|
| 218 |
print(f"Timeout after {max_retries} attempts. Task may still be processing.")
|
| 219 |
print(f"You can manually check status later using task ID: {task_id}")
|
| 220 |
return task_id, None
|
| 221 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 222 |
def main():
|
| 223 |
+
print("\n=== Testing RunwayML with Reference Images ===")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 224 |
reference_images = [
|
| 225 |
+
"assets/characters/japanese_guy.jpg",
|
| 226 |
+
"assets/scenes/f1-fields.jpg",
|
| 227 |
+
"assets/styles/f1-cockpit.jpg"
|
| 228 |
]
|
| 229 |
+
print("=== Manual Tagging Mode ===")
|
| 230 |
+
manual_prompt = "@character in a @scene with @style composition, cinematic lighting, high detail"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 231 |
try:
|
| 232 |
task_id, saved_path = generate_and_wait_for_result(
|
| 233 |
+
prompt_text=manual_prompt,
|
| 234 |
reference_image_paths=reference_images,
|
| 235 |
+
ratio="1920:1080",
|
| 236 |
+
filename="f1_driver_manual_tags.jpg",
|
| 237 |
+
auto_tag_prompt=False
|
| 238 |
)
|
| 239 |
+
if saved_path:
|
| 240 |
+
print(f"Manual tagging success! Image saved to: {saved_path}")
|
| 241 |
+
else:
|
| 242 |
+
print(f"Manual tagging incomplete. Task ID: {task_id}")
|
| 243 |
except Exception as e:
|
| 244 |
+
print(f"Manual tagging error: {e}")
|
| 245 |
+
print("\n" + "="*50)
|
| 246 |
+
print("=== Auto Tagging Mode Example ===")
|
| 247 |
+
auto_prompt = "A Japanese F1 driver in a cockpit style setting on a racing field, cinematic lighting, high detail"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 248 |
try:
|
| 249 |
task_id, saved_path = generate_and_wait_for_result(
|
| 250 |
+
prompt_text=auto_prompt,
|
| 251 |
reference_image_paths=reference_images,
|
| 252 |
+
ratio="1920:1080",
|
| 253 |
+
filename="f1_driver_auto_tags.jpg",
|
| 254 |
+
auto_tag_prompt=True
|
| 255 |
)
|
| 256 |
+
if saved_path:
|
| 257 |
+
print(f"Auto tagging success! Image saved to: {saved_path}")
|
| 258 |
+
else:
|
| 259 |
+
print(f"Auto tagging incomplete. Task ID: {task_id}")
|
| 260 |
except Exception as e:
|
| 261 |
+
print(f"Auto tagging error: {e}")
|
|
|
|
| 262 |
|
| 263 |
if __name__ == "__main__":
|
| 264 |
main()
|