Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, Depends, Body | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from models.schemas import SignupForm, TokenResponse, PatientCreate, DoctorCreate, AppointmentCreate | |
| from db.mongo import users_collection, patients_collection, appointments_collection | |
| from core.security import hash_password, verify_password, create_access_token, get_current_user | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| from typing import Optional | |
| from pydantic import BaseModel | |
| router = APIRouter() | |
| # --- SIGNUP --- | |
| async def signup(data: SignupForm): | |
| if data.role != "patient": | |
| raise HTTPException(status_code=403, detail="Only patients can sign up via this route") | |
| email = data.email.lower().strip() | |
| existing = await users_collection.find_one({"email": email}) | |
| if existing: | |
| raise HTTPException(status_code=409, detail="Email already exists") | |
| hashed_pw = hash_password(data.password) | |
| user_doc = { | |
| "email": email, | |
| "full_name": data.full_name.strip(), | |
| "password": hashed_pw, | |
| "role": "patient", | |
| "created_at": datetime.utcnow() | |
| } | |
| await users_collection.insert_one(user_doc) | |
| return {"success": True, "message": "Patient account created"} | |
| # --- ADMIN: Create doctor account --- | |
| async def create_doctor(data: DoctorCreate): | |
| existing = await users_collection.find_one({"email": data.email}) | |
| if existing: | |
| raise HTTPException(status_code=409, detail="Email already exists") | |
| hashed_pw = hash_password(data.password) | |
| user_doc = { | |
| "matricule": data.matricule, | |
| "email": data.email.lower().strip(), | |
| "full_name": data.full_name.strip(), | |
| "specialty": data.specialty, | |
| "password": hashed_pw, | |
| "role": "doctor", | |
| "created_at": datetime.utcnow() | |
| } | |
| await users_collection.insert_one(user_doc) | |
| return {"success": True, "message": "Doctor account created"} | |
| # --- LOGIN --- | |
| async def login(form_data: OAuth2PasswordRequestForm = Depends()): | |
| email = form_data.username.lower().strip() | |
| user = await users_collection.find_one({"email": email}) | |
| if not user or not verify_password(form_data.password, user["password"]): | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| access_token = create_access_token(data={"sub": user["email"]}) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| # --- GET CURRENT USER --- | |
| async def get_me(current_user: dict = Depends(get_current_user)): | |
| return { | |
| "email": current_user["email"], | |
| "full_name": current_user.get("full_name", ""), | |
| "role": current_user.get("role", "unknown"), | |
| "created_at": current_user.get("created_at", "") | |
| } | |
| # --- ADD NEW PATIENT --- | |
| async def add_patient(data: PatientCreate, current_user: dict = Depends(get_current_user)): | |
| if current_user.get("role") != "doctor": | |
| raise HTTPException(status_code=403, detail="Only doctors can add patients") | |
| patient_doc = { | |
| **data.dict(), | |
| "date_of_birth": datetime.combine(data.date_of_birth, datetime.min.time()), | |
| "contact": data.contact.dict() if data.contact else {}, | |
| "created_by": current_user["email"], | |
| "created_at": datetime.utcnow() | |
| } | |
| result = await patients_collection.insert_one(patient_doc) | |
| return {"id": str(result.inserted_id), "message": "Patient created successfully"} | |
| # --- GET ALL PATIENTS --- | |
| async def list_patients(current_user: dict = Depends(get_current_user)): | |
| if current_user.get("role") != "doctor": | |
| raise HTTPException(status_code=403, detail="Only doctors can view patients") | |
| patients_cursor = patients_collection.find({"created_by": current_user["email"]}) | |
| patients = [] | |
| async for patient in patients_cursor: | |
| patients.append({ | |
| "id": str(patient["_id"]), | |
| "full_name": patient.get("full_name", ""), | |
| "date_of_birth": patient.get("date_of_birth"), | |
| "gender": patient.get("gender", ""), | |
| "notes": patient.get("notes", "") | |
| }) | |
| return patients | |
| # --- COUNT PATIENTS --- | |
| async def count_patients(current_user: dict = Depends(get_current_user)): | |
| if current_user.get("role") != "doctor": | |
| raise HTTPException(status_code=403, detail="Only doctors can count patients") | |
| count = await patients_collection.count_documents({"created_by": current_user["email"]}) | |
| return {"count": count} | |
| # ========================= | |
| # APPOINTMENT ROUTES | |
| # ========================= | |
| # --- CREATE APPOINTMENT (doctor only) --- | |
| async def create_appointment(data: AppointmentCreate, current_user: dict = Depends(get_current_user)): | |
| if current_user.get("role") != "doctor": | |
| raise HTTPException(status_code=403, detail="Only doctors can create appointments") | |
| appointment_doc = { | |
| "patient_id": ObjectId(data.patient_id), | |
| "doctor_id": ObjectId(data.doctor_id), | |
| "date": data.date, | |
| "time": data.time, | |
| "reason": data.reason, | |
| "created_by": current_user["email"], | |
| "created_at": datetime.utcnow() | |
| } | |
| await appointments_collection.insert_one(appointment_doc) | |
| return {"message": "Appointment created successfully"} | |
| # --- LIST DOCTOR'S APPOINTMENTS --- | |
| async def list_doctor_appointments(current_user: dict = Depends(get_current_user)): | |
| if current_user.get("role") != "doctor": | |
| raise HTTPException(status_code=403, detail="Only doctors can view this") | |
| cursor = appointments_collection.find({"doctor_id": {"$exists": True}}) | |
| return [{**a, "_id": str(a["_id"]), "patient_id": str(a["patient_id"]), "doctor_id": str(a["doctor_id"])} async for a in cursor] | |
| # --- LIST PATIENT'S APPOINTMENTS --- | |
| async def list_my_appointments(current_user: dict = Depends(get_current_user)): | |
| if current_user.get("role") != "patient": | |
| raise HTTPException(status_code=403, detail="Only patients can view their appointments") | |
| user = await users_collection.find_one({"email": current_user["email"]}) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| patient_id = user.get("_id") | |
| cursor = appointments_collection.find({"patient_id": patient_id}) | |
| return [{**a, "_id": str(a["_id"]), "patient_id": str(a["patient_id"]), "doctor_id": str(a["doctor_id"])} async for a in cursor] | |