Spaces:
Running
Running
File size: 8,431 Bytes
31eedc3 b8392c6 31eedc3 b8392c6 31eedc3 b8392c6 31eedc3 dcabd54 31eedc3 b8392c6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
import time
import json
from typing import List, Dict, Any
from firebase_admin import messaging
from app.services.database import supabase
from app.utils.logging_config import log
# --- Data Fetching Functions ---
def fetch_user_notifications():
log.debug("Fetching user_notifications from Supabase")
start = time.time()
resp = supabase.table("user_notifications").select("*").execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to fetch user_notifications: %s", error)
raise RuntimeError(f"Failed to fetch user_notifications: {error}")
duration = time.time() - start
log.info("Fetched %d user_notifications in %.3fs", len(data or []), duration)
return data or []
def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, Any]]:
if not event_ids:
return []
log.debug("Fetching event_attendance for event_ids: %s", event_ids)
start = time.time()
resp = supabase.table("event_attendance").select("*").in_("event_id", event_ids).execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to fetch event_attendance: %s", error)
raise RuntimeError(f"Failed to fetch event_attendance: {error}")
duration = time.time() - start
log.info("Fetched %d attendance rows for %d events in %.3fs", len(data or []), len(event_ids), duration)
return data or []
def fetch_fcm_tokens_for_users(user_ids: List[int]) -> List[Dict[str, Any]]:
if not user_ids:
return []
log.debug("Fetching FCM tokens for user_ids (count=%d)", len(user_ids))
start = time.time()
resp = (
supabase.table("user_fcm_tokens")
.select("user_id,fcm_token")
.in_("user_id", user_ids)
.execute()
)
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to fetch user_fcm_tokens: %s", error)
raise RuntimeError(f"Failed to fetch user_fcm_tokens: {error}")
duration = time.time() - start
log.info("Fetched %d FCM token rows in %.3fs", len(data or []), duration)
return data or []
def upsert_notifications(rows: List[Dict[str, Any]]):
if not rows:
return None
log.debug("Upserting %d notification rows", len(rows))
start = time.time()
resp = supabase.table("notifications").upsert(rows).execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to upsert notifications: %s", error)
raise RuntimeError(f"Failed to upsert notifications: {error}")
duration = time.time() - start
log.info("Upserted %d notifications in %.3fs", len(data or []), duration)
return data
def delete_user_notifications_by_ids(ids: List[int]):
if not ids:
return None
log.debug("Deleting %d user_notifications", len(ids))
start = time.time()
resp = supabase.table("user_notifications").delete().in_("id", ids).execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to delete user_notifications: %s", error)
raise RuntimeError(f"Failed to delete user_notifications: {error}")
duration = time.time() - start
log.info("Deleted %d user_notifications in %.3fs", len(data or []), duration)
return data
def aggregate_notifications(notifications):
agg = {}
for notif in notifications:
event_id = notif.get("event_id")
if not event_id:
continue
if event_id not in agg:
agg[event_id] = {
"title": notif["title"],
"body": notif["body"],
"type": notif["type"],
"priority": notif.get("priority", "normal"),
"event_id": event_id,
"user_ids": set(),
"ids": [],
"data": []
}
agg[event_id]["user_ids"].add(notif["user_id"])
agg[event_id]["ids"].append(notif["id"])
agg[event_id]["data"].append(notif.get("data", {}))
log.debug("Aggregated notifications into %d events", len(agg))
return agg
def send_fcm(tokens, title, body, data):
if not tokens:
log.debug("No tokens provided to send_fcm")
return None
def _sanitize_data(d):
if not d:
return {}
out = {}
for k, v in d.items():
key = str(k)
if v is None:
out[key] = ""
elif isinstance(v, str):
out[key] = v
else:
try:
out[key] = json.dumps(v, separators=(',', ':')) if isinstance(v, (dict, list)) else str(v)
except Exception:
out[key] = str(v)
return out
safe_data = _sanitize_data(data)
message = messaging.MulticastMessage(
notification=messaging.Notification(title=title, body=body),
data=safe_data,
tokens=tokens,
)
try:
log.info("Sending FCM multicast message to %d tokens", len(tokens))
res = messaging.send_each_for_multicast(message)
log.info("FCM multicast send result: %s", res)
return res
except Exception as e:
log.exception("Failed to send FCM multicast message: %s", e)
raise
def notification_service():
while True:
try:
notifications = fetch_user_notifications()
if not notifications:
log.debug("No notifications to process. Sleeping for 10 minutes.")
time.sleep(600)
continue
agg = aggregate_notifications(notifications)
event_ids = list(agg.keys())
attendance_rows = fetch_event_attendance_for_events(event_ids)
attendance_map: Dict[int, List[int]] = {}
for r in attendance_rows:
eid = r.get("event_id")
uid = r.get("user_id")
if eid is None or uid is None:
continue
attendance_map.setdefault(eid, []).append(uid)
all_user_ids = set()
for v in agg.values():
all_user_ids.update(v["user_ids"])
for ulist in attendance_map.values():
all_user_ids.update(ulist)
all_user_ids = list(all_user_ids)
token_rows = fetch_fcm_tokens_for_users(all_user_ids)
token_map: Dict[int, str] = {r["user_id"]: r["fcm_token"] for r in token_rows}
for event_id, notif in agg.items():
user_ids = attendance_map.get(event_id, [])
tokens = [token_map.get(uid) for uid in user_ids]
tokens = [t for t in tokens if t]
data = {"event_id": event_id, "type": notif["type"], "priority": notif["priority"]}
response = send_fcm(tokens, notif["title"], notif["body"], data)
log.info("Sent notification for event %s to %d users. fcm_response=%s", event_id, len(tokens), getattr(response, 'success_count', response))
upsert_payload = [{
"title": notif["title"],
"message": notif["body"],
"audience": "attendees",
"event_id": event_id,
"user_ids": user_ids,
"status": "sent",
"metadata": data
}]
upsert_notifications(upsert_payload)
delete_user_notifications_by_ids(notif["ids"])
log.debug("Deleted processed notifications for event %s.", event_id)
except Exception as e:
log.exception("Error in notification service main loop: %s", e)
time.sleep(600)
|