Spaces:
Sleeping
Sleeping
File size: 5,627 Bytes
5f2de2a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import os
import json
import time
import logging
import requests
logger = logging.getLogger(__name__)
QUEUE_FILE = os.getenv("RETRY_QUEUE_FILE", "retry_queue.jsonl")
PROCESSING_FILE = QUEUE_FILE + ".pending"
MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", "5"))
BACKOFF_BASE = int(os.getenv("RETRY_BACKOFF_BASE", "60"))
PROCESS_INTERVAL = int(os.getenv("RETRY_PROCESS_INTERVAL", "300"))
def enqueue(item: dict):
try:
os.makedirs(os.path.dirname(QUEUE_FILE) or '.', exist_ok=True)
with open(QUEUE_FILE, "a", encoding="utf-8") as f:
item.setdefault("attempts", 0)
item.setdefault("created_at", int(time.time()))
item.setdefault("next_try", int(time.time()))
f.write(json.dumps(item, default=str) + "\n")
logger.info("Enqueued retry item: %s", item.get("type"))
except Exception:
logger.exception("Failed to enqueue retry item")
def _process_entry(entry: dict) -> bool:
"""Attempt to process a single queue entry. Returns True on success."""
typ = entry.get("type")
page_id = entry.get("page_id")
token = entry.get("access_token")
try:
if typ == "image":
image_path = entry.get("image_path")
caption = entry.get("caption")
url = f"https://graph.facebook.com/{page_id}/photos"
data = {"access_token": token}
if caption:
data["caption"] = caption
with open(image_path, "rb") as imgf:
files = {"source": imgf}
resp = requests.post(url, files=files, data=data, timeout=20)
else:
# message or default
message = entry.get("message")
link = entry.get("link")
url = f"https://graph.facebook.com/{page_id}/feed"
payload = {"message": message, "access_token": token}
if link:
payload["link"] = link
resp = requests.post(url, data=payload, timeout=20)
resp.raise_for_status()
# on success, append to log.txt
try:
with open("log.txt", "a", encoding="utf-8") as lf:
lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_SUCCESS type={typ} page={page_id} response={resp.text}\n")
except Exception:
logger.exception("Failed to write retry success to log.txt")
return True
except requests.RequestException as e:
# network or HTTP error
logger.warning("Retry item failed (will retry if attempts remain): %s", e)
entry["last_error"] = str(e)
return False
except Exception as e:
logger.exception("Unexpected error processing retry item: %s", e)
entry["last_error"] = str(e)
return False
def _process_batch(entries: list) -> list:
requeue = []
now = int(time.time())
for entry in entries:
next_try = int(entry.get("next_try", 0))
if next_try > now:
requeue.append(entry)
continue
success = _process_entry(entry)
if not success:
attempts = int(entry.get("attempts", 0)) + 1
if attempts >= MAX_ATTEMPTS:
# give up and log
try:
with open("log.txt", "a", encoding="utf-8") as lf:
lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_GIVEUP type={entry.get('type')} page={entry.get('page_id')} attempts={attempts} error={entry.get('last_error')}\n")
except Exception:
logger.exception("Failed to write giveup to log.txt")
continue
# schedule retry with exponential backoff
delay = BACKOFF_BASE * (2 ** (attempts - 1))
entry["attempts"] = attempts
entry["next_try"] = now + delay
requeue.append(entry)
return requeue
def process_once():
if not os.path.exists(QUEUE_FILE):
return
try:
# move to pending file for processing
try:
os.replace(QUEUE_FILE, PROCESSING_FILE)
except FileNotFoundError:
return
entries = []
try:
with open(PROCESSING_FILE, "r", encoding="utf-8") as pf:
for ln in pf:
ln = ln.strip()
if not ln:
continue
try:
entries.append(json.loads(ln))
except Exception:
logger.exception("Bad JSON line in retry queue")
except Exception:
logger.exception("Failed to read processing file")
requeue = _process_batch(entries)
# append requeued items back to queue file
if requeue:
try:
with open(QUEUE_FILE, "a", encoding="utf-8") as qf:
for item in requeue:
qf.write(json.dumps(item, default=str) + "\n")
except Exception:
logger.exception("Failed to write requeue items")
finally:
try:
if os.path.exists(PROCESSING_FILE):
os.remove(PROCESSING_FILE)
except Exception:
pass
def run_worker():
logger.info("Starting retry queue worker (interval %s seconds)", PROCESS_INTERVAL)
while True:
try:
process_once()
except Exception:
logger.exception("Retry worker crashed")
time.sleep(PROCESS_INTERVAL)
def start_worker_thread():
import threading
t = threading.Thread(target=run_worker, daemon=True)
t.start()
return t
|