hf-actions commited on
Commit
5f2de2a
·
1 Parent(s): 8216f24

feat: persistent retry queue for failed FB posts; enqueue on failures; start worker

Browse files
Files changed (4) hide show
  1. app.py +7 -1
  2. generate_image.py +19 -2
  3. post_to_facebook.py +11 -0
  4. retry_queue.py +157 -0
app.py CHANGED
@@ -36,7 +36,7 @@ atexit.register(_close_asyncio_loop)
36
  from generate_wisdom import generate_wisdom
37
  from post_to_facebook import post_to_facebook
38
  from generate_image import generate_image, generate_and_post
39
- from generate_image import generate_and_post
40
 
41
 
42
  def generate_and_optionally_post(topic: str, do_post: bool):
@@ -357,4 +357,10 @@ Ultra-realistic photography style, fine-art cinematic composition, calming mood,
357
  else:
358
  logger.error("Token validation failed — immediate run will not start")
359
 
 
 
 
 
 
 
360
  demo.launch()
 
36
  from generate_wisdom import generate_wisdom
37
  from post_to_facebook import post_to_facebook
38
  from generate_image import generate_image, generate_and_post
39
+ from retry_queue import start_worker_thread
40
 
41
 
42
  def generate_and_optionally_post(topic: str, do_post: bool):
 
357
  else:
358
  logger.error("Token validation failed — immediate run will not start")
359
 
360
+ # start background retry worker for any enqueued failed posts
361
+ try:
362
+ start_worker_thread()
363
+ logger.info("Started retry queue worker thread")
364
+ except Exception:
365
+ logger.exception("Failed to start retry queue worker thread")
366
  demo.launch()
generate_image.py CHANGED
@@ -18,6 +18,7 @@ except Exception:
18
  openai_legacy = None
19
 
20
  import requests
 
21
 
22
  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
23
  logger = logging.getLogger(__name__)
