AdGenesis-App / generator_function /multimodel_image_processor.py
userIdc2024's picture
Update generator_function/multimodel_image_processor.py
c3706b1 verified
import os, zipfile, tempfile, logging, base64
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Optional
from PIL import Image
import io
from generator_function.image_function import generate_image
from prompt.prompt_services import get_prompts
from multimodel_services.replicate_generation_service import generate_image_with_model, convert_size_to_aspect_ratio
from multimodel_services.model_manager import is_gpt_model, get_all_parameters
from helpers_function.helper_meta_data import meta_data_helper_function
from helpers_function.helpers import upload_image_to_r2
from helpers_function.helpers import is_valid_image
from database.connections import get_results_collection as get_collection
from database.operations import start_job, finish_job
from util.session_state import current_uid
logger = logging.getLogger(__name__)
COL = get_collection()
def _resolve_user_id() -> str:
return current_uid() or os.getenv("DEFAULT_USER_ID", "anonymous")
def process_zip_and_generate_images_multimodel(
zip_path: str,
category: str,
size: str,
quality: str,
user_prompt: str,
sentiment: str,
platform: str,
num_images: int,
demo_mode: bool,
existing_images: Optional[List[str]],
blur: bool,
uid: str,
selected_model: str = "gpt_default",
model_params: Optional[dict] = None,
private_mode: bool = False,
) -> List[str]:
"""Enhanced image processor that supports both GPT and multimodel approaches"""
num_images = 1 if demo_mode else num_images
try:
if zip_path.endswith(".zip"):
temp_dir = extract_zip_file(zip_path)
image_files = get_valid_image_files(temp_dir)
else:
image_files = [(os.path.basename(zip_path), zip_path)]
results = process_image_files_multimodel(
image_files, category, size, quality, user_prompt, sentiment, platform,
num_images, blur, uid, selected_model, model_params, private_mode
)
all_urls = [url for entry in results for url in entry["urls"]]
seen, deduped = set(), []
for u in all_urls:
if u not in seen:
seen.add(u); deduped.append(u)
# Return only new images, not appended to existing ones
return deduped
except Exception:
logger.exception(f"Global error during processing file: {zip_path}")
return existing_images or []
def extract_zip_file(zip_path: str) -> tempfile.TemporaryDirectory:
temp_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(temp_dir.name)
logger.info(f"Extracted ZIP file: {zip_path}")
return temp_dir
def get_valid_image_files(temp_dir: tempfile.TemporaryDirectory) -> List[Tuple[str, str]]:
valid_files: List[Tuple[str, str]] = []
for file in os.listdir(temp_dir.name):
if "__MACOSX" in file: continue
file_path = os.path.join(temp_dir.name, file)
if is_valid_image(file):
valid_files.append((file, file_path))
else:
logger.warning(f"Ignored non-image file: {file}")
logger.info(f"Found {len(valid_files)} valid images.")
return valid_files
def process_image_files_multimodel(image_files: List[Tuple[str, str]], category: str, size: str,
quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool,
uid: str, selected_model: str, model_params: Optional[dict], private_mode: bool) -> List[dict]:
"""Process image files with multimodel support"""
final_results: List[dict] = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for file_name, file_path in image_files:
job_id: Optional[str] = None
if COL is not None and not private_mode:
try:
settings = {
"size": size, "quality": quality, "sentiment": sentiment,
"platform": platform, "num_images": num_images, "blur": bool(blur),
"selected_model": selected_model, "model_params": model_params or {}
}
inputs = {"file_name": file_name, "mode": "img_or_zip_multimodel"}
job_id = start_job(
COL,
type="variation",
created_by=uid,
category=category or "general",
inputs=inputs,
settings=settings,
user_prompt=user_prompt
)
except Exception:
logger.exception("Failed to start DB job; continuing without DB logging.")
futures.append(
executor.submit(
process_single_image_multimodel,
file_name, file_path, category, size, quality, user_prompt, sentiment,
platform, num_images, blur, job_id, selected_model, model_params, private_mode,
)
)
for future in as_completed(futures):
try:
result = future.result()
if result: final_results.append(result)
except Exception:
logger.exception("Unhandled exception during image processing thread.")
return final_results
def process_single_image_multimodel(file_name: str, file_path: str, category: str, size: str,
quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool,
job_id: Optional[str], selected_model: str, model_params: Optional[dict], private_mode: bool) -> Optional[dict]:
"""Process single image with multimodel support"""
try:
image_urls = generate_images_from_prompts_multimodel(
file_path, size, quality, category, sentiment, user_prompt, platform,
num_images, blur, selected_model, model_params, private_mode
)
if COL is not None and job_id and not private_mode:
try:
finish_job(COL, job_id, status=("completed" if image_urls else "failed"), outputs_urls=image_urls)
except Exception:
logger.exception("Failed to finish DB job.")
if image_urls:
return {"file_name": file_name, "urls": image_urls}
return None
except Exception as e:
logger.error(f"Processing failed for {file_name}: {e}")
if COL is not None and job_id and not private_mode:
try:
finish_job(COL, job_id, status="failed", outputs_urls=[])
except Exception:
logger.exception("Also failed to mark DB job as failed.")
return None
def generate_images_from_prompts_multimodel(
file_path: str, size: str, quality: str, category: str, sentiment: str, user_prompt: str,
platform: str, num_images: int, blur: bool, selected_model: str, model_params: Optional[dict],
private_mode: bool,
) -> List[str]:
"""Generate images using either GPT or multimodel approach"""
image_urls: List[str] = []
def worker(i: int) -> Optional[str]:
try:
if is_gpt_model(selected_model):
# Use existing GPT approach
image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i)
else:
# Use multimodel approach
image_bytes = generate_image_multimodel(
file_path, selected_model, category, sentiment, user_prompt,
platform, size, model_params, i, num_images
)
if not image_bytes: return None
image_with_metadata = meta_data_helper_function(image_bytes)
if private_mode:
return "data:image/png;base64," + base64.b64encode(image_with_metadata).decode("utf-8")
s3_url = upload_image_to_r2(image_with_metadata)
return s3_url
except Exception as e:
logger.error(f"Image generation failed: {e}")
return None
with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor:
futures = [executor.submit(worker, i) for i in range(num_images)]
for future in as_completed(futures):
result = future.result()
if result: image_urls.append(result)
return image_urls
def generate_image_multimodel(file_path: str, model_name: str, category: str, sentiment: str,
user_prompt: str, platform: str, size: str, model_params: Optional[dict],
variation_index: int, num_images: int) -> Optional[bytes]:
"""Generate image using multimodel approach"""
try:
# Convert image to base64
with open(file_path, 'rb') as f:
image_data = f.read()
base64_image = base64.b64encode(image_data).decode()
# Use existing prompt service to get prompt variations
prompt_variations = get_prompts(
base64_image, category, user_prompt, sentiment, None, num_images
)
# Select a prompt based on variation index
if prompt_variations and len(prompt_variations) > 0:
selected_prompt = prompt_variations[variation_index % len(prompt_variations)]
else:
# Fallback prompt
selected_prompt = f"Generate a high-quality {category or 'advertising'} image. {user_prompt}"
# Prepare model parameters
all_params = get_all_parameters(model_name, model_params)
# Only convert size to aspect ratio if no aspect ratio was provided by user
if model_name == "google/nano-banana" and "aspect_ratio" in all_params:
# If aspect_ratio is the default value, convert from size
if all_params["aspect_ratio"] == "match_input_image": # This is the default
converted_ratio = convert_size_to_aspect_ratio(size, model_name)
all_params["aspect_ratio"] = converted_ratio
# logger.info(f"Converted size '{size}' to aspect ratio '{converted_ratio}' for {model_name}")
else:
logger.info(f"Using user-selected aspect ratio '{all_params['aspect_ratio']}' for {model_name}")
# logger.info(f"Final parameters for {model_name}: {all_params}")
# Generate image with selected model
generated_image_data = generate_image_with_model(
model_name, selected_prompt, all_params, base64_image
)
return generated_image_data
except Exception as e:
logger.error(f"Multimodel generation failed: {e}")
return None