Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import json | |
| import os | |
| import time | |
| import uuid | |
| from dotenv import load_dotenv | |
| import firebase_admin | |
| from firebase_admin import messaging, credentials | |
| from firebase_admin import firestore as admin_firestore | |
| from google.oauth2.service_account import Credentials | |
| from pydantic import BaseModel | |
| from fastapi import HTTPException | |
| load_dotenv() | |
| class FcmRequest(BaseModel): | |
| token: str | |
| title: str | |
| body: str | |
| data: dict | None = None | |
| class FcmService: | |
| """Singleton service for sending Firebase Cloud Messages.""" | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(FcmService, cls).__new__(cls) | |
| cls._instance._initialize() | |
| return cls._instance | |
| def _initialize(self): | |
| """Initialize Firebase Admin SDK once.""" | |
| service_account_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON") | |
| if not service_account_json: | |
| raise RuntimeError( | |
| "FIREBASE_SERVICE_ACCOUNT_JSON not found in environment variables" | |
| ) | |
| # Load service account credentials from env JSON | |
| cred_info = json.loads(service_account_json) | |
| # Initialize Firebase only once | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate(cred_info) | |
| firebase_admin.initialize_app(cred) | |
| self.admin_key = os.getenv("FIREBASE_API_ADMIN_KEY", "super-secret-admin-key") | |
| async def send_fcm(self, request: FcmRequest): | |
| """Send an FCM notification to a specific device.""" | |
| try: | |
| message = messaging.Message( | |
| notification=messaging.Notification( | |
| title=request.title, | |
| body=request.body, | |
| ), | |
| data=request.data or {}, | |
| token=request.token, | |
| android=messaging.AndroidConfig( | |
| priority="high", | |
| notification=messaging.AndroidNotification( | |
| sound="default", | |
| channel_id="feedback_notifications", # optional but recommended | |
| ), | |
| ), | |
| apns=messaging.APNSConfig( | |
| payload=messaging.APNSPayload( | |
| aps=messaging.Aps( | |
| sound="default", | |
| content_available=True, | |
| ) | |
| ), | |
| headers={ | |
| "apns-priority": "10", # 10 = immediate | |
| }, | |
| ), | |
| ) | |
| response = messaging.send(message) | |
| return {"status": "success", "message_id": response} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def send_to_topic( | |
| self, | |
| title: str, | |
| body: str, | |
| topic: str = "work_updates", | |
| data: dict | None = None, | |
| channel_id="feedback_notifications", | |
| is_data_only_message: bool = False | |
| ): | |
| try: | |
| # Inject title/body only for data-only messages | |
| new_data = ( | |
| { | |
| "title": title, | |
| "body": body, | |
| **(data or {}) | |
| } | |
| if is_data_only_message | |
| else (data or {}) | |
| ) | |
| # Build message | |
| message = messaging.Message( | |
| notification=None if is_data_only_message else messaging.Notification( | |
| title=title, | |
| body=body | |
| ), | |
| data=new_data, | |
| topic=topic, | |
| # ANDROID | |
| android=messaging.AndroidConfig( | |
| priority="high", | |
| notification=None if is_data_only_message else messaging.AndroidNotification( | |
| sound="default", | |
| channel_id=channel_id, | |
| ), | |
| ), | |
| # IOS | |
| apns=messaging.APNSConfig( | |
| headers={"apns-priority": "10"} if not is_data_only_message else None, | |
| payload=messaging.APNSPayload( | |
| aps=messaging.Aps( | |
| sound=None if is_data_only_message else "default", | |
| content_available=True if is_data_only_message else None | |
| ) | |
| ) | |
| ), | |
| ) | |
| response = messaging.send(message) | |
| return {"status": "success", "message_id": response} | |
| except Exception as e: | |
| print(f"Error Sending Message to topic {topic}", e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def send_broadcast(self, id:str, title: str, body: str, data: dict | None = None): | |
| """Send a broadcast notification and persist it to Firestore.""" | |
| broadcast_doc = { | |
| "id" : id, | |
| "title": title, | |
| "body": body, | |
| "data": data or {"type": "broadcast"}, | |
| "sent_at": int(time.time()), | |
| "created_at": admin_firestore.SERVER_TIMESTAMP, | |
| } | |
| # 1️⃣ Persist to Firestore | |
| try: | |
| db = admin_firestore.client() | |
| doc_ref = db.collection("broadcasts").document() | |
| doc_ref.set(broadcast_doc) # Admin SDK is sync, but non-blocking + safe inside async | |
| print(f"[Firestore] Broadcast persisted: {doc_ref.id}") | |
| except Exception as e: | |
| print(f"[Firestore ERROR] Failed to store broadcast: {e}") | |
| # 2️⃣ Send FCM broadcast | |
| print(f"send_broadcast: title={title} ; body={body}; data={data}") | |
| return await self.send_to_topic( | |
| title=title, | |
| body=body, | |
| topic=os.getenv("BROADCAST_TOPIC_NAME", "global_broadcast"), | |
| data=data or {"type": "broadcast"}, | |
| channel_id="broadcast_channel", | |
| is_data_only_message=False, | |
| ) | |
| # ✅ Singleton instance | |
| fcm_service = FcmService() | |