Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 6,116 Bytes
2d52ea0 e96aee8 2d52ea0 e96aee8 2d52ea0 0eee7a6 2d52ea0 0eee7a6 2d52ea0 86d056b 1e002f2 86d056b 1e002f2 86d056b 1e002f2 a6db207 1e002f2 a6db207 1e002f2 86d056b 1e002f2 86d056b 1e002f2 86d056b 1e002f2 86d056b 1e002f2 86d056b 1e002f2 86d056b 1e002f2 86d056b 1e002f2 86d056b e96aee8 86d056b 947806b 1e002f2 e96aee8 947806b e96aee8 1e002f2 e96aee8 947806b 2d52ea0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import json
import os
import time
import uuid
from dotenv import load_dotenv
import firebase_admin
from firebase_admin import messaging, credentials
from firebase_admin import firestore as admin_firestore
from google.oauth2.service_account import Credentials
from pydantic import BaseModel
from fastapi import HTTPException
load_dotenv()
class FcmRequest(BaseModel):
token: str
title: str
body: str
data: dict | None = None
class FcmService:
"""Singleton service for sending Firebase Cloud Messages."""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FcmService, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
"""Initialize Firebase Admin SDK once."""
service_account_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON")
if not service_account_json:
raise RuntimeError(
"FIREBASE_SERVICE_ACCOUNT_JSON not found in environment variables"
)
# Load service account credentials from env JSON
cred_info = json.loads(service_account_json)
# Initialize Firebase only once
if not firebase_admin._apps:
cred = credentials.Certificate(cred_info)
firebase_admin.initialize_app(cred)
self.admin_key = os.getenv("FIREBASE_API_ADMIN_KEY", "super-secret-admin-key")
async def send_fcm(self, request: FcmRequest):
"""Send an FCM notification to a specific device."""
try:
message = messaging.Message(
notification=messaging.Notification(
title=request.title,
body=request.body,
),
data=request.data or {},
token=request.token,
android=messaging.AndroidConfig(
priority="high",
notification=messaging.AndroidNotification(
sound="default",
channel_id="feedback_notifications", # optional but recommended
),
),
apns=messaging.APNSConfig(
payload=messaging.APNSPayload(
aps=messaging.Aps(
sound="default",
content_available=True,
)
),
headers={
"apns-priority": "10", # 10 = immediate
},
),
)
response = messaging.send(message)
return {"status": "success", "message_id": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def send_to_topic(
self,
title: str,
body: str,
topic: str = "work_updates",
data: dict | None = None,
channel_id="feedback_notifications",
is_data_only_message: bool = False
):
try:
# Inject title/body only for data-only messages
new_data = (
{
"title": title,
"body": body,
**(data or {})
}
if is_data_only_message
else (data or {})
)
# Build message
message = messaging.Message(
notification=None if is_data_only_message else messaging.Notification(
title=title,
body=body
),
data=new_data,
topic=topic,
# ANDROID
android=messaging.AndroidConfig(
priority="high",
notification=None if is_data_only_message else messaging.AndroidNotification(
sound="default",
channel_id=channel_id,
),
),
# IOS
apns=messaging.APNSConfig(
headers={"apns-priority": "10"} if not is_data_only_message else None,
payload=messaging.APNSPayload(
aps=messaging.Aps(
sound=None if is_data_only_message else "default",
content_available=True if is_data_only_message else None
)
)
),
)
response = messaging.send(message)
return {"status": "success", "message_id": response}
except Exception as e:
print(f"Error Sending Message to topic {topic}", e)
raise HTTPException(status_code=500, detail=str(e))
async def send_broadcast(self, id:str, title: str, body: str, data: dict | None = None):
"""Send a broadcast notification and persist it to Firestore."""
broadcast_doc = {
"id" : id,
"title": title,
"body": body,
"data": data or {"type": "broadcast"},
"sent_at": int(time.time()),
"created_at": admin_firestore.SERVER_TIMESTAMP,
}
# 1️⃣ Persist to Firestore
try:
db = admin_firestore.client()
doc_ref = db.collection("broadcasts").document()
doc_ref.set(broadcast_doc) # Admin SDK is sync, but non-blocking + safe inside async
print(f"[Firestore] Broadcast persisted: {doc_ref.id}")
except Exception as e:
print(f"[Firestore ERROR] Failed to store broadcast: {e}")
# 2️⃣ Send FCM broadcast
print(f"send_broadcast: title={title} ; body={body}; data={data}")
return await self.send_to_topic(
title=title,
body=body,
topic=os.getenv("BROADCAST_TOPIC_NAME", "global_broadcast"),
data=data or {"type": "broadcast"},
channel_id="broadcast_channel",
is_data_only_message=False,
)
# ✅ Singleton instance
fcm_service = FcmService()
|