sanatan_ai / modules /firebase /messaging.py
vikramvasudevan's picture
Upload folder using huggingface_hub
e96aee8 verified
import json
import os
import time
import uuid
from dotenv import load_dotenv
import firebase_admin
from firebase_admin import messaging, credentials
from firebase_admin import firestore as admin_firestore
from google.oauth2.service_account import Credentials
from pydantic import BaseModel
from fastapi import HTTPException
load_dotenv()
class FcmRequest(BaseModel):
token: str
title: str
body: str
data: dict | None = None
class FcmService:
"""Singleton service for sending Firebase Cloud Messages."""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FcmService, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
"""Initialize Firebase Admin SDK once."""
service_account_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON")
if not service_account_json:
raise RuntimeError(
"FIREBASE_SERVICE_ACCOUNT_JSON not found in environment variables"
)
# Load service account credentials from env JSON
cred_info = json.loads(service_account_json)
# Initialize Firebase only once
if not firebase_admin._apps:
cred = credentials.Certificate(cred_info)
firebase_admin.initialize_app(cred)
self.admin_key = os.getenv("FIREBASE_API_ADMIN_KEY", "super-secret-admin-key")
async def send_fcm(self, request: FcmRequest):
"""Send an FCM notification to a specific device."""
try:
message = messaging.Message(
notification=messaging.Notification(
title=request.title,
body=request.body,
),
data=request.data or {},
token=request.token,
android=messaging.AndroidConfig(
priority="high",
notification=messaging.AndroidNotification(
sound="default",
channel_id="feedback_notifications", # optional but recommended
),
),
apns=messaging.APNSConfig(
payload=messaging.APNSPayload(
aps=messaging.Aps(
sound="default",
content_available=True,
)
),
headers={
"apns-priority": "10", # 10 = immediate
},
),
)
response = messaging.send(message)
return {"status": "success", "message_id": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def send_to_topic(
self,
title: str,
body: str,
topic: str = "work_updates",
data: dict | None = None,
channel_id="feedback_notifications",
is_data_only_message: bool = False
):
try:
# Inject title/body only for data-only messages
new_data = (
{
"title": title,
"body": body,
**(data or {})
}
if is_data_only_message
else (data or {})
)
# Build message
message = messaging.Message(
notification=None if is_data_only_message else messaging.Notification(
title=title,
body=body
),
data=new_data,
topic=topic,
# ANDROID
android=messaging.AndroidConfig(
priority="high",
notification=None if is_data_only_message else messaging.AndroidNotification(
sound="default",
channel_id=channel_id,
),
),
# IOS
apns=messaging.APNSConfig(
headers={"apns-priority": "10"} if not is_data_only_message else None,
payload=messaging.APNSPayload(
aps=messaging.Aps(
sound=None if is_data_only_message else "default",
content_available=True if is_data_only_message else None
)
)
),
)
response = messaging.send(message)
return {"status": "success", "message_id": response}
except Exception as e:
print(f"Error Sending Message to topic {topic}", e)
raise HTTPException(status_code=500, detail=str(e))
async def send_broadcast(self, id:str, title: str, body: str, data: dict | None = None):
"""Send a broadcast notification and persist it to Firestore."""
broadcast_doc = {
"id" : id,
"title": title,
"body": body,
"data": data or {"type": "broadcast"},
"sent_at": int(time.time()),
"created_at": admin_firestore.SERVER_TIMESTAMP,
}
# 1️⃣ Persist to Firestore
try:
db = admin_firestore.client()
doc_ref = db.collection("broadcasts").document()
doc_ref.set(broadcast_doc) # Admin SDK is sync, but non-blocking + safe inside async
print(f"[Firestore] Broadcast persisted: {doc_ref.id}")
except Exception as e:
print(f"[Firestore ERROR] Failed to store broadcast: {e}")
# 2️⃣ Send FCM broadcast
print(f"send_broadcast: title={title} ; body={body}; data={data}")
return await self.send_to_topic(
title=title,
body=body,
topic=os.getenv("BROADCAST_TOPIC_NAME", "global_broadcast"),
data=data or {"type": "broadcast"},
channel_id="broadcast_channel",
is_data_only_message=False,
)
# ✅ Singleton instance
fcm_service = FcmService()