SendNotifications / app /services /notification_service.py
Hydra-Bolt
added
b8392c6
import time
import json
from typing import List, Dict, Any
from firebase_admin import messaging
from app.services.database import supabase
from app.utils.logging_config import log
# --- Data Fetching Functions ---
def fetch_user_notifications():
log.debug("Fetching user_notifications from Supabase")
start = time.time()
resp = supabase.table("user_notifications").select("*").execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to fetch user_notifications: %s", error)
raise RuntimeError(f"Failed to fetch user_notifications: {error}")
duration = time.time() - start
log.info("Fetched %d user_notifications in %.3fs", len(data or []), duration)
return data or []
def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, Any]]:
if not event_ids:
return []
log.debug("Fetching event_attendance for event_ids: %s", event_ids)
start = time.time()
resp = supabase.table("event_attendance").select("*").in_("event_id", event_ids).execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to fetch event_attendance: %s", error)
raise RuntimeError(f"Failed to fetch event_attendance: {error}")
duration = time.time() - start
log.info("Fetched %d attendance rows for %d events in %.3fs", len(data or []), len(event_ids), duration)
return data or []
def fetch_fcm_tokens_for_users(user_ids: List[int]) -> List[Dict[str, Any]]:
if not user_ids:
return []
log.debug("Fetching FCM tokens for user_ids (count=%d)", len(user_ids))
start = time.time()
resp = (
supabase.table("user_fcm_tokens")
.select("user_id,fcm_token")
.in_("user_id", user_ids)
.execute()
)
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to fetch user_fcm_tokens: %s", error)
raise RuntimeError(f"Failed to fetch user_fcm_tokens: {error}")
duration = time.time() - start
log.info("Fetched %d FCM token rows in %.3fs", len(data or []), duration)
return data or []
def upsert_notifications(rows: List[Dict[str, Any]]):
if not rows:
return None
log.debug("Upserting %d notification rows", len(rows))
start = time.time()
resp = supabase.table("notifications").upsert(rows).execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to upsert notifications: %s", error)
raise RuntimeError(f"Failed to upsert notifications: {error}")
duration = time.time() - start
log.info("Upserted %d notifications in %.3fs", len(data or []), duration)
return data
def delete_user_notifications_by_ids(ids: List[int]):
if not ids:
return None
log.debug("Deleting %d user_notifications", len(ids))
start = time.time()
resp = supabase.table("user_notifications").delete().in_("id", ids).execute()
data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
if error:
log.error("Failed to delete user_notifications: %s", error)
raise RuntimeError(f"Failed to delete user_notifications: {error}")
duration = time.time() - start
log.info("Deleted %d user_notifications in %.3fs", len(data or []), duration)
return data
def aggregate_notifications(notifications):
agg = {}
for notif in notifications:
event_id = notif.get("event_id")
if not event_id:
continue
if event_id not in agg:
agg[event_id] = {
"title": notif["title"],
"body": notif["body"],
"type": notif["type"],
"priority": notif.get("priority", "normal"),
"event_id": event_id,
"user_ids": set(),
"ids": [],
"data": []
}
agg[event_id]["user_ids"].add(notif["user_id"])
agg[event_id]["ids"].append(notif["id"])
agg[event_id]["data"].append(notif.get("data", {}))
log.debug("Aggregated notifications into %d events", len(agg))
return agg
def send_fcm(tokens, title, body, data):
if not tokens:
log.debug("No tokens provided to send_fcm")
return None
def _sanitize_data(d):
if not d:
return {}
out = {}
for k, v in d.items():
key = str(k)
if v is None:
out[key] = ""
elif isinstance(v, str):
out[key] = v
else:
try:
out[key] = json.dumps(v, separators=(',', ':')) if isinstance(v, (dict, list)) else str(v)
except Exception:
out[key] = str(v)
return out
safe_data = _sanitize_data(data)
message = messaging.MulticastMessage(
notification=messaging.Notification(title=title, body=body),
data=safe_data,
tokens=tokens,
)
try:
log.info("Sending FCM multicast message to %d tokens", len(tokens))
res = messaging.send_each_for_multicast(message)
log.info("FCM multicast send result: %s", res)
return res
except Exception as e:
log.exception("Failed to send FCM multicast message: %s", e)
raise
def notification_service():
while True:
try:
notifications = fetch_user_notifications()
if not notifications:
log.debug("No notifications to process. Sleeping for 10 minutes.")
time.sleep(600)
continue
agg = aggregate_notifications(notifications)
event_ids = list(agg.keys())
attendance_rows = fetch_event_attendance_for_events(event_ids)
attendance_map: Dict[int, List[int]] = {}
for r in attendance_rows:
eid = r.get("event_id")
uid = r.get("user_id")
if eid is None or uid is None:
continue
attendance_map.setdefault(eid, []).append(uid)
all_user_ids = set()
for v in agg.values():
all_user_ids.update(v["user_ids"])
for ulist in attendance_map.values():
all_user_ids.update(ulist)
all_user_ids = list(all_user_ids)
token_rows = fetch_fcm_tokens_for_users(all_user_ids)
token_map: Dict[int, str] = {r["user_id"]: r["fcm_token"] for r in token_rows}
for event_id, notif in agg.items():
user_ids = attendance_map.get(event_id, [])
tokens = [token_map.get(uid) for uid in user_ids]
tokens = [t for t in tokens if t]
data = {"event_id": event_id, "type": notif["type"], "priority": notif["priority"]}
response = send_fcm(tokens, notif["title"], notif["body"], data)
log.info("Sent notification for event %s to %d users. fcm_response=%s", event_id, len(tokens), getattr(response, 'success_count', response))
upsert_payload = [{
"title": notif["title"],
"message": notif["body"],
"audience": "attendees",
"event_id": event_id,
"user_ids": user_ids,
"status": "sent",
"metadata": data
}]
upsert_notifications(upsert_payload)
delete_user_notifications_by_ids(notif["ids"])
log.debug("Deleted processed notifications for event %s.", event_id)
except Exception as e:
log.exception("Error in notification service main loop: %s", e)
time.sleep(600)