File size: 8,431 Bytes
31eedc3
b8392c6
 
 
 
 
31eedc3
b8392c6
31eedc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b8392c6
31eedc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcabd54
31eedc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b8392c6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import time
import json
from typing import List, Dict, Any
from firebase_admin import messaging
from app.services.database import supabase
from app.utils.logging_config import log

# --- Data Fetching Functions ---
def fetch_user_notifications():
    log.debug("Fetching user_notifications from Supabase")
    start = time.time()
    resp = supabase.table("user_notifications").select("*").execute()
    data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
    error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
    if error:
        log.error("Failed to fetch user_notifications: %s", error)
        raise RuntimeError(f"Failed to fetch user_notifications: {error}")
    duration = time.time() - start
    log.info("Fetched %d user_notifications in %.3fs", len(data or []), duration)
    return data or []

def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, Any]]:
    if not event_ids:
        return []
    log.debug("Fetching event_attendance for event_ids: %s", event_ids)
    start = time.time()
    resp = supabase.table("event_attendance").select("*").in_("event_id", event_ids).execute()
    data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
    error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
    if error:
        log.error("Failed to fetch event_attendance: %s", error)
        raise RuntimeError(f"Failed to fetch event_attendance: {error}")
    duration = time.time() - start
    log.info("Fetched %d attendance rows for %d events in %.3fs", len(data or []), len(event_ids), duration)
    return data or []

def fetch_fcm_tokens_for_users(user_ids: List[int]) -> List[Dict[str, Any]]:
    if not user_ids:
        return []
    log.debug("Fetching FCM tokens for user_ids (count=%d)", len(user_ids))
    start = time.time()
    resp = (
        supabase.table("user_fcm_tokens")
        .select("user_id,fcm_token")
        .in_("user_id", user_ids)
        .execute()
    )
    data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
    error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
    if error:
        log.error("Failed to fetch user_fcm_tokens: %s", error)
        raise RuntimeError(f"Failed to fetch user_fcm_tokens: {error}")
    duration = time.time() - start
    log.info("Fetched %d FCM token rows in %.3fs", len(data or []), duration)
    return data or []

def upsert_notifications(rows: List[Dict[str, Any]]):
    if not rows:
        return None
    log.debug("Upserting %d notification rows", len(rows))
    start = time.time()
    resp = supabase.table("notifications").upsert(rows).execute()
    data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
    error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
    if error:
        log.error("Failed to upsert notifications: %s", error)
        raise RuntimeError(f"Failed to upsert notifications: {error}")
    duration = time.time() - start
    log.info("Upserted %d notifications in %.3fs", len(data or []), duration)
    return data

def delete_user_notifications_by_ids(ids: List[int]):
    if not ids:
        return None
    log.debug("Deleting %d user_notifications", len(ids))
    start = time.time()
    resp = supabase.table("user_notifications").delete().in_("id", ids).execute()
    data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
    error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
    if error:
        log.error("Failed to delete user_notifications: %s", error)
        raise RuntimeError(f"Failed to delete user_notifications: {error}")
    duration = time.time() - start
    log.info("Deleted %d user_notifications in %.3fs", len(data or []), duration)
    return data

def aggregate_notifications(notifications):
    agg = {}
    for notif in notifications:
        event_id = notif.get("event_id")
        if not event_id:
            continue
        if event_id not in agg:
            agg[event_id] = {
                "title": notif["title"],
                "body": notif["body"],
                "type": notif["type"],
                "priority": notif.get("priority", "normal"),
                "event_id": event_id,
                "user_ids": set(),
                "ids": [],
                "data": []
            }
        agg[event_id]["user_ids"].add(notif["user_id"])
        agg[event_id]["ids"].append(notif["id"])
        agg[event_id]["data"].append(notif.get("data", {}))
    log.debug("Aggregated notifications into %d events", len(agg))
    return agg

def send_fcm(tokens, title, body, data):
    if not tokens:
        log.debug("No tokens provided to send_fcm")
        return None
    def _sanitize_data(d):
        if not d:
            return {}
        out = {}
        for k, v in d.items():
            key = str(k)
            if v is None:
                out[key] = ""
            elif isinstance(v, str):
                out[key] = v
            else:
                try:
                    out[key] = json.dumps(v, separators=(',', ':')) if isinstance(v, (dict, list)) else str(v)
                except Exception:
                    out[key] = str(v)
        return out
    safe_data = _sanitize_data(data)
    message = messaging.MulticastMessage(
        notification=messaging.Notification(title=title, body=body),
        data=safe_data,
        tokens=tokens,
    )
    try:
        log.info("Sending FCM multicast message to %d tokens", len(tokens))
        res = messaging.send_each_for_multicast(message)
        log.info("FCM multicast send result: %s", res)
        return res
    except Exception as e:
        log.exception("Failed to send FCM multicast message: %s", e)
        raise

def notification_service():
    while True:
        try:
            notifications = fetch_user_notifications()
            if not notifications:
                log.debug("No notifications to process. Sleeping for 10 minutes.")
                time.sleep(600)
                continue
            agg = aggregate_notifications(notifications)
            event_ids = list(agg.keys())
            attendance_rows = fetch_event_attendance_for_events(event_ids)
            attendance_map: Dict[int, List[int]] = {}
            for r in attendance_rows:
                eid = r.get("event_id")
                uid = r.get("user_id")
                if eid is None or uid is None:
                    continue
                attendance_map.setdefault(eid, []).append(uid)
            all_user_ids = set()
            for v in agg.values():
                all_user_ids.update(v["user_ids"])
            for ulist in attendance_map.values():
                all_user_ids.update(ulist)
            all_user_ids = list(all_user_ids)
            token_rows = fetch_fcm_tokens_for_users(all_user_ids)
            token_map: Dict[int, str] = {r["user_id"]: r["fcm_token"] for r in token_rows}
            for event_id, notif in agg.items():
                user_ids = attendance_map.get(event_id, [])
                tokens = [token_map.get(uid) for uid in user_ids]
                tokens = [t for t in tokens if t]
                data = {"event_id": event_id, "type": notif["type"], "priority": notif["priority"]}
                response = send_fcm(tokens, notif["title"], notif["body"], data)
                log.info("Sent notification for event %s to %d users. fcm_response=%s", event_id, len(tokens), getattr(response, 'success_count', response))
                upsert_payload = [{
                    "title": notif["title"],
                    "message": notif["body"],
                    "audience": "attendees",
                    "event_id": event_id,
                    "user_ids": user_ids,
                    "status": "sent",
                    "metadata": data
                }]
                upsert_notifications(upsert_payload)
                delete_user_notifications_by_ids(notif["ids"])
                log.debug("Deleted processed notifications for event %s.", event_id)
        except Exception as e:
            log.exception("Error in notification service main loop: %s", e)
        time.sleep(600)