Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Body | |
| from pydantic import BaseModel | |
| import uvicorn | |
| import firebase_admin | |
| from firebase_admin import messaging, credentials | |
| app = FastAPI() | |
| # Firebase Admin SDK initialization | |
| cred = credentials.Certificate("earn-by-ads-34cd4-firebase-adminsdk-23x4r-557fc24c1d.json") | |
| firebase_admin.initialize_app(cred) | |
| # Define request model | |
| class NotificationRequest(BaseModel): | |
| title: str | |
| body: str | |
| topic: str = "all" | |
| data: dict = None | |
| # Test endpoint | |
| def root(): | |
| return {"message": "FastAPI is running!"} | |
| # Example FastAPI endpoint for your server | |
| async def send_notification_to_token(notification: dict): | |
| try: | |
| # Extract notification details | |
| title = notification.get("title") | |
| body = notification.get("body") | |
| token = notification.get("token") | |
| data = notification.get("data", {}) | |
| # Create message for FCM | |
| message = messaging.Message( | |
| notification=messaging.Notification( | |
| title=title, | |
| body=body, | |
| ), | |
| data=data, | |
| token=token, # This is the key difference - send to a specific token | |
| ) | |
| # Send message | |
| response = messaging.send(message) | |
| return {"success": True, "message_id": response} | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def send_notification(request: NotificationRequest = Body(...)): | |
| # Create the notification message | |
| message = messaging.Message( | |
| notification=messaging.Notification( | |
| title=request.title, | |
| body=request.body | |
| ), | |
| data=request.data, | |
| topic=request.topic | |
| ) | |
| # Send the notification | |
| try: | |
| response = messaging.send(message) | |
| return {"success": True, "message_id": response} | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |