monk / generate_image.py
hf-actions
feat: persistent retry queue for failed FB posts; enqueue on failures; start worker
5f2de2a
import os
import time
import base64
import logging
from dotenv import load_dotenv
from generate_wisdom import generate_wisdom
try:
from openai import OpenAI
openai_client = OpenAI()
openai_legacy = None
except Exception:
try:
import openai as openai_legacy
openai_client = None
except Exception:
openai_client = None
openai_legacy = None
import requests
from retry_queue import enqueue as enqueue_retry
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
def generate_image_hf(prompt: str, model: str = "stabilityai/stable-diffusion-2", size: str = "1024x1024", steps: int = 20, guidance: float = 7.5) -> bytes:
"""Generate image bytes using Hugging Face Inference API as a fallback.
Tries the requested `model` first; if it's not available via the Inference API
(HTTP 410/404) the function will try a small list of common SD models.
"""
load_dotenv()
hf_token = os.getenv("HF")
if not hf_token:
logger.error("HF token missing in environment (HF)")
raise RuntimeError("HF token missing in environment (HF)")
try:
width, height = (int(x) for x in size.split("x"))
except Exception:
width, height = 1024, 1024
candidate_models = [model, "runwayml/stable-diffusion-v1-5", "prompthero/openjourney"]
last_exc = None
for m in candidate_models:
url = f"https://api-inference.huggingface.co/models/{m}"
headers = {"Authorization": f"Bearer {hf_token}", "Accept": "application/json"}
payload = {
"inputs": prompt,
"parameters": {
"width": width,
"height": height,
"num_inference_steps": steps,
"guidance_scale": guidance,
},
"options": {"wait_for_model": True},
}
logger.info("Trying HF model %s (size=%sx%s, steps=%s)", m, width, height, steps)
try:
resp = requests.post(url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
except requests.HTTPError as e:
status = getattr(e.response, "status_code", None)
logger.warning("HF model %s returned HTTP %s", m, status)
last_exc = e
# try next candidate if model not hosted on inference API (410/Gone or 404)
if status in (404, 410):
# attempt Space /api/predict for public Spaces (owner/model -> /spaces/owner/model)
try:
owner, repo = m.split("/", 1)
space_url = f"https://huggingface.co/spaces/{owner}/{repo}/api/predict"
logger.info("Trying Space API %s", space_url)
sp_headers = {"Authorization": f"Bearer {hf_token}"} if hf_token else {}
sp_payload = {"data": [prompt]}
sp_resp = requests.post(space_url, headers=sp_headers, json=sp_payload, timeout=180)
sp_resp.raise_for_status()
js = sp_resp.json()
# many Spaces return {'data': [...]} where the first item is base64 or url
if isinstance(js, dict) and "data" in js:
first = js["data"][0]
if isinstance(first, str):
# try base64 decode
import re
mobj = re.search(r"([A-Za-z0-9+/=]{200,})", first)
if mobj:
return base64.b64decode(mobj.group(1))
# if it's a direct URL, try fetching bytes
if first.startswith("http"):
r2 = requests.get(first, timeout=60)
r2.raise_for_status()
return r2.content
# if we reach here, the Space didn't return usable image; continue to next model
except Exception:
logger.warning("Space API attempt for %s failed", m)
continue
raise
content_type = resp.headers.get("content-type", "")
if content_type.startswith("application/json"):
js = resp.json()
# Try common response shapes for base64-encoded image
b64 = None
if isinstance(js, dict):
for key in ("image", "generated_image", "data", "images"):
if key in js:
val = js[key]
if isinstance(val, str):
b64 = val
break
if isinstance(val, list) and val:
first = val[0]
if isinstance(first, dict) and "image_base64" in first:
b64 = first["image_base64"]
break
if isinstance(first, str):
b64 = first
break
if not b64:
import re
txt = str(js)
mobj = re.search(r"([A-Za-z0-9+/=]{200,})", txt)
if mobj:
b64 = mobj.group(1)
if not b64:
raise RuntimeError("No base64 image found in HF JSON response")
return base64.b64decode(b64)
# otherwise assume binary image content
return resp.content
# If loop exhausted, raise last exception
if last_exc:
raise last_exc
raise RuntimeError("Hugging Face image generation failed for all candidates")
def generate_image_replicate(prompt: str, model: str | None = None, image_inputs: list | None = None, aspect_ratio: str = "match_input_image", output_format: str = "jpg") -> bytes:
"""Generate image bytes using Replicate as a final fallback.
Requires `REPLICATE_API_TOKEN` in the environment.
"""
load_dotenv()
token = os.getenv("REPLICATE_API_TOKEN")
if not token:
logger.error("REPLICATE_API_TOKEN missing in environment")
raise RuntimeError("REPLICATE_API_TOKEN missing in environment")
try:
import replicate
except Exception:
logger.exception("Replicate client not installed")
raise
# Build list of replicate model candidates: primary then alternates
if model is None:
primary = os.getenv("REPLICATE_MODEL")
else:
primary = model
alternates = os.getenv("REPLICATE_MODEL_ALTERNATES", "")
candidates = []
if primary:
candidates.append(primary)
if alternates:
for part in alternates.split(","):
part = part.strip()
if part and part not in candidates:
candidates.append(part)
if not candidates:
logger.error("No Replicate model configured (set REPLICATE_MODEL or REPLICATE_MODEL_ALTERNATES)")
raise RuntimeError("No Replicate model configured")
input_payload = {
"prompt": prompt,
"image_input": image_inputs or [],
"aspect_ratio": aspect_ratio,
"output_format": output_format,
}
last_exc = None
for cand in candidates:
logger.info("Calling Replicate candidate model %s", cand)
try:
output = replicate.run(cand, input=input_payload)
# if succeed, proceed to handle output
break
except Exception as e:
logger.warning("Replicate model %s failed: %s", cand, e)
last_exc = e
output = None
continue
if output is None:
logger.error("All Replicate candidates failed")
raise last_exc or RuntimeError("Replicate generation failed for all candidates")
# Replicate often returns a URL or a list of URLs
logger.info("Replicate output type: %s", type(output))
# handle common output shapes
if isinstance(output, str) and output.startswith("http"):
r = requests.get(output, timeout=120)
r.raise_for_status()
return r.content
if isinstance(output, list) and output:
first = output[0]
if isinstance(first, str) and first.startswith("http"):
r = requests.get(first, timeout=120)
r.raise_for_status()
return r.content
if isinstance(first, bytes):
return first
if isinstance(output, bytes):
return output
if isinstance(output, dict):
# try common fields
if "url" in output and isinstance(output["url"], str):
r = requests.get(output["url"], timeout=120)
r.raise_for_status()
return r.content
if "image" in output and isinstance(output["image"], str):
try:
return base64.b64decode(output["image"])
except Exception:
pass
raise RuntimeError("Unknown Replicate output format")
def generate_image_replicate_poll(prompt: str, model: str | None = None, image_inputs: list | None = None, aspect_ratio: str = "match_input_image", output_format: str = "jpg", check_interval: int = 8, timeout: int = 600) -> bytes:
"""Create a Replicate prediction and poll every `check_interval` seconds until finished.
Saves logs and returns raw image bytes when available. Requires `REPLICATE_API_TOKEN` and
`REPLICATE_MODEL` (or pass `model` param) to be set.
"""
load_dotenv()
token = os.getenv("REPLICATE_API_TOKEN")
if not token:
logger.error("REPLICATE_API_TOKEN missing in environment")
raise RuntimeError("REPLICATE_API_TOKEN missing in environment")
# Build candidate list (primary then alternates)
if model is None:
primary = os.getenv("REPLICATE_MODEL")
else:
primary = model
alternates = os.getenv("REPLICATE_MODEL_ALTERNATES", "")
candidates = []
if primary:
candidates.append(primary)
if alternates:
for part in alternates.split(","):
part = part.strip()
if part and part not in candidates:
candidates.append(part)
if not candidates:
logger.error("No Replicate model configured (set REPLICATE_MODEL or REPLICATE_MODEL_ALTERNATES)")
raise RuntimeError("No Replicate model configured (REPLICATE_MODEL missing)")
url = "https://api.replicate.com/v1/predictions"
headers = {
"Authorization": f"Token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {
"version": model,
"input": {
"prompt": prompt,
"image_input": image_inputs or [],
"aspect_ratio": aspect_ratio,
"output_format": output_format,
},
}
last_exc = None
for cand in candidates:
logger.info("Creating Replicate prediction for model %s", cand)
payload["version"] = cand
try:
resp = requests.post(url, headers=headers, json=payload, timeout=30)
resp.raise_for_status()
except Exception as e:
logger.warning("Replicate create failed for %s: %s", cand, e)
last_exc = e
continue
pred = resp.json()
pred_id = pred.get("id")
if not pred_id:
logger.warning("Replicate create returned no id for %s", cand)
last_exc = RuntimeError("Replicate did not return a prediction id")
continue
logger.info("Replicate prediction created: %s (model=%s)", pred_id, cand)
started = time.time()
status = pred.get("status")
while status in ("starting", "processing", "queued"):
if time.time() - started > timeout:
last_exc = RuntimeError("Replicate prediction timed out")
break
logger.info("Prediction %s status=%s — sleeping %ss", pred_id, status, check_interval)
time.sleep(check_interval)
r2 = requests.get(f"{url}/{pred_id}", headers=headers, timeout=30)
r2.raise_for_status()
pred = r2.json()
status = pred.get("status")
if status != "succeeded":
detail = pred.get("error") or pred.get("output")
logger.warning("Prediction %s failed with status=%s: %s", pred_id, status, detail)
last_exc = RuntimeError(f"Replicate prediction failed: {detail}")
continue
logger.info("Prediction %s succeeded (model=%s)", pred_id, cand)
output = pred.get("output")
# output is commonly a list of urls
if isinstance(output, list) and output:
first = output[0]
if isinstance(first, str) and first.startswith("http"):
logger.info("Downloading output from %s", first)
r3 = requests.get(first, timeout=120)
r3.raise_for_status()
return r3.content
if isinstance(first, bytes):
return first
if isinstance(output, str) and output.startswith("http"):
r3 = requests.get(output, timeout=120)
r3.raise_for_status()
return r3.content
# fallback: try to inspect nested structures
if isinstance(output, dict):
for k in ("image", "url", "output"):
v = output.get(k)
if isinstance(v, str) and v.startswith("http"):
r3 = requests.get(v, timeout=120)
r3.raise_for_status()
return r3.content
raise RuntimeError("Unknown Replicate prediction output format")
def generate_image(prompt: str, size: str = "1024x1024", provider_order: str | None = None) -> str:
"""Generate an image from `prompt`. Returns local path to saved image (PNG).
Behavior: tries providers in the order specified by the `PROVIDER_ORDER`
environment variable (comma-separated). Supported providers: `openai`,
`huggingface` (or `hf`), and `replicate`. If a provider fails, the code
moves to the next provider. Default: `openai,replicate`.
"""
load_dotenv()
logger.info("Generating image for prompt: %s", prompt)
def generate_image_openai(local_prompt: str, local_size: str) -> bytes:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.error("OPENAI_API_KEY not set")
raise RuntimeError("OPENAI_API_KEY not set")
if openai_client is not None:
resp = openai_client.images.generate(model="gpt-image-1", prompt=local_prompt, size=local_size)
b64 = resp.data[0].b64_json
return base64.b64decode(b64)
elif openai_legacy is not None:
openai_legacy.api_key = api_key
resp = openai_legacy.Image.create(prompt=local_prompt, size=local_size, n=1)
b64 = resp["data"][0]["b64_json"]
return base64.b64decode(b64)
else:
raise RuntimeError("No OpenAI client available")
# Provider order from env, default to openai then replicate
if provider_order is None:
provider_order = os.getenv("PROVIDER_ORDER", "openai,replicate")
providers = [p.strip().lower() for p in provider_order.split(",") if p.strip()]
if not providers:
providers = ["openai", "replicate"]
img_bytes = None
last_exc = None
for provider in providers:
try:
logger.info("Trying provider: %s", provider)
if provider in ("openai", "oa"):
img_bytes = generate_image_openai(prompt, size)
elif provider in ("huggingface", "hf"):
img_bytes = generate_image_hf(prompt, size=size)
elif provider == "replicate":
# use polling replicate fallback (checks every 8s)
img_bytes = generate_image_replicate_poll(prompt, check_interval=8)
else:
logger.warning("Unknown provider '%s' — skipping", provider)
continue
# if generation succeeded, break loop
if img_bytes:
logger.info("Provider %s succeeded", provider)
break
except Exception as e:
logger.exception("Provider %s failed: %s", provider, e)
last_exc = e
continue
if not img_bytes:
logger.error("All providers failed")
if last_exc:
raise SystemExit(1) from last_exc
raise SystemExit(1)
out_dir = os.path.join(os.getcwd(), "generated_images")
os.makedirs(out_dir, exist_ok=True)
ts = int(time.time())
filename = f"image_{ts}.png"
path = os.path.join(out_dir, filename)
with open(path, "wb") as f:
f.write(img_bytes)
logger.info("Saved generated image to %s", path)
return path
def post_image_to_facebook(page_id: str, access_token: str, image_path: str, caption: str | None = None) -> dict:
url = f"https://graph.facebook.com/{page_id}/photos"
data = {"access_token": access_token}
if caption:
data["caption"] = caption
logger.info("Uploading image %s to Facebook page %s", image_path, page_id)
with open(image_path, "rb") as imgf:
files = {"source": imgf}
resp = requests.post(url, files=files, data=data)
try:
resp.raise_for_status()
except requests.HTTPError:
logger.error("Facebook upload error: %s", resp.text)
# write to log.txt
try:
with open("log.txt", "a", encoding="utf-8") as lf:
lf.write(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_IMAGE_POST_ERROR page={page_id} image={image_path} response={resp.text}\n")
except Exception:
logger.exception("Failed to write to log.txt")
raise
logger.info("Upload successful: %s", resp.json())
try:
with open("log.txt", "a", encoding="utf-8") as lf:
data = resp.json()
lf.write(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_IMAGE_POST_SUCCESS page={page_id} image={image_path} id={data.get('id')} post_id={data.get('post_id')}\n")
except Exception:
logger.exception("Failed to append image post info to log.txt")
return resp.json()
def generate_and_post(prompt: str, caption: str | None = None, post: bool = False, use_wisdom_as_prompt: bool = False, caption_template: str | None = None, use_wisdom_as_caption: bool = False, provider_order: str | None = None) -> dict:
# If requested, generate a short wisdom text and use it (or append) as the image prompt
image_prompt = prompt
wisdom_text = None
if use_wisdom_as_prompt:
try:
wisdom_text = generate_wisdom(prompt)
# If no explicit prompt provided, use the wisdom as the image prompt
if not image_prompt:
image_prompt = wisdom_text
else:
# combine both: image prompt + the wisdom quote to guide imagery
image_prompt = f"{image_prompt}. Quote: {wisdom_text}"
except Exception as e:
logger.exception("Failed to generate wisdom for image prompt: %s", e)
# proceed using the original prompt
img_path = generate_image(image_prompt, provider_order=provider_order)
result = {"image_path": img_path}
if wisdom_text:
result["wisdom"] = wisdom_text
if post:
load_dotenv()
page_id = os.getenv("FB_PAGE_ID")
token = os.getenv("FB_PAGE_ACCESS_TOKEN")
if not page_id or not token:
logger.error("Missing FB_PAGE_ID or FB_PAGE_ACCESS_TOKEN in environment")
raise SystemExit(1)
# build final caption: explicit caption wins; then caption_template; then wisdom if requested
final_caption = caption
if not final_caption and caption_template:
try:
final_caption = caption_template.format(prompt=prompt or "", wisdom=wisdom_text or "")
except Exception:
logger.exception("Failed to format caption_template")
final_caption = caption_template
if not final_caption and use_wisdom_as_caption and wisdom_text:
final_caption = wisdom_text
try:
res = post_image_to_facebook(page_id, token, img_path, final_caption)
result["facebook"] = res
except Exception as e:
# enqueue a retry entry and return an 'enqueued' status instead of failing
try:
enqueue_retry({
"type": "image",
"page_id": page_id,
"access_token": token,
"image_path": img_path,
"caption": final_caption,
})
logger.warning("Image post failed; enqueued for retry: %s", e)
result["facebook"] = {"status": "enqueued", "reason": str(e)}
except Exception:
logger.exception("Failed to enqueue failed image post")
raise
return result
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Generate an image via OpenAI and optionally post to Facebook")
parser.add_argument("-p", "--prompt", required=True, help="Prompt for image generation")
parser.add_argument("--caption", help="Caption to use when posting to Facebook")
parser.add_argument("--caption-template", help="Caption template, supports {prompt} and {wisdom}")
parser.add_argument("--use-wisdom-as-caption", action="store_true", help="Use generated wisdom as caption if available")
parser.add_argument("--post", action="store_true", help="Post image to Facebook after generation")
args = parser.parse_args()
res = generate_and_post(
args.prompt,
caption=args.caption,
post=args.post,
caption_template=args.caption_template,
use_wisdom_as_caption=args.use_wisdom_as_caption,
)
print(res)