import time import json from typing import List, Dict, Any from firebase_admin import messaging from app.services.database import supabase from app.utils.logging_config import log # --- Data Fetching Functions --- def fetch_user_notifications(): log.debug("Fetching user_notifications from Supabase") start = time.time() resp = supabase.table("user_notifications").select("*").execute() data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) if error: log.error("Failed to fetch user_notifications: %s", error) raise RuntimeError(f"Failed to fetch user_notifications: {error}") duration = time.time() - start log.info("Fetched %d user_notifications in %.3fs", len(data or []), duration) return data or [] def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, Any]]: if not event_ids: return [] log.debug("Fetching event_attendance for event_ids: %s", event_ids) start = time.time() resp = supabase.table("event_attendance").select("*").in_("event_id", event_ids).execute() data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) if error: log.error("Failed to fetch event_attendance: %s", error) raise RuntimeError(f"Failed to fetch event_attendance: {error}") duration = time.time() - start log.info("Fetched %d attendance rows for %d events in %.3fs", len(data or []), len(event_ids), duration) return data or [] def fetch_fcm_tokens_for_users(user_ids: List[int]) -> List[Dict[str, Any]]: if not user_ids: return [] log.debug("Fetching FCM tokens for user_ids (count=%d)", len(user_ids)) start = time.time() resp = ( supabase.table("user_fcm_tokens") .select("user_id,fcm_token") .in_("user_id", user_ids) .execute() ) data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) if error: log.error("Failed to fetch user_fcm_tokens: %s", error) raise RuntimeError(f"Failed to fetch user_fcm_tokens: {error}") duration = time.time() - start log.info("Fetched %d FCM token rows in %.3fs", len(data or []), duration) return data or [] def upsert_notifications(rows: List[Dict[str, Any]]): if not rows: return None log.debug("Upserting %d notification rows", len(rows)) start = time.time() resp = supabase.table("notifications").upsert(rows).execute() data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) if error: log.error("Failed to upsert notifications: %s", error) raise RuntimeError(f"Failed to upsert notifications: {error}") duration = time.time() - start log.info("Upserted %d notifications in %.3fs", len(data or []), duration) return data def delete_user_notifications_by_ids(ids: List[int]): if not ids: return None log.debug("Deleting %d user_notifications", len(ids)) start = time.time() resp = supabase.table("user_notifications").delete().in_("id", ids).execute() data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None) error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None) if error: log.error("Failed to delete user_notifications: %s", error) raise RuntimeError(f"Failed to delete user_notifications: {error}") duration = time.time() - start log.info("Deleted %d user_notifications in %.3fs", len(data or []), duration) return data def aggregate_notifications(notifications): agg = {} for notif in notifications: event_id = notif.get("event_id") if not event_id: continue if event_id not in agg: agg[event_id] = { "title": notif["title"], "body": notif["body"], "type": notif["type"], "priority": notif.get("priority", "normal"), "event_id": event_id, "user_ids": set(), "ids": [], "data": [] } agg[event_id]["user_ids"].add(notif["user_id"]) agg[event_id]["ids"].append(notif["id"]) agg[event_id]["data"].append(notif.get("data", {})) log.debug("Aggregated notifications into %d events", len(agg)) return agg def send_fcm(tokens, title, body, data): if not tokens: log.debug("No tokens provided to send_fcm") return None def _sanitize_data(d): if not d: return {} out = {} for k, v in d.items(): key = str(k) if v is None: out[key] = "" elif isinstance(v, str): out[key] = v else: try: out[key] = json.dumps(v, separators=(',', ':')) if isinstance(v, (dict, list)) else str(v) except Exception: out[key] = str(v) return out safe_data = _sanitize_data(data) message = messaging.MulticastMessage( notification=messaging.Notification(title=title, body=body), data=safe_data, tokens=tokens, ) try: log.info("Sending FCM multicast message to %d tokens", len(tokens)) res = messaging.send_each_for_multicast(message) log.info("FCM multicast send result: %s", res) return res except Exception as e: log.exception("Failed to send FCM multicast message: %s", e) raise def notification_service(): while True: try: notifications = fetch_user_notifications() if not notifications: log.debug("No notifications to process. Sleeping for 10 minutes.") time.sleep(600) continue agg = aggregate_notifications(notifications) event_ids = list(agg.keys()) attendance_rows = fetch_event_attendance_for_events(event_ids) attendance_map: Dict[int, List[int]] = {} for r in attendance_rows: eid = r.get("event_id") uid = r.get("user_id") if eid is None or uid is None: continue attendance_map.setdefault(eid, []).append(uid) all_user_ids = set() for v in agg.values(): all_user_ids.update(v["user_ids"]) for ulist in attendance_map.values(): all_user_ids.update(ulist) all_user_ids = list(all_user_ids) token_rows = fetch_fcm_tokens_for_users(all_user_ids) token_map: Dict[int, str] = {r["user_id"]: r["fcm_token"] for r in token_rows} for event_id, notif in agg.items(): user_ids = attendance_map.get(event_id, []) tokens = [token_map.get(uid) for uid in user_ids] tokens = [t for t in tokens if t] data = {"event_id": event_id, "type": notif["type"], "priority": notif["priority"]} response = send_fcm(tokens, notif["title"], notif["body"], data) log.info("Sent notification for event %s to %d users. fcm_response=%s", event_id, len(tokens), getattr(response, 'success_count', response)) upsert_payload = [{ "title": notif["title"], "message": notif["body"], "audience": "attendees", "event_id": event_id, "user_ids": user_ids, "status": "sent", "metadata": data }] upsert_notifications(upsert_payload) delete_user_notifications_by_ids(notif["ids"]) log.debug("Deleted processed notifications for event %s.", event_id) except Exception as e: log.exception("Error in notification service main loop: %s", e) time.sleep(600)