wipapi / app.py
rohanshaw's picture
Update app.py
b70d470 verified
import os
import logging
import firebase_admin.messaging
import requests
import datetime
import asyncio
from fastapi import FastAPI, HTTPException, status, Body
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, EmailStr
from motor.motor_asyncio import AsyncIOMotorClient
from passlib.context import CryptContext
from dotenv import load_dotenv
from typing import Optional, List
import firebase_admin
from firebase_admin import credentials
# --- Load Environment Variables ---
load_dotenv()
MONGODB_URI = os.getenv("MONGODB_URI")
if not MONGODB_URI:
raise ValueError("MONGODB_URI environment variable not set.")
# --- Configuration ---
DATABASE_NAME = "userAuthDB"
COLLECTION_NAME = "users"
TOKEN_COLLECTION_NAME = "fcmtokens"
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Password Hashing Setup ---
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# --- FastAPI App Initialization ---
app = FastAPI(
title="Backend API",
description="API for WhereIParked Backend OPerations.",
version="1.5.0",
)
# --- CORS Configuration ---
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Database Connection ---
client: Optional[AsyncIOMotorClient] = None
db = None
cred = credentials.Certificate("whereiparked-e9eb6-firebase-adminsdk-fbsvc-1691a99d68.json")
default_app = None
@app.on_event("startup")
async def startup_client():
"""Connect to MongoDB & Firebase on application startup."""
global client, db, default_app
logger.info("Connecting to MongoDB...")
try:
client = AsyncIOMotorClient(MONGODB_URI)
db = client[DATABASE_NAME]
# Ping the server to check connection
await client.admin.command('ping')
logger.info(f"Successfully connected to MongoDB database: {DATABASE_NAME}")
try:
if not firebase_admin._apps:
default_app = firebase_admin.initialize_app(cred, {
'projectId': 'whereiparked-e9eb6'
})
logger.info("Firebase Admin SDK initialized successfully.")
else:
default_app = firebase_admin.get_app()
logger.info("Firebase Admin SDK already initialized.")
print(firebase_admin._apps)
except Exception as e:
logger.error(f"Error initializing/getting Firebase Admin SDK: {e}")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise RuntimeError(f"Could not connect to MongoDB: {e}")
@app.on_event("shutdown")
async def shutdown_client():
"""Disconnect from MongoDB on application shutdown."""
global client
if client:
logger.info("Closing MongoDB connection...")
client.close()
logger.info("MongoDB connection closed.")
# --- Pydantic Models ---
class UserBase(BaseModel):
"""Base model for user data."""
email: EmailStr = Field(..., example="user@example.com")
class UserCreate(UserBase):
"""Model for user creation (signup)."""
username: str = Field(..., min_length=3, max_length=50, example="john_doe")
password: str = Field(..., min_length=8, example="strongpassword123")
class UserLogin(BaseModel):
"""Model for user login."""
email: EmailStr = Field(..., example="user@example.com")
password: str = Field(..., example="strongpassword123")
class UserInDB(UserBase):
"""Model representing user data stored in the database."""
username: str
hashed_password: str
class Token(BaseModel):
"""Model for returning success/token (optional)."""
message: str
class SignupResponse(BaseModel):
"""Response model for successful signup."""
message: str = Field(..., example="Signup successful!")
class LoginSuccessResponse(BaseModel):
"""Response model for successful login."""
message: str = Field(..., example="Login successful!")
email: EmailStr = Field(..., example="user@example.com")
class TokenModel(BaseModel):
email: str
token: str
class InAppEventNotification(BaseModel):
token: str
# --- Helper Functions ---
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
async def get_user_by_email(email: str) -> Optional[dict]:
"""Retrieves a user from the database by email."""
if db is None:
# Log the error and raise an appropriate HTTP exception
logger.error("Database connection is not available in get_user_by_email")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.")
try:
user = await db[COLLECTION_NAME].find_one({"email": email})
return user # Returns dict or None
except Exception as e:
logger.error(f"Error fetching user by email ({email}): {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error accessing user data.")
async def get_user_by_username(username: str) -> Optional[dict]:
"""Retrieves a user from the database by username."""
if db is None:
logger.error("Database connection is not available in get_user_by_username")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.")
try:
user = await db[COLLECTION_NAME].find_one({"username": username})
return user # Returns dict or None
except Exception as e:
logger.error(f"Error fetching user by username ({username}): {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error accessing user data.")
FCM_SERVER_KEY = os.getenv("FCM_SERVER_KEY")
async def send_push(token: str, title: str, body: str, platform: str = "web"):
try:
if platform=="app":
message = firebase_admin.messaging.Message(
notification=firebase_admin.messaging.Notification(
title=title,
body=body,
image="./icon.png"
),
data={
"title": title,
"body": body
},
token=token,
)
else:
message = firebase_admin.messaging.Message(
data={
"title": title,
"body": body
},
token=token,
)
response = firebase_admin.messaging.send(message)
print('Successfully sent message:', response)
return response
except UnregisteredError:
logger.warning(f"Token unregistered, removing from DB: {token}")
asyncio.create_task(db[TOKEN_COLLECTION_NAME].delete_one({"token": token}))
return {"error": "Token unregistered, deleted from DB."}
except Exception as e:
logger.error(f"Error sending message: {e}")
raise
from firebase_admin.messaging import UnregisteredError
# --- API Endpoints ---
@app.post("/signup", response_model=SignupResponse, status_code=status.HTTP_201_CREATED)
async def signup(user_data: UserCreate = Body(...)):
"""Handles user registration."""
if db is None: # Check added for safety, though startup should handle it
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.")
existing_user_email = await get_user_by_email(user_data.email)
if existing_user_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered.",
)
existing_user_username = await get_user_by_username(user_data.username)
if existing_user_username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken.",
)
hashed_password = get_password_hash(user_data.password)
user_in_db = UserInDB(
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password
)
try:
new_user = await db[COLLECTION_NAME].insert_one(user_in_db.dict(by_alias=True)) # Use dict() for Pydantic v1/v2 compatibility
logger.info(f"User created successfully with ID: {new_user.inserted_id}")
except Exception as e:
logger.error(f"Error inserting user into database: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An error occurred while creating the account.",
)
return SignupResponse(message="Signup successful!")
# Updated: Changed response_model to LoginSuccessResponse
@app.post("/login", response_model=LoginSuccessResponse)
async def login(login_data: UserLogin = Body(...)):
"""Handles user login. Returns user email on success."""
if db is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.")
user = await get_user_by_email(login_data.email)
if not user or not verify_password(login_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password.",
headers={"WWW-Authenticate": "Bearer"},
)
# Return success message and user email
logger.info(f"User {user['email']} logged in successfully.")
return LoginSuccessResponse(
message="Login successful!",
email=user["email"]
# username=user["username"] # Optionally include username
)
@app.post("/api/save-token")
async def save_token(token_model: TokenModel):
existing = await db.fcmtokens.find_one({"token": token_model.token})
if not existing:
await db.fcmtokens.insert_one({
"email": token_model.email,
"token": token_model.token,
"createdAt": datetime.datetime.utcnow()
})
return {"message": "Token saved"}
@app.post("/api/parked")
async def user_parked(requestBody: InAppEventNotification = Body(...)):
await send_push(requestBody.token, "WhereIParked", "You parked your car. Timer started!")
return {"message": "Notification sent"}
@app.post("/api/clear-location")
async def user_cleared(requestBody: InAppEventNotification = Body(...)):
await send_push(requestBody.token, "WhereIParked", "Parking location cleared. Safe drive!")
return {"message": "Notification sent"}
@app.post("/api/park-duration")
async def park_duration(requestBody: InAppEventNotification = Body(...), duration = str):
await send_push(requestBody.token, "Reminder", "You’ve been parked for over " + duration + " minutes!")
return {"message": "Notification sent"}
@app.post("/admin/push")
async def admin_push(title: str = Body(...), body: str = Body(...)):
tokens = await db.fcmtokens.find().to_list(length=None)
for token_entry in tokens:
await send_push(token_entry["token"], title, body)
return {"message": "Notifications sent"}
@app.post("/api/app/parked")
async def app_user_parked(requestBody: InAppEventNotification = Body(...)):
await send_push(requestBody.token, "WhereIParked", "You parked your car. Timer started!", "app")
return {"message": "Notification sent"}
@app.post("/api/app/clear-location")
async def app_user_cleared(requestBody: InAppEventNotification = Body(...)):
await send_push(requestBody.token, "WhereIParked", "Parking location cleared. Safe drive!", "app")
return {"message": "Notification sent"}
@app.post("/api/app/park-duration")
async def app_park_duration(requestBody: InAppEventNotification = Body(...), duration = str):
await send_push(requestBody.token, "Reminder", "You’ve been parked for over " + duration + " minutes!", "app")
return {"message": "Notification sent"}
@app.post("/admin/app/push")
async def app_admin_push(title: str = Body(...), body: str = Body(...)):
tokens = await db.fcmtokens.find().to_list(length=None)
for token_entry in tokens:
await send_push(token_entry["token"], title, body, "app")
return {"message": "Notifications sent"}
@app.get("/", status_code=status.HTTP_200_OK)
async def read_root():
"""Root endpoint to check if the API is running."""
return {"message": "Authentication API is running!"}