Spaces:
Sleeping
Sleeping
File size: 5,929 Bytes
931fdaf 60688c0 931fdaf f706182 931fdaf f706182 931fdaf f706182 931fdaf f706182 931fdaf f706182 931fdaf f706182 931fdaf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import os
import zipfile
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from image_functions import generate_image
from meta_data import meta_data_helper_function
from prompt_service import get_prompts
from r2_uploader import upload_image_to_r2
from mongo_logger import create_log, update_log_status
from helpers import encode_image_to_base64, is_valid_image
import logging
logger = logging.getLogger(__name__)
def process_zip_and_generate_images(
zip_path, category, size, quality, user_prompt,
sentiment, platform, num_images, demo_mode, existing_images, blur
):
num_images = 1 if demo_mode else num_images
try:
if zip_path.endswith('.zip'):
temp_dir = extract_zip_file(zip_path)
image_files = get_valid_image_files(temp_dir)
else:
image_files = [(os.path.basename(zip_path), zip_path)]
results = process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur)
all_urls = [url for entry in results for url in entry["urls"]]
return existing_images + all_urls if existing_images else all_urls
except Exception as e:
logger.exception(f"Global error during processing file: {zip_path}")
return [f"Error: {str(e)}"]
def extract_zip_file(zip_path):
try:
temp_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir.name)
logger.info(f"Extracted ZIP file: {zip_path}")
return temp_dir
except Exception as e:
logger.exception(f"Failed to extract ZIP file: {zip_path}")
raise
def get_valid_image_files(temp_dir):
valid_files = []
zip_contents = os.listdir(temp_dir.name)
for file in zip_contents:
file_path = os.path.join(temp_dir.name, file)
if is_valid_image(file) and "__MACOSX" not in file:
valid_files.append((file, file_path))
else:
logger.warning(f"Ignored non-image or unwanted file: {file}")
logger.info(f"Found {len(valid_files)} valid images.")
return valid_files
def process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur):
final_results = []
logs = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for file_name, file_path in image_files:
log_id = create_log(category, file_name, user_prompt)
logs.append(f"Processing {file_name}")
futures.append(
executor.submit(
process_single_image,
file_name,
file_path,
category,
size,
quality,
user_prompt,
sentiment,
platform,
num_images,
blur,
log_id,
logs
)
)
for future in as_completed(futures):
try:
result = future.result()
if result:
final_results.append(result)
except Exception as e:
logger.exception("Unhandled exception during image processing thread.")
return final_results
def process_single_image(file_name, file_path, category, size, quality, user_prompt, sentiment, platform, num_images, blur, log_id, logs):
try:
# base64_img = encode_image_to_base64(file_path)
# logs.append(f"Prompts generated for {file_name}")
#
# prompts = get_prompts(base64_img, category, user_prompt, sentiment, negative_prompt)
# logs.append(f"Generating images for {file_name}")
image_urls = generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur)
status = "completed" if image_urls else "failed"
message = "Completed successfully" if image_urls else "No images generated"
update_log_status(log_id, status, urls=image_urls, message=message)
if image_urls:
return {
"file_name": file_name,
"urls": image_urls
}
return None
except Exception as e:
error_msg = f"Processing failed for {file_name}: {e}"
logger.error(error_msg)
logs.append(error_msg)
update_log_status(log_id, "failed", urls=[], message=str(e))
return None
def generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur):
image_urls = []
def worker(i):
try:
image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i)
if not image_bytes:
logger.error("Image generation returned empty bytes.")
return None
logger.info("Successfully generated image bytes.")
try:
image_with_metadata = meta_data_helper_function(image_bytes)
except Exception as e:
logger.error(f"Metadata generation failed: {e}")
return None
s3_url = upload_image_to_r2(image_with_metadata)
logger.info(f"Generated and uploaded image to: {s3_url}")
return s3_url
except Exception as e:
logger.error(f"Image generation failed: {e}")
return None
with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor:
futures = [executor.submit(worker, i) for i in range(num_images)]
for future in as_completed(futures):
result = future.result()
if result:
image_urls.append(result)
return image_urls
def log_error(message):
logger.error(message) |