Spaces:
Running
Running
| import time | |
| import json | |
| from typing import List, Dict, Any | |
| from firebase_admin import messaging | |
| from app.services.database import supabase | |
| from app.utils.logging_config import log | |
| # --- Data Fetching Functions --- | |
| def fetch_user_notifications(): | |
| log.debug("Fetching user_notifications from Supabase") | |
| start = time.time() | |
| resp = supabase.table("user_notifications").select("*").execute() | |
| data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) | |
| error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) | |
| if error: | |
| log.error("Failed to fetch user_notifications: %s", error) | |
| raise RuntimeError(f"Failed to fetch user_notifications: {error}") | |
| duration = time.time() - start | |
| log.info("Fetched %d user_notifications in %.3fs", len(data or []), duration) | |
| return data or [] | |
| def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, Any]]: | |
| if not event_ids: | |
| return [] | |
| log.debug("Fetching event_attendance for event_ids: %s", event_ids) | |
| start = time.time() | |
| resp = supabase.table("event_attendance").select("*").in_("event_id", event_ids).execute() | |
| data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) | |
| error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) | |
| if error: | |
| log.error("Failed to fetch event_attendance: %s", error) | |
| raise RuntimeError(f"Failed to fetch event_attendance: {error}") | |
| duration = time.time() - start | |
| log.info("Fetched %d attendance rows for %d events in %.3fs", len(data or []), len(event_ids), duration) | |
| return data or [] | |
| def fetch_fcm_tokens_for_users(user_ids: List[int]) -> List[Dict[str, Any]]: | |
| if not user_ids: | |
| return [] | |
| log.debug("Fetching FCM tokens for user_ids (count=%d)", len(user_ids)) | |
| start = time.time() | |
| resp = ( | |
| supabase.table("user_fcm_tokens") | |
| .select("user_id,fcm_token") | |
| .in_("user_id", user_ids) | |
| .execute() | |
| ) | |
| data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) | |
| error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) | |
| if error: | |
| log.error("Failed to fetch user_fcm_tokens: %s", error) | |
| raise RuntimeError(f"Failed to fetch user_fcm_tokens: {error}") | |
| duration = time.time() - start | |
| log.info("Fetched %d FCM token rows in %.3fs", len(data or []), duration) | |
| return data or [] | |
| def upsert_notifications(rows: List[Dict[str, Any]]): | |
| if not rows: | |
| return None | |
| log.debug("Upserting %d notification rows", len(rows)) | |
| start = time.time() | |
| resp = supabase.table("notifications").upsert(rows).execute() | |
| data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) | |
| error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) | |
| if error: | |
| log.error("Failed to upsert notifications: %s", error) | |
| raise RuntimeError(f"Failed to upsert notifications: {error}") | |
| duration = time.time() - start | |
| log.info("Upserted %d notifications in %.3fs", len(data or []), duration) | |
| return data | |
| def delete_user_notifications_by_ids(ids: List[int]): | |
| if not ids: | |
| return None | |
| log.debug("Deleting %d user_notifications", len(ids)) | |
| start = time.time() | |
| resp = supabase.table("user_notifications").delete().in_("id", ids).execute() | |
| data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) | |
| error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) | |
| if error: | |
| log.error("Failed to delete user_notifications: %s", error) | |
| raise RuntimeError(f"Failed to delete user_notifications: {error}") | |
| duration = time.time() - start | |
| log.info("Deleted %d user_notifications in %.3fs", len(data or []), duration) | |
| return data | |
| def aggregate_notifications(notifications): | |
| agg = {} | |
| for notif in notifications: | |
| event_id = notif.get("event_id") | |
| if not event_id: | |
| continue | |
| if event_id not in agg: | |
| agg[event_id] = { | |
| "title": notif["title"], | |
| "body": notif["body"], | |
| "type": notif["type"], | |
| "priority": notif.get("priority", "normal"), | |
| "event_id": event_id, | |
| "user_ids": set(), | |
| "ids": [], | |
| "data": [] | |
| } | |
| agg[event_id]["user_ids"].add(notif["user_id"]) | |
| agg[event_id]["ids"].append(notif["id"]) | |
| agg[event_id]["data"].append(notif.get("data", {})) | |
| log.debug("Aggregated notifications into %d events", len(agg)) | |
| return agg | |
| def send_fcm(tokens, title, body, data): | |
| if not tokens: | |
| log.debug("No tokens provided to send_fcm") | |
| return None | |
| def _sanitize_data(d): | |
| if not d: | |
| return {} | |
| out = {} | |
| for k, v in d.items(): | |
| key = str(k) | |
| if v is None: | |
| out[key] = "" | |
| elif isinstance(v, str): | |
| out[key] = v | |
| else: | |
| try: | |
| out[key] = json.dumps(v, separators=(',', ':')) if isinstance(v, (dict, list)) else str(v) | |
| except Exception: | |
| out[key] = str(v) | |
| return out | |
| safe_data = _sanitize_data(data) | |
| message = messaging.MulticastMessage( | |
| notification=messaging.Notification(title=title, body=body), | |
| data=safe_data, | |
| tokens=tokens, | |
| ) | |
| try: | |
| log.info("Sending FCM multicast message to %d tokens", len(tokens)) | |
| res = messaging.send_each_for_multicast(message) | |
| log.info("FCM multicast send result: %s", res) | |
| return res | |
| except Exception as e: | |
| log.exception("Failed to send FCM multicast message: %s", e) | |
| raise | |
| def notification_service(): | |
| while True: | |
| try: | |
| notifications = fetch_user_notifications() | |
| if not notifications: | |
| log.debug("No notifications to process. Sleeping for 10 minutes.") | |
| time.sleep(600) | |
| continue | |
| agg = aggregate_notifications(notifications) | |
| event_ids = list(agg.keys()) | |
| attendance_rows = fetch_event_attendance_for_events(event_ids) | |
| attendance_map: Dict[int, List[int]] = {} | |
| for r in attendance_rows: | |
| eid = r.get("event_id") | |
| uid = r.get("user_id") | |
| if eid is None or uid is None: | |
| continue | |
| attendance_map.setdefault(eid, []).append(uid) | |
| all_user_ids = set() | |
| for v in agg.values(): | |
| all_user_ids.update(v["user_ids"]) | |
| for ulist in attendance_map.values(): | |
| all_user_ids.update(ulist) | |
| all_user_ids = list(all_user_ids) | |
| token_rows = fetch_fcm_tokens_for_users(all_user_ids) | |
| token_map: Dict[int, str] = {r["user_id"]: r["fcm_token"] for r in token_rows} | |
| for event_id, notif in agg.items(): | |
| user_ids = attendance_map.get(event_id, []) | |
| tokens = [token_map.get(uid) for uid in user_ids] | |
| tokens = [t for t in tokens if t] | |
| data = {"event_id": event_id, "type": notif["type"], "priority": notif["priority"]} | |
| response = send_fcm(tokens, notif["title"], notif["body"], data) | |
| log.info("Sent notification for event %s to %d users. fcm_response=%s", event_id, len(tokens), getattr(response, 'success_count', response)) | |
| upsert_payload = [{ | |
| "title": notif["title"], | |
| "message": notif["body"], | |
| "audience": "attendees", | |
| "event_id": event_id, | |
| "user_ids": user_ids, | |
| "status": "sent", | |
| "metadata": data | |
| }] | |
| upsert_notifications(upsert_payload) | |
| delete_user_notifications_by_ids(notif["ids"]) | |
| log.debug("Deleted processed notifications for event %s.", event_id) | |
| except Exception as e: | |
| log.exception("Error in notification service main loop: %s", e) | |
| time.sleep(600) | |