Spaces:
Running
Running
File size: 10,383 Bytes
7ff26ab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
import os, zipfile, tempfile, logging, base64
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Optional
from PIL import Image
import io
from generator_function.image_function import generate_image
from prompt.prompt_services import get_prompts
from multimodel_services.replicate_generation_service import generate_image_with_model, convert_size_to_aspect_ratio
from multimodel_services.model_manager import is_gpt_model, get_all_parameters
from helpers_function.helper_meta_data import meta_data_helper_function
from helpers_function.helpers import upload_image_to_r2
from helpers_function.helpers import is_valid_image
from database.connections import get_results_collection as get_collection
from database.operations import start_job, finish_job
from util.session_state import current_uid
logger = logging.getLogger(__name__)
COL = get_collection()
def _resolve_user_id() -> str:
return current_uid() or os.getenv("DEFAULT_USER_ID", "anonymous")
def process_zip_and_generate_images_multimodel(
zip_path: str,
category: str,
size: str,
quality: str,
user_prompt: str,
sentiment: str,
platform: str,
num_images: int,
demo_mode: bool,
existing_images: Optional[List[str]],
blur: bool,
uid: str,
selected_model: str = "gpt_default",
model_params: Optional[dict] = None,
) -> List[str]:
"""Enhanced image processor that supports both GPT and multimodel approaches"""
num_images = 1 if demo_mode else num_images
try:
if zip_path.endswith(".zip"):
temp_dir = extract_zip_file(zip_path)
image_files = get_valid_image_files(temp_dir)
else:
image_files = [(os.path.basename(zip_path), zip_path)]
results = process_image_files_multimodel(
image_files, category, size, quality, user_prompt, sentiment, platform,
num_images, blur, uid, selected_model, model_params
)
all_urls = [url for entry in results for url in entry["urls"]]
seen, deduped = set(), []
for u in all_urls:
if u not in seen:
seen.add(u); deduped.append(u)
return (existing_images or []) + deduped
except Exception:
logger.exception(f"Global error during processing file: {zip_path}")
return existing_images or []
def extract_zip_file(zip_path: str) -> tempfile.TemporaryDirectory:
temp_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(temp_dir.name)
logger.info(f"Extracted ZIP file: {zip_path}")
return temp_dir
def get_valid_image_files(temp_dir: tempfile.TemporaryDirectory) -> List[Tuple[str, str]]:
valid_files: List[Tuple[str, str]] = []
for file in os.listdir(temp_dir.name):
if "__MACOSX" in file: continue
file_path = os.path.join(temp_dir.name, file)
if is_valid_image(file):
valid_files.append((file, file_path))
else:
logger.warning(f"Ignored non-image file: {file}")
logger.info(f"Found {len(valid_files)} valid images.")
return valid_files
def process_image_files_multimodel(image_files: List[Tuple[str, str]], category: str, size: str,
quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool,
uid: str, selected_model: str, model_params: Optional[dict]) -> List[dict]:
"""Process image files with multimodel support"""
final_results: List[dict] = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for file_name, file_path in image_files:
job_id: Optional[str] = None
if COL is not None:
try:
settings = {
"size": size, "quality": quality, "sentiment": sentiment,
"platform": platform, "num_images": num_images, "blur": bool(blur),
"selected_model": selected_model, "model_params": model_params or {}
}
inputs = {"file_name": file_name, "mode": "img_or_zip_multimodel"}
job_id = start_job(
COL,
type="variation_multimodel",
created_by=uid,
category=category or "general",
inputs=inputs,
settings=settings,
user_prompt=user_prompt
)
except Exception:
logger.exception("Failed to start DB job; continuing without DB logging.")
futures.append(
executor.submit(
process_single_image_multimodel,
file_name, file_path, category, size, quality, user_prompt, sentiment,
platform, num_images, blur, job_id, selected_model, model_params,
)
)
for future in as_completed(futures):
try:
result = future.result()
if result: final_results.append(result)
except Exception:
logger.exception("Unhandled exception during image processing thread.")
return final_results
def process_single_image_multimodel(file_name: str, file_path: str, category: str, size: str,
quality: str, user_prompt: str, sentiment: str, platform: str, num_images: int, blur: bool,
job_id: Optional[str], selected_model: str, model_params: Optional[dict]) -> Optional[dict]:
"""Process single image with multimodel support"""
try:
image_urls = generate_images_from_prompts_multimodel(
file_path, size, quality, category, sentiment, user_prompt, platform,
num_images, blur, selected_model, model_params
)
if COL is not None and job_id:
try:
finish_job(COL, job_id, status=("completed" if image_urls else "failed"), outputs_urls=image_urls)
except Exception:
logger.exception("Failed to finish DB job.")
if image_urls:
return {"file_name": file_name, "urls": image_urls}
return None
except Exception as e:
logger.error(f"Processing failed for {file_name}: {e}")
if COL is not None and job_id:
try:
finish_job(COL, job_id, status="failed", outputs_urls=[])
except Exception:
logger.exception("Also failed to mark DB job as failed.")
return None
def generate_images_from_prompts_multimodel(
file_path: str, size: str, quality: str, category: str, sentiment: str, user_prompt: str,
platform: str, num_images: int, blur: bool, selected_model: str, model_params: Optional[dict],
) -> List[str]:
"""Generate images using either GPT or multimodel approach"""
image_urls: List[str] = []
def worker(i: int) -> Optional[str]:
try:
if is_gpt_model(selected_model):
# Use existing GPT approach
image_bytes = generate_image(file_path, size, quality, category, sentiment, user_prompt, platform, blur, i)
else:
# Use multimodel approach
image_bytes = generate_image_multimodel(
file_path, selected_model, category, sentiment, user_prompt,
platform, size, model_params, i
)
if not image_bytes: return None
image_with_metadata = meta_data_helper_function(image_bytes)
s3_url = upload_image_to_r2(image_with_metadata)
return s3_url
except Exception as e:
logger.error(f"Image generation failed: {e}")
return None
with ThreadPoolExecutor(max_workers=min(10, num_images)) as executor:
futures = [executor.submit(worker, i) for i in range(num_images)]
for future in as_completed(futures):
result = future.result()
if result: image_urls.append(result)
return image_urls
def generate_image_multimodel(file_path: str, model_name: str, category: str, sentiment: str,
user_prompt: str, platform: str, size: str, model_params: Optional[dict],
variation_index: int) -> Optional[bytes]:
"""Generate image using multimodel approach"""
try:
# Convert image to base64
with open(file_path, 'rb') as f:
image_data = f.read()
base64_image = base64.b64encode(image_data).decode()
# Use existing prompt service to get prompt variations
prompt_variations = get_prompts(
base64_image, category, user_prompt, sentiment, None
)
# Select a prompt based on variation index
if prompt_variations and len(prompt_variations) > 0:
selected_prompt = prompt_variations[variation_index % len(prompt_variations)]
else:
# Fallback prompt
selected_prompt = f"Generate a high-quality {category or 'advertising'} image. {user_prompt}"
# Prepare model parameters
all_params = get_all_parameters(model_name, model_params)
# Only convert size to aspect ratio if no aspect ratio was provided by user
if model_name == "google/nano-banana" and "aspect_ratio" in all_params:
# If aspect_ratio is the default value, convert from size
if all_params["aspect_ratio"] == "match_input_image": # This is the default
converted_ratio = convert_size_to_aspect_ratio(size, model_name)
all_params["aspect_ratio"] = converted_ratio
logger.info(f"Converted size '{size}' to aspect ratio '{converted_ratio}' for {model_name}")
else:
logger.info(f"Using user-selected aspect ratio '{all_params['aspect_ratio']}' for {model_name}")
logger.info(f"Final parameters for {model_name}: {all_params}")
# Generate image with selected model
generated_image_data = generate_image_with_model(
model_name, selected_prompt, all_params, base64_image
)
return generated_image_data
except Exception as e:
logger.error(f"Multimodel generation failed: {e}")
return None
|