Leadgen-AdGenesis / image_processor.py
userIdc2024's picture
Update image_processor.py
34ea641 verified
import os
import zipfile
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from meta_data import meta_data_helper_function
from image_functions import generate_image
from prompt_service import get_prompts
from r2_uploader import upload_image_to_r2
from mongo_logger import create_log, update_log_status
from helpers import encode_image_to_base64, is_valid_image
import logging
logger = logging.getLogger(__name__)
def process_zip_and_generate_images(
zip_path, category, size, quality, user_prompt,
sentiment, platform, num_images, demo_mode, existing_images, blur
):
num_images = 1 if demo_mode else num_images
num_images = 3 if not demo_mode and platform == "Google Display Network" else num_images
try:
if zip_path.endswith('.zip'):
temp_dir = extract_zip_file(zip_path)
image_files = get_valid_image_files(temp_dir)
else:
image_files = [(os.path.basename(zip_path.name), zip_path.name)]
results = process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur)
all_urls = [url for entry in results for url in entry["urls"]]
return existing_images + all_urls if existing_images else all_urls
except Exception as e:
logger.exception(f"Global error during processing file: {zip_path}")
return [f"Error: {str(e)}"]
def extract_zip_file(zip_path):
try:
temp_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir.name)
logger.info(f"Extracted ZIP file: {zip_path}")
return temp_dir
except Exception as e:
logger.exception(f"Failed to extract ZIP file: {zip_path}")
raise
def get_valid_image_files(temp_dir):
valid_files = []
zip_contents = os.listdir(temp_dir.name)
for file in zip_contents:
file_path = os.path.join(temp_dir.name, file)
if is_valid_image(file) and "__MACOSX" not in file:
valid_files.append((file, file_path))
else:
logger.warning(f"Ignored non-image or unwanted file: {file}")
logger.info(f"Found {len(valid_files)} valid images.")
return valid_files
def process_image_files(image_files, category, size, quality, user_prompt, sentiment, platform, num_images, blur):
final_results = []
logs = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for file_name, file_path in image_files:
log_id = create_log(category, file_name, user_prompt)
logs.append(f"Processing {file_name}")
futures.append(
executor.submit(
process_single_image,
file_name,
file_path,
category,
size,
quality,
user_prompt,
sentiment,
platform,
num_images,
blur,
log_id,
logs
)
)
for future in as_completed(futures):
try:
result = future.result()
if result:
final_results.append(result)
except Exception as e:
logger.exception("Unhandled exception during image processing thread.")
return final_results
def process_single_image(file_name, file_path, category, size, quality, user_prompt, sentiment, platform, num_images, blur, log_id, logs):
try:
# base64_img = encode_image_to_base64(file_path)
# logs.append(f"Prompts generated for {file_name}")
#
# prompts = get_prompts(base64_img, category, user_prompt, sentiment, negative_prompt)
# logs.append(f"Generating images for {file_name}")
image_urls = generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur)
status = "completed" if image_urls else "failed"
message = "Completed successfully" if image_urls else "No images generated"
update_log_status(log_id, status, urls=image_urls, message=message)
if image_urls:
return {
"file_name": file_name,
"urls": image_urls
}
return None
except Exception as e:
error_msg = f"Processing failed for {file_name}: {e}"
logger.error(error_msg)
logs.append(error_msg)
update_log_status(log_id, "failed", urls=[], message=str(e))
return None
def generate_images_from_prompts(file_path, size, quality, category, sentiment, user_prompt, platform, num_images, blur):
image_urls = []
def worker(i):
try:
image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i)
if not image_bytes:
logger.error("Image generation returned empty bytes.")
return None
logger.info("Successfully generated image bytes.")
try:
image_with_metadata = meta_data_helper_function(image_bytes)
except Exception as e:
logger.error(f"Metadata generation failed: {e}")
return None
s3_url = upload_image_to_r2(image_with_metadata)
logger.info(f"Generated and uploaded image to: {s3_url}")
return s3_url
except Exception as e:
logger.error(f"Image generation failed: {e}")
return None
with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor:
futures = [executor.submit(worker, i) for i in range(num_images)]
for future in as_completed(futures):
result = future.result()
if result:
image_urls.append(result)
return image_urls
def log_error(message):
logger.error(message)