import os import json import time import logging import requests logger = logging.getLogger(__name__) QUEUE_FILE = os.getenv("RETRY_QUEUE_FILE", "retry_queue.jsonl") PROCESSING_FILE = QUEUE_FILE + ".pending" MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", "5")) BACKOFF_BASE = int(os.getenv("RETRY_BACKOFF_BASE", "60")) PROCESS_INTERVAL = int(os.getenv("RETRY_PROCESS_INTERVAL", "300")) def enqueue(item: dict): try: os.makedirs(os.path.dirname(QUEUE_FILE) or '.', exist_ok=True) with open(QUEUE_FILE, "a", encoding="utf-8") as f: item.setdefault("attempts", 0) item.setdefault("created_at", int(time.time())) item.setdefault("next_try", int(time.time())) f.write(json.dumps(item, default=str) + "\n") logger.info("Enqueued retry item: %s", item.get("type")) except Exception: logger.exception("Failed to enqueue retry item") def _process_entry(entry: dict) -> bool: """Attempt to process a single queue entry. Returns True on success.""" typ = entry.get("type") page_id = entry.get("page_id") token = entry.get("access_token") try: if typ == "image": image_path = entry.get("image_path") caption = entry.get("caption") url = f"https://graph.facebook.com/{page_id}/photos" data = {"access_token": token} if caption: data["caption"] = caption with open(image_path, "rb") as imgf: files = {"source": imgf} resp = requests.post(url, files=files, data=data, timeout=20) else: # message or default message = entry.get("message") link = entry.get("link") url = f"https://graph.facebook.com/{page_id}/feed" payload = {"message": message, "access_token": token} if link: payload["link"] = link resp = requests.post(url, data=payload, timeout=20) resp.raise_for_status() # on success, append to log.txt try: with open("log.txt", "a", encoding="utf-8") as lf: lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_SUCCESS type={typ} page={page_id} response={resp.text}\n") except Exception: logger.exception("Failed to write retry success to log.txt") return True except requests.RequestException as e: # network or HTTP error logger.warning("Retry item failed (will retry if attempts remain): %s", e) entry["last_error"] = str(e) return False except Exception as e: logger.exception("Unexpected error processing retry item: %s", e) entry["last_error"] = str(e) return False def _process_batch(entries: list) -> list: requeue = [] now = int(time.time()) for entry in entries: next_try = int(entry.get("next_try", 0)) if next_try > now: requeue.append(entry) continue success = _process_entry(entry) if not success: attempts = int(entry.get("attempts", 0)) + 1 if attempts >= MAX_ATTEMPTS: # give up and log try: with open("log.txt", "a", encoding="utf-8") as lf: lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_GIVEUP type={entry.get('type')} page={entry.get('page_id')} attempts={attempts} error={entry.get('last_error')}\n") except Exception: logger.exception("Failed to write giveup to log.txt") continue # schedule retry with exponential backoff delay = BACKOFF_BASE * (2 ** (attempts - 1)) entry["attempts"] = attempts entry["next_try"] = now + delay requeue.append(entry) return requeue def process_once(): if not os.path.exists(QUEUE_FILE): return try: # move to pending file for processing try: os.replace(QUEUE_FILE, PROCESSING_FILE) except FileNotFoundError: return entries = [] try: with open(PROCESSING_FILE, "r", encoding="utf-8") as pf: for ln in pf: ln = ln.strip() if not ln: continue try: entries.append(json.loads(ln)) except Exception: logger.exception("Bad JSON line in retry queue") except Exception: logger.exception("Failed to read processing file") requeue = _process_batch(entries) # append requeued items back to queue file if requeue: try: with open(QUEUE_FILE, "a", encoding="utf-8") as qf: for item in requeue: qf.write(json.dumps(item, default=str) + "\n") except Exception: logger.exception("Failed to write requeue items") finally: try: if os.path.exists(PROCESSING_FILE): os.remove(PROCESSING_FILE) except Exception: pass def run_worker(): logger.info("Starting retry queue worker (interval %s seconds)", PROCESS_INTERVAL) while True: try: process_once() except Exception: logger.exception("Retry worker crashed") time.sleep(PROCESS_INTERVAL) def start_worker_thread(): import threading t = threading.Thread(target=run_worker, daemon=True) t.start() return t