import os import time import base64 import logging from dotenv import load_dotenv from generate_wisdom import generate_wisdom try: from openai import OpenAI openai_client = OpenAI() openai_legacy = None except Exception: try: import openai as openai_legacy openai_client = None except Exception: openai_client = None openai_legacy = None import requests from retry_queue import enqueue as enqueue_retry logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) def generate_image_hf(prompt: str, model: str = "stabilityai/stable-diffusion-2", size: str = "1024x1024", steps: int = 20, guidance: float = 7.5) -> bytes: """Generate image bytes using Hugging Face Inference API as a fallback. Tries the requested `model` first; if it's not available via the Inference API (HTTP 410/404) the function will try a small list of common SD models. """ load_dotenv() hf_token = os.getenv("HF") if not hf_token: logger.error("HF token missing in environment (HF)") raise RuntimeError("HF token missing in environment (HF)") try: width, height = (int(x) for x in size.split("x")) except Exception: width, height = 1024, 1024 candidate_models = [model, "runwayml/stable-diffusion-v1-5", "prompthero/openjourney"] last_exc = None for m in candidate_models: url = f"https://api-inference.huggingface.co/models/{m}" headers = {"Authorization": f"Bearer {hf_token}", "Accept": "application/json"} payload = { "inputs": prompt, "parameters": { "width": width, "height": height, "num_inference_steps": steps, "guidance_scale": guidance, }, "options": {"wait_for_model": True}, } logger.info("Trying HF model %s (size=%sx%s, steps=%s)", m, width, height, steps) try: resp = requests.post(url, headers=headers, json=payload, timeout=120) resp.raise_for_status() except requests.HTTPError as e: status = getattr(e.response, "status_code", None) logger.warning("HF model %s returned HTTP %s", m, status) last_exc = e # try next candidate if model not hosted on inference API (410/Gone or 404) if status in (404, 410): # attempt Space /api/predict for public Spaces (owner/model -> /spaces/owner/model) try: owner, repo = m.split("/", 1) space_url = f"https://huggingface.co/spaces/{owner}/{repo}/api/predict" logger.info("Trying Space API %s", space_url) sp_headers = {"Authorization": f"Bearer {hf_token}"} if hf_token else {} sp_payload = {"data": [prompt]} sp_resp = requests.post(space_url, headers=sp_headers, json=sp_payload, timeout=180) sp_resp.raise_for_status() js = sp_resp.json() # many Spaces return {'data': [...]} where the first item is base64 or url if isinstance(js, dict) and "data" in js: first = js["data"][0] if isinstance(first, str): # try base64 decode import re mobj = re.search(r"([A-Za-z0-9+/=]{200,})", first) if mobj: return base64.b64decode(mobj.group(1)) # if it's a direct URL, try fetching bytes if first.startswith("http"): r2 = requests.get(first, timeout=60) r2.raise_for_status() return r2.content # if we reach here, the Space didn't return usable image; continue to next model except Exception: logger.warning("Space API attempt for %s failed", m) continue raise content_type = resp.headers.get("content-type", "") if content_type.startswith("application/json"): js = resp.json() # Try common response shapes for base64-encoded image b64 = None if isinstance(js, dict): for key in ("image", "generated_image", "data", "images"): if key in js: val = js[key] if isinstance(val, str): b64 = val break if isinstance(val, list) and val: first = val[0] if isinstance(first, dict) and "image_base64" in first: b64 = first["image_base64"] break if isinstance(first, str): b64 = first break if not b64: import re txt = str(js) mobj = re.search(r"([A-Za-z0-9+/=]{200,})", txt) if mobj: b64 = mobj.group(1) if not b64: raise RuntimeError("No base64 image found in HF JSON response") return base64.b64decode(b64) # otherwise assume binary image content return resp.content # If loop exhausted, raise last exception if last_exc: raise last_exc raise RuntimeError("Hugging Face image generation failed for all candidates") def generate_image_replicate(prompt: str, model: str | None = None, image_inputs: list | None = None, aspect_ratio: str = "match_input_image", output_format: str = "jpg") -> bytes: """Generate image bytes using Replicate as a final fallback. Requires `REPLICATE_API_TOKEN` in the environment. """ load_dotenv() token = os.getenv("REPLICATE_API_TOKEN") if not token: logger.error("REPLICATE_API_TOKEN missing in environment") raise RuntimeError("REPLICATE_API_TOKEN missing in environment") try: import replicate except Exception: logger.exception("Replicate client not installed") raise # Build list of replicate model candidates: primary then alternates if model is None: primary = os.getenv("REPLICATE_MODEL") else: primary = model alternates = os.getenv("REPLICATE_MODEL_ALTERNATES", "") candidates = [] if primary: candidates.append(primary) if alternates: for part in alternates.split(","): part = part.strip() if part and part not in candidates: candidates.append(part) if not candidates: logger.error("No Replicate model configured (set REPLICATE_MODEL or REPLICATE_MODEL_ALTERNATES)") raise RuntimeError("No Replicate model configured") input_payload = { "prompt": prompt, "image_input": image_inputs or [], "aspect_ratio": aspect_ratio, "output_format": output_format, } last_exc = None for cand in candidates: logger.info("Calling Replicate candidate model %s", cand) try: output = replicate.run(cand, input=input_payload) # if succeed, proceed to handle output break except Exception as e: logger.warning("Replicate model %s failed: %s", cand, e) last_exc = e output = None continue if output is None: logger.error("All Replicate candidates failed") raise last_exc or RuntimeError("Replicate generation failed for all candidates") # Replicate often returns a URL or a list of URLs logger.info("Replicate output type: %s", type(output)) # handle common output shapes if isinstance(output, str) and output.startswith("http"): r = requests.get(output, timeout=120) r.raise_for_status() return r.content if isinstance(output, list) and output: first = output[0] if isinstance(first, str) and first.startswith("http"): r = requests.get(first, timeout=120) r.raise_for_status() return r.content if isinstance(first, bytes): return first if isinstance(output, bytes): return output if isinstance(output, dict): # try common fields if "url" in output and isinstance(output["url"], str): r = requests.get(output["url"], timeout=120) r.raise_for_status() return r.content if "image" in output and isinstance(output["image"], str): try: return base64.b64decode(output["image"]) except Exception: pass raise RuntimeError("Unknown Replicate output format") def generate_image_replicate_poll(prompt: str, model: str | None = None, image_inputs: list | None = None, aspect_ratio: str = "match_input_image", output_format: str = "jpg", check_interval: int = 8, timeout: int = 600) -> bytes: """Create a Replicate prediction and poll every `check_interval` seconds until finished. Saves logs and returns raw image bytes when available. Requires `REPLICATE_API_TOKEN` and `REPLICATE_MODEL` (or pass `model` param) to be set. """ load_dotenv() token = os.getenv("REPLICATE_API_TOKEN") if not token: logger.error("REPLICATE_API_TOKEN missing in environment") raise RuntimeError("REPLICATE_API_TOKEN missing in environment") # Build candidate list (primary then alternates) if model is None: primary = os.getenv("REPLICATE_MODEL") else: primary = model alternates = os.getenv("REPLICATE_MODEL_ALTERNATES", "") candidates = [] if primary: candidates.append(primary) if alternates: for part in alternates.split(","): part = part.strip() if part and part not in candidates: candidates.append(part) if not candidates: logger.error("No Replicate model configured (set REPLICATE_MODEL or REPLICATE_MODEL_ALTERNATES)") raise RuntimeError("No Replicate model configured (REPLICATE_MODEL missing)") url = "https://api.replicate.com/v1/predictions" headers = { "Authorization": f"Token {token}", "Content-Type": "application/json", "Accept": "application/json", } payload = { "version": model, "input": { "prompt": prompt, "image_input": image_inputs or [], "aspect_ratio": aspect_ratio, "output_format": output_format, }, } last_exc = None for cand in candidates: logger.info("Creating Replicate prediction for model %s", cand) payload["version"] = cand try: resp = requests.post(url, headers=headers, json=payload, timeout=30) resp.raise_for_status() except Exception as e: logger.warning("Replicate create failed for %s: %s", cand, e) last_exc = e continue pred = resp.json() pred_id = pred.get("id") if not pred_id: logger.warning("Replicate create returned no id for %s", cand) last_exc = RuntimeError("Replicate did not return a prediction id") continue logger.info("Replicate prediction created: %s (model=%s)", pred_id, cand) started = time.time() status = pred.get("status") while status in ("starting", "processing", "queued"): if time.time() - started > timeout: last_exc = RuntimeError("Replicate prediction timed out") break logger.info("Prediction %s status=%s — sleeping %ss", pred_id, status, check_interval) time.sleep(check_interval) r2 = requests.get(f"{url}/{pred_id}", headers=headers, timeout=30) r2.raise_for_status() pred = r2.json() status = pred.get("status") if status != "succeeded": detail = pred.get("error") or pred.get("output") logger.warning("Prediction %s failed with status=%s: %s", pred_id, status, detail) last_exc = RuntimeError(f"Replicate prediction failed: {detail}") continue logger.info("Prediction %s succeeded (model=%s)", pred_id, cand) output = pred.get("output") # output is commonly a list of urls if isinstance(output, list) and output: first = output[0] if isinstance(first, str) and first.startswith("http"): logger.info("Downloading output from %s", first) r3 = requests.get(first, timeout=120) r3.raise_for_status() return r3.content if isinstance(first, bytes): return first if isinstance(output, str) and output.startswith("http"): r3 = requests.get(output, timeout=120) r3.raise_for_status() return r3.content # fallback: try to inspect nested structures if isinstance(output, dict): for k in ("image", "url", "output"): v = output.get(k) if isinstance(v, str) and v.startswith("http"): r3 = requests.get(v, timeout=120) r3.raise_for_status() return r3.content raise RuntimeError("Unknown Replicate prediction output format") def generate_image(prompt: str, size: str = "1024x1024", provider_order: str | None = None) -> str: """Generate an image from `prompt`. Returns local path to saved image (PNG). Behavior: tries providers in the order specified by the `PROVIDER_ORDER` environment variable (comma-separated). Supported providers: `openai`, `huggingface` (or `hf`), and `replicate`. If a provider fails, the code moves to the next provider. Default: `openai,replicate`. """ load_dotenv() logger.info("Generating image for prompt: %s", prompt) def generate_image_openai(local_prompt: str, local_size: str) -> bytes: api_key = os.getenv("OPENAI_API_KEY") if not api_key: logger.error("OPENAI_API_KEY not set") raise RuntimeError("OPENAI_API_KEY not set") if openai_client is not None: resp = openai_client.images.generate(model="gpt-image-1", prompt=local_prompt, size=local_size) b64 = resp.data[0].b64_json return base64.b64decode(b64) elif openai_legacy is not None: openai_legacy.api_key = api_key resp = openai_legacy.Image.create(prompt=local_prompt, size=local_size, n=1) b64 = resp["data"][0]["b64_json"] return base64.b64decode(b64) else: raise RuntimeError("No OpenAI client available") # Provider order from env, default to openai then replicate if provider_order is None: provider_order = os.getenv("PROVIDER_ORDER", "openai,replicate") providers = [p.strip().lower() for p in provider_order.split(",") if p.strip()] if not providers: providers = ["openai", "replicate"] img_bytes = None last_exc = None for provider in providers: try: logger.info("Trying provider: %s", provider) if provider in ("openai", "oa"): img_bytes = generate_image_openai(prompt, size) elif provider in ("huggingface", "hf"): img_bytes = generate_image_hf(prompt, size=size) elif provider == "replicate": # use polling replicate fallback (checks every 8s) img_bytes = generate_image_replicate_poll(prompt, check_interval=8) else: logger.warning("Unknown provider '%s' — skipping", provider) continue # if generation succeeded, break loop if img_bytes: logger.info("Provider %s succeeded", provider) break except Exception as e: logger.exception("Provider %s failed: %s", provider, e) last_exc = e continue if not img_bytes: logger.error("All providers failed") if last_exc: raise SystemExit(1) from last_exc raise SystemExit(1) out_dir = os.path.join(os.getcwd(), "generated_images") os.makedirs(out_dir, exist_ok=True) ts = int(time.time()) filename = f"image_{ts}.png" path = os.path.join(out_dir, filename) with open(path, "wb") as f: f.write(img_bytes) logger.info("Saved generated image to %s", path) return path def post_image_to_facebook(page_id: str, access_token: str, image_path: str, caption: str | None = None) -> dict: url = f"https://graph.facebook.com/{page_id}/photos" data = {"access_token": access_token} if caption: data["caption"] = caption logger.info("Uploading image %s to Facebook page %s", image_path, page_id) with open(image_path, "rb") as imgf: files = {"source": imgf} resp = requests.post(url, files=files, data=data) try: resp.raise_for_status() except requests.HTTPError: logger.error("Facebook upload error: %s", resp.text) # write to log.txt try: with open("log.txt", "a", encoding="utf-8") as lf: lf.write(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_IMAGE_POST_ERROR page={page_id} image={image_path} response={resp.text}\n") except Exception: logger.exception("Failed to write to log.txt") raise logger.info("Upload successful: %s", resp.json()) try: with open("log.txt", "a", encoding="utf-8") as lf: data = resp.json() lf.write(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_IMAGE_POST_SUCCESS page={page_id} image={image_path} id={data.get('id')} post_id={data.get('post_id')}\n") except Exception: logger.exception("Failed to append image post info to log.txt") return resp.json() def generate_and_post(prompt: str, caption: str | None = None, post: bool = False, use_wisdom_as_prompt: bool = False, caption_template: str | None = None, use_wisdom_as_caption: bool = False, provider_order: str | None = None) -> dict: # If requested, generate a short wisdom text and use it (or append) as the image prompt image_prompt = prompt wisdom_text = None if use_wisdom_as_prompt: try: wisdom_text = generate_wisdom(prompt) # If no explicit prompt provided, use the wisdom as the image prompt if not image_prompt: image_prompt = wisdom_text else: # combine both: image prompt + the wisdom quote to guide imagery image_prompt = f"{image_prompt}. Quote: {wisdom_text}" except Exception as e: logger.exception("Failed to generate wisdom for image prompt: %s", e) # proceed using the original prompt img_path = generate_image(image_prompt, provider_order=provider_order) result = {"image_path": img_path} if wisdom_text: result["wisdom"] = wisdom_text if post: load_dotenv() page_id = os.getenv("FB_PAGE_ID") token = os.getenv("FB_PAGE_ACCESS_TOKEN") if not page_id or not token: logger.error("Missing FB_PAGE_ID or FB_PAGE_ACCESS_TOKEN in environment") raise SystemExit(1) # build final caption: explicit caption wins; then caption_template; then wisdom if requested final_caption = caption if not final_caption and caption_template: try: final_caption = caption_template.format(prompt=prompt or "", wisdom=wisdom_text or "") except Exception: logger.exception("Failed to format caption_template") final_caption = caption_template if not final_caption and use_wisdom_as_caption and wisdom_text: final_caption = wisdom_text try: res = post_image_to_facebook(page_id, token, img_path, final_caption) result["facebook"] = res except Exception as e: # enqueue a retry entry and return an 'enqueued' status instead of failing try: enqueue_retry({ "type": "image", "page_id": page_id, "access_token": token, "image_path": img_path, "caption": final_caption, }) logger.warning("Image post failed; enqueued for retry: %s", e) result["facebook"] = {"status": "enqueued", "reason": str(e)} except Exception: logger.exception("Failed to enqueue failed image post") raise return result if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Generate an image via OpenAI and optionally post to Facebook") parser.add_argument("-p", "--prompt", required=True, help="Prompt for image generation") parser.add_argument("--caption", help="Caption to use when posting to Facebook") parser.add_argument("--caption-template", help="Caption template, supports {prompt} and {wisdom}") parser.add_argument("--use-wisdom-as-caption", action="store_true", help="Use generated wisdom as caption if available") parser.add_argument("--post", action="store_true", help="Post image to Facebook after generation") args = parser.parse_args() res = generate_and_post( args.prompt, caption=args.caption, post=args.post, caption_template=args.caption_template, use_wisdom_as_caption=args.use_wisdom_as_caption, ) print(res)