@@ -490,8 +491,24 @@ def generate_and_post(prompt: str, caption: str | None = None, post: bool = Fals
490
  if not final_caption and use_wisdom_as_caption and wisdom_text:
491
  final_caption = wisdom_text
492
 
493
- res = post_image_to_facebook(page_id, token, img_path, final_caption)
494
- result["facebook"] = res
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
495
  return result
496
 
497
 
 
18
  openai_legacy = None
19
 
20
  import requests
21
+ from retry_queue import enqueue as enqueue_retry
22
 
23
  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
24
  logger = logging.getLogger(__name__)
 
491
  if not final_caption and use_wisdom_as_caption and wisdom_text:
492
  final_caption = wisdom_text
493
 
494
+ try:
495
+ res = post_image_to_facebook(page_id, token, img_path, final_caption)
496
+ result["facebook"] = res
497
+ except Exception as e:
498
+ # enqueue a retry entry and return an 'enqueued' status instead of failing
499
+ try:
500
+ enqueue_retry({
501
+ "type": "image",
502
+ "page_id": page_id,
503
+ "access_token": token,
504
+ "image_path": img_path,
505
+ "caption": final_caption,
506
+ })
507
+ logger.warning("Image post failed; enqueued for retry: %s", e)
508
+ result["facebook"] = {"status": "enqueued", "reason": str(e)}
509
+ except Exception:
510
+ logger.exception("Failed to enqueue failed image post")
511
+ raise
512
  return result
513
 
514
 
post_to_facebook.py CHANGED
@@ -6,6 +6,7 @@ import socket
6
  import time
7
  from requests.exceptions import RequestException
8
  from dotenv import load_dotenv
 
9
 
10
  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
11
  logger = logging.getLogger(__name__)
@@ -31,6 +32,11 @@ def post_to_facebook(page_id: str, access_token: str, message: str, link: str |
31
  except Exception as e:
32
  logger.error("DNS resolution failed for graph.facebook.com: %s", e)
33
  _append_log(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_DNS_RESOLUTION_FAILED {e}")
 
 
 
 
 
34
  raise
35
 
36
  # Retry with exponential backoff for transient network issues
@@ -52,6 +58,11 @@ def post_to_facebook(page_id: str, access_token: str, message: str, link: str |
52
  else:
53
  logger.error("All Facebook POST attempts failed")
54
  _append_log(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_POST_ERROR page={page_id} message={message[:120]!r} exception={e}")
 
 
 
 
 
55
  raise
56
  data = resp.json()
57
  logger.info("Post successful: %s", data)
 
6
  import time
7
  from requests.exceptions import RequestException
8
  from dotenv import load_dotenv
9
+ from retry_queue import enqueue as enqueue_retry
10
 
11
  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
12
  logger = logging.getLogger(__name__)
 
32
  except Exception as e:
33
  logger.error("DNS resolution failed for graph.facebook.com: %s", e)
34
  _append_log(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_DNS_RESOLUTION_FAILED {e}")
35
+ # enqueue for later retry
36
+ try:
37
+ enqueue_retry({"type": "message", "page_id": page_id, "access_token": access_token, "message": message, "link": link})
38
+ except Exception:
39
+ logger.exception("Failed to enqueue message after DNS failure")
40
  raise
41
 
42
  # Retry with exponential backoff for transient network issues
 
58
  else:
59
  logger.error("All Facebook POST attempts failed")
60
  _append_log(f"[{__import__('time').strftime('%Y-%m-%d %H:%M:%S')}] FB_POST_ERROR page={page_id} message={message[:120]!r} exception={e}")
61
+ # enqueue for retry
62
+ try:
63
+ enqueue_retry({"type": "message", "page_id": page_id, "access_token": access_token, "message": message, "link": link})
64
+ except Exception:
65
+ logger.exception("Failed to enqueue failed message post")
66
  raise
67
  data = resp.json()
68
  logger.info("Post successful: %s", data)
retry_queue.py ADDED
@@ -0,0 +1,157 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import logging
5
+ import requests
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+ QUEUE_FILE = os.getenv("RETRY_QUEUE_FILE", "retry_queue.jsonl")
10
+ PROCESSING_FILE = QUEUE_FILE + ".pending"
11
+ MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", "5"))
12
+ BACKOFF_BASE = int(os.getenv("RETRY_BACKOFF_BASE", "60"))
13
+ PROCESS_INTERVAL = int(os.getenv("RETRY_PROCESS_INTERVAL", "300"))
14
+
15
+
16
+ def enqueue(item: dict):
17
+ try:
18
+ os.makedirs(os.path.dirname(QUEUE_FILE) or '.', exist_ok=True)
19
+ with open(QUEUE_FILE, "a", encoding="utf-8") as f:
20
+ item.setdefault("attempts", 0)
21
+ item.setdefault("created_at", int(time.time()))
22
+ item.setdefault("next_try", int(time.time()))
23
+ f.write(json.dumps(item, default=str) + "\n")
24
+ logger.info("Enqueued retry item: %s", item.get("type"))
25
+ except Exception:
26
+ logger.exception("Failed to enqueue retry item")
27
+
28
+
29
+ def _process_entry(entry: dict) -> bool:
30
+ """Attempt to process a single queue entry. Returns True on success."""
31
+ typ = entry.get("type")
32
+ page_id = entry.get("page_id")
33
+ token = entry.get("access_token")
34
+ try:
35
+ if typ == "image":
36
+ image_path = entry.get("image_path")
37
+ caption = entry.get("caption")
38
+ url = f"https://graph.facebook.com/{page_id}/photos"
39
+ data = {"access_token": token}
40
+ if caption:
41
+ data["caption"] = caption
42
+ with open(image_path, "rb") as imgf:
43
+ files = {"source": imgf}
44
+ resp = requests.post(url, files=files, data=data, timeout=20)
45
+ else:
46
+ # message or default
47
+ message = entry.get("message")
48
+ link = entry.get("link")
49
+ url = f"https://graph.facebook.com/{page_id}/feed"
50
+ payload = {"message": message, "access_token": token}
51
+ if link:
52
+ payload["link"] = link
53
+ resp = requests.post(url, data=payload, timeout=20)
54
+
55
+ resp.raise_for_status()
56
+ # on success, append to log.txt
57
+ try:
58
+ with open("log.txt", "a", encoding="utf-8") as lf:
59
+ lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_SUCCESS type={typ} page={page_id} response={resp.text}\n")
60
+ except Exception:
61
+ logger.exception("Failed to write retry success to log.txt")
62
+ return True
63
+ except requests.RequestException as e:
64
+ # network or HTTP error
65
+ logger.warning("Retry item failed (will retry if attempts remain): %s", e)
66
+ entry["last_error"] = str(e)
67
+ return False
68
+ except Exception as e:
69
+ logger.exception("Unexpected error processing retry item: %s", e)
70
+ entry["last_error"] = str(e)
71
+ return False
72
+
73
+
74
+ def _process_batch(entries: list) -> list:
75
+ requeue = []
76
+ now = int(time.time())
77
+ for entry in entries:
78
+ next_try = int(entry.get("next_try", 0))
79
+ if next_try > now:
80
+ requeue.append(entry)
81
+ continue
82
+ success = _process_entry(entry)
83
+ if not success:
84
+ attempts = int(entry.get("attempts", 0)) + 1
85
+ if attempts >= MAX_ATTEMPTS:
86
+ # give up and log
87
+ try:
88
+ with open("log.txt", "a", encoding="utf-8") as lf:
89
+ lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_GIVEUP type={entry.get('type')} page={entry.get('page_id')} attempts={attempts} error={entry.get('last_error')}\n")
90
+ except Exception:
91
+ logger.exception("Failed to write giveup to log.txt")
92
+ continue
93
+ # schedule retry with exponential backoff
94
+ delay = BACKOFF_BASE * (2 ** (attempts - 1))
95
+ entry["attempts"] = attempts
96
+ entry["next_try"] = now + delay
97
+ requeue.append(entry)
98
+ return requeue
99
+
100
+
101
+ def process_once():
102
+ if not os.path.exists(QUEUE_FILE):
103
+ return
104
+ try:
105
+ # move to pending file for processing
106
+ try:
107
+ os.replace(QUEUE_FILE, PROCESSING_FILE)
108
+ except FileNotFoundError:
109
+ return
110
+ entries = []
111
+ try:
112
+ with open(PROCESSING_FILE, "r", encoding="utf-8") as pf:
113
+ for ln in pf:
114
+ ln = ln.strip()
115
+ if not ln:
116
+ continue
117
+ try:
118
+ entries.append(json.loads(ln))
119
+ except Exception:
120
+ logger.exception("Bad JSON line in retry queue")
121
+ except Exception:
122
+ logger.exception("Failed to read processing file")
123
+
124
+ requeue = _process_batch(entries)
125
+
126
+ # append requeued items back to queue file
127
+ if requeue:
128
+ try:
129
+ with open(QUEUE_FILE, "a", encoding="utf-8") as qf:
130
+ for item in requeue:
131
+ qf.write(json.dumps(item, default=str) + "\n")
132
+ except Exception:
133
+ logger.exception("Failed to write requeue items")
134
+
135
+ finally:
136
+ try:
137
+ if os.path.exists(PROCESSING_FILE):
138
+ os.remove(PROCESSING_FILE)
139
+ except Exception:
140
+ pass
141
+
142
+
143
+ def run_worker():
144
+ logger.info("Starting retry queue worker (interval %s seconds)", PROCESS_INTERVAL)
145
+ while True:
146
+ try:
147
+ process_once()
148
+ except Exception:
149
+ logger.exception("Retry worker crashed")
150
+ time.sleep(PROCESS_INTERVAL)
151
+
152
+
153
+ def start_worker_thread():
154
+ import threading
155
+ t = threading.Thread(target=run_worker, daemon=True)
156
+ t.start()
157
+ return t