Spaces:
Sleeping
Sleeping
hf-actions
feat: persistent retry queue for failed FB posts; enqueue on failures; start worker
5f2de2a
| import os | |
| import time | |
| import base64 | |
| import logging | |
| from dotenv import load_dotenv | |
| from generate_wisdom import generate_wisdom | |
| try: | |
| from openai import OpenAI | |
| openai_client = OpenAI() | |
| openai_legacy = None | |
| except Exception: | |
| try: | |
| import openai as openai_legacy | |
| openai_client = None | |
| except Exception: | |
| openai_client = None | |
| openai_legacy = None | |
| import requests | |
| from retry_queue import enqueue as enqueue_retry | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| logger = logging.getLogger(__name__) | |
| def generate_image_hf(prompt: str, model: str = "stabilityai/stable-diffusion-2", size: str = "1024x1024", steps: int = 20, guidance: float = 7.5) -> bytes: | |
| """Generate image bytes using Hugging Face Inference API as a fallback. | |
| Tries the requested `model` first; if it's not available via the Inference API | |
| (HTTP 410/404) the function will try a small list of common SD models. | |
| """ | |
| load_dotenv() | |
| hf_token = os.getenv("HF") | |
| if not hf_token: | |
| logger.error("HF token missing in environment (HF)") | |
| raise RuntimeError("HF token missing in environment (HF)") | |
| try: | |
| width, height = (int(x) for x in size.split("x")) | |
| except Exception: | |
| width, height = 1024, 1024 | |
| candidate_models = [model, "runwayml/stable-diffusion-v1-5", "prompthero/openjourney"] | |
| last_exc = None | |
| for m in candidate_models: | |
| url = f"https://api-inference.huggingface.co/models/{m}" | |
| headers = {"Authorization": f"Bearer {hf_token}", "Accept": "application/json"} | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "width": width, | |
| "height": height, | |
| "num_inference_steps": steps, | |
| "guidance_scale": guidance, | |
| }, | |
| "options": {"wait_for_model": True}, | |
| } | |
| logger.info("Trying HF model %s (size=%sx%s, steps=%s)", m, width, height, steps) | |
| try: | |
| resp = requests.post(url, headers=headers, json=payload, timeout=120) | |
| resp.raise_for_status() | |
| except requests.HTTPError as e: | |
| status = getattr(e.response, "status_code", None) | |
| logger.warning("HF model %s returned HTTP %s", m, status) | |
| last_exc = e | |
| # try next candidate if model not hosted on inference API (410/Gone or 404) | |
| if status in (404, 410): | |
| # attempt Space /api/predict for public Spaces (owner/model -> /spaces/owner/model) | |
| try: | |
| owner, repo = m.split("/", 1) | |
| space_url = f"https://huggingface.co/spaces/{owner}/{repo}/api/predict" | |
| logger.info("Trying Space API %s", space_url) | |
| sp_headers = {"Authorization": f"Bearer {hf_token}"} if hf_token else {} | |
| sp_payload = {"data": [prompt]} | |
| sp_resp = requests.post(space_url, headers=sp_headers, json=sp_payload, timeout=180) | |
| sp_resp.raise_for_status() | |
| js = sp_resp.json() | |
| # many Spaces return {'data': [...]} where the first item is base64 or url | |
| if isinstance(js, dict) and "data" in js: | |
| first = js["data"][0] | |
| if isinstance(first, str): | |
| # try base64 decode | |
| import re | |
| mobj = re.search(r"([A-Za-z0-9+/=]{200,})", first) | |
| if mobj: | |
| return base64.b64decode(mobj.group(1)) | |
| # if it's a direct URL, try fetching bytes | |
| if first.startswith("http"): | |
| r2 = requests.get(first, timeout=60) | |
| r2.raise_for_status() | |
| return r2.content | |
| # if we reach here, the Space didn't return usable image; continue to next model | |
| except Exception: | |
| logger.warning("Space API attempt for %s failed", m) | |
| continue | |
| raise | |
| content_type = resp.headers.get("content-type", "") | |
| if content_type.startswith("application/json"): | |
| js = resp.json() | |
| # Try common response shapes for base64-encoded image | |
| b64 = None | |
| if isinstance(js, dict): | |
| for key in ("image", "generated_image", "data", "images"): | |
| if key in js: | |
| val = js[key] | |
| if isinstance(val, str): | |
| b64 = val | |
| break | |
| if isinstance(val, list) and val: | |
| first = val[0] | |
| if isinstance(first, dict) and "image_base64" in first: | |
| b64 = first["image_base64"] | |
| break | |
| if isinstance(first, str): | |
| b64 = first | |
| break | |
| if not b64: | |
| import re | |
| txt = str(js) | |
| mobj = re.search(r"([A-Za-z0-9+/=]{200,})", txt) | |
| if mobj: | |
| b64 = mobj.group(1) | |
| if not b64: | |
| raise RuntimeError("No base64 image found in HF JSON response") | |
| return base64.b64decode(b64) | |
| # otherwise assume binary image content | |
| return resp.content | |
| # If loop exhausted, raise last exception | |
| if last_exc: | |
| raise last_exc | |
| raise RuntimeError("Hugging Face image generation failed for all candidates") | |
| def generate_image_replicate(prompt: str, model: str | None = None, image_inputs: list | None = None, aspect_ratio: str = "match_input_image", output_format: str = "jpg") -> bytes: | |
| """Generate image bytes using Replicate as a final fallback. | |
| Requires `REPLICATE_API_TOKEN` in the environment. | |
| """ | |
| load_dotenv() | |
| token = os.getenv("REPLICATE_API_TOKEN") | |
| if not token: | |
| logger.error("REPLICATE_API_TOKEN missing in environment") | |
| raise RuntimeError("REPLICATE_API_TOKEN missing in environment") | |
| try: | |
| import replicate | |
| except Exception: | |
| logger.exception("Replicate client not installed") | |
| raise | |
| # Build list of replicate model candidates: primary then alternates | |
| if model is None: | |
| primary = os.getenv("REPLICATE_MODEL") | |
| else: | |
| primary = model | |
| alternates = os.getenv("REPLICATE_MODEL_ALTERNATES", "") | |
| candidates = [] | |
| if primary: | |
| candidates.append(primary) | |
| if alternates: | |
| for part in alternates.split(","): | |
| part = part.strip() | |
| if part and part not in candidates: | |
| candidates.append(part) | |
| if not candidates: | |
| logger.error("No Replicate model configured (set REPLICATE_MODEL or REPLICATE_MODEL_ALTERNATES)") | |
| raise RuntimeError("No Replicate model configured") | |
| input_payload = { | |
| "prompt": prompt, | |
| "image_input": image_inputs or [], | |
| "aspect_ratio": aspect_ratio, | |
| "output_format": output_format, | |
| } | |
| last_exc = None | |
| for cand in candidates: | |
| logger.info("Calling Replicate candidate model %s", cand) | |
| try: | |
| output = replicate.run(cand, input=input_payload) | |
| # if succeed, proceed to handle output | |
| break | |
| except Exception as e: | |
| logger.warning("Replicate model %s failed: %s", cand, e) | |
| last_exc = e | |
| output = None | |
| continue | |
| if output is None: | |
| logger.error("All Replicate candidates failed") | |
| raise last_exc or RuntimeError("Replicate generation failed for all candidates") | |
| # Replicate often returns a URL or a list of URLs | |
| logger.info("Replicate output type: %s", type(output)) | |
| # handle common output shapes | |
| if isinstance(output, str) and output.startswith("http"): | |
| r = requests.get(output, timeout=120) | |
| r.raise_for_status() | |
| return r.content | |
| if isinstance(output, list) and output: | |
| first = output[0] | |
| if isinstance(first, str) and first.startswith("http"): | |
| r = requests.get(first, timeout=120) | |
| r.raise_for_status() | |
| return r.content | |
| if isinstance(first, bytes): | |
| return first | |
| if isinstance(output, bytes): | |
| return output | |
| if isinstance(output, dict): | |
| # try common fields | |
| if "url" in output and isinstance(output["url"], str): | |
| r = requests.get(output["url"], timeout=120) | |
| r.raise_for_status() | |
| return r.content | |
| if "image" in output and isinstance(output["image"], str): | |
| try: | |
| return base64.b64decode(output["image"]) | |
| except Exception: | |
| pass | |
| raise RuntimeError("Unknown Replicate output format") | |
| def generate_image_replicate_poll(prompt: str, model: str | None = None, image_inputs: list | None = None, aspect_ratio: str = "match_input_image", output_format: str = "jpg", check_interval: int = 8, timeout: int = 600) -> bytes: | |
| """Create a Replicate prediction and poll every `check_interval` seconds until finished. | |
| Saves logs and returns raw image bytes when available. Requires `REPLICATE_API_TOKEN` and | |
| `REPLICATE_MODEL` (or pass `model` param) to be set. | |
| """ | |
| load_dotenv() | |
| token = os.getenv("REPLICATE_API_TOKEN") | |
| if not token: | |
| logger.error("REPLICATE_API_TOKEN missing in environment") | |
| raise RuntimeError("REPLICATE_API_TOKEN missing in environment") | |
| # Build candidate list (primary then alternates) | |
| if model is None: | |
| primary = os.getenv("REPLICATE_MODEL") | |
| else: | |
| primary = model | |
| alternates = os.getenv("REPLICATE_MODEL_ALTERNATES", "") | |
| candidates = [] | |
| if primary: | |
| candidates.append(primary) | |
| if alternates: | |
| for part in alternates.split(","): | |
| part = part.strip() | |
| if part and part not in candidates: | |
| candidates.append(part) | |
| if not candidates: | |
| logger.error("No Replicate model configured (set REPLICATE_MODEL or REPLICATE_MODEL_ALTERNATES)") | |
| raise RuntimeError("No Replicate model configured (REPLICATE_MODEL missing)") | |
| url = "https://api.replicate.com/v1/predictions" | |
| headers = { | |
| "Authorization": f"Token {token}", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| payload = { | |
| "version": model, | |
| "input": { | |
| "prompt": prompt, | |
| "image_input": image_inputs or [], | |
| "aspect_ratio": aspect_ratio, | |
| "output_format": output_format, | |
| }, | |
| } | |
| last_exc = None | |
| for cand in candidates: | |
| logger.info("Creating Replicate prediction for model %s", cand) | |
| payload["version"] = cand | |
| try: | |
| resp = requests.post(url, headers=headers, json=payload, timeout=30) | |
| resp.raise_for_status() | |
| except Exception as e: | |
| logger.warning("Replicate create failed for %s: %s", cand, e) | |
| last_exc = e | |
| continue | |
| pred = resp.json() | |
| pred_id = pred.get("id") | |
| if not pred_id: | |
| logger.warning("Replicate create returned no id for %s", cand) | |
| last_exc = RuntimeError("Replicate did not return a prediction id") | |
| continue | |
| logger.info("Replicate prediction created: %s (model=%s)", pred_id, cand) | |
| started = time.time() | |
| status = pred.get("status") | |
| while status in ("starting", "processing", "queued"): | |
| if time.time() - started > timeout: | |
| last_exc = RuntimeError("Replicate prediction timed out") | |
| break | |
| logger.info("Prediction %s status=%s — sleeping %ss", pred_id, status, check_interval) | |
| time.sleep(check_interval) | |
| r2 = requests.get(f"{url}/{pred_id}", headers=headers, timeout=30) | |
| r2.raise_for_status() | |
| pred = r2.json() | |
| status = pred.get("status") | |
| if status != "succeeded": | |
| detail = pred.get("error") or pred.get("output") | |
| logger.warning("Prediction %s failed with status=%s: %s", pred_id, status, detail) | |
| last_exc = RuntimeError(f"Replicate prediction failed: {detail}") | |
| continue | |
| logger.info("Prediction %s succeeded (model=%s)", pred_id, cand) | |
| output = pred.get("output") | |
| # output is commonly a list of urls | |
| if isinstance(output, list) and output: | |
| first = output[0] | |
| if isinstance(first, str) and first.startswith("http"): | |
| logger.info("Downloading output from %s", first) | |
| r3 = requests.get(first, timeout=120) | |
| r3.raise_for_status() | |
| return r3.content | |
| if isinstance(first, bytes): | |
| return first | |
| if isinstance(output, str) and output.startswith("http"): | |
| r3 = requests.get(output, timeout=120) | |
| r3.raise_for_status() | |
| return r3.content | |
| # fallback: try to inspect nested structures | |
| if isinstance(output, dict): | |
| for k in ("image", "url", "output"): | |
| v = output.get(k) | |
| if isinstance(v, str) and v.startswith("http"): | |
| r3 = requests.get(v, timeout=120) | |
| r3.raise_for_status() | |
| return r3.content | |
| raise RuntimeError("Unknown Replicate prediction output format") | |
| def generate_image(prompt: str, size: str = "1024x1024", provider_order: str | None = None) -> str: | |
| """Generate an image from `prompt`. Returns local path to saved image (PNG). | |
| Behavior: tries providers in the order specified by the `PROVIDER_ORDER` | |
| environment variable (comma-separated). Supported providers: `openai`, | |
| `huggingface` (or `hf`), and `replicate`. If a provider fails, the code | |
| moves to the next provider. Default: `openai,replicate`. | |
| """ | |
| load_dotenv() | |
| logger.info("Generating image for prompt: %s", prompt) | |
| def generate_image_openai(local_prompt: str, local_size: str) -> bytes: | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY not set") | |
| raise RuntimeError("OPENAI_API_KEY not set") | |
| if openai_client is not None: | |
| resp = openai_client.images.generate(model="gpt-image-1", prompt=local_prompt, size=local_size) | |
| b64 = resp.data[0].b64_json | |
| return base64.b64decode(b64) | |
| elif openai_legacy is not None: | |
| openai_legacy.api_key = api_key | |
| resp = openai_legacy.Image.create(prompt=local_prompt, size=local_size, n=1) | |
| b64 = resp["data"][0]["b64_json"] | |
| return base64.b64decode(b64) | |
| else: | |
| raise RuntimeError("No OpenAI client available") | |
| # Provider order from env, default to openai then replicate | |
| if provider_order is None: | |
| provider_order = os.getenv("PROVIDER_ORDER", "openai,replicate") | |
| providers = [p.strip().lower() for p in provider_order.split(",") if p.strip()] | |
| if not providers: | |
| providers = ["openai", "replicate"] | |
| img_bytes = None | |
| last_exc = None | |
| for provider in providers: | |
| try: | |
| logger.info("Trying provider: %s", provider) | |
| if provider in ("openai", "oa"): | |
| img_bytes = generate_image_openai(prompt, size) | |
| elif provider in ("huggingface", "hf"): | |
| img_bytes = generate_image_hf(prompt, size=size) | |
| elif provider == "replicate": | |
| # use polling replicate fallback (checks every 8s) | |
| img_bytes = generate_image_replicate_poll(prompt, check_interval=8) | |
| else: | |
| logger.warning("Unknown provider '%s' — skipping", provider) | |
| continue | |
| # if generation succeeded, break loop | |
| if img_bytes: | |
| logger.info("Provider %s succeeded", provider) | |
| break | |
| except Exception as e: | |
| logger.exception("Provider %s failed: %s", provider, e) | |
| last_exc = e | |
| continue | |
| if not img_bytes: | |
| logger.error("All providers failed") | |
| if last_exc: | |
| raise SystemExit(1) from last_exc | |
| raise SystemExit(1) | |
| out_dir = os.path.join(os.getcwd(), "generated_images") | |
| os.makedirs(out_dir, exist_ok=True) | |
| ts = int(time.time()) | |
| filename = f"image_{ts}.png" | |
| path = os.path.join(out_dir, filename) | |
| with open(path, "wb") as f: | |
| f.write(img_bytes) | |
| logger.info("Saved generated image to %s", path) | |
| return path | |
| def post_image_to_facebook(page_id: str, access_token: str, image_path: str, caption: str | None = None) -> dict: | |
| url = f"https://graph.facebook.com/{page_id}/photos" | |
| data = {"access_token": access_token} | |
| if caption: | |
| data["caption"] = caption | |
| logger.info("Uploading image %s to Facebook page %s", image_path, page_id) | |
| with open(image_path, "rb") as imgf: | |
| files = {"source": imgf} | |
| resp = requests.post(url, files=files, data=data) | |
| try: | |
| resp.raise_for_status() | |
| except requests.HTTPError: | |
| logger.error("Facebook upload error: %s", resp.text) | |
| # write to log.txt | |
| try: | |
| with open("log.txt", "a", encoding="utf-8") as lf: | |
| lf.write(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_IMAGE_POST_ERROR page={page_id} image={image_path} response={resp.text}\n") | |
| except Exception: | |
| logger.exception("Failed to write to log.txt") | |
| raise | |
| logger.info("Upload successful: %s", resp.json()) | |
| try: | |
| with open("log.txt", "a", encoding="utf-8") as lf: | |
| data = resp.json() | |
| lf.write(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_IMAGE_POST_SUCCESS page={page_id} image={image_path} id={data.get('id')} post_id={data.get('post_id')}\n") | |
| except Exception: | |
| logger.exception("Failed to append image post info to log.txt") | |
| return resp.json() | |
| def generate_and_post(prompt: str, caption: str | None = None, post: bool = False, use_wisdom_as_prompt: bool = False, caption_template: str | None = None, use_wisdom_as_caption: bool = False, provider_order: str | None = None) -> dict: | |
| # If requested, generate a short wisdom text and use it (or append) as the image prompt | |
| image_prompt = prompt | |
| wisdom_text = None | |
| if use_wisdom_as_prompt: | |
| try: | |
| wisdom_text = generate_wisdom(prompt) | |
| # If no explicit prompt provided, use the wisdom as the image prompt | |
| if not image_prompt: | |
| image_prompt = wisdom_text | |
| else: | |
| # combine both: image prompt + the wisdom quote to guide imagery | |
| image_prompt = f"{image_prompt}. Quote: {wisdom_text}" | |
| except Exception as e: | |
| logger.exception("Failed to generate wisdom for image prompt: %s", e) | |
| # proceed using the original prompt | |
| img_path = generate_image(image_prompt, provider_order=provider_order) | |
| result = {"image_path": img_path} | |
| if wisdom_text: | |
| result["wisdom"] = wisdom_text | |
| if post: | |
| load_dotenv() | |
| page_id = os.getenv("FB_PAGE_ID") | |
| token = os.getenv("FB_PAGE_ACCESS_TOKEN") | |
| if not page_id or not token: | |
| logger.error("Missing FB_PAGE_ID or FB_PAGE_ACCESS_TOKEN in environment") | |
| raise SystemExit(1) | |
| # build final caption: explicit caption wins; then caption_template; then wisdom if requested | |
| final_caption = caption | |
| if not final_caption and caption_template: | |
| try: | |
| final_caption = caption_template.format(prompt=prompt or "", wisdom=wisdom_text or "") | |
| except Exception: | |
| logger.exception("Failed to format caption_template") | |
| final_caption = caption_template | |
| if not final_caption and use_wisdom_as_caption and wisdom_text: | |
| final_caption = wisdom_text | |
| try: | |
| res = post_image_to_facebook(page_id, token, img_path, final_caption) | |
| result["facebook"] = res | |
| except Exception as e: | |
| # enqueue a retry entry and return an 'enqueued' status instead of failing | |
| try: | |
| enqueue_retry({ | |
| "type": "image", | |
| "page_id": page_id, | |
| "access_token": token, | |
| "image_path": img_path, | |
| "caption": final_caption, | |
| }) | |
| logger.warning("Image post failed; enqueued for retry: %s", e) | |
| result["facebook"] = {"status": "enqueued", "reason": str(e)} | |
| except Exception: | |
| logger.exception("Failed to enqueue failed image post") | |
| raise | |
| return result | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Generate an image via OpenAI and optionally post to Facebook") | |
| parser.add_argument("-p", "--prompt", required=True, help="Prompt for image generation") | |
| parser.add_argument("--caption", help="Caption to use when posting to Facebook") | |
| parser.add_argument("--caption-template", help="Caption template, supports {prompt} and {wisdom}") | |
| parser.add_argument("--use-wisdom-as-caption", action="store_true", help="Use generated wisdom as caption if available") | |
| parser.add_argument("--post", action="store_true", help="Post image to Facebook after generation") | |
| args = parser.parse_args() | |
| res = generate_and_post( | |
| args.prompt, | |
| caption=args.caption, | |
| post=args.post, | |
| caption_template=args.caption_template, | |
| use_wisdom_as_caption=args.use_wisdom_as_caption, | |
| ) | |
| print(res) | |