Spaces:
Sleeping
Sleeping
File size: 6,589 Bytes
b84b4a7 ff07255 b84b4a7 8fbc1f3 ff07255 b84b4a7 8fbc1f3 77e8127 b84b4a7 b7c0a0c b84b4a7 ff07255 b84b4a7 77e8127 b84b4a7 ff07255 b84b4a7 b7c0a0c b84b4a7 77e8127 b84b4a7 77e8127 b84b4a7 8fbc1f3 77e8127 b7c0a0c b84b4a7 77e8127 b84b4a7 77e8127 b84b4a7 b7c0a0c b84b4a7 77e8127 b84b4a7 b7c0a0c b84b4a7 b7c0a0c b84b4a7 ff07255 b7c0a0c ff07255 b84b4a7 77e8127 0e8ec15 b84b4a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
from fastapi import APIRouter, HTTPException, Depends, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from db.mongo import users_collection
from core.security import hash_password, verify_password, create_access_token, get_current_user
from models.schemas import SignupForm, TokenResponse, DoctorCreate
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/signup", status_code=status.HTTP_201_CREATED)
async def signup(data: SignupForm):
logger.info(f"Signup attempt for email: {data.email}")
email = data.email.lower().strip()
existing = await users_collection.find_one({"email": email})
if existing:
logger.warning(f"Signup failed: Email already exists: {email}")
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already exists"
)
hashed_pw = hash_password(data.password)
user_doc = {
"email": email,
"full_name": data.full_name.strip(),
"password": hashed_pw,
"role": "patient",
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"device_token": data.device_token or "" # Store device token if provided
}
try:
result = await users_collection.insert_one(user_doc)
logger.info(f"User created successfully: {email}")
return {
"status": "success",
"id": str(result.inserted_id),
"email": email
}
except Exception as e:
logger.error(f"Failed to create user {email}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create user: {str(e)}"
)
@router.post("/admin/doctors", status_code=status.HTTP_201_CREATED)
async def create_doctor(
data: DoctorCreate,
current_user: dict = Depends(get_current_user)
):
logger.info(f"Doctor creation attempt by {current_user.get('email')}")
if current_user.get('role') != 'admin':
logger.warning(f"Unauthorized doctor creation attempt by {current_user.get('email')}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can create doctor accounts"
)
email = data.email.lower().strip()
existing = await users_collection.find_one({"email": email})
if existing:
logger.warning(f"Doctor creation failed: Email already exists: {email}")
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already exists"
)
hashed_pw = hash_password(data.password)
doctor_doc = {
"email": email,
"full_name": data.full_name.strip(),
"password": hashed_pw,
"role": "doctor",
"specialty": data.specialty,
"license_number": data.license_number,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"device_token": data.device_token or "" # Store device token if provided
}
try:
result = await users_collection.insert_one(doctor_doc)
logger.info(f"Doctor created successfully: {email}")
return {
"status": "success",
"id": str(result.inserted_id),
"email": email
}
except Exception as e:
logger.error(f"Failed to create doctor {email}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create doctor: {str(e)}"
)
@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
logger.info(f"Login attempt for email: {form_data.username}")
user = await users_collection.find_one({"email": form_data.username.lower()})
if not user or not verify_password(form_data.password, user["password"]):
logger.warning(f"Login failed for {form_data.username}: Invalid credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Update device token if provided in form_data (e.g., from frontend)
if hasattr(form_data, 'device_token') and form_data.device_token:
await users_collection.update_one(
{"email": user["email"]},
{"$set": {"device_token": form_data.device_token}}
)
logger.info(f"Device token updated for {form_data.username}")
access_token = create_access_token(data={"sub": user["email"]})
logger.info(f"Successful login for {form_data.username}")
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.get("role", "patient")
}
@router.get("/me")
async def get_me(request: Request, current_user: dict = Depends(get_current_user)):
logger.info(f"Fetching user profile for {current_user['email']} at {datetime.utcnow().isoformat()}")
print(f"Headers: {request.headers}")
try:
user = await users_collection.find_one({"email": current_user["email"]})
if not user:
logger.warning(f"User not found: {current_user['email']}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
response = {
"id": str(user["_id"]),
"email": user["email"],
"full_name": user.get("full_name", ""),
"role": user.get("role", "patient"),
"specialty": user.get("specialty"),
"created_at": user.get("created_at"),
"updated_at": user.get("updated_at"),
"device_token": user.get("device_token", "") # Include device token in response
}
logger.info(f"User profile retrieved for {current_user['email']} at {datetime.utcnow().isoformat()}")
return response
except Exception as e:
logger.error(f"Database error for user {current_user['email']}: {str(e)} at {datetime.utcnow().isoformat()}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
# Export the router as 'auth' for api.__init__.py
auth = router |