import os import logging import firebase_admin.messaging import requests import datetime import asyncio from fastapi import FastAPI, HTTPException, status, Body from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field, EmailStr from motor.motor_asyncio import AsyncIOMotorClient from passlib.context import CryptContext from dotenv import load_dotenv from typing import Optional, List import firebase_admin from firebase_admin import credentials # --- Load Environment Variables --- load_dotenv() MONGODB_URI = os.getenv("MONGODB_URI") if not MONGODB_URI: raise ValueError("MONGODB_URI environment variable not set.") # --- Configuration --- DATABASE_NAME = "userAuthDB" COLLECTION_NAME = "users" TOKEN_COLLECTION_NAME = "fcmtokens" # --- Logging Setup --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Password Hashing Setup --- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # --- FastAPI App Initialization --- app = FastAPI( title="Backend API", description="API for WhereIParked Backend OPerations.", version="1.5.0", ) # --- CORS Configuration --- origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- Database Connection --- client: Optional[AsyncIOMotorClient] = None db = None cred = credentials.Certificate("whereiparked-e9eb6-firebase-adminsdk-fbsvc-1691a99d68.json") default_app = None @app.on_event("startup") async def startup_client(): """Connect to MongoDB & Firebase on application startup.""" global client, db, default_app logger.info("Connecting to MongoDB...") try: client = AsyncIOMotorClient(MONGODB_URI) db = client[DATABASE_NAME] # Ping the server to check connection await client.admin.command('ping') logger.info(f"Successfully connected to MongoDB database: {DATABASE_NAME}") try: if not firebase_admin._apps: default_app = firebase_admin.initialize_app(cred, { 'projectId': 'whereiparked-e9eb6' }) logger.info("Firebase Admin SDK initialized successfully.") else: default_app = firebase_admin.get_app() logger.info("Firebase Admin SDK already initialized.") print(firebase_admin._apps) except Exception as e: logger.error(f"Error initializing/getting Firebase Admin SDK: {e}") except Exception as e: logger.error(f"Failed to connect to MongoDB: {e}") raise RuntimeError(f"Could not connect to MongoDB: {e}") @app.on_event("shutdown") async def shutdown_client(): """Disconnect from MongoDB on application shutdown.""" global client if client: logger.info("Closing MongoDB connection...") client.close() logger.info("MongoDB connection closed.") # --- Pydantic Models --- class UserBase(BaseModel): """Base model for user data.""" email: EmailStr = Field(..., example="user@example.com") class UserCreate(UserBase): """Model for user creation (signup).""" username: str = Field(..., min_length=3, max_length=50, example="john_doe") password: str = Field(..., min_length=8, example="strongpassword123") class UserLogin(BaseModel): """Model for user login.""" email: EmailStr = Field(..., example="user@example.com") password: str = Field(..., example="strongpassword123") class UserInDB(UserBase): """Model representing user data stored in the database.""" username: str hashed_password: str class Token(BaseModel): """Model for returning success/token (optional).""" message: str class SignupResponse(BaseModel): """Response model for successful signup.""" message: str = Field(..., example="Signup successful!") class LoginSuccessResponse(BaseModel): """Response model for successful login.""" message: str = Field(..., example="Login successful!") email: EmailStr = Field(..., example="user@example.com") class TokenModel(BaseModel): email: str token: str class InAppEventNotification(BaseModel): token: str # --- Helper Functions --- def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) async def get_user_by_email(email: str) -> Optional[dict]: """Retrieves a user from the database by email.""" if db is None: # Log the error and raise an appropriate HTTP exception logger.error("Database connection is not available in get_user_by_email") raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.") try: user = await db[COLLECTION_NAME].find_one({"email": email}) return user # Returns dict or None except Exception as e: logger.error(f"Error fetching user by email ({email}): {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error accessing user data.") async def get_user_by_username(username: str) -> Optional[dict]: """Retrieves a user from the database by username.""" if db is None: logger.error("Database connection is not available in get_user_by_username") raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.") try: user = await db[COLLECTION_NAME].find_one({"username": username}) return user # Returns dict or None except Exception as e: logger.error(f"Error fetching user by username ({username}): {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error accessing user data.") FCM_SERVER_KEY = os.getenv("FCM_SERVER_KEY") async def send_push(token: str, title: str, body: str, platform: str = "web"): try: if platform=="app": message = firebase_admin.messaging.Message( notification=firebase_admin.messaging.Notification( title=title, body=body, image="./icon.png" ), data={ "title": title, "body": body }, token=token, ) else: message = firebase_admin.messaging.Message( data={ "title": title, "body": body }, token=token, ) response = firebase_admin.messaging.send(message) print('Successfully sent message:', response) return response except UnregisteredError: logger.warning(f"Token unregistered, removing from DB: {token}") asyncio.create_task(db[TOKEN_COLLECTION_NAME].delete_one({"token": token})) return {"error": "Token unregistered, deleted from DB."} except Exception as e: logger.error(f"Error sending message: {e}") raise from firebase_admin.messaging import UnregisteredError # --- API Endpoints --- @app.post("/signup", response_model=SignupResponse, status_code=status.HTTP_201_CREATED) async def signup(user_data: UserCreate = Body(...)): """Handles user registration.""" if db is None: # Check added for safety, though startup should handle it raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.") existing_user_email = await get_user_by_email(user_data.email) if existing_user_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered.", ) existing_user_username = await get_user_by_username(user_data.username) if existing_user_username: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already taken.", ) hashed_password = get_password_hash(user_data.password) user_in_db = UserInDB( username=user_data.username, email=user_data.email, hashed_password=hashed_password ) try: new_user = await db[COLLECTION_NAME].insert_one(user_in_db.dict(by_alias=True)) # Use dict() for Pydantic v1/v2 compatibility logger.info(f"User created successfully with ID: {new_user.inserted_id}") except Exception as e: logger.error(f"Error inserting user into database: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An error occurred while creating the account.", ) return SignupResponse(message="Signup successful!") # Updated: Changed response_model to LoginSuccessResponse @app.post("/login", response_model=LoginSuccessResponse) async def login(login_data: UserLogin = Body(...)): """Handles user login. Returns user email on success.""" if db is None: raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service is unavailable.") user = await get_user_by_email(login_data.email) if not user or not verify_password(login_data.password, user["hashed_password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password.", headers={"WWW-Authenticate": "Bearer"}, ) # Return success message and user email logger.info(f"User {user['email']} logged in successfully.") return LoginSuccessResponse( message="Login successful!", email=user["email"] # username=user["username"] # Optionally include username ) @app.post("/api/save-token") async def save_token(token_model: TokenModel): existing = await db.fcmtokens.find_one({"token": token_model.token}) if not existing: await db.fcmtokens.insert_one({ "email": token_model.email, "token": token_model.token, "createdAt": datetime.datetime.utcnow() }) return {"message": "Token saved"} @app.post("/api/parked") async def user_parked(requestBody: InAppEventNotification = Body(...)): await send_push(requestBody.token, "WhereIParked", "You parked your car. Timer started!") return {"message": "Notification sent"} @app.post("/api/clear-location") async def user_cleared(requestBody: InAppEventNotification = Body(...)): await send_push(requestBody.token, "WhereIParked", "Parking location cleared. Safe drive!") return {"message": "Notification sent"} @app.post("/api/park-duration") async def park_duration(requestBody: InAppEventNotification = Body(...), duration = str): await send_push(requestBody.token, "Reminder", "You’ve been parked for over " + duration + " minutes!") return {"message": "Notification sent"} @app.post("/admin/push") async def admin_push(title: str = Body(...), body: str = Body(...)): tokens = await db.fcmtokens.find().to_list(length=None) for token_entry in tokens: await send_push(token_entry["token"], title, body) return {"message": "Notifications sent"} @app.post("/api/app/parked") async def app_user_parked(requestBody: InAppEventNotification = Body(...)): await send_push(requestBody.token, "WhereIParked", "You parked your car. Timer started!", "app") return {"message": "Notification sent"} @app.post("/api/app/clear-location") async def app_user_cleared(requestBody: InAppEventNotification = Body(...)): await send_push(requestBody.token, "WhereIParked", "Parking location cleared. Safe drive!", "app") return {"message": "Notification sent"} @app.post("/api/app/park-duration") async def app_park_duration(requestBody: InAppEventNotification = Body(...), duration = str): await send_push(requestBody.token, "Reminder", "You’ve been parked for over " + duration + " minutes!", "app") return {"message": "Notification sent"} @app.post("/admin/app/push") async def app_admin_push(title: str = Body(...), body: str = Body(...)): tokens = await db.fcmtokens.find().to_list(length=None) for token_entry in tokens: await send_push(token_entry["token"], title, body, "app") return {"message": "Notifications sent"} @app.get("/", status_code=status.HTTP_200_OK) async def read_root(): """Root endpoint to check if the API is running.""" return {"message": "Authentication API is running!"}