File size: 5,627 Bytes
5f2de2a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import os
import json
import time
import logging
import requests

logger = logging.getLogger(__name__)

QUEUE_FILE = os.getenv("RETRY_QUEUE_FILE", "retry_queue.jsonl")
PROCESSING_FILE = QUEUE_FILE + ".pending"
MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", "5"))
BACKOFF_BASE = int(os.getenv("RETRY_BACKOFF_BASE", "60"))
PROCESS_INTERVAL = int(os.getenv("RETRY_PROCESS_INTERVAL", "300"))


def enqueue(item: dict):
    try:
        os.makedirs(os.path.dirname(QUEUE_FILE) or '.', exist_ok=True)
        with open(QUEUE_FILE, "a", encoding="utf-8") as f:
            item.setdefault("attempts", 0)
            item.setdefault("created_at", int(time.time()))
            item.setdefault("next_try", int(time.time()))
            f.write(json.dumps(item, default=str) + "\n")
        logger.info("Enqueued retry item: %s", item.get("type"))
    except Exception:
        logger.exception("Failed to enqueue retry item")


def _process_entry(entry: dict) -> bool:
    """Attempt to process a single queue entry. Returns True on success."""
    typ = entry.get("type")
    page_id = entry.get("page_id")
    token = entry.get("access_token")
    try:
        if typ == "image":
            image_path = entry.get("image_path")
            caption = entry.get("caption")
            url = f"https://graph.facebook.com/{page_id}/photos"
            data = {"access_token": token}
            if caption:
                data["caption"] = caption
            with open(image_path, "rb") as imgf:
                files = {"source": imgf}
                resp = requests.post(url, files=files, data=data, timeout=20)
        else:
            # message or default
            message = entry.get("message")
            link = entry.get("link")
            url = f"https://graph.facebook.com/{page_id}/feed"
            payload = {"message": message, "access_token": token}
            if link:
                payload["link"] = link
            resp = requests.post(url, data=payload, timeout=20)

        resp.raise_for_status()
        # on success, append to log.txt
        try:
            with open("log.txt", "a", encoding="utf-8") as lf:
                lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_SUCCESS type={typ} page={page_id} response={resp.text}\n")
        except Exception:
            logger.exception("Failed to write retry success to log.txt")
        return True
    except requests.RequestException as e:
        # network or HTTP error
        logger.warning("Retry item failed (will retry if attempts remain): %s", e)
        entry["last_error"] = str(e)
        return False
    except Exception as e:
        logger.exception("Unexpected error processing retry item: %s", e)
        entry["last_error"] = str(e)
        return False


def _process_batch(entries: list) -> list:
    requeue = []
    now = int(time.time())
    for entry in entries:
        next_try = int(entry.get("next_try", 0))
        if next_try > now:
            requeue.append(entry)
            continue
        success = _process_entry(entry)
        if not success:
            attempts = int(entry.get("attempts", 0)) + 1
            if attempts >= MAX_ATTEMPTS:
                # give up and log
                try:
                    with open("log.txt", "a", encoding="utf-8") as lf:
                        lf.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FB_RETRY_GIVEUP type={entry.get('type')} page={entry.get('page_id')} attempts={attempts} error={entry.get('last_error')}\n")
                except Exception:
                    logger.exception("Failed to write giveup to log.txt")
                continue
            # schedule retry with exponential backoff
            delay = BACKOFF_BASE * (2 ** (attempts - 1))
            entry["attempts"] = attempts
            entry["next_try"] = now + delay
            requeue.append(entry)
    return requeue


def process_once():
    if not os.path.exists(QUEUE_FILE):
        return
    try:
        # move to pending file for processing
        try:
            os.replace(QUEUE_FILE, PROCESSING_FILE)
        except FileNotFoundError:
            return
        entries = []
        try:
            with open(PROCESSING_FILE, "r", encoding="utf-8") as pf:
                for ln in pf:
                    ln = ln.strip()
                    if not ln:
                        continue
                    try:
                        entries.append(json.loads(ln))
                    except Exception:
                        logger.exception("Bad JSON line in retry queue")
        except Exception:
            logger.exception("Failed to read processing file")

        requeue = _process_batch(entries)

        # append requeued items back to queue file
        if requeue:
            try:
                with open(QUEUE_FILE, "a", encoding="utf-8") as qf:
                    for item in requeue:
                        qf.write(json.dumps(item, default=str) + "\n")
            except Exception:
                logger.exception("Failed to write requeue items")

    finally:
        try:
            if os.path.exists(PROCESSING_FILE):
                os.remove(PROCESSING_FILE)
        except Exception:
            pass


def run_worker():
    logger.info("Starting retry queue worker (interval %s seconds)", PROCESS_INTERVAL)
    while True:
        try:
            process_once()
        except Exception:
            logger.exception("Retry worker crashed")
        time.sleep(PROCESS_INTERVAL)


def start_worker_thread():
    import threading
    t = threading.Thread(target=run_worker, daemon=True)
    t.start()
    return t