| from fastapi import FastAPI, Request, Depends, HTTPException, status, File, UploadFile, Form , Header , Body |
| from fastapi.responses import JSONResponse, FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from sqlalchemy import create_engine, text |
| from sqlalchemy.orm import sessionmaker |
| import pymysql |
| import jwt |
| import random |
| import string |
| import datetime |
| import os |
| import uuid |
| from dotenv import load_dotenv |
| from twilio.rest import Client |
| import requests |
| from werkzeug.utils import secure_filename |
| from pydantic import BaseModel |
| from typing import Optional |
| from sqlalchemy.orm import Session |
| from typing import Optional, Dict , List, Any |
|
|
| load_dotenv() |
|
|
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| DB_HOST = os.getenv('host') |
| DB_DATABASE = os.getenv('db') |
| DB_USERNAME = os.getenv('username') |
| DB_PASSWORD = os.getenv('psswd') |
| DATABASE_URL = f"mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_DATABASE}" |
| engine = create_engine(DATABASE_URL, pool_pre_ping=True) |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
|
|
| account_sid = os.getenv('acc_sid') |
| auth_token = os.getenv('auth_token') |
| twilio_number = os.getenv('tn') |
|
|
| JWT_SECRET_KEY = 'CkOPcOppyh31sQcisbyOM3RKD4C2G7SzQmuG5LePt9XBarsxgjm0fc7uOECcqoGm' |
| JWT_ALGORITHM = "HS256" |
| |
| UPLOAD_FOLDER = 'uploads' |
| if not os.path.exists(UPLOAD_FOLDER): |
| os.makedirs(UPLOAD_FOLDER) |
|
|
| app.mount("/uploads", StaticFiles(directory=UPLOAD_FOLDER), name="uploads") |
|
|
| BASE_URL = 'https://ani14-wound-app.hf.space' |
| ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} |
|
|
| |
| def get_db(): |
| db = SessionLocal() |
| try: |
| yield db |
| finally: |
| db.close() |
|
|
| |
| def allowed_file(filename): |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
| def generate_session_id(): |
| return str(uuid.uuid4()) |
|
|
| def generate_license_key(): |
| return ''.join(random.choices(string.ascii_uppercase + string.digits, k=12)) |
|
|
| def generate_patient_id(): |
| with SessionLocal() as session: |
| result = session.execute(text("SELECT MAX(id) FROM patients")).fetchone() |
| last_id = result[0] if result[0] is not None else 0 |
| prefix = "AB" |
| formatted_id = f"{prefix}000{last_id + 1}" |
| return formatted_id |
|
|
| def send_sms(phone, otp): |
| client = Client(account_sid, auth_token) |
| message = client.messages.create( |
| body=f"Your verification code is: {otp}. Don't share this code with anyone; our employees will never ask for the code.", |
| from_=twilio_number, |
| to=phone |
| ) |
|
|
| def generate_otp(): |
| return str(random.randint(1000, 9999)) |
|
|
| def update_otp_in_database(session, phone, otp, expiry_time, updated_at): |
| try: |
| query = text("UPDATE organisations SET otp= :otp, otp_expiry= :expiry_time, updated_at = :updated_at WHERE phone= :phone") |
| session.execute(query, {'otp': otp, 'expiry_time': expiry_time, 'phone': phone, 'updated_at': updated_at}) |
| session.commit() |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| def update_otp_in_med_database(session, phone, otp, expiry_time, updated_at): |
| try: |
| query = text("UPDATE users SET otp= :otp, otp_expiry= :expiry_time, updated_at = :updated_at WHERE phone= :phone") |
| session.execute(query, {'otp': otp, 'expiry_time': expiry_time, 'phone': phone, 'updated_at': updated_at}) |
| session.commit() |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
| def verify_jwt_token(request: Request): |
| auth_header = request.headers.get('Authorization') |
| if not auth_header or not auth_header.startswith('Bearer '): |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid Authorization header') |
| token = auth_header.split(' ')[1] |
| try: |
| payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=['HS256']) |
| return payload |
| except jwt.ExpiredSignatureError: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token has expired') |
| except jwt.InvalidTokenError: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') |
|
|
| |
| class SendEmailRequest(BaseModel): |
| name: str |
| email: str |
| c_code: str |
| phone: str |
|
|
| class VerifyLicenseKeyRequest(BaseModel): |
| email: str |
| license_key: str |
|
|
| class CreatePinRequest(BaseModel): |
| license_key: str |
| email: str |
| pin: str |
| name: Optional[str] = None |
| c_code: Optional[str] = None |
| phone: Optional[str] = None |
|
|
| class FetchDataRequest(BaseModel): |
| email: str |
|
|
| class SaveAdditionalDataRequest(BaseModel): |
| department: str |
| location: str |
| email: str |
| latitude: float |
| longitude: float |
|
|
| class AddWoundDetailsRequest(BaseModel): |
| length: float |
| breadth: float |
| depth: float |
| area: float |
| moisture: str |
| wound_location: Optional[str] = None |
| tissue: Optional[str] = None |
| exudate: Optional[str] = None |
| periwound: Optional[str] = None |
| periwound_type: Optional[str] = None |
| patient_id: str |
| type: Optional[str] = None |
| category: Optional[str] = None |
| edge: Optional[str] = None |
| infection: Optional[str] = None |
| last_dressing_date: Optional[str] = None |
|
|
|
|
|
|
|
|
| class AddPatientRequest(BaseModel): |
| name: str |
| dob: str |
| gender: str |
| age: int |
| height: float |
| weight: float |
| email: str |
| doctor: Optional[str] = None |
|
|
|
|
|
|
| class SearchPatientRequest(BaseModel): |
| name: str |
|
|
| class GeneratePrescriptionRequest(BaseModel): |
| patient_id: str |
|
|
| class VerifyPinRequest(BaseModel): |
| email: str |
| pin: str |
|
|
| class SendOtpRequest(BaseModel): |
| phone: str |
|
|
| class UpdateScheduledDateRequest(BaseModel): |
| email: str |
| patient_id: str |
| doctor: str |
| scheduled_date: str |
|
|
| class TotalAppointmentsTillDateRequest(BaseModel): |
| date: str |
|
|
| class TotalAppointmentsTillMonthRequest(BaseModel): |
| year: int |
| month: int |
|
|
| class ChangePinRequest(BaseModel): |
| email: str |
| current_pin: str |
| new_pin: str |
|
|
| class ForgotPinRequest(BaseModel): |
| email: str |
| otp: int |
| new_pin: str |
|
|
| class OrganisationDetailsRequest(BaseModel): |
| email: str |
|
|
| class UpdatePatientDetailsRequest(BaseModel): |
| patient_id: str |
| allergies: Optional[str] = None |
| past_history: Optional[str] = None |
|
|
| class PatientDetailsRequest(BaseModel): |
| patient_id: str |
|
|
| class StoreImageRequest(BaseModel): |
| patient_id: str |
|
|
| class GetImageRequest(BaseModel): |
| patient_id: str |
|
|
| class StoreWoundImageRequest(BaseModel): |
| patient_id: str |
|
|
| class GetWoundImageRequest(BaseModel): |
| patient_id: str |
|
|
| class StoreMedImageRequest(BaseModel): |
| email: str |
|
|
| class GetMedImageRequest(BaseModel): |
| email: str |
|
|
| class StoreOrgImageRequest(BaseModel): |
| email: str |
|
|
| class GetOrgImageRequest(BaseModel): |
| email: str |
|
|
| class SaveNotesRequest(BaseModel): |
| patient_id: str |
| notes: str |
|
|
| class TotalAppointmentsRequest(BaseModel): |
| start_date: str |
| end_date: str |
| doctor: str |
|
|
| class AdminAddPractitionerRequest(BaseModel): |
| name: str |
| email: str |
| c_code: str |
| phone: str |
| org: str |
|
|
| class UpdateOrgProfileRequest(BaseModel): |
| email: str |
| name: Optional[str] = None |
| department: Optional[str] = None |
| about: Optional[str] = None |
| location: Optional[str] = None |
| latitude: Optional[float] = None |
| longitude: Optional[float] = None |
|
|
| class UpdateMedProfileRequest(BaseModel): |
| email: str |
| name: Optional[str] = None |
| department: Optional[str] = None |
| about: Optional[str] = None |
| location: Optional[str] = None |
| latitude: Optional[float] = None |
| longitude: Optional[float] = None |
|
|
| class AddWoundDetailsV2Request(BaseModel): |
| length: float |
| breadth: float |
| depth: float |
| area: float |
| moisture: str |
| wound_location: Optional[str] = None |
| tissue: Optional[str] = None |
| exudate: Optional[str] = None |
| periwound: Optional[str] = None |
| periwound_type: Optional[str] = None |
| patient_id: str |
| type: Optional[str] = None |
| category: Optional[str] = None |
| edge: Optional[str] = None |
| infection: Optional[str] = None |
| last_dressing_date: Optional[str] = None |
|
|
| class WoundProgressTimelineRequest(BaseModel): |
| patient_id: str |
|
|
| class AddPatientV2Request(BaseModel): |
| name: str |
| dob: str |
| gender: str |
| age: int |
| height: float |
| weight: float |
| email: str |
| doctor: str |
| role: str |
|
|
| class SaveNotesV2Request(BaseModel): |
| patient_id: str |
| notes: Optional[str] = None |
| remarks: Optional[str] = None |
|
|
| class AdminAddPractitionerV2Request(BaseModel): |
| name: str |
| email: str |
| c_code: str |
| phone: str |
| org_email: str |
|
|
| class SortPatientsRequest(BaseModel): |
| email: str |
| date: Optional[str] = None |
|
|
| class GetAppointmentCountRequest(BaseModel): |
| email: str |
| date: str |
|
|
| class AddWoundDetailsV3Request(BaseModel): |
| length: float |
| breadth: float |
| depth: float |
| area: float |
| moisture: str |
| wound_location: Optional[str] = None |
| tissue: Optional[str] = None |
| exudate: Optional[str] = None |
| periwound: Optional[str] = None |
| periwound_type: Optional[str] = None |
| patient_id: str |
| type: Optional[str] = None |
| category: Optional[str] = None |
| edge: Optional[str] = None |
| infection: Optional[str] = None |
| last_dressing_date: Optional[str] = None |
|
|
|
|
| |
|
|
| @app.post("/send_email") |
| async def send_email(request_data: SendEmailRequest, db: SessionLocal = Depends(get_db)): |
| name = request_data.name |
| email = request_data.email |
| c_code = request_data.c_code |
| phone = request_data.phone |
|
|
| try: |
| with db as session: |
| query = text("SELECT email, phone FROM organisations WHERE email = :email OR phone = :phone") |
| existing_user = session.execute(query, {'email': email, 'phone': phone}).fetchone() |
|
|
| if existing_user: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Email or phone already exists. Please login.') |
| else: |
| license_key = generate_license_key() |
|
|
| email_payload = { |
| 'Recipient': email, |
| 'Subject': 'License key for SmartHeal', |
| 'Body': f'Your license key is: {license_key}', |
| 'ApiKey': '6A7339A3-E70B-4A8D-AA23-0264125F4959' |
| } |
|
|
| email_response = requests.post( |
| 'https://api.waysdatalabs.com/api/EmailSender/SendMail', |
| headers={}, |
| data=email_payload |
| ) |
|
|
| if email_response.status_code == 200: |
| return JSONResponse(content={'message': 'License key sent to email successfully', 'license_key': license_key}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail='Failed to send email') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/verify_license_key") |
| async def verify_license_key_endpoint(request_data: VerifyLicenseKeyRequest, db: SessionLocal = Depends(get_db)): |
| email = request_data.email |
| license_key = request_data.license_key |
|
|
| try: |
| with db as session: |
| query = text("SELECT licence_key FROM organisations WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
| if result and result.licence_key == license_key: |
| token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256') |
| return JSONResponse(content={'message': 'License key verified successfully', 'token': token}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid license key') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/create_pin") |
| async def create_pin_endpoint(request_data: CreatePinRequest, db: SessionLocal = Depends(get_db)): |
| license_key = request_data.license_key |
| email = request_data.email |
| pin = request_data.pin |
| name = request_data.name |
| c_code = request_data.c_code |
| phone = request_data.phone |
|
|
| created_at = datetime.datetime.utcnow() |
| updated_at = datetime.datetime.utcnow() |
|
|
| try: |
| with db as session: |
| token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256') |
| query = text("INSERT INTO organisations (name, email, c_code, phone, uuid, licence_key, pin, created_at, updated_at) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :pin, :created_at, :updated_at)") |
| session.execute(query, { |
| 'name': name, |
| 'email': email, |
| 'c_code': c_code, |
| 'phone': phone, |
| 'uuid': generate_session_id(), |
| 'license_key': license_key, |
| 'pin': pin, |
| 'created_at': created_at, |
| 'updated_at': updated_at |
| }) |
| session.commit() |
| return JSONResponse(content={'message': 'PIN created and data saved successfully', 'token': token}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/fetch_data") |
| async def fetch_name_phone(request_data: FetchDataRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| email = request_data.email |
|
|
| try: |
| with db as session: |
| query = text("SELECT name, phone FROM organisations WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchall() |
| result_dicts = [{'name': row[0], 'phone': row[1]} for row in result] |
| return JSONResponse(content=result_dicts, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/save_additional_data") |
| async def save_department_location(request_data: SaveAdditionalDataRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| department = request_data.department |
| location = request_data.location |
| email = request_data.email |
| latitude = request_data.latitude |
| longitude = request_data.longitude |
|
|
| try: |
| with db as session: |
| query = text("UPDATE organisations SET departments = :department, location = :location, latitude = :latitude, longitude = :longitude WHERE email = :email;") |
| session.execute(query, {'department': department, 'location': location, 'latitude': latitude, 'longitude': longitude, 'email': email}) |
| session.commit() |
| return JSONResponse(content={'message': 'Department, location, latitude, and longitude saved successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| class WoundDetailsRequest: |
| def __init__( |
| self, |
| length: float = Form(... ), |
| breadth: float = Form(...), |
| depth: float = Form(...), |
| area: float = Form(...), |
| moisture: str = Form(...), |
| wound_location: str = Form(...), |
| tissue: str = Form(...), |
| exudate: str = Form(...), |
| periwound: str = Form(...), |
| periwound_type: str = Form(...), |
| patient_id: str = Form(...), |
| ): |
| self.length = length |
| self.breadth = breadth |
| self.depth = depth |
| self.area = area |
| self.moisture = moisture |
| self.wound_location = wound_location |
| self.tissue = tissue |
| self.exudate = exudate |
| self.periwound = periwound |
| self.periwound_type = periwound_type |
| self.patient_id = patient_id |
|
|
| @app.post("/add_wound_details_v3") |
| async def add_wound_details_v3( |
| |
| db: Session = Depends(get_db), |
| payload: dict = Depends(verify_jwt_token), |
| |
| |
| image: UploadFile = File(...), |
| |
| |
| length: float = Form(...), |
| breadth: float = Form(...), |
| depth: float = Form(...), |
| area: float = Form(...), |
| moisture: str = Form(...), |
| wound_location: str = Form(...), |
| tissue: str = Form(...), |
| exudate: str = Form(...), |
| periwound: str = Form(...), |
| periwound_type: str = Form(...), |
| patient_id: str = Form(...), |
| type: str = Form(...), |
| category: str = Form(...), |
| edge: str = Form(...), |
| infection: str = Form(...), |
| last_dressing_date: str = Form(...), |
| ): |
| """ |
| This endpoint receives wound details and an image, saves the image, |
| and updates the database in two tables: `wounds` and `wound_images`. |
| """ |
| updated_at = datetime.datetime.utcnow() |
|
|
| |
| if not allowed_file(image.filename): |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail='Invalid image file type. Allowed types are png, jpg, jpeg.' |
| ) |
|
|
| |
| image_filename = secure_filename(f"{patient_id}_{int(updated_at.timestamp())}_{image.filename}") |
| |
| |
| patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id) |
| os.makedirs(patient_folder, exist_ok=True) |
| image_path = os.path.join(patient_folder, image_filename) |
| |
| |
| with open(image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
|
|
| |
| image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{image_filename}" |
|
|
| |
| try: |
| with db as session: |
| |
| query_select = text("SELECT area, id, created_at FROM wounds WHERE patient_id = :patient_id") |
| result = session.execute(query_select, {'patient_id': patient_id}).fetchone() |
|
|
| if not result: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found') |
|
|
| existing_area = result[0] if result[0] is not None else 0.0 |
| wound_id = result[1] |
| created_at = result[2] |
| |
| |
| area_difference = float(area) - float(existing_area) |
| size_variation = 'wound area same' |
| if area_difference > 0: size_variation = 'wound area increased' |
| elif area_difference < 0: size_variation = 'wound area reduced' |
|
|
| |
| query_insert = text(""" |
| INSERT INTO wound_images (depth, width, height, uuid, updated_at, patient_id, size_variation, image, wound_id, created_at, area) |
| VALUES (:depth, :breadth, :length, :uuid, :updated_at, :patient_id, :size_variation, :image_url, :wound_id, :created_at, :area) |
| """) |
| session.execute(query_insert, { |
| 'depth': depth, 'breadth': breadth, 'length': length, |
| 'uuid': generate_session_id(), 'updated_at': updated_at, |
| 'patient_id': patient_id, 'size_variation': size_variation, |
| 'image_url': image_url, 'wound_id': wound_id, |
| 'created_at': created_at, 'area': area |
| }) |
|
|
| |
| query_update = text(""" |
| UPDATE wounds |
| SET height = :length, width = :breadth, depth = :depth, area = :area, |
| moisture = :moisture, position = :wound_location, tissue = :tissue, |
| exudate = :exudate, periwound = :periwound, periwound_type = :periwound_type, |
| type = :type, category = :category, edges = :edge, infection = :infection, |
| last_dressing = :last_dressing_date, image = :image_url, updated_at = :updated_at |
| WHERE patient_id = :patient_id; |
| """) |
| session.execute(query_update, { |
| 'length': length, 'breadth': breadth, 'depth': depth, 'area': area, |
| 'moisture': moisture, 'wound_location': wound_location, 'tissue': tissue, |
| 'exudate': exudate, 'periwound': periwound, 'periwound_type': periwound_type, |
| 'type': type, 'category': category, 'edge': edge, 'infection': infection, |
| 'last_dressing_date': last_dressing_date, 'patient_id': patient_id, |
| 'image_url': image_url, 'updated_at': updated_at |
| }) |
| |
| |
| session.commit() |
| |
| return {'message': 'Wound details and image stored successfully', 'image_url': image_url} |
| |
| except Exception as e: |
| |
| print(f"An error occurred: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
|
|
| class EmailRequest(BaseModel): |
| email: str |
|
|
| @app.get("/get_all_patient_details") |
| async def get_all_patient_details( |
| request: Request, |
| authorization: Optional[str] = Header(None), |
| db: Session = Depends(get_db) |
| ): |
| if not authorization or not authorization.startswith("Bearer "): |
| raise HTTPException(status_code=401, detail="Invalid Authorization header") |
|
|
| token = authorization.split(" ")[1] |
|
|
| try: |
| payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM]) |
| except jwt.ExpiredSignatureError: |
| raise HTTPException(status_code=401, detail="Token has expired") |
| except JWTError: |
| raise HTTPException(status_code=401, detail="Invalid token") |
|
|
| try: |
| body = await request.json() |
| email = body.get("email") |
| if not email: |
| raise HTTPException(status_code=400, detail="Missing required field: email") |
|
|
| query = text("SELECT * FROM patients WHERE email = :email") |
| patient_details = db.execute(query, {"email": email}).fetchall() |
|
|
| patient_dicts = [dict(row._mapping) for row in patient_details] |
| return {"patient_details": patient_dicts} |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| from fastapi.exceptions import HTTPException |
| import traceback |
|
|
| @app.post("/add_patient") |
| def add_patient_endpoint( |
| input_data: AddPatientRequest, |
| db: Session = Depends(get_db), |
| jwt_payload: dict = Depends(verify_jwt_token), |
| ): |
| |
| patient_id = generate_patient_id() |
| session_uuid = generate_session_id() |
| now = datetime.datetime.utcnow() |
|
|
| try: |
| |
| |
| patient_params = { |
| "name": input_data.name, |
| "dob": input_data.dob, |
| "gender": input_data.gender, |
| "age": input_data.age, |
| "height": input_data.height, |
| "weight": input_data.weight, |
| "email": input_data.email, |
| "doctor": input_data.doctor, |
| "uuid": session_uuid, |
| "patient_id": patient_id, |
| "created_at": now, |
| "updated_at": now, |
| "scheduled_date": now |
| } |
|
|
| |
| db.execute( |
| text(""" |
| INSERT INTO patients |
| (name, dob, gender, age, height, weight, email, doctor, |
| uuid, patient_id, created_at, updated_at, scheduled_date) |
| VALUES |
| (:name, :dob, :gender, :age, :height, :weight, :email, :doctor, |
| :uuid, :patient_id, :created_at, :updated_at, :scheduled_date) |
| """), patient_params |
| ) |
|
|
| |
| db.execute( |
| text(""" |
| INSERT INTO wounds (uuid, patient_id, created_at, updated_at) |
| VALUES (:uuid, :patient_id, :created_at, :updated_at) |
| """), { |
| "uuid": session_uuid, |
| "patient_id": patient_id, |
| "created_at": now, |
| "updated_at": now |
| } |
| ) |
| |
| |
| db.commit() |
|
|
| except Exception as e: |
| |
| db.rollback() |
| |
| print(f"Error adding patient: {e}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"An internal error occurred while adding the patient: {e}" |
| ) |
|
|
| return {"message": "Patient added successfully", "patient_id": patient_id} |
|
|
|
|
|
|
| @app.get("/search_patient") |
| async def search_patient_endpoint(name: str, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| if not name: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Missing required fields') |
|
|
| try: |
| with db as session: |
| query = text("SELECT * FROM patients WHERE name = :name") |
| patients = session.execute(query, {'name': name}).fetchall() |
| |
| if not patients: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No patients found with this name') |
| |
| patients_dicts = [dict(row._mapping) for row in patients] |
| return JSONResponse(content={'patients': patients_dicts}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/generate_prescription") |
| async def generate_prescription_endpoint(patient_id: str, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| if not patient_id: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Missing required fields') |
|
|
| try: |
| with db as session: |
| patient_query = text("SELECT * FROM patients WHERE patient_id = :patient_id ") |
| patient = session.execute(patient_query, {'patient_id': patient_id}).fetchone() |
| if not patient: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No patient found with this ID') |
|
|
| wound_query = text("SELECT * FROM patients WHERE patient_id = :patient_id ") |
| wounds = session.execute(wound_query, {'patient_id': patient_id}).fetchall() |
|
|
| wound_category = [] |
| for wound in wounds: |
| area = wound.height * wound.width |
| if area <= 5: |
| dimension = 'Small' |
| elif 5 < area <= 20: |
| dimension = 'Medium' |
| else: |
| dimension = 'Large' |
| wound_category.append((wound.wound_type, dimension)) |
|
|
| medication_details = [] |
| for wound_type, dimension in wound_category: |
| medication_query = text(""" |
| SELECT * FROM WoundMedications |
| WHERE WoundType = :wound_type AND WoundDimensions = :dimension |
| """) |
| medications = session.execute(medication_query, {'wound_type': wound_type, 'dimension': dimension}).fetchall() |
| medication_details.extend(medications) |
|
|
| prescription = { |
| 'patient_details': dict(patient._mapping), |
| 'wound_details': [dict(w._mapping) for w in wounds], |
| 'medication_details': [dict(m._mapping) for m in medication_details] |
| } |
|
|
| return JSONResponse(content={'prescription': prescription}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/verify_pin") |
| async def verify_pin_endpoint(request_data: VerifyPinRequest, db: SessionLocal = Depends(get_db)): |
| email = request_data.email |
| pin = request_data.pin |
|
|
| try: |
| with db as session: |
| query = text("SELECT pin FROM organisations WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| if result.pin == pin: |
| return JSONResponse(content={'message': 'Pin verified successfully'}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid pin') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/send_otp") |
| async def send_otp_endpoint(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db)): |
| phone = request_data.phone |
|
|
| try: |
| with db as session: |
| query = text("SELECT * FROM organisations WHERE phone = :phone") |
| organisation = session.execute(query, {'phone': phone}).fetchone() |
|
|
| if organisation: |
| phone_with_code = organisation.c_code + organisation.phone |
| otp = "1234" |
| |
|
|
| updated_at = datetime.datetime.utcnow() |
| expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5) |
| update_otp_in_database(session, phone, otp, expiry_time, updated_at) |
| |
| token = jwt.encode({'email': organisation.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256') |
| return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'token': token, 'otp': otp, 'email': organisation.email, 'name': organisation.name, 'pin': organisation.pin}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/med_send_email") |
| async def med_send_email(request_data: SendEmailRequest, db: SessionLocal = Depends(get_db)): |
| name = request_data.name |
| email = request_data.email |
| c_code = request_data.c_code |
| phone = request_data.phone |
|
|
| try: |
| with db as session: |
| query = text("SELECT email, phone FROM users WHERE email = :email OR phone = :phone") |
| existing_user = session.execute(query, {'email': email, 'phone': phone}).fetchone() |
| if existing_user: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Email or phone already exists. Please login.') |
| else: |
| license_key = generate_license_key() |
|
|
| email_payload = { |
| 'Recipient': email, |
| 'Subject': 'License key for SmartHeal', |
| 'Body': f'Your license key is: {license_key}', |
| 'ApiKey': '6A7339A3-E70B-4A8D-AA23-0264125F4959' |
| } |
|
|
| email_response = requests.post( |
| 'https://api.waysdatalabs.com/api/EmailSender/SendMail', |
| headers={}, |
| data=email_payload |
| ) |
|
|
| if email_response.status_code == 200: |
| return JSONResponse(content={'message': 'License key sent to email successfully', 'license_key': license_key}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail='Failed to send email') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/med_verify_license_key") |
| async def med_verify_license_key(request_data: VerifyLicenseKeyRequest, db: SessionLocal = Depends(get_db)): |
| email = request_data.email |
| license_key = request_data.license_key |
|
|
| try: |
| with db as session: |
| query = text("SELECT licence_key FROM users WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
| if result and result.licence_key == license_key: |
| token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)}, JWT_SECRET_KEY, algorithm='HS256') |
| return JSONResponse(content={'message': 'License key verified successfully', 'token': token}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid license key') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/med_create_pin") |
| async def med_create_pin(request_data: CreatePinRequest, db: SessionLocal = Depends(get_db)): |
| license_key = request_data.license_key |
| email = request_data.email |
| pin = request_data.pin |
| name = request_data.name |
| c_code = request_data.c_code |
| phone = request_data.phone |
|
|
| created_at = datetime.datetime.utcnow() |
| updated_at = datetime.datetime.utcnow() |
|
|
| try: |
| with db as session: |
| org = "0" |
| token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256') |
| query = text("INSERT INTO users (name, email, c_code, phone, uuid, licence_key, pin, org, created_at, updated_at) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :pin, :org, :created_at, :updated_at)") |
| session.execute(query, { |
| 'name': name, |
| 'email': email, |
| 'c_code': c_code, |
| 'phone': phone, |
| 'uuid': generate_session_id(), |
| 'license_key': license_key, |
| 'pin': pin, |
| 'org': org, |
| 'created_at': created_at, |
| 'updated_at': updated_at |
| }) |
| session.commit() |
| return JSONResponse(content={'message': 'PIN created and data saved successfully', 'token': token}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/med_fetch_data") |
| async def med_fetch_name_phone(request_data: FetchDataRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| email = request_data.email |
|
|
| try: |
| with db as session: |
| query = text("SELECT name, phone FROM users WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchall() |
| result_dicts = [{'name': row[0], 'phone': row[1]} for row in result] |
| return JSONResponse(content=result_dicts, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/save_med_data") |
| async def med_save_department_location(request_data: Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| data = await request_data.json() |
| speciality = data.get('speciality') |
| location = data.get('location') |
| email = data.get('email') |
| latitude = data.get('latitude') |
| longitude = data.get('longitude') |
| org = data.get('org') |
| designation = data.get('designation') |
|
|
| if not (speciality and location and latitude and longitude and org and email): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Missing required fields') |
|
|
| try: |
| with db as session: |
| query = text("UPDATE users SET speciality = :speciality, location = :location, latitude = :latitude, longitude = :longitude, org = :org, designation = :designation WHERE email = :email;") |
| session.execute(query, {'speciality': speciality, 'location': location, 'latitude': latitude, 'longitude': longitude, 'org': org, 'designation': designation, 'email': email}) |
| session.commit() |
| return JSONResponse(content={'message': 'Speciality, location, organisation and designation saved successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("//med_verify_pin") |
| async def med_verify_pin(request_data: VerifyPinRequest, db: SessionLocal = Depends(get_db)): |
| email = request_data.email |
| pin = request_data.pin |
|
|
| try: |
| with db as session: |
| query = text("SELECT pin FROM users WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| if result.pin == pin: |
| return JSONResponse(content={'message': 'Pin verified successfully'}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid pin') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/med_send_otp") |
| async def med_send_otp(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db)): |
| phone = request_data.phone |
|
|
| try: |
| with db as session: |
| query = text("SELECT * FROM users WHERE phone = :phone") |
| user = session.execute(query, {'phone': phone}).fetchone() |
|
|
| if user: |
| phone_with_code = user.c_code + user.phone |
| otp = "1234" |
| |
|
|
| updated_at = datetime.datetime.utcnow() |
| expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5) |
| update_otp_in_med_database(session, phone, otp, expiry_time, updated_at) |
| |
| token = jwt.encode({'email': user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)}, JWT_SECRET_KEY, algorithm='HS256') |
| return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'token': token, 'otp': otp, 'email': user.email, 'name': user.name, 'pin': user.pin}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/update_scheduled_date") |
| async def update_scheduled_date_endpoint(request_data: UpdateScheduledDateRequest, db: SessionLocal = Depends(get_db)): |
| email = request_data.email |
| patient_id = request_data.patient_id |
| doctor = request_data.doctor |
| scheduled_date = request_data.scheduled_date |
|
|
| try: |
| with db as session: |
| query = text("UPDATE patients SET scheduled_date = :scheduled_date WHERE email = :email AND patient_id = :patient_id AND doctor = :doctor") |
| result = session.execute(query, {'scheduled_date': scheduled_date, 'email': email, 'patient_id': patient_id, 'doctor': doctor}) |
| session.commit() |
|
|
| if result.rowcount == 0: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No matching record found') |
|
|
| return JSONResponse(content={'message': 'Scheduled date updated successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/total_appointments_till_date") |
| async def total_appointments_till_date_endpoint(date: str, db: SessionLocal = Depends(get_db)): |
| if not date: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Date parameter is required') |
|
|
| try: |
| with db as session: |
| query = text("SELECT COUNT(*) as total_appointments FROM patients WHERE scheduled_date <= :date") |
| result = session.execute(query, {'date': date}).fetchone() |
|
|
| total_appointments = result[0] |
|
|
| return JSONResponse(content={'total_appointments': total_appointments}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/total_appointments_till_month") |
| async def total_appointments_till_month_endpoint(year: int, month: int, db: SessionLocal = Depends(get_db)): |
| if not (year and month): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Year and month parameters are required') |
|
|
| try: |
| with db as session: |
| query = text(""" |
| SELECT COUNT(*) as total_appointments |
| FROM patients |
| WHERE YEAR(scheduled_date) = :year AND MONTH(scheduled_date) = :month |
| """) |
| result = session.execute(query, {'year': year, 'month': month}).fetchone() |
|
|
| total_appointments = result[0] |
|
|
| return JSONResponse(content={'total_appointments': total_appointments}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/change_pin_org") |
| async def change_pin_org(request_data: ChangePinRequest, db: SessionLocal = Depends(get_db)): |
| email = request_data.email |
| current_pin = request_data.current_pin |
| new_pin = request_data.new_pin |
|
|
| try: |
| with db as session: |
| query = text("SELECT pin FROM organisations WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| if result.pin == current_pin: |
| update_query = text("UPDATE organisations SET pin = :new_pin WHERE email = :email") |
| session.execute(update_query, {'new_pin': new_pin, 'email': email}) |
| session.commit() |
| return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid current pin') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/forgot_pin_org") |
| async def forgot_pin_org(request_data: ForgotPinRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| email = request_data.email |
| otp = request_data.otp |
| new_pin = request_data.new_pin |
|
|
| try: |
| with db as session: |
| query = text("SELECT otp FROM organisations WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| if result.otp == otp: |
| update_query = text("UPDATE organisations SET pin = :new_pin WHERE email = :email") |
| session.execute(update_query, {'new_pin': new_pin, 'email': email}) |
| session.commit() |
| return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid OTP') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/organisation_details") |
| async def organisation_details(email: str, db: SessionLocal = Depends(get_db)): |
| if not email: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required') |
|
|
| try: |
| with db as session: |
| query = text(""" |
| SELECT o.name, o.departments, o.location, o.latitude, o.longitude, o.about, |
| o.profile_photo_path, COUNT(p.email) as patient_count |
| FROM organisations o |
| LEFT JOIN patients p ON o.email = p.email |
| WHERE o.email = :email |
| """) |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| response = { |
| 'name': result.name, |
| 'departments': result.departments, |
| 'location': result.location, |
| 'latitude': result.latitude, |
| 'longitude': result.longitude, |
| 'about': result.about, |
| 'profile_photo_path': result.profile_photo_path, |
| 'patient_count': result.patient_count |
| } |
| return JSONResponse(content=response, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/change_pin_med") |
| async def change_pin_med(request_data: ChangePinRequest, db: SessionLocal = Depends(get_db)): |
| email = request_data.email |
| current_pin = request_data.current_pin |
| new_pin = request_data.new_pin |
|
|
| try: |
| with db as session: |
| query = text("SELECT pin FROM users WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| if result.pin == current_pin: |
| update_query = text("UPDATE users SET pin = :new_pin WHERE email = :email") |
| session.execute(update_query, {'new_pin': new_pin, 'email': email}) |
| session.commit() |
| return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid current pin') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/forgot_pin_med") |
| async def forgot_pin_med(request_data: ForgotPinRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| email = request_data.email |
| otp = request_data.otp |
| new_pin = request_data.new_pin |
|
|
| try: |
| with db as session: |
| query = text("SELECT otp FROM users WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| if result.otp == otp: |
| update_query = text("UPDATE users SET pin = :new_pin WHERE email = :email") |
| session.execute(update_query, {'new_pin': new_pin, 'email': email}) |
| session.commit() |
| return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid OTP') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/med_details") |
| async def med_details(email: str, db: SessionLocal = Depends(get_db)): |
| if not email: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required') |
|
|
| try: |
| with db as session: |
| query = text(""" |
| SELECT o.name, o.departments, o.location, o.latitude, o.longitude, o.about, |
| o.profile_photo_path, COUNT(p.email) as patient_count |
| FROM users o |
| LEFT JOIN patients p ON o.email = p.email |
| WHERE o.email = :email |
| """) |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result: |
| response = { |
| 'name': result.name, |
| 'departments': result.departments, |
| 'location': result.location, |
| 'latitude': result.latitude, |
| 'longitude': result.longitude, |
| 'about': result.about, |
| 'profile_photo_path': result.profile_photo_path, |
| 'patient_count': result.patient_count |
| } |
| return JSONResponse(content=response, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/update_patient_details") |
| async def update_patient_details(request_data: UpdatePatientDetailsRequest, db: SessionLocal = Depends(get_db)): |
| patient_id = request_data.patient_id |
| allergies = request_data.allergies |
| past_history = request_data.past_history |
|
|
| try: |
| with db as session: |
| query = text(""" |
| UPDATE patients |
| SET allergy = :allergies, illness = :past_history |
| WHERE patient_id = :patient_id |
| """) |
| session.execute(query, { |
| 'allergies': allergies, |
| 'past_history': past_history, |
| 'patient_id': patient_id |
| }) |
| session.commit() |
|
|
| return JSONResponse(content={'message': 'Patient details updated successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/patient_details") |
| async def get_patient_details( |
| payload: Dict[str, Any] = Body(...), |
| db: Session = Depends(get_db), |
| jwt_payload: dict = Depends(verify_jwt_token) |
| ): |
| patient_id = payload.get('patient_id') |
|
|
| if not patient_id: |
| raise HTTPException( |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
| detail="The 'patient_id' field is required in the request body." |
| ) |
|
|
| try: |
| patient_query = text("SELECT * FROM patients WHERE patient_id = :patient_id") |
| patient_result = db.execute(patient_query, {'patient_id': patient_id}).fetchone() |
|
|
| if patient_result: |
| |
| patient_details = dict(patient_result._mapping) |
|
|
| wound_query = text("SELECT * FROM wounds WHERE patient_id = :patient_id") |
| wound_results = db.execute(wound_query, {'patient_id': patient_id}).fetchall() |
|
|
| wound_details_list = [dict(wound._mapping) for wound in wound_results] |
| |
| |
| patient_details['wound_details'] = wound_details_list |
|
|
| |
| |
| |
| |
| |
| for key, value in patient_details.items(): |
| if isinstance(value, (datetime.datetime, datetime.date)): |
| patient_details[key] = value.isoformat() |
| |
| |
| for wound in patient_details['wound_details']: |
| for key, value in wound.items(): |
| if isinstance(value, (datetime.datetime, datetime.date)): |
| wound[key] = value.isoformat() |
| |
| |
| |
| |
| return JSONResponse(content=patient_details, status_code=status.HTTP_200_OK) |
| |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found') |
|
|
| except Exception as e: |
| print(f"An error occurred while fetching details for patient_id {patient_id}: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| |
| @app.post("/store_image") |
| async def store_image(patient_id: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)): |
| if not allowed_file(image.filename): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type') |
|
|
| filename = secure_filename(image.filename) |
|
|
| try: |
| patient_folder = os.path.join(UPLOAD_FOLDER, 'patients', patient_id) |
| os.makedirs(patient_folder, exist_ok=True) |
|
|
| image_path = os.path.join(patient_folder, filename) |
| with open(image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
|
|
| image_url = f"{BASE_URL}/uploads/patients/{patient_id}/{filename}" |
|
|
| with db as session: |
| query = text("UPDATE patients SET profile_photo_path = :image_url WHERE patient_id = :patient_id") |
| session.execute(query, {'image_url': image_url, 'patient_id': patient_id}) |
| session.commit() |
|
|
| return JSONResponse(content={ |
| 'message': 'Image stored successfully', |
| 'image_url': image_url |
| }, status_code=status.HTTP_200_OK) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/get_image") |
| async def get_image(patient_id: str, db: SessionLocal = Depends(get_db)): |
| if not patient_id: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Patient ID parameter is required') |
|
|
| try: |
| with db as session: |
| query = text("SELECT profile_photo_path FROM patients WHERE patient_id = :patient_id") |
| result = session.execute(query, {'patient_id': patient_id}).fetchone() |
|
|
| if result and result.profile_photo_path: |
| image_url = result.profile_photo_path |
| filename = image_url.split('/')[-1] |
| |
| file_path = os.path.join(UPLOAD_FOLDER, 'patients', patient_id, filename) |
| if os.path.exists(file_path): |
| return FileResponse(file_path) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given patient ID') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/store_wound_image") |
| async def store_wound_image(patient_id: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)): |
| if not allowed_file(image.filename): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type') |
|
|
| filename = secure_filename(image.filename) |
|
|
| try: |
| patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id) |
| os.makedirs(patient_folder, exist_ok=True) |
|
|
| image_path = os.path.join(patient_folder, filename) |
| with open(image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
|
|
| image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{filename}" |
|
|
| with db as session: |
| query = text("UPDATE wounds SET image = :image_url WHERE patient_id = :patient_id") |
| session.execute(query, {'image_url': image_url, 'patient_id': patient_id}) |
| session.commit() |
|
|
| return JSONResponse(content={ |
| 'message': 'Image stored successfully', |
| 'image_url': image_url |
| }, status_code=status.HTTP_200_OK) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/get_wound_image") |
| async def get_wound_image(patient_id: str, db: SessionLocal = Depends(get_db)): |
| if not patient_id: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Patient ID parameter is required') |
|
|
| try: |
| with db as session: |
| query = text("SELECT image FROM wounds WHERE patient_id = :patient_id") |
| result = session.execute(query, {'patient_id': patient_id}).fetchone() |
|
|
| if result and result.image: |
| image_url = result.image |
| filename = image_url.split('/')[-1] |
| |
| file_path = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id, filename) |
| if os.path.exists(file_path): |
| return FileResponse(file_path) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given patient ID') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/store_med_image") |
| async def store_med_image(email: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)): |
| if not allowed_file(image.filename): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type') |
|
|
| filename = secure_filename(image.filename) |
|
|
| try: |
| med_practitioner_folder = os.path.join(UPLOAD_FOLDER, 'medical_practitioners', email) |
| os.makedirs(med_practitioner_folder, exist_ok=True) |
|
|
| image_path = os.path.join(med_practitioner_folder, filename) |
| with open(image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
| |
| image_url = f"{BASE_URL}/uploads/medical_practitioners/{email}/{filename}" |
|
|
| with db as session: |
| query = text("UPDATE users SET profile_photo_path = :image_url WHERE email = :email") |
| session.execute(query, {'image_url': image_url, 'email': email}) |
| session.commit() |
|
|
| return JSONResponse(content={'message': 'Image stored and path updated successfully', 'image_url': image_url}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/get_med_image") |
| async def get_med_image(email: str, db: SessionLocal = Depends(get_db)): |
| if not email: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required') |
|
|
| try: |
| with db as session: |
| query = text("SELECT profile_photo_path FROM users WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result and result.profile_photo_path: |
| image_url = result.profile_photo_path |
| filename = image_url.split('/')[-1] |
| |
| file_path = os.path.join(UPLOAD_FOLDER, 'medical_practitioners', email, filename) |
| if os.path.exists(file_path): |
| return FileResponse(file_path) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given email') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/store_org_image") |
| async def store_org_image(email: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)): |
| if not allowed_file(image.filename): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type') |
|
|
| filename = secure_filename(image.filename) |
|
|
| try: |
| org_folder = os.path.join(UPLOAD_FOLDER, 'organisations', email) |
| os.makedirs(org_folder, exist_ok=True) |
|
|
| image_path = os.path.join(org_folder, filename) |
| with open(image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
| |
| image_url = f"{BASE_URL}/uploads/organisations/{email}/{filename}" |
|
|
| with db as session: |
| query = text("UPDATE organisations SET profile_photo_path = :image_url WHERE email = :email") |
| session.execute(query, {'image_url': image_url, 'email': email}) |
| session.commit() |
|
|
| return JSONResponse(content={'message': 'Image stored and path updated successfully', 'image_url': image_url}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/get_org_image") |
| async def get_org_image(email: str, db: SessionLocal = Depends(get_db)): |
| if not email: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required') |
|
|
| try: |
| with db as session: |
| query = text("SELECT profile_photo_path FROM organisations WHERE email = :email") |
| result = session.execute(query, {'email': email}).fetchone() |
|
|
| if result and result.profile_photo_path: |
| image_url = result.profile_photo_path |
| filename = image_url.split('/')[-1] |
| |
| file_path = os.path.join(UPLOAD_FOLDER, 'organisations', email, filename) |
| if os.path.exists(file_path): |
| return FileResponse(file_path) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server') |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given email') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/save_notes") |
| async def save_notes(request_data: SaveNotesRequest, db: SessionLocal = Depends(get_db)): |
| patient_id = request_data.patient_id |
| notes = request_data.notes |
|
|
| try: |
| with db as session: |
| query = text("UPDATE patients SET notes = :notes WHERE patient_id = :patient_id") |
| session.execute(query, {'notes': notes, 'patient_id': patient_id}) |
| session.commit() |
|
|
| return JSONResponse(content={'message': 'Notes saved successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/total_appointments") |
| async def total_appointments(request_data: TotalAppointmentsRequest, db: SessionLocal = Depends(get_db)): |
| start_date_str = request_data.start_date |
| end_date_str = request_data.end_date |
| doctor = request_data.doctor |
|
|
| try: |
| start_date = datetime.datetime.strptime(start_date_str, '%Y-%m-%d') |
| end_date = datetime.datetime.strptime(end_date_str, '%Y-%m-%d') |
| |
| with db as session: |
| query = text(""" |
| SELECT DATE(scheduled_date) as date, COUNT(*) as total_appointments |
| FROM patients |
| WHERE scheduled_date BETWEEN :start_date AND :end_date |
| AND doctor = :doctor |
| GROUP BY DATE(scheduled_date) |
| ORDER BY DATE(scheduled_date) |
| """) |
|
|
| results = session.execute(query, { |
| 'start_date': start_date, |
| 'end_date': end_date, |
| 'doctor': doctor |
| }).fetchall() |
|
|
| appointments_by_day = {} |
|
|
| for row in results: |
| date_str = row.date.strftime('%Y-%m-%d') |
| appointments_by_day[date_str] = row.total_appointments |
|
|
| current_date = start_date |
| while current_date <= end_date: |
| date_str = current_date.strftime('%Y-%m-%d') |
| if date_str not in appointments_by_day: |
| appointments_by_day[date_str] = 0 |
| current_date += datetime.timedelta(days=1) |
|
|
| return JSONResponse(content={'appointments_by_day': appointments_by_day}, status_code=status.HTTP_200_OK) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/admin_add_practitioner") |
| async def admin_add_practitioner(request_data: AdminAddPractitionerRequest, db: SessionLocal = Depends(get_db)): |
| name = request_data.name |
| email = request_data.email |
| c_code = request_data.c_code |
| phone = request_data.phone |
| org = request_data.org |
|
|
| try: |
| with db as session: |
| query = text("SELECT email FROM users WHERE email = :email") |
| existing_email = session.execute(query, {'email': email}).fetchone() |
|
|
| query = text("SELECT phone FROM users WHERE phone = :phone") |
| existing_phone = session.execute(query, {'phone': phone}).fetchone() |
|
|
| if existing_email: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email already exists. Please login.') |
| elif existing_phone: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Phone number already exists. Please use another phone number.') |
| else: |
| uuid_val = generate_session_id() |
| license_key = generate_license_key() |
|
|
| query = text("INSERT INTO users (name, email, c_code, phone, uuid, licence_key, org) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :org)") |
| session.execute(query, { |
| 'name': name, |
| 'email': email, |
| 'c_code': c_code, |
| 'phone': phone, |
| 'uuid': uuid_val, |
| 'license_key': license_key, |
| 'org': org |
| }) |
| session.commit() |
| return JSONResponse(content={'message': 'Data added successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/update_org_profile") |
| async def update_org_profile(request_data: UpdateOrgProfileRequest, db: SessionLocal = Depends(get_db)): |
| name = request_data.name |
| department = request_data.department |
| about = request_data.about |
| location = request_data.location |
| latitude = request_data.latitude |
| longitude = request_data.longitude |
| email = request_data.email |
|
|
| if email is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email is required') |
|
|
| try: |
| with db as session: |
| query = text("SELECT email FROM organisations WHERE email = :email") |
| existing_org = session.execute(query, {'email': email}).fetchone() |
|
|
| if not existing_org: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found') |
|
|
| update_query = text(""" |
| UPDATE organisations |
| SET name = :name, |
| departments = :department, |
| about = :about, |
| location = :location, |
| latitude = :latitude, |
| longitude = :longitude |
| WHERE email = :email |
| """) |
| session.execute(update_query, { |
| 'name': name, |
| 'department': department, |
| 'about': about, |
| 'location': location, |
| 'latitude': latitude, |
| 'longitude': longitude, |
| 'email': email |
| }) |
| session.commit() |
| return JSONResponse(content={'message': 'Organisation profile updated successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/update_med_profile") |
| async def update_med_profile(request_data: UpdateMedProfileRequest, db: SessionLocal = Depends(get_db)): |
| name = request_data.name |
| department = request_data.department |
| about = request_data.about |
| location = request_data.location |
| latitude = request_data.latitude |
| longitude = request_data.longitude |
| email = request_data.email |
|
|
| if email is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email is required') |
|
|
| try: |
| with db as session: |
| query = text("SELECT email FROM users WHERE email = :email") |
| existing_user = session.execute(query, {'email': email}).fetchone() |
|
|
| if not existing_user: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Medical user not found') |
|
|
| update_query = text(""" |
| UPDATE users |
| SET name = :name, |
| departments = :department, |
| about = :about, |
| location = :location, |
| latitude = :latitude, |
| longitude = :longitude |
| WHERE email = :email |
| """) |
| session.execute(update_query, { |
| 'name': name, |
| 'department': department, |
| 'about': about, |
| 'location': location, |
| 'latitude': latitude, |
| 'longitude': longitude, |
| 'email': email |
| }) |
| session.commit() |
| return JSONResponse(content={'message': 'Medical profile updated successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/add_wound_details_v2") |
| async def add_wound_details_v2(request_data: AddWoundDetailsV2Request = Depends(), image: UploadFile = File(...), db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| length = request_data.length |
| breadth = request_data.breadth |
| depth = request_data.depth |
| area = request_data.area |
| moisture = request_data.moisture |
| wound_location = request_data.wound_location |
| tissue = request_data.tissue |
| exudate = request_data.exudate |
| periwound = request_data.periwound |
| periwound_type = request_data.periwound_type |
| patient_id = request_data.patient_id |
| type = request_data.type |
| category = request_data.category |
| edge = request_data.edge |
| infection = request_data.infection |
| last_dressing_date = request_data.last_dressing_date |
|
|
| updated_at = datetime.datetime.utcnow() |
|
|
| if not allowed_file(image.filename): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type') |
|
|
| filename = secure_filename(image.filename) |
|
|
| try: |
| with db as session: |
| query = text("SELECT area, id, created_at FROM wounds WHERE patient_id = :patient_id") |
| result = session.execute(query, {'patient_id': patient_id}).fetchone() |
| |
| if not result: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found') |
|
|
| existing_area = result[0] |
| if not existing_area: |
| existing_area = 0 |
| wound_id = result[1] |
| created_at = result[2] |
| area_difference = float(area) - float(existing_area) |
| if area_difference > 0: |
| size_variation = 'wound area increased' |
| elif area_difference < 0: |
| size_variation = 'wound area reduced' |
| else: |
| size_variation = 'wound area same' |
|
|
| patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id) |
| os.makedirs(patient_folder, exist_ok=True) |
| image_path = os.path.join(patient_folder, filename) |
| with open(image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
|
|
| image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{filename}" |
|
|
| query1 = text(""" |
| INSERT INTO wound_images (depth, width, height, uuid, updated_at, patient_id, size_variation, image, wound_id, created_at, area) |
| VALUES (:depth, :breadth, :length, :uuid, :updated_at, :patient_id, :size_variation, :image_url, :wound_id, :created_at, :area) |
| """) |
| session.execute(query1, { |
| 'depth': depth, 'breadth': breadth, 'length': length, 'uuid': generate_session_id(), |
| 'updated_at': updated_at, 'patient_id': patient_id, 'size_variation': size_variation, |
| 'image_url': image_url, 'wound_id': wound_id, 'created_at': created_at, 'area': area |
| }) |
|
|
| query2 = text(""" |
| UPDATE wounds SET height = :length, width = :breadth, depth = :depth, area = :area, |
| moisture = :moisture, position = :wound_location, tissue = :tissue, exudate = :exudate, |
| periwound = :periwound, periwound_type = :periwound_type, type = :type, category = :category, |
| edges = :edge, infection = :infection, updated_at = :updated_at, last_dressing = :last_dressing_date, |
| image = :image_url |
| WHERE patient_id = :patient_id |
| """) |
| session.execute(query2, { |
| 'length': length, 'breadth': breadth, 'depth': depth, 'area': area, |
| 'moisture': moisture, 'wound_location': wound_location, 'tissue': tissue, |
| 'exudate': exudate, 'periwound': periwound, 'periwound_type': periwound_type, |
| 'type': type, 'category': category, 'edge': edge, 'infection': infection, |
| 'updated_at': updated_at, 'last_dressing_date': last_dressing_date, 'patient_id': patient_id, |
| 'image_url': image_url |
| }) |
|
|
| session.commit() |
| return JSONResponse(content={ |
| 'message': 'Wound details and image stored successfully', |
| 'image_url': image_url |
| }, status_code=status.HTTP_200_OK) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/wound_progress_timeline") |
| async def get_wound_details_v2(patient_id: str, db: SessionLocal = Depends(get_db)): |
| if not patient_id: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='patient_id is required') |
|
|
| try: |
| with db as session: |
| query = text(""" |
| SELECT * FROM wound_images WHERE patient_id = :patient_id ORDER BY updated_at DESC """) |
| results = session.execute(query, {'patient_id': patient_id}).fetchall() |
|
|
| wound_details = [] |
| for row in results: |
| wound_details.append({ |
| 'length': row.height, |
| 'breadth': row.width, |
| 'depth': row.depth, |
| 'size_variation': row.size_variation, |
| 'image': row.image, |
| 'patient_id': row.patient_id, |
| 'area': row.area, |
| 'updated_at': row.updated_at.isoformat() |
| }) |
|
|
| return JSONResponse(content=wound_details, status_code=status.HTTP_200_OK) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/med/forgot/pin/otp") |
| async def med_forgot_pin_otp(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| phone = request_data.phone |
|
|
| try: |
| with db as session: |
| query = text("SELECT * FROM users WHERE phone = :phone") |
| user = session.execute(query, {'phone': phone}).fetchone() |
|
|
| if user: |
| phone_with_code = user.c_code + user.phone |
| otp = "1234" |
| |
|
|
| updated_at = datetime.datetime.utcnow() |
| expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5) |
| update_otp_in_med_database(session, phone, otp, expiry_time, updated_at) |
| |
| return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'otp': otp}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/org/forgot/pin/otp") |
| async def org_forgot_pin_otp(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| phone = request_data.phone |
|
|
| try: |
| with db as session: |
| query = text("SELECT * FROM organisations WHERE phone = :phone") |
| user = session.execute(query, {'phone': phone}).fetchone() |
|
|
| if user: |
| phone_with_code = user.c_code + user.phone |
| otp = "1234" |
| |
|
|
| updated_at = datetime.datetime.utcnow() |
| expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5) |
| update_otp_in_database(session, phone, otp, expiry_time, updated_at) |
| |
| return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'otp': otp}, status_code=status.HTTP_200_OK) |
| else: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!') |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/update_scheduled_date_v2") |
| async def update_scheduled_date_v2(request_data: UpdateScheduledDateRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| email = request_data.email |
| patient_id = request_data.patient_id |
| scheduled_date = request_data.scheduled_date |
|
|
| try: |
| with db as session: |
| query = text("UPDATE patients SET scheduled_date = :scheduled_date WHERE email = :email AND patient_id = :patient_id") |
| result = session.execute(query, {'scheduled_date': scheduled_date, 'email': email, 'patient_id': patient_id}) |
| session.commit() |
|
|
| if result.rowcount == 0: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No matching record found') |
|
|
| return JSONResponse(content={'message': 'Scheduled date updated successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.get("/total_appointments_v2") |
| async def total_appointments_v2(start_date: str, end_date: str, email: str, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| try: |
| start_date_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') |
| end_date_dt = datetime.datetime.strptime(end_date, '%Y-%m-%d') |
| |
| with db as session: |
| query = text(""" |
| SELECT DATE(scheduled_date) as date, COUNT(*) as total_appointments |
| FROM patients |
| WHERE scheduled_date BETWEEN :start_date AND :end_date |
| AND email = :email |
| GROUP BY DATE(scheduled_date) |
| ORDER BY DATE(scheduled_date) |
| """) |
|
|
| results = session.execute(query, { |
| 'start_date': start_date_dt, |
| 'end_date': end_date_dt, |
| 'email': email |
| }).fetchall() |
|
|
| appointments_by_day = {} |
|
|
| for row in results: |
| date_str = row.date.strftime('%Y-%m-%d') |
| appointments_by_day[date_str] = row.total_appointments |
|
|
| current_date = start_date_dt |
| while current_date <= end_date_dt: |
| date_str = current_date.strftime('%Y-%m-%d') |
| if date_str not in appointments_by_day: |
| appointments_by_day[date_str] = 0 |
| current_date += datetime.timedelta(days=1) |
|
|
| return JSONResponse(content={'appointments_by_day': appointments_by_day}, status_code=status.HTTP_200_OK) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/add_patient_v2") |
| async def add_patient_v2(request_data: AddPatientV2Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| name = request_data.name |
| dob = request_data.dob |
| gender = request_data.gender |
| age = request_data.age |
| height = request_data.height |
| weight = request_data.weight |
| email = request_data.email |
| doctor = request_data.doctor |
| role = request_data.role |
| patient_id = generate_patient_id() |
|
|
| created_at = datetime.datetime.utcnow() |
| updated_at = datetime.datetime.utcnow() |
| scheduled_date = datetime.datetime.utcnow() |
|
|
| try: |
| with db as session: |
| uuid_val = generate_session_id() |
| if role == "3": |
| user_query = text("SELECT id FROM users WHERE email = :email AND name = :doctor") |
| user_result = session.execute(user_query, {'email': email, 'doctor': doctor}).fetchone() |
| if not user_result: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Doctor not found') |
| doctor_id = user_result.id |
|
|
| pat_query = text("INSERT INTO patients (name, dob, gender, age, height, weight, email, doctor, added_by, uuid, patient_id, created_at, updated_at, scheduled_date) VALUES (:name, :dob, :gender, :age, :height, :weight, :email, :doctor_id, :doctor, :uuid, :patient_id, :created_at, :updated_at, :scheduled_date)") |
| session.execute(pat_query, {'name': name, 'dob': dob, 'gender': gender, 'age': age, 'height': height, 'weight': weight, 'email': email, 'doctor_id': doctor_id, 'doctor': doctor, 'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at, 'scheduled_date': scheduled_date}) |
|
|
| wound_query = text("INSERT INTO wounds (uuid, patient_id, created_at, updated_at) VALUES (:uuid, :patient_id, :created_at, :updated_at)") |
| session.execute(wound_query, {'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at}) |
| |
| elif role == "5": |
| org_query = text("SELECT id FROM organisations WHERE email = :email AND name = :doctor") |
| org_result = session.execute(org_query, {'email': email, 'doctor': doctor}).fetchone() |
| if not org_result: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found') |
| org_id = org_result.id |
|
|
| pat_query = text("INSERT INTO patients (name, dob, gender, age, height, weight, email, org, added_by, uuid, patient_id, created_at, updated_at, scheduled_date) VALUES (:name, :dob, :gender, :age, :height, :weight, :email, :org_id, :doctor, :uuid, :patient_id, :created_at, :updated_at, :scheduled_date)") |
| session.execute(pat_query, {'name': name, 'dob': dob, 'gender': gender, 'age': age, 'height': height, 'weight': weight, 'email': email, 'org_id': org_id, 'doctor': doctor, 'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at, 'scheduled_date': scheduled_date}) |
|
|
| wound_query = text("INSERT INTO wounds (uuid, patient_id, created_at, updated_at) VALUES (:uuid, :patient_id, :created_at, :updated_at)") |
| session.execute(wound_query, {'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at}) |
|
|
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid role') |
| |
| session.commit() |
| |
| return JSONResponse(content={'message': 'Patient added successfully', 'patient_id': patient_id}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/save_notes_v2") |
| async def save_notes_v2(request_data: SaveNotesV2Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| patient_id = request_data.patient_id |
| notes = request_data.notes |
| remarks = request_data.remarks |
|
|
| try: |
| with db as session: |
| query = text("UPDATE patients SET notes = :notes, remarks = :remarks WHERE patient_id = :patient_id") |
| session.execute(query, {'notes': notes, 'remarks': remarks, 'patient_id': patient_id}) |
| session.commit() |
|
|
| return JSONResponse(content={'message': 'Notes saved successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/admin_add_practitioner_v2") |
| async def admin_add_practitioner_v2(request_data: AdminAddPractitionerV2Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| name = request_data.name |
| email = request_data.email |
| c_code = request_data.c_code |
| phone = request_data.phone |
| org_email = request_data.org_email |
| created_at = datetime.datetime.utcnow() |
| updated_at = datetime.datetime.utcnow() |
|
|
| try: |
| with db as session: |
| query = text("SELECT email FROM users WHERE email = :email") |
| existing_email = session.execute(query, {'email': email}).fetchone() |
|
|
| query = text("SELECT phone FROM users WHERE phone = :phone") |
| existing_phone = session.execute(query, {'phone': phone}).fetchone() |
|
|
| if existing_email: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email already exists. Please login.') |
| elif existing_phone: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Phone number already exists. Please use another phone number.') |
| else: |
| org_query = text("SELECT id FROM organisations WHERE email = :org_email") |
| org_result = session.execute(org_query, {'org_email': org_email}).fetchone() |
| if not org_result: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found') |
| org_id = org_result.id |
| uuid_val = generate_session_id() |
| license_key = generate_license_key() |
|
|
| query = text("INSERT INTO users (name, email, c_code, phone, uuid, licence_key, org, updated_at, created_at) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :org_id, :updated_at, :created_at)") |
| session.execute(query, { |
| 'name': name, |
| 'email': email, |
| 'c_code': c_code, |
| 'phone': phone, |
| 'uuid': uuid_val, |
| 'license_key': license_key, |
| 'org_id': org_id, |
| 'updated_at': updated_at, |
| 'created_at': created_at |
| }) |
| session.commit() |
| return JSONResponse(content={'message': 'Data added successfully'}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/sort_patients") |
| async def sort_patients(request_data: SortPatientsRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| email = request_data.email |
| date_str = request_data.date |
|
|
| try: |
| with db as session: |
| if date_str: |
| appointment_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date() |
| query = text(""" |
| SELECT * FROM patients WHERE scheduled_date = :appointment_date AND email = :email |
| """) |
| results = session.execute(query, {'appointment_date': appointment_date, 'email': email}).fetchall() |
| else: |
| query = text(""" |
| SELECT * FROM patients WHERE email = :email |
| """) |
| results = session.execute(query, {'email': email}).fetchall() |
|
|
| patient_dicts = [dict(row._mapping) for row in results] |
| return JSONResponse(content={'patients': patient_dicts}, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/get_appointment_count") |
| async def get_appointment_count(request_data: GetAppointmentCountRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| email = request_data.email |
| date_str = request_data.date |
|
|
| try: |
| appointment_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date() |
| with db as session: |
| query = text(""" |
| SELECT COUNT(*) as count FROM patients WHERE scheduled_date = :appointment_date AND email = :email |
| """) |
| result = session.execute(query, {'appointment_date': appointment_date, 'email': email}).fetchone() |
|
|
| total_appointments = result.count if result else 0 |
|
|
| return JSONResponse(content={ |
| 'title': f'The total number of appointments booked is {total_appointments}', |
| 'description': f'The appointments are scheduled for {appointment_date}' |
| }, status_code=status.HTTP_200_OK) |
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @app.post("/add_wound_detail") |
| async def add_wound_details_v3(request_data: AddWoundDetailsV3Request = Depends(), image: UploadFile = File(..., alias="image"), api_image: UploadFile = File(..., alias="api_image"), db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)): |
| length = request_data.length |
| breadth = request_data.breadth |
| depth = request_data.depth |
| area = request_data.area |
| moisture = request_data.moisture |
| wound_location = request_data.wound_location |
| tissue = request_data.tissue |
| exudate = request_data.exudate |
| periwound = request_data.periwound |
| periwound_type = request_data.periwound_type |
| patient_id = request_data.patient_id |
| type = request_data.type |
| category = request_data.category |
| edge = request_data.edge |
| infection = request_data.infection |
| last_dressing_date = request_data.last_dressing_date |
|
|
| updated_at = datetime.datetime.utcnow() |
|
|
| if not allowed_file(image.filename) or not allowed_file(api_image.filename): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type for one or both images') |
|
|
| normal_image_filename = secure_filename(image.filename) |
| api_image_filename = secure_filename(api_image.filename) |
|
|
| try: |
| with db as session: |
| query = text("SELECT area, id, created_at FROM wounds WHERE patient_id = :patient_id") |
| result = session.execute(query, {'patient_id': patient_id}).fetchone() |
|
|
| if not result: |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found') |
|
|
| existing_area = result[0] |
| if not existing_area: |
| existing_area = 0 |
| wound_id = result[1] |
| created_at = result[2] |
| area_difference = float(area) - float(existing_area) |
| if area_difference > 0: |
| size_variation = 'wound area increased' |
| elif area_difference < 0: |
| size_variation = 'wound area reduced' |
| else: |
| size_variation = 'wound area same' |
|
|
| patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id) |
| os.makedirs(patient_folder, exist_ok=True) |
| normal_image_path = os.path.join(patient_folder, normal_image_filename) |
| with open(normal_image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
|
|
| api_patient_folder = os.path.join(UPLOAD_FOLDER, 'assessed_wounds', patient_id) |
| os.makedirs(api_patient_folder, exist_ok=True) |
| api_image_path = os.path.join(api_patient_folder, api_image_filename) |
| with open(api_image_path, "wb") as buffer: |
| buffer.write(await api_image.read()) |
|
|
| image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{normal_image_filename}" |
| api_image_url = f"{BASE_URL}/uploads/assessed_wounds/{patient_id}/{api_image_filename}" |
|
|
| query1 = text(""" |
| INSERT INTO wound_images (depth, width, height, uuid, updated_at, patient_id, size_variation, image, wound_id, created_at, area, photo_assessment) |
| VALUES (:depth, :breadth, :length, :uuid, :updated_at, :patient_id, :size_variation, :image_url, :wound_id, :created_at, :area, :api_image_url) |
| """) |
| session.execute(query1, { |
| 'depth': depth, 'breadth': breadth, 'length': length, 'uuid': generate_session_id(), |
| 'updated_at': updated_at, 'patient_id': patient_id, 'size_variation': size_variation, |
| 'image_url': image_url, 'wound_id': wound_id, 'created_at': created_at, 'area': area, |
| 'api_image_url': api_image_url |
| }) |
|
|
| query2 = text(""" |
| UPDATE wounds SET height = :length, width = :breadth, depth = :depth, area = :area, |
| moisture = :moisture, position = :wound_location, tissue = :tissue, exudate = :exudate, |
| periwound = :periwound, periwound_type = :periwound_type, type = :type, category = :category, |
| edges = :edge, infection = :infection, updated_at = :updated_at, last_dressing = :last_dressing_date, |
| image = :image_url, photo_assessment = :api_image_url |
| WHERE patient_id = :patient_id |
| """) |
| session.execute(query2, { |
| 'length': length, 'breadth': breadth, 'depth': depth, 'area': area, |
| 'moisture': moisture, 'wound_location': wound_location, 'tissue': tissue, |
| 'exudate': exudate, 'periwound': periwound, 'periwound_type': periwound_type, |
| 'type': type, 'category': category, 'edge': edge, 'infection': infection, |
| 'updated_at': updated_at, 'last_dressing_date': last_dressing_date, 'patient_id': patient_id, |
| 'image_url': image_url, 'api_image_url': api_image_url |
| }) |
|
|
| session.commit() |
| return JSONResponse(content={ |
| 'message': 'Wound details and images stored successfully', |
| 'image_url': image_url, |
| 'photo_assessment': api_image_url |
| }, status_code=status.HTTP_200_OK) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|