Spaces:
Sleeping
Sleeping
hf-actions
feat: persistent retry queue for failed FB posts; enqueue on failures; start worker
5f2de2a
| import os | |
| import json | |
| import time | |
| import logging | |
| import requests | |
| logger = logging.getLogger(__name__) | |
| QUEUE_FILE = os.getenv("RETRY_QUEUE_FILE", "retry_queue.jsonl") | |
| PROCESSING_FILE = QUEUE_FILE + ".pending" | |
| MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", "5")) | |
| BACKOFF_BASE = int(os.getenv("RETRY_BACKOFF_BASE", "60")) | |
| PROCESS_INTERVAL = int(os.getenv("RETRY_PROCESS_INTERVAL", "300")) | |
| def enqueue(item: dict): | |
| try: | |
| os.makedirs(os.path.dirname(QUEUE_FILE) or '.', exist_ok=True) | |
| with open(QUEUE_FILE, "a", encoding="utf-8") as f: | |
| item.setdefault("attempts", 0) | |
| item.setdefault("created_at", int(time.time())) | |
| item.setdefault("next_try", int(time.time())) | |
| f.write(json.dumps(item, default=str) + "\n") | |
| logger.info("Enqueued retry item: %s", item.get("type")) | |
| except Exception: | |
| logger.exception("Failed to enqueue retry item") | |
| def _process_entry(entry: dict) -> bool: | |
| """Attempt to process a single queue entry. Returns True on success.""" | |
| typ = entry.get("type") | |
| page_id = entry.get("page_id") | |
| token = entry.get("access_token") | |
| try: | |
| if typ == "image": | |
| image_path = entry.get("image_path") | |
| caption = entry.get("caption") | |
| url = f"https://graph.facebook.com/{page_id}/photos" | |
| data = {"access_token": token} | |
| if caption: | |
| data["caption"] = caption | |
| with open(image_path, "rb") as imgf: | |
| files = {"source": imgf} | |
| resp = requests.post(url, files=files, data=data, timeout=20) | |
| else: | |
| # message or default | |
| message = entry.get("message") | |
| link = entry.get("link") | |
| url = f"https://graph.facebook.com/{page_id}/feed" | |
| payload = {"message": message, "access_token": token} | |
| if link: | |
| payload["link"] = link | |
| resp = requests.post(url, data=payload, timeout=20) | |
| resp.raise_for_status() | |
| # on success, append to log.txt | |
| try: | |
| with open("log.txt", "a", encoding="utf-8") as lf: | |
| lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_SUCCESS type={typ} page={page_id} response={resp.text}\n") | |
| except Exception: | |
| logger.exception("Failed to write retry success to log.txt") | |
| return True | |
| except requests.RequestException as e: | |
| # network or HTTP error | |
| logger.warning("Retry item failed (will retry if attempts remain): %s", e) | |
| entry["last_error"] = str(e) | |
| return False | |
| except Exception as e: | |
| logger.exception("Unexpected error processing retry item: %s", e) | |
| entry["last_error"] = str(e) | |
| return False | |
| def _process_batch(entries: list) -> list: | |
| requeue = [] | |
| now = int(time.time()) | |
| for entry in entries: | |
| next_try = int(entry.get("next_try", 0)) | |
| if next_try > now: | |
| requeue.append(entry) | |
| continue | |
| success = _process_entry(entry) | |
| if not success: | |
| attempts = int(entry.get("attempts", 0)) + 1 | |
| if attempts >= MAX_ATTEMPTS: | |
| # give up and log | |
| try: | |
| with open("log.txt", "a", encoding="utf-8") as lf: | |
| lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_GIVEUP type={entry.get('type')} page={entry.get('page_id')} attempts={attempts} error={entry.get('last_error')}\n") | |
| except Exception: | |
| logger.exception("Failed to write giveup to log.txt") | |
| continue | |
| # schedule retry with exponential backoff | |
| delay = BACKOFF_BASE * (2 ** (attempts - 1)) | |
| entry["attempts"] = attempts | |
| entry["next_try"] = now + delay | |
| requeue.append(entry) | |
| return requeue | |
| def process_once(): | |
| if not os.path.exists(QUEUE_FILE): | |
| return | |
| try: | |
| # move to pending file for processing | |
| try: | |
| os.replace(QUEUE_FILE, PROCESSING_FILE) | |
| except FileNotFoundError: | |
| return | |
| entries = [] | |
| try: | |
| with open(PROCESSING_FILE, "r", encoding="utf-8") as pf: | |
| for ln in pf: | |
| ln = ln.strip() | |
| if not ln: | |
| continue | |
| try: | |
| entries.append(json.loads(ln)) | |
| except Exception: | |
| logger.exception("Bad JSON line in retry queue") | |
| except Exception: | |
| logger.exception("Failed to read processing file") | |
| requeue = _process_batch(entries) | |
| # append requeued items back to queue file | |
| if requeue: | |
| try: | |
| with open(QUEUE_FILE, "a", encoding="utf-8") as qf: | |
| for item in requeue: | |
| qf.write(json.dumps(item, default=str) + "\n") | |
| except Exception: | |
| logger.exception("Failed to write requeue items") | |
| finally: | |
| try: | |
| if os.path.exists(PROCESSING_FILE): | |
| os.remove(PROCESSING_FILE) | |
| except Exception: | |
| pass | |
| def run_worker(): | |
| logger.info("Starting retry queue worker (interval %s seconds)", PROCESS_INTERVAL) | |
| while True: | |
| try: | |
| process_once() | |
| except Exception: | |
| logger.exception("Retry worker crashed") | |
| time.sleep(PROCESS_INTERVAL) | |
| def start_worker_thread(): | |
| import threading | |
| t = threading.Thread(target=run_worker, daemon=True) | |
| t.start() | |
| return t | |