monk / retry_queue.py
hf-actions
feat: persistent retry queue for failed FB posts; enqueue on failures; start worker
5f2de2a
import os
import json
import time
import logging
import requests
logger = logging.getLogger(__name__)
QUEUE_FILE = os.getenv("RETRY_QUEUE_FILE", "retry_queue.jsonl")
PROCESSING_FILE = QUEUE_FILE + ".pending"
MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", "5"))
BACKOFF_BASE = int(os.getenv("RETRY_BACKOFF_BASE", "60"))
PROCESS_INTERVAL = int(os.getenv("RETRY_PROCESS_INTERVAL", "300"))
def enqueue(item: dict):
try:
os.makedirs(os.path.dirname(QUEUE_FILE) or '.', exist_ok=True)
with open(QUEUE_FILE, "a", encoding="utf-8") as f:
item.setdefault("attempts", 0)
item.setdefault("created_at", int(time.time()))
item.setdefault("next_try", int(time.time()))
f.write(json.dumps(item, default=str) + "\n")
logger.info("Enqueued retry item: %s", item.get("type"))
except Exception:
logger.exception("Failed to enqueue retry item")
def _process_entry(entry: dict) -> bool:
"""Attempt to process a single queue entry. Returns True on success."""
typ = entry.get("type")
page_id = entry.get("page_id")
token = entry.get("access_token")
try:
if typ == "image":
image_path = entry.get("image_path")
caption = entry.get("caption")
url = f"https://graph.facebook.com/{page_id}/photos"
data = {"access_token": token}
if caption:
data["caption"] = caption
with open(image_path, "rb") as imgf:
files = {"source": imgf}
resp = requests.post(url, files=files, data=data, timeout=20)
else:
# message or default
message = entry.get("message")
link = entry.get("link")
url = f"https://graph.facebook.com/{page_id}/feed"
payload = {"message": message, "access_token": token}
if link:
payload["link"] = link
resp = requests.post(url, data=payload, timeout=20)
resp.raise_for_status()
# on success, append to log.txt
try:
with open("log.txt", "a", encoding="utf-8") as lf:
lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_SUCCESS type={typ} page={page_id} response={resp.text}\n")
except Exception:
logger.exception("Failed to write retry success to log.txt")
return True
except requests.RequestException as e:
# network or HTTP error
logger.warning("Retry item failed (will retry if attempts remain): %s", e)
entry["last_error"] = str(e)
return False
except Exception as e:
logger.exception("Unexpected error processing retry item: %s", e)
entry["last_error"] = str(e)
return False
def _process_batch(entries: list) -> list:
requeue = []
now = int(time.time())
for entry in entries:
next_try = int(entry.get("next_try", 0))
if next_try > now:
requeue.append(entry)
continue
success = _process_entry(entry)
if not success:
attempts = int(entry.get("attempts", 0)) + 1
if attempts >= MAX_ATTEMPTS:
# give up and log
try:
with open("log.txt", "a", encoding="utf-8") as lf:
lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_GIVEUP type={entry.get('type')} page={entry.get('page_id')} attempts={attempts} error={entry.get('last_error')}\n")
except Exception:
logger.exception("Failed to write giveup to log.txt")
continue
# schedule retry with exponential backoff
delay = BACKOFF_BASE * (2 ** (attempts - 1))
entry["attempts"] = attempts
entry["next_try"] = now + delay
requeue.append(entry)
return requeue
def process_once():
if not os.path.exists(QUEUE_FILE):
return
try:
# move to pending file for processing
try:
os.replace(QUEUE_FILE, PROCESSING_FILE)
except FileNotFoundError:
return
entries = []
try:
with open(PROCESSING_FILE, "r", encoding="utf-8") as pf:
for ln in pf:
ln = ln.strip()
if not ln:
continue
try:
entries.append(json.loads(ln))
except Exception:
logger.exception("Bad JSON line in retry queue")
except Exception:
logger.exception("Failed to read processing file")
requeue = _process_batch(entries)
# append requeued items back to queue file
if requeue:
try:
with open(QUEUE_FILE, "a", encoding="utf-8") as qf:
for item in requeue:
qf.write(json.dumps(item, default=str) + "\n")
except Exception:
logger.exception("Failed to write requeue items")
finally:
try:
if os.path.exists(PROCESSING_FILE):
os.remove(PROCESSING_FILE)
except Exception:
pass
def run_worker():
logger.info("Starting retry queue worker (interval %s seconds)", PROCESS_INTERVAL)
while True:
try:
process_once()
except Exception:
logger.exception("Retry worker crashed")
time.sleep(PROCESS_INTERVAL)
def start_worker_thread():
import threading
t = threading.Thread(target=run_worker, daemon=True)
t.start()
return t