Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| import requests | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| EXPO_TOKENS = set() | |
| class RegisterRequest(BaseModel): | |
| expo_token: str | |
| class IncidentRequest(BaseModel): | |
| incident_id: str = 1 | |
| title: str = "Dummy Title" | |
| message: str = "message" | |
| dummyData = IncidentRequest() | |
| def register_device(data: RegisterRequest): | |
| EXPO_TOKENS.add(data.expo_token) | |
| print(data.expo_token) | |
| return {"status": "registered"} | |
| def notify_incident(): | |
| data=dummyData | |
| responses = [] | |
| for token in EXPO_TOKENS: | |
| payload = { | |
| "to": token, | |
| "sound": "default", | |
| "title": data.title, | |
| "body": data.message, | |
| "priority": "high", | |
| "data": { | |
| "url": f"myapp://incidents/{data.incident_id}" | |
| }, | |
| } | |
| r = requests.post( | |
| "https://exp.host/--/api/v2/push/send", | |
| json=payload, | |
| timeout=5, | |
| ) | |
| responses.append(r.json()) | |
| return {"sent": len(responses)} | |