from __future__ import annotations import re import json import logging import mimetypes from datetime import datetime import shutil from pathlib import Path from typing import List, Dict, Any import os from google import genai from google.genai import types, errors as genai_errors from constants import ( ROOT, PLAN_PATH, DEFAULT_SETTINGS, GEMINI_SETTINGS_KEYS, LOG_NAME, STYLE_VIEW_ORDER, ) class PromptTask(Dict[str, Any]): """Typed mapping representing a single prompt item (slide, filename, prompt, order).""" slide: str filename: str prompt: str order: int def slugify(text: str) -> str: """Convert a slide label to a filesystem-friendly slug.""" text = text.lower() text = re.sub(r"[^a-z0-9]+", "-", text) text = text.strip("-") return text or "slide" def output_root(brand: str, collection: str) -> Path: """Base directory for images under outputs//collection//images.""" return ROOT / "outputs" / slugify(brand) / "collection" / slugify(collection) / "images" def parse_plan(plan_path: Path) -> List[PromptTask]: """Pull every FILENAME/PROMPT pair from plan.md, keeping slide context.""" lines = plan_path.read_text(encoding="utf-8").splitlines() tasks: List[PromptTask] = [] current_slide = "slide" order = 0 i = 0 while i < len(lines): line = lines[i].strip() # Capture slide headers (e.g., "Slide 6", "Slide 6A", "Slides 8–19") slide_match = re.match(r"slide[s]?\s+([\w–-]+)", line, re.IGNORECASE) if slide_match: current_slide = line i += 1 continue file_match = re.match(r"FILENAME:\s*(.+)", line, re.IGNORECASE) if file_match: filename = file_match.group(1).strip() # Advance to the PROMPT line j = i + 1 while j < len(lines) and not lines[j].strip().lower().startswith("prompt:"): j += 1 if j >= len(lines): raise ValueError(f"PROMPT missing for {filename}") prompt_line = lines[j].strip() prompt = prompt_line.split("PROMPT:", 1)[1].strip() # Capture any prompt continuation lines until the next FILENAME/Slide header k = j + 1 continuation: List[str] = [] while k < len(lines): next_line = lines[k].strip() if next_line == "": k += 1 continue if re.match(r"(FILENAME:|Slide[s]?\s+|< Text Content)", next_line, re.IGNORECASE): break continuation.append(next_line) k += 1 if continuation: prompt = " ".join([prompt] + continuation) tasks.append( { "slide": current_slide, "filename": filename, "prompt": prompt, "order": order, } ) order += 1 i = k continue i += 1 return tasks def setup_logging(out_root: Path, mode: str, level: str = "INFO", log_path: Path | None = None) -> logging.Logger: """Configure stdout + file logging; file goes under outputs//collection//images//run.log.""" out_dir = out_root / mode out_dir.mkdir(parents=True, exist_ok=True) log_file = log_path or out_dir / "run.log" numeric_level = getattr(logging, level.upper(), logging.INFO) formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") handlers: list[logging.Handler] = [logging.StreamHandler()] handlers[0].setFormatter(formatter) file_handler = logging.FileHandler(log_file, encoding="utf-8") file_handler.setFormatter(formatter) handlers.append(file_handler) logging.basicConfig(level=numeric_level, handlers=handlers, force=True) logger = logging.getLogger(LOG_NAME) logger.setLevel(numeric_level) logger.info("Logging initialized (mode=%s, file=%s, level=%s)", mode, log_file, level.upper()) return logger def clean_output_dir(out_root: Path, mode: str, logger: logging.Logger | None = None) -> None: """Remove all files under the given mode folder to start from a clean slate.""" target = out_root / mode if target.exists(): if logger: logger.info("Cleaning output directory %s", target) shutil.rmtree(target) def anchor_part(prompt: str, logger: logging.Logger) -> tuple[None, None]: """Anchor images via folder are removed; function retained for signature compatibility.""" return None, None def part_from_path(path: Path) -> types.Part: """Load an image file as a genai Part with an inferred MIME type.""" mime, _ = mimetypes.guess_type(path) if not mime: mime = "image/jpeg" data = path.read_bytes() return types.Part.from_bytes(data=data, mime_type=mime) def detect_style_view(filename: str) -> tuple[str, str] | None: """Return (style_code, view) for style view images; else None.""" m = re.match(r"^(MG-[A-Z]-SS\d{2}-\d{3})_(hero|front|back)\.", filename, re.IGNORECASE) if not m: return None style_code, view = m.group(1), m.group(2).lower() return style_code, view def reorder_tasks_for_styles(tasks: List[PromptTask]) -> List[PromptTask]: """Group style views and order front->back->hero; keep non-style in original positions.""" style_map: dict[str, list[PromptTask]] = {} for t in tasks: sv = detect_style_view(t["filename"]) if sv: code, view = sv style_map.setdefault(code, []).append(t | {"_style_view": view}) final: list[PromptTask] = [] processed: set[str] = set() for t in tasks: sv = detect_style_view(t["filename"]) if not sv: final.append(t) continue code, _ = sv if code in processed: continue processed.add(code) grouped = style_map.get(code, []) grouped.sort(key=lambda x: (STYLE_VIEW_ORDER.get(x.get("_style_view", "other"), 99), x["order"])) # remove helper key before returning for g in grouped: g.pop("_style_view", None) final.append(g) return final def load_settings(settings_path: Path | None) -> dict[str, str]: """Load settings JSON (if present) limited to known keys.""" path = settings_path or DEFAULT_SETTINGS if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) return {k: v for k, v in data.items() if k in GEMINI_SETTINGS_KEYS and v} except json.JSONDecodeError as exc: # noqa: BLE001 raise SystemExit(f"settings file {path} is not valid JSON: {exc}") def resolve_api_key(settings: dict[str, str]) -> str: """Get API key from env first, then settings file; env wins.""" if os.environ.get("GEMINI_API_KEY"): return os.environ["GEMINI_API_KEY"] if os.environ.get("GOOGLE_API_KEY"): return os.environ["GOOGLE_API_KEY"] key = settings.get("GEMINI_API_KEY") or settings.get("GOOGLE_API_KEY") if key: return key raise SystemExit( "GEMINI_API_KEY/GOOGLE_API_KEY is not set. Set the env var or create settings.json (see settings.example.json)." ) def generate_images( tasks: List[PromptTask], mode: str, limit: int | None, api_key: str, logger: logging.Logger, out_root: Path, timestamp: str, ) -> None: """Generate images for the provided tasks list and write a manifest.""" client = genai.Client(api_key=api_key) to_run = tasks if mode == "full" else tasks[: limit or 2] logger.info("Starting generation: %s tasks (mode=%s)", len(to_run), mode) style_state: dict[str, dict[str, Path]] = {} manifest = [] for task in to_run: slide_slug = slugify(task["slide"]) out_dir = out_root / mode / slide_slug out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / task["filename"] logger.info("Generating %s (slide: %s)", task["filename"], task["slide"]) style_view = detect_style_view(task["filename"]) anchor, anchor_code = anchor_part(task["prompt"], logger) anchor_used = None contents: list[types.Part | str] = [] if style_view: style_code, view = style_view state = style_state.get(style_code, {}) preferred_path: Path | None = None if view == "hero": preferred_path = None # first in chain, prompt-only elif view == "front": preferred_path = state.get("hero") anchor_used = "hero" if preferred_path else None elif view == "back": preferred_path = state.get("front") or state.get("hero") anchor_used = "front" if state.get("front") else ("hero" if state.get("hero") else None) if preferred_path and preferred_path.exists(): try: contents.append(part_from_path(preferred_path)) anchor_used = anchor_used or "previous" except Exception as exc: # noqa: BLE001 logger.exception("Failed to load prior view %s as anchor: %s", preferred_path, exc) if not contents and anchor: contents.append(anchor) anchor_used = anchor_used or (f"face:{anchor_code}" if anchor_code else "face") contents.append(task["prompt"]) try: response = client.models.generate_content( model="gemini-2.5-flash-image", contents=contents, config=types.GenerateContentConfig( response_modalities=["image"], ), ) except genai_errors.ClientError as exc: # noqa: BLE001 if exc.status_code == 401: logger.error( "401 Unauthorized. This usually means the key is missing, the wrong key type (use Google AI Studio key), or Vertex mode requires OAuth." ) logger.exception("Generation failed for %s: %s", task["filename"], exc) continue except Exception as exc: # noqa: BLE001 logger.exception("Generation failed for %s: %s", task["filename"], exc) continue parts = getattr(response, "parts", None) if not parts: logger.warning("Response had no parts for %s; skipping", task["filename"]) continue image_part = next((p for p in parts if getattr(p, "inline_data", None)), None) if not image_part: logger.warning("No image part returned for %s; skipping", task["filename"]) continue try: image = image_part.as_image() image.save(out_path) logger.info("Saved %s", out_path) except Exception as exc: # noqa: BLE001 logger.exception("Failed to save %s: %s", out_path, exc) continue if style_view: style_code, view = style_view style_state.setdefault(style_code, {})[view] = out_path manifest.append( { "slide": task["slide"], "filename": task["filename"], "prompt": task["prompt"], "path": str(out_path.relative_to(ROOT)), "anchor": anchor_used, "anchor_face": anchor_code, } ) if manifest: manifest_path = out_root / mode / f"manifest_{timestamp}.json" manifest_path.write_text(json.dumps(manifest, indent=2), encoding="utf-8") logger.info("Manifest written to %s", manifest_path) else: logger.warning("No images were generated; manifest not written") def run_generation( mode: str = "full", limit: int | None = None, settings_path: Path | None = None, brand: str = "mango", collection: str = "hot-summer-ss26", log_level: str = "INFO", clean: bool = False, ) -> None: """Programmatic entrypoint to parse plan.md and generate Gemini images.""" tasks = parse_plan(PLAN_PATH) tasks = reorder_tasks_for_styles(tasks) if not tasks: raise SystemExit("No prompts found in plan.md") if mode == "sample" and limit is not None and limit <= 0: raise SystemExit("limit must be positive for sample mode") out_root = output_root(brand, collection) if clean: clean_output_dir(out_root, mode) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") logger = setup_logging(out_root, mode, log_level) settings = load_settings(settings_path) if "GOOGLE_GENAI_USE_VERTEXAI" in settings and "GOOGLE_GENAI_USE_VERTEXAI" not in os.environ: os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = str(settings["GOOGLE_GENAI_USE_VERTEXAI"]).lower() api_key = resolve_api_key(settings) prompts_dir = ROOT / "outputs" / slugify(brand) / "collection" / slugify(collection) / "prompts" prompts_dir.mkdir(parents=True, exist_ok=True) prompts_path = prompts_dir / f"images_prompts_{timestamp}.json" prompts_payload = [{"slide": t["slide"], "filename": t["filename"], "prompt": t["prompt"], "order": t["order"]} for t in tasks] prompts_path.write_text(json.dumps(prompts_payload, indent=2), encoding="utf-8") logger.info("Prompts saved to %s", prompts_path) generate_images(tasks, mode, limit, api_key, logger, out_root, timestamp) # ---------------- Reusable runner for external callers ---------------- # def run_prompt_list( prompt_items: List[Dict[str, Any]], brand: str, collection: str, mode: str, api_key: str | None, logger: logging.Logger, ) -> List[Dict[str, Any]]: """ Run a list of prompts (each dict: prompt, filename) through Gemini and save to images/. Returns manifest entries. Includes simple anchoring for style views (hero -> front -> back) using previously generated images for the same style code. """ out_root = output_root(brand, collection) / mode out_root.mkdir(parents=True, exist_ok=True) # Auth resolution: prefer explicit api_key, else settings.json (no env reliance) settings_path = ROOT / "settings.json" settings = {} if settings_path.exists(): try: settings = json.loads(settings_path.read_text(encoding="utf-8")) except Exception: settings = {} use_vertex = str(settings.get("GOOGLE_GENAI_USE_VERTEXAI", "")).lower() == "true" if not api_key: api_key = settings.get("GEMINI_API_KEY") or settings.get("GOOGLE_API_KEY") project = ( settings.get("GOOGLE_VERTEX_PROJECT") or settings.get("GOOGLE_CLOUD_PROJECT") or settings.get("GCLOUD_PROJECT") ) location = settings.get("GOOGLE_VERTEX_LOCATION") or settings.get("GOOGLE_CLOUD_LOCATION") or "us-central1" logger.info( "[gemini] auth resolution: api_key=%s use_vertex=%s project=%s location=%s", "yes" if api_key else "no", use_vertex, project or "none", location, ) if use_vertex: if not project: raise SystemExit( "Gemini Vertex auth missing project. Set GOOGLE_VERTEX_PROJECT or GOOGLE_CLOUD_PROJECT in settings.json." ) client = genai.Client(vertexai={"project": project, "location": location}) logger.info("[gemini] using Vertex ADC project=%s location=%s", project, location) elif api_key: client = genai.Client(api_key=api_key) logger.info("[gemini] using API key auth") else: raise SystemExit( "Gemini auth missing: set GEMINI_API_KEY/GOOGLE_API_KEY in settings.json or set GOOGLE_GENAI_USE_VERTEXAI=true with GOOGLE_CLOUD_PROJECT in settings.json" ) manifest = [] style_state: dict[str, dict[str, Path]] = {} for item in prompt_items: prompt = item["prompt"] filename = item.get("filename") or f"prompt_{len(manifest)+1}.png" slide_slug = slugify(item.get("slide", "adhoc")) if item.get("out_path"): out_path = (ROOT / item["out_path"]).resolve() if not Path(item["out_path"]).is_absolute() else Path(item["out_path"]) out_path.parent.mkdir(parents=True, exist_ok=True) else: out_dir = out_root / slide_slug out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / filename logger.info("[run_prompt_list] %s -> %s", filename, out_path) contents: list[types.Part | str] = [] # Style chaining: if filename matches style view, attach prior image anchor_used = None style_view = detect_style_view(filename) if style_view: code, view = style_view state = style_state.get(code, {}) preferred_path: Path | None = None if view == "hero": preferred_path = None elif view == "front": preferred_path = state.get("hero") anchor_used = "hero" if preferred_path else None elif view == "back": preferred_path = state.get("front") or state.get("hero") anchor_used = "front" if state.get("front") else ("hero" if state.get("hero") else None) if preferred_path and preferred_path.exists(): try: contents.append(part_from_path(preferred_path)) anchor_used = anchor_used or "previous" except Exception as exc: # noqa: BLE001 logger.exception("Failed to load prior view %s as anchor: %s", preferred_path, exc) contents.append(prompt) try: resp = client.models.generate_content( model="gemini-2.5-flash-image", contents=contents, config=types.GenerateContentConfig(response_modalities=["image"]), ) image_part = None if hasattr(resp, "parts") and resp.parts: image_part = next((p for p in resp.parts if getattr(p, "inline_data", None)), None) if not image_part and hasattr(resp, "candidates"): for cand in resp.candidates or []: content = getattr(cand, "content", None) parts = getattr(content, "parts", []) if content else [] for part in parts or []: if getattr(part, "inline_data", None): image_part = part break if image_part: break if not image_part: logger.warning("[run_prompt_list] no image returned for %s", filename) manifest.append({"filename": filename, "status": "no_image"}) continue image = image_part.as_image() image.save(out_path) manifest.append({"filename": filename, "status": "ok", "path": str(out_path.relative_to(ROOT)), "anchor": anchor_used}) if style_view: code, view = style_view style_state.setdefault(code, {})[view] = out_path except Exception as exc: # noqa: BLE001 logger.exception("[run_prompt_list] failed for %s: %s", filename, exc) manifest.append({"filename": filename, "status": f"error:{exc}"}) return manifest def run_prompt_list_vertex_chain( prompt_items: List[Dict[str, Any]], brand: str, collection: str, mode: str, logger: logging.Logger, temp: float = 1.0, top_p: float = 0.95, ) -> List[Dict[str, Any]]: """ Multi-turn Vertex image chain per style (hero → front → back) with image feedback. End-to-end flow: 1) Group prompts by style/slide so each style runs as one mini-session. 2) HERO: call Gemini Vertex with the hero prompt (no anchors). Save the returned image. 3) FRONT: send the original hero prompt as a user turn, the hero image as a *model* turn, then the front prompt as a user turn. Generate and save the front image. 4) BACK: send hero prompt + hero image (model turn) + front prompt + front image (model turn), then the back prompt. Generate and save the back image. 5) Persist outputs under `outputs//collection//images//...` and record a manifest entry per view. Notes: - Uses Vertex client with explicit safety + image config (1:1, 1K) and temperature/top_p controls. - If any of the three views fail, the function logs an error for that view and continues to the next style. """ from collections import defaultdict out_root = output_root(brand, collection) / mode out_root.mkdir(parents=True, exist_ok=True) client = genai.Client(vertexai=True) cfg = types.GenerateContentConfig( temperature=temp, top_p=top_p, max_output_tokens=32768, response_modalities=["TEXT", "IMAGE"], safety_settings=[ types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"), types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"), types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"), types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF"), ], image_config=types.ImageConfig( aspect_ratio="1:1", image_size="1K", output_mime_type="image/png", ), ) def to_model_image_content(img_bytes: bytes) -> types.Content: """Wrap prior image bytes as a model-role content part for chaining.""" return types.Content( role="model", parts=[ types.Part.from_text(text="`"), types.Part.from_bytes(data=img_bytes, mime_type="image/png"), ], ) def extract_first_image(resp) -> bytes | None: """Extract the first inline image payload from a Vertex response object.""" for cand in getattr(resp, "candidates", []) or []: parts = getattr(getattr(cand, "content", None), "parts", []) or [] for part in parts: if getattr(part, "inline_data", None) and getattr(part.inline_data, "data", None): return part.inline_data.data return None grouped: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for itm in prompt_items: grouped[itm.get("slide") or itm.get("style_name") or "unknown"].append(itm) manifest: List[Dict[str, Any]] = [] for slide, items in grouped.items(): logger.info("[vertex-chain] style=%s items=%d", slide, len(items)) hero = next((i for i in items if "_hero" in i.get("filename", "")), None) front = next((i for i in items if "_front" in i.get("filename", "")), None) back = next((i for i in items if "_back" in i.get("filename", "")), None) if not (hero and front and back): logger.warning("[vertex-chain] skip %s missing hero/front/back", slide) continue def resolve_out_path(itm: Dict[str, Any]) -> Path: """Resolve the output path for an item, creating parent folders as needed.""" op = itm.get("out_path") if op: p = Path(op) if not p.is_absolute(): p = ROOT / p p.parent.mkdir(parents=True, exist_ok=True) return p out_dir = out_root / (itm.get("slide") or slide) out_dir.mkdir(parents=True, exist_ok=True) return out_dir / itm.get("filename", "out.png") # HERO # 1) Hero request: single user turn with hero prompt. hero_resp = client.models.generate_content( model="gemini-2.5-flash-image", contents=[types.Content(role="user", parts=[types.Part.from_text(text=hero["prompt"])])], config=cfg, ) hero_img = extract_first_image(hero_resp) if not hero_img: manifest.append({"filename": hero.get("filename"), "status": "error", "path": None}) logger.error("[vertex-chain] no hero image for %s", slide) continue hero_path = resolve_out_path(hero) hero_path.write_bytes(hero_img) manifest.append({"filename": hero.get("filename"), "status": "ok", "path": str(hero_path.relative_to(ROOT))}) # FRONT # 2) Front request: feed hero prompt (user) + hero image (model turn) + front prompt (user). contents_front = [ types.Content(role="user", parts=[types.Part.from_text(text=hero["prompt"])]), to_model_image_content(hero_img), types.Content(role="user", parts=[types.Part.from_text(text=front["prompt"])]), ] front_resp = client.models.generate_content( model="gemini-2.5-flash-image", contents=contents_front, config=cfg, ) front_img = extract_first_image(front_resp) if not front_img: manifest.append({"filename": front.get("filename"), "status": "error", "path": None}) logger.error("[vertex-chain] no front image for %s", slide) continue front_path = resolve_out_path(front) front_path.write_bytes(front_img) manifest.append({"filename": front.get("filename"), "status": "ok", "path": str(front_path.relative_to(ROOT))}) # BACK # 3) Back request: hero prompt (user) + hero image (model) + front prompt (user) + front image (model) + back prompt (user). contents_back = [ types.Content(role="user", parts=[types.Part.from_text(text=hero["prompt"])]), to_model_image_content(hero_img), types.Content(role="user", parts=[types.Part.from_text(text=front["prompt"])]), to_model_image_content(front_img), types.Content(role="user", parts=[types.Part.from_text(text=back["prompt"])]), ] back_resp = client.models.generate_content( model="gemini-2.5-flash-image", contents=contents_back, config=cfg, ) back_img = extract_first_image(back_resp) if not back_img: manifest.append({"filename": back.get("filename"), "status": "error", "path": None}) logger.error("[vertex-chain] no back image for %s", slide) continue back_path = resolve_out_path(back) back_path.write_bytes(back_img) manifest.append({"filename": back.get("filename"), "status": "ok", "path": str(back_path.relative_to(ROOT))}) return manifest