CPS-API / api /routes.py
Ali2206's picture
Update api/routes.py
f9eed38 verified
raw
history blame
9.37 kB
from fastapi import APIRouter, HTTPException, Depends, Body
from fastapi.security import OAuth2PasswordRequestForm
from models.schemas import SignupForm, TokenResponse, PatientCreate, DoctorCreate, AppointmentCreate
from db.mongo import users_collection, patients_collection, appointments_collection
from core.security import hash_password, verify_password, create_access_token, get_current_user
from datetime import datetime
from bson import ObjectId
from bson.errors import InvalidId
from typing import Optional
from pydantic import BaseModel
router = APIRouter()
# --- SIGNUP ---
@router.post("/signup")
async def signup(data: SignupForm):
if data.role != "patient":
raise HTTPException(status_code=403, detail="Only patients can sign up via this route")
email = data.email.lower().strip()
existing = await users_collection.find_one({"email": email})
if existing:
raise HTTPException(status_code=409, detail="Email already exists")
hashed_pw = hash_password(data.password)
user_doc = {
"email": email,
"full_name": data.full_name.strip(),
"password": hashed_pw,
"role": "patient",
"created_at": datetime.utcnow()
}
await users_collection.insert_one(user_doc)
return {"success": True, "message": "Patient account created"}
# --- ADMIN: Create doctor account ---
@router.post("/admin/create-doctor")
async def create_doctor(data: DoctorCreate):
existing = await users_collection.find_one({"email": data.email})
if existing:
raise HTTPException(status_code=409, detail="Email already exists")
hashed_pw = hash_password(data.password)
user_doc = {
"matricule": data.matricule,
"email": data.email.lower().strip(),
"full_name": data.full_name.strip(),
"specialty": data.specialty,
"password": hashed_pw,
"role": "doctor",
"created_at": datetime.utcnow()
}
await users_collection.insert_one(user_doc)
return {"success": True, "message": "Doctor account created"}
# --- GET ALL DOCTORS ---
@router.get("/doctors")
async def list_doctors():
cursor = users_collection.find({"role": "doctor"})
doctors = []
async for doc in cursor:
doctors.append({
"id": str(doc["_id"]),
"full_name": doc.get("full_name", ""),
"email": doc.get("email", ""),
"matricule": doc.get("matricule", ""),
"specialty": doc.get("specialty", ""),
"created_at": doc.get("created_at"),
})
return doctors
# --- LOGIN ---
@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
email = form_data.username.lower().strip()
user = await users_collection.find_one({"email": email})
if not user or not verify_password(form_data.password, user["password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(data={"sub": user["email"]})
return {"access_token": access_token, "token_type": "bearer"}
# --- GET CURRENT USER ---
@router.get("/me")
async def get_me(current_user: dict = Depends(get_current_user)):
user = await users_collection.find_one({"email": current_user["email"]})
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": str(user["_id"]), # ✅ needed in frontend to submit appointment
"email": user["email"],
"full_name": user.get("full_name", ""),
"role": user.get("role", "unknown"),
"specialty": user.get("specialty", None) if user.get("role") == "doctor" else None,
"created_at": user.get("created_at", "")
}
# --- ADD NEW PATIENT ---
@router.post("/patients")
async def add_patient(data: PatientCreate, current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can add patients")
patient_doc = {
**data.dict(),
"date_of_birth": datetime.combine(data.date_of_birth, datetime.min.time()),
"contact": data.contact.dict() if data.contact else {},
"created_by": current_user["email"],
"created_at": datetime.utcnow()
}
result = await patients_collection.insert_one(patient_doc)
return {"id": str(result.inserted_id), "message": "Patient created successfully"}
# --- GET ALL PATIENTS ---
@router.get("/patients")
async def list_patients(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can view patients")
patients_cursor = patients_collection.find({"created_by": current_user["email"]})
patients = []
async for patient in patients_cursor:
patients.append({
"id": str(patient["_id"]),
"full_name": patient.get("full_name", ""),
"date_of_birth": patient.get("date_of_birth"),
"gender": patient.get("gender", ""),
"notes": patient.get("notes", "")
})
return patients
# --- COUNT PATIENTS ---
@router.get("/patients-count")
async def count_patients(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can count patients")
count = await patients_collection.count_documents({"created_by": current_user["email"]})
return {"count": count}
# --- CREATE APPOINTMENT ---
@router.post("/appointments")
async def create_appointment(data: AppointmentCreate, current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "patient":
raise HTTPException(status_code=403, detail="Only patients can book appointments")
# Get patient user info
patient_user = await users_collection.find_one({"email": current_user["email"]})
if not patient_user:
raise HTTPException(status_code=404, detail="Patient user not found")
# Insert appointment
appointment_doc = {
"patient_id": patient_user["_id"],
"doctor_id": ObjectId(data.doctor_id),
"date": data.date,"date": datetime.combine(data.date, datetime.min.time()),
"time": data.time.strftime("%H:%M:%S"),
"reason": data.reason,
"created_by": current_user["email"],
"created_at": datetime.utcnow()
}
await appointments_collection.insert_one(appointment_doc)
# Auto-add to doctor's patient list if not already
existing = await patients_collection.find_one({
"user_email": current_user["email"],
"created_by": await get_doctor_email_by_id(data.doctor_id)
})
if not existing:
await patients_collection.insert_one({
"full_name": patient_user.get("full_name", ""),
"user_email": patient_user["email"],
"gender": "",
"created_by": await get_doctor_email_by_id(data.doctor_id),
"created_at": datetime.utcnow()
})
return {"message": "Appointment booked successfully"}
# --- Helper function ---
async def get_doctor_email_by_id(doctor_id: str) -> Optional[str]:
try:
doc = await users_collection.find_one({"_id": ObjectId(doctor_id), "role": "doctor"})
return doc["email"] if doc else None
except Exception:
return None
# --- LIST DOCTOR'S APPOINTMENTS ---
@router.get("/appointments/doctor")
async def list_doctor_appointments(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "doctor":
raise HTTPException(status_code=403, detail="Only doctors can view this")
cursor = appointments_collection.find({"doctor_id": ObjectId(current_user["_id"])})
appointments = []
async for a in cursor:
try:
patient_id = a.get("patient_id")
if not isinstance(patient_id, ObjectId):
patient_id = ObjectId(patient_id)
patient = await users_collection.find_one({"_id": patient_id})
except Exception:
patient = None
appointments.append({
"_id": str(a["_id"]),
"doctor_id": str(a["doctor_id"]),
"patient": {
"full_name": patient.get("full_name", "Unknown") if patient else "Unknown",
"email": patient.get("email", "") if patient else "",
},
"date": a.get("date").strftime("%Y-%m-%d") if isinstance(a.get("date"), datetime) else a.get("date"),
"time": a.get("time", ""),
"reason": a.get("reason", "")
})
return appointments
# --- LIST PATIENT'S APPOINTMENTS ---
@router.get("/appointments/patient")
async def list_my_appointments(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != "patient":
raise HTTPException(status_code=403, detail="Only patients can view their appointments")
user = await users_collection.find_one({"email": current_user["email"]})
if not user:
raise HTTPException(status_code=404, detail="User not found")
patient_id = user.get("_id")
cursor = appointments_collection.find({"patient_id": patient_id})
return [{**a, "_id": str(a["_id"]), "patient_id": str(a["patient_id"]), "doctor_id": str(a["doctor_id"])} async for a in cursor]