wound-app / main.py
Ani14's picture
Update main.py
35ee4a5 verified
from fastapi import FastAPI, Request, Depends, HTTPException, status, File, UploadFile, Form , Header , Body
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import pymysql
import jwt
import random
import string
import datetime
import os
import uuid
from dotenv import load_dotenv
from twilio.rest import Client
import requests
from werkzeug.utils import secure_filename
from pydantic import BaseModel
from typing import Optional
from sqlalchemy.orm import Session
from typing import Optional, Dict , List, Any
load_dotenv()
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Update with specific origins in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Database credentials
DB_HOST = os.getenv('host')
DB_DATABASE = os.getenv('db')
DB_USERNAME = os.getenv('username')
DB_PASSWORD = os.getenv('psswd')
DATABASE_URL = f"mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_DATABASE}"
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
account_sid = os.getenv('acc_sid')
auth_token = os.getenv('auth_token')
twilio_number = os.getenv('tn')
JWT_SECRET_KEY = 'CkOPcOppyh31sQcisbyOM3RKD4C2G7SzQmuG5LePt9XBarsxgjm0fc7uOECcqoGm'
JWT_ALGORITHM = "HS256"
# Configuration for file uploads
UPLOAD_FOLDER = 'uploads'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.mount("/uploads", StaticFiles(directory=UPLOAD_FOLDER), name="uploads")
BASE_URL = 'https://ani14-wound-app.hf.space' # This might need to be updated based on deployment
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
# Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Utility function to check allowed file extensions
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def generate_session_id():
return str(uuid.uuid4())
def generate_license_key():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
def generate_patient_id():
with SessionLocal() as session:
result = session.execute(text("SELECT MAX(id) FROM patients")).fetchone()
last_id = result[0] if result[0] is not None else 0
prefix = "AB"
formatted_id = f"{prefix}000{last_id + 1}"
return formatted_id
def send_sms(phone, otp):
client = Client(account_sid, auth_token)
message = client.messages.create(
body=f"Your verification code is: {otp}. Don't share this code with anyone; our employees will never ask for the code.",
from_=twilio_number,
to=phone
)
def generate_otp():
return str(random.randint(1000, 9999))
def update_otp_in_database(session, phone, otp, expiry_time, updated_at):
try:
query = text("UPDATE organisations SET otp= :otp, otp_expiry= :expiry_time, updated_at = :updated_at WHERE phone= :phone")
session.execute(query, {'otp': otp, 'expiry_time': expiry_time, 'phone': phone, 'updated_at': updated_at})
session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def update_otp_in_med_database(session, phone, otp, expiry_time, updated_at):
try:
query = text("UPDATE users SET otp= :otp, otp_expiry= :expiry_time, updated_at = :updated_at WHERE phone= :phone")
session.execute(query, {'otp': otp, 'expiry_time': expiry_time, 'phone': phone, 'updated_at': updated_at})
session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# JWT Authentication Dependency
def verify_jwt_token(request: Request):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid Authorization header')
token = auth_header.split(' ')[1]
try:
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token has expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
# Pydantic Models for Request Body Validation
class SendEmailRequest(BaseModel):
name: str
email: str
c_code: str
phone: str
class VerifyLicenseKeyRequest(BaseModel):
email: str
license_key: str
class CreatePinRequest(BaseModel):
license_key: str
email: str
pin: str
name: Optional[str] = None
c_code: Optional[str] = None
phone: Optional[str] = None
class FetchDataRequest(BaseModel):
email: str
class SaveAdditionalDataRequest(BaseModel):
department: str
location: str
email: str
latitude: float
longitude: float
class AddWoundDetailsRequest(BaseModel):
length: float
breadth: float
depth: float
area: float
moisture: str
wound_location: Optional[str] = None
tissue: Optional[str] = None
exudate: Optional[str] = None
periwound: Optional[str] = None
periwound_type: Optional[str] = None
patient_id: str
type: Optional[str] = None
category: Optional[str] = None
edge: Optional[str] = None
infection: Optional[str] = None
last_dressing_date: Optional[str] = None
class AddPatientRequest(BaseModel):
name: str
dob: str
gender: str
age: int
height: float
weight: float
email: str
doctor: Optional[str] = None # Make it optional
class SearchPatientRequest(BaseModel):
name: str
class GeneratePrescriptionRequest(BaseModel):
patient_id: str
class VerifyPinRequest(BaseModel):
email: str
pin: str
class SendOtpRequest(BaseModel):
phone: str
class UpdateScheduledDateRequest(BaseModel):
email: str
patient_id: str
doctor: str
scheduled_date: str
class TotalAppointmentsTillDateRequest(BaseModel):
date: str
class TotalAppointmentsTillMonthRequest(BaseModel):
year: int
month: int
class ChangePinRequest(BaseModel):
email: str
current_pin: str
new_pin: str
class ForgotPinRequest(BaseModel):
email: str
otp: int
new_pin: str
class OrganisationDetailsRequest(BaseModel):
email: str
class UpdatePatientDetailsRequest(BaseModel):
patient_id: str
allergies: Optional[str] = None
past_history: Optional[str] = None
class PatientDetailsRequest(BaseModel):
patient_id: str
class StoreImageRequest(BaseModel):
patient_id: str
class GetImageRequest(BaseModel):
patient_id: str
class StoreWoundImageRequest(BaseModel):
patient_id: str
class GetWoundImageRequest(BaseModel):
patient_id: str
class StoreMedImageRequest(BaseModel):
email: str
class GetMedImageRequest(BaseModel):
email: str
class StoreOrgImageRequest(BaseModel):
email: str
class GetOrgImageRequest(BaseModel):
email: str
class SaveNotesRequest(BaseModel):
patient_id: str
notes: str
class TotalAppointmentsRequest(BaseModel):
start_date: str
end_date: str
doctor: str
class AdminAddPractitionerRequest(BaseModel):
name: str
email: str
c_code: str
phone: str
org: str
class UpdateOrgProfileRequest(BaseModel):
email: str
name: Optional[str] = None
department: Optional[str] = None
about: Optional[str] = None
location: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
class UpdateMedProfileRequest(BaseModel):
email: str
name: Optional[str] = None
department: Optional[str] = None
about: Optional[str] = None
location: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
class AddWoundDetailsV2Request(BaseModel):
length: float
breadth: float
depth: float
area: float
moisture: str
wound_location: Optional[str] = None
tissue: Optional[str] = None
exudate: Optional[str] = None
periwound: Optional[str] = None
periwound_type: Optional[str] = None
patient_id: str
type: Optional[str] = None
category: Optional[str] = None
edge: Optional[str] = None
infection: Optional[str] = None
last_dressing_date: Optional[str] = None
class WoundProgressTimelineRequest(BaseModel):
patient_id: str
class AddPatientV2Request(BaseModel):
name: str
dob: str
gender: str
age: int
height: float
weight: float
email: str
doctor: str
role: str
class SaveNotesV2Request(BaseModel):
patient_id: str
notes: Optional[str] = None
remarks: Optional[str] = None
class AdminAddPractitionerV2Request(BaseModel):
name: str
email: str
c_code: str
phone: str
org_email: str
class SortPatientsRequest(BaseModel):
email: str
date: Optional[str] = None
class GetAppointmentCountRequest(BaseModel):
email: str
date: str
class AddWoundDetailsV3Request(BaseModel):
length: float
breadth: float
depth: float
area: float
moisture: str
wound_location: Optional[str] = None
tissue: Optional[str] = None
exudate: Optional[str] = None
periwound: Optional[str] = None
periwound_type: Optional[str] = None
patient_id: str
type: Optional[str] = None
category: Optional[str] = None
edge: Optional[str] = None
infection: Optional[str] = None
last_dressing_date: Optional[str] = None
# Endpoints
@app.post("/send_email")
async def send_email(request_data: SendEmailRequest, db: SessionLocal = Depends(get_db)):
name = request_data.name
email = request_data.email
c_code = request_data.c_code
phone = request_data.phone
try:
with db as session:
query = text("SELECT email, phone FROM organisations WHERE email = :email OR phone = :phone")
existing_user = session.execute(query, {'email': email, 'phone': phone}).fetchone()
if existing_user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Email or phone already exists. Please login.')
else:
license_key = generate_license_key()
email_payload = {
'Recipient': email,
'Subject': 'License key for SmartHeal',
'Body': f'Your license key is: {license_key}',
'ApiKey': '6A7339A3-E70B-4A8D-AA23-0264125F4959'
}
email_response = requests.post(
'https://api.waysdatalabs.com/api/EmailSender/SendMail',
headers={},
data=email_payload
)
if email_response.status_code == 200:
return JSONResponse(content={'message': 'License key sent to email successfully', 'license_key': license_key}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail='Failed to send email')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/verify_license_key")
async def verify_license_key_endpoint(request_data: VerifyLicenseKeyRequest, db: SessionLocal = Depends(get_db)):
email = request_data.email
license_key = request_data.license_key
try:
with db as session:
query = text("SELECT licence_key FROM organisations WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result and result.licence_key == license_key:
token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256')
return JSONResponse(content={'message': 'License key verified successfully', 'token': token}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid license key')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/create_pin")
async def create_pin_endpoint(request_data: CreatePinRequest, db: SessionLocal = Depends(get_db)):
license_key = request_data.license_key
email = request_data.email
pin = request_data.pin
name = request_data.name
c_code = request_data.c_code
phone = request_data.phone
created_at = datetime.datetime.utcnow()
updated_at = datetime.datetime.utcnow()
try:
with db as session:
token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256')
query = text("INSERT INTO organisations (name, email, c_code, phone, uuid, licence_key, pin, created_at, updated_at) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :pin, :created_at, :updated_at)")
session.execute(query, {
'name': name,
'email': email,
'c_code': c_code,
'phone': phone,
'uuid': generate_session_id(),
'license_key': license_key,
'pin': pin,
'created_at': created_at,
'updated_at': updated_at
})
session.commit()
return JSONResponse(content={'message': 'PIN created and data saved successfully', 'token': token}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/fetch_data")
async def fetch_name_phone(request_data: FetchDataRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
email = request_data.email
try:
with db as session:
query = text("SELECT name, phone FROM organisations WHERE email = :email")
result = session.execute(query, {'email': email}).fetchall()
result_dicts = [{'name': row[0], 'phone': row[1]} for row in result]
return JSONResponse(content=result_dicts, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/save_additional_data")
async def save_department_location(request_data: SaveAdditionalDataRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
department = request_data.department
location = request_data.location
email = request_data.email
latitude = request_data.latitude
longitude = request_data.longitude
try:
with db as session:
query = text("UPDATE organisations SET departments = :department, location = :location, latitude = :latitude, longitude = :longitude WHERE email = :email;")
session.execute(query, {'department': department, 'location': location, 'latitude': latitude, 'longitude': longitude, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Department, location, latitude, and longitude saved successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
class WoundDetailsRequest:
def __init__(
self,
length: float = Form(... ),
breadth: float = Form(...),
depth: float = Form(...),
area: float = Form(...),
moisture: str = Form(...),
wound_location: str = Form(...),
tissue: str = Form(...),
exudate: str = Form(...),
periwound: str = Form(...),
periwound_type: str = Form(...),
patient_id: str = Form(...),
):
self.length = length
self.breadth = breadth
self.depth = depth
self.area = area
self.moisture = moisture
self.wound_location = wound_location
self.tissue = tissue
self.exudate = exudate
self.periwound = periwound
self.periwound_type = periwound_type
self.patient_id = patient_id
@app.post("/add_wound_details_v3")
async def add_wound_details_v3(
# Use Depends() to get your database session and verify the JWT token
db: Session = Depends(get_db),
payload: dict = Depends(verify_jwt_token),
# Use File() to specify that 'image' is an uploaded file
image: UploadFile = File(...),
# Use Form() for all text fields because the request is multipart/form-data
length: float = Form(...),
breadth: float = Form(...),
depth: float = Form(...),
area: float = Form(...),
moisture: str = Form(...),
wound_location: str = Form(...),
tissue: str = Form(...),
exudate: str = Form(...),
periwound: str = Form(...),
periwound_type: str = Form(...),
patient_id: str = Form(...),
type: str = Form(...),
category: str = Form(...),
edge: str = Form(...),
infection: str = Form(...),
last_dressing_date: str = Form(...),
):
"""
This endpoint receives wound details and an image, saves the image,
and updates the database in two tables: `wounds` and `wound_images`.
"""
updated_at = datetime.datetime.utcnow()
# 1. Validate and save the uploaded image
if not allowed_file(image.filename):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Invalid image file type. Allowed types are png, jpg, jpeg.'
)
# Create a secure, unique filename to prevent conflicts
image_filename = secure_filename(f"{patient_id}_{int(updated_at.timestamp())}_{image.filename}")
# Define the path to save the image
patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id)
os.makedirs(patient_folder, exist_ok=True) # Create the directory if it doesn't exist
image_path = os.path.join(patient_folder, image_filename)
# Write the file to the server
with open(image_path, "wb") as buffer:
buffer.write(await image.read())
# Construct the public URL for the saved image
image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{image_filename}"
# 2. Perform database operations
try:
with db as session:
# First, get the current state of the wound for history tracking
query_select = text("SELECT area, id, created_at FROM wounds WHERE patient_id = :patient_id")
result = session.execute(query_select, {'patient_id': patient_id}).fetchone()
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found')
existing_area = result[0] if result[0] is not None else 0.0
wound_id = result[1]
created_at = result[2]
# Determine the size variation for the history record
area_difference = float(area) - float(existing_area)
size_variation = 'wound area same'
if area_difference > 0: size_variation = 'wound area increased'
elif area_difference < 0: size_variation = 'wound area reduced'
# INSERT a new record into the history table (`wound_images`)
query_insert = text("""
INSERT INTO wound_images (depth, width, height, uuid, updated_at, patient_id, size_variation, image, wound_id, created_at, area)
VALUES (:depth, :breadth, :length, :uuid, :updated_at, :patient_id, :size_variation, :image_url, :wound_id, :created_at, :area)
""")
session.execute(query_insert, {
'depth': depth, 'breadth': breadth, 'length': length,
'uuid': generate_session_id(), 'updated_at': updated_at,
'patient_id': patient_id, 'size_variation': size_variation,
'image_url': image_url, 'wound_id': wound_id,
'created_at': created_at, 'area': area
})
# UPDATE the main `wounds` table with the latest data
query_update = text("""
UPDATE wounds
SET height = :length, width = :breadth, depth = :depth, area = :area,
moisture = :moisture, position = :wound_location, tissue = :tissue,
exudate = :exudate, periwound = :periwound, periwound_type = :periwound_type,
type = :type, category = :category, edges = :edge, infection = :infection,
last_dressing = :last_dressing_date, image = :image_url, updated_at = :updated_at
WHERE patient_id = :patient_id;
""")
session.execute(query_update, {
'length': length, 'breadth': breadth, 'depth': depth, 'area': area,
'moisture': moisture, 'wound_location': wound_location, 'tissue': tissue,
'exudate': exudate, 'periwound': periwound, 'periwound_type': periwound_type,
'type': type, 'category': category, 'edge': edge, 'infection': infection,
'last_dressing_date': last_dressing_date, 'patient_id': patient_id,
'image_url': image_url, 'updated_at': updated_at
})
# Commit both changes to the database
session.commit()
return {'message': 'Wound details and image stored successfully', 'image_url': image_url}
except Exception as e:
# If anything goes wrong, return a server error
print(f"An error occurred: {e}") # Log the error for debugging
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
class EmailRequest(BaseModel):
email: str
@app.get("/get_all_patient_details")
async def get_all_patient_details(
request: Request,
authorization: Optional[str] = Header(None),
db: Session = Depends(get_db)
):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid Authorization header")
token = authorization.split(" ")[1]
try:
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
try:
body = await request.json()
email = body.get("email")
if not email:
raise HTTPException(status_code=400, detail="Missing required field: email")
query = text("SELECT * FROM patients WHERE email = :email")
patient_details = db.execute(query, {"email": email}).fetchall()
patient_dicts = [dict(row._mapping) for row in patient_details]
return {"patient_details": patient_dicts}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
from fastapi.exceptions import HTTPException
import traceback
@app.post("/add_patient")
def add_patient_endpoint(
input_data: AddPatientRequest,
db: Session = Depends(get_db),
jwt_payload: dict = Depends(verify_jwt_token),
):
# Generate IDs and timestamps
patient_id = generate_patient_id()
session_uuid = generate_session_id() # Renamed to avoid conflict with uuid library
now = datetime.datetime.utcnow()
try:
# The dictionary of parameters to be inserted into the database.
# The keys here MUST match the placeholders (e.g., :name, :doctor) in the SQL query.
patient_params = {
"name": input_data.name,
"dob": input_data.dob,
"gender": input_data.gender,
"age": input_data.age,
"height": input_data.height,
"weight": input_data.weight,
"email": input_data.email,
"doctor": input_data.doctor, # <-- This was the missing line that caused the error.
"uuid": session_uuid,
"patient_id": patient_id,
"created_at": now,
"updated_at": now,
"scheduled_date": now
}
# First INSERT statement for the 'patients' table
db.execute(
text("""
INSERT INTO patients
(name, dob, gender, age, height, weight, email, doctor,
uuid, patient_id, created_at, updated_at, scheduled_date)
VALUES
(:name, :dob, :gender, :age, :height, :weight, :email, :doctor,
:uuid, :patient_id, :created_at, :updated_at, :scheduled_date)
"""), patient_params
)
# Second INSERT statement for the 'wounds' table
db.execute(
text("""
INSERT INTO wounds (uuid, patient_id, created_at, updated_at)
VALUES (:uuid, :patient_id, :created_at, :updated_at)
"""), {
"uuid": session_uuid,
"patient_id": patient_id,
"created_at": now,
"updated_at": now
}
)
# If both inserts are successful, commit the transaction
db.commit()
except Exception as e:
# If any error occurs, roll back the entire transaction
db.rollback()
# Log the detailed error for debugging and raise a generic server error
print(f"Error adding patient: {e}") # Optional: for server-side logging
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An internal error occurred while adding the patient: {e}"
)
return {"message": "Patient added successfully", "patient_id": patient_id}
@app.get("/search_patient")
async def search_patient_endpoint(name: str, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
if not name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Missing required fields')
try:
with db as session:
query = text("SELECT * FROM patients WHERE name = :name")
patients = session.execute(query, {'name': name}).fetchall()
if not patients:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No patients found with this name')
patients_dicts = [dict(row._mapping) for row in patients]
return JSONResponse(content={'patients': patients_dicts}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/generate_prescription")
async def generate_prescription_endpoint(patient_id: str, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
if not patient_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Missing required fields')
try:
with db as session:
patient_query = text("SELECT * FROM patients WHERE patient_id = :patient_id ")
patient = session.execute(patient_query, {'patient_id': patient_id}).fetchone()
if not patient:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No patient found with this ID')
wound_query = text("SELECT * FROM patients WHERE patient_id = :patient_id ")
wounds = session.execute(wound_query, {'patient_id': patient_id}).fetchall()
wound_category = []
for wound in wounds:
area = wound.height * wound.width
if area <= 5:
dimension = 'Small'
elif 5 < area <= 20:
dimension = 'Medium'
else:
dimension = 'Large'
wound_category.append((wound.wound_type, dimension))
medication_details = []
for wound_type, dimension in wound_category:
medication_query = text("""
SELECT * FROM WoundMedications
WHERE WoundType = :wound_type AND WoundDimensions = :dimension
""")
medications = session.execute(medication_query, {'wound_type': wound_type, 'dimension': dimension}).fetchall()
medication_details.extend(medications)
prescription = {
'patient_details': dict(patient._mapping),
'wound_details': [dict(w._mapping) for w in wounds],
'medication_details': [dict(m._mapping) for m in medication_details]
}
return JSONResponse(content={'prescription': prescription}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/verify_pin")
async def verify_pin_endpoint(request_data: VerifyPinRequest, db: SessionLocal = Depends(get_db)):
email = request_data.email
pin = request_data.pin
try:
with db as session:
query = text("SELECT pin FROM organisations WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result:
if result.pin == pin:
return JSONResponse(content={'message': 'Pin verified successfully'}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid pin')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/send_otp")
async def send_otp_endpoint(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db)):
phone = request_data.phone
try:
with db as session:
query = text("SELECT * FROM organisations WHERE phone = :phone")
organisation = session.execute(query, {'phone': phone}).fetchone()
if organisation:
phone_with_code = organisation.c_code + organisation.phone
otp = "1234" # generate_otp()
# send_sms(phone_with_code, otp)
updated_at = datetime.datetime.utcnow()
expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
update_otp_in_database(session, phone, otp, expiry_time, updated_at)
token = jwt.encode({'email': organisation.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256')
return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'token': token, 'otp': otp, 'email': organisation.email, 'name': organisation.name, 'pin': organisation.pin}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/med_send_email")
async def med_send_email(request_data: SendEmailRequest, db: SessionLocal = Depends(get_db)):
name = request_data.name
email = request_data.email
c_code = request_data.c_code
phone = request_data.phone
try:
with db as session:
query = text("SELECT email, phone FROM users WHERE email = :email OR phone = :phone")
existing_user = session.execute(query, {'email': email, 'phone': phone}).fetchone()
if existing_user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Email or phone already exists. Please login.')
else:
license_key = generate_license_key()
email_payload = {
'Recipient': email,
'Subject': 'License key for SmartHeal',
'Body': f'Your license key is: {license_key}',
'ApiKey': '6A7339A3-E70B-4A8D-AA23-0264125F4959'
}
email_response = requests.post(
'https://api.waysdatalabs.com/api/EmailSender/SendMail',
headers={},
data=email_payload
)
if email_response.status_code == 200:
return JSONResponse(content={'message': 'License key sent to email successfully', 'license_key': license_key}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail='Failed to send email')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/med_verify_license_key")
async def med_verify_license_key(request_data: VerifyLicenseKeyRequest, db: SessionLocal = Depends(get_db)):
email = request_data.email
license_key = request_data.license_key
try:
with db as session:
query = text("SELECT licence_key FROM users WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result and result.licence_key == license_key:
token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)}, JWT_SECRET_KEY, algorithm='HS256')
return JSONResponse(content={'message': 'License key verified successfully', 'token': token}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid license key')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/med_create_pin")
async def med_create_pin(request_data: CreatePinRequest, db: SessionLocal = Depends(get_db)):
license_key = request_data.license_key
email = request_data.email
pin = request_data.pin
name = request_data.name
c_code = request_data.c_code
phone = request_data.phone
created_at = datetime.datetime.utcnow()
updated_at = datetime.datetime.utcnow()
try:
with db as session:
org = "0"
token = jwt.encode({'email': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)}, JWT_SECRET_KEY, algorithm='HS256')
query = text("INSERT INTO users (name, email, c_code, phone, uuid, licence_key, pin, org, created_at, updated_at) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :pin, :org, :created_at, :updated_at)")
session.execute(query, {
'name': name,
'email': email,
'c_code': c_code,
'phone': phone,
'uuid': generate_session_id(),
'license_key': license_key,
'pin': pin,
'org': org,
'created_at': created_at,
'updated_at': updated_at
})
session.commit()
return JSONResponse(content={'message': 'PIN created and data saved successfully', 'token': token}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/med_fetch_data")
async def med_fetch_name_phone(request_data: FetchDataRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
email = request_data.email
try:
with db as session:
query = text("SELECT name, phone FROM users WHERE email = :email")
result = session.execute(query, {'email': email}).fetchall()
result_dicts = [{'name': row[0], 'phone': row[1]} for row in result]
return JSONResponse(content=result_dicts, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/save_med_data")
async def med_save_department_location(request_data: Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
data = await request_data.json()
speciality = data.get('speciality')
location = data.get('location')
email = data.get('email')
latitude = data.get('latitude')
longitude = data.get('longitude')
org = data.get('org')
designation = data.get('designation')
if not (speciality and location and latitude and longitude and org and email):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Missing required fields')
try:
with db as session:
query = text("UPDATE users SET speciality = :speciality, location = :location, latitude = :latitude, longitude = :longitude, org = :org, designation = :designation WHERE email = :email;")
session.execute(query, {'speciality': speciality, 'location': location, 'latitude': latitude, 'longitude': longitude, 'org': org, 'designation': designation, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Speciality, location, organisation and designation saved successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("//med_verify_pin")
async def med_verify_pin(request_data: VerifyPinRequest, db: SessionLocal = Depends(get_db)):
email = request_data.email
pin = request_data.pin
try:
with db as session:
query = text("SELECT pin FROM users WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result:
if result.pin == pin:
return JSONResponse(content={'message': 'Pin verified successfully'}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid pin')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/med_send_otp")
async def med_send_otp(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db)):
phone = request_data.phone
try:
with db as session:
query = text("SELECT * FROM users WHERE phone = :phone")
user = session.execute(query, {'phone': phone}).fetchone()
if user:
phone_with_code = user.c_code + user.phone
otp = "1234" # generate_otp()
# send_sms(phone_with_code, otp)
updated_at = datetime.datetime.utcnow()
expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
update_otp_in_med_database(session, phone, otp, expiry_time, updated_at)
token = jwt.encode({'email': user.email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)}, JWT_SECRET_KEY, algorithm='HS256')
return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'token': token, 'otp': otp, 'email': user.email, 'name': user.name, 'pin': user.pin}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/update_scheduled_date")
async def update_scheduled_date_endpoint(request_data: UpdateScheduledDateRequest, db: SessionLocal = Depends(get_db)):
email = request_data.email
patient_id = request_data.patient_id
doctor = request_data.doctor
scheduled_date = request_data.scheduled_date
try:
with db as session:
query = text("UPDATE patients SET scheduled_date = :scheduled_date WHERE email = :email AND patient_id = :patient_id AND doctor = :doctor")
result = session.execute(query, {'scheduled_date': scheduled_date, 'email': email, 'patient_id': patient_id, 'doctor': doctor})
session.commit()
if result.rowcount == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No matching record found')
return JSONResponse(content={'message': 'Scheduled date updated successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/total_appointments_till_date")
async def total_appointments_till_date_endpoint(date: str, db: SessionLocal = Depends(get_db)):
if not date:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Date parameter is required')
try:
with db as session:
query = text("SELECT COUNT(*) as total_appointments FROM patients WHERE scheduled_date <= :date")
result = session.execute(query, {'date': date}).fetchone()
total_appointments = result[0]
return JSONResponse(content={'total_appointments': total_appointments}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/total_appointments_till_month")
async def total_appointments_till_month_endpoint(year: int, month: int, db: SessionLocal = Depends(get_db)):
if not (year and month):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Year and month parameters are required')
try:
with db as session:
query = text("""
SELECT COUNT(*) as total_appointments
FROM patients
WHERE YEAR(scheduled_date) = :year AND MONTH(scheduled_date) = :month
""")
result = session.execute(query, {'year': year, 'month': month}).fetchone()
total_appointments = result[0]
return JSONResponse(content={'total_appointments': total_appointments}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/change_pin_org")
async def change_pin_org(request_data: ChangePinRequest, db: SessionLocal = Depends(get_db)):
email = request_data.email
current_pin = request_data.current_pin
new_pin = request_data.new_pin
try:
with db as session:
query = text("SELECT pin FROM organisations WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result:
if result.pin == current_pin:
update_query = text("UPDATE organisations SET pin = :new_pin WHERE email = :email")
session.execute(update_query, {'new_pin': new_pin, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid current pin')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/forgot_pin_org")
async def forgot_pin_org(request_data: ForgotPinRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
email = request_data.email
otp = request_data.otp
new_pin = request_data.new_pin
try:
with db as session:
query = text("SELECT otp FROM organisations WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result:
if result.otp == otp:
update_query = text("UPDATE organisations SET pin = :new_pin WHERE email = :email")
session.execute(update_query, {'new_pin': new_pin, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid OTP')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/organisation_details")
async def organisation_details(email: str, db: SessionLocal = Depends(get_db)):
if not email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required')
try:
with db as session:
query = text("""
SELECT o.name, o.departments, o.location, o.latitude, o.longitude, o.about,
o.profile_photo_path, COUNT(p.email) as patient_count
FROM organisations o
LEFT JOIN patients p ON o.email = p.email
WHERE o.email = :email
""")
result = session.execute(query, {'email': email}).fetchone()
if result:
response = {
'name': result.name,
'departments': result.departments,
'location': result.location,
'latitude': result.latitude,
'longitude': result.longitude,
'about': result.about,
'profile_photo_path': result.profile_photo_path,
'patient_count': result.patient_count
}
return JSONResponse(content=response, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/change_pin_med")
async def change_pin_med(request_data: ChangePinRequest, db: SessionLocal = Depends(get_db)):
email = request_data.email
current_pin = request_data.current_pin
new_pin = request_data.new_pin
try:
with db as session:
query = text("SELECT pin FROM users WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result:
if result.pin == current_pin:
update_query = text("UPDATE users SET pin = :new_pin WHERE email = :email")
session.execute(update_query, {'new_pin': new_pin, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid current pin')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/forgot_pin_med")
async def forgot_pin_med(request_data: ForgotPinRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
email = request_data.email
otp = request_data.otp
new_pin = request_data.new_pin
try:
with db as session:
query = text("SELECT otp FROM users WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result:
if result.otp == otp:
update_query = text("UPDATE users SET pin = :new_pin WHERE email = :email")
session.execute(update_query, {'new_pin': new_pin, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Pin updated successfully'}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid OTP')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Email not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/med_details")
async def med_details(email: str, db: SessionLocal = Depends(get_db)):
if not email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required')
try:
with db as session:
query = text("""
SELECT o.name, o.departments, o.location, o.latitude, o.longitude, o.about,
o.profile_photo_path, COUNT(p.email) as patient_count
FROM users o
LEFT JOIN patients p ON o.email = p.email
WHERE o.email = :email
""")
result = session.execute(query, {'email': email}).fetchone()
if result:
response = {
'name': result.name,
'departments': result.departments,
'location': result.location,
'latitude': result.latitude,
'longitude': result.longitude,
'about': result.about,
'profile_photo_path': result.profile_photo_path,
'patient_count': result.patient_count
}
return JSONResponse(content=response, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/update_patient_details")
async def update_patient_details(request_data: UpdatePatientDetailsRequest, db: SessionLocal = Depends(get_db)):
patient_id = request_data.patient_id
allergies = request_data.allergies
past_history = request_data.past_history
try:
with db as session:
query = text("""
UPDATE patients
SET allergy = :allergies, illness = :past_history
WHERE patient_id = :patient_id
""")
session.execute(query, {
'allergies': allergies,
'past_history': past_history,
'patient_id': patient_id
})
session.commit()
return JSONResponse(content={'message': 'Patient details updated successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/patient_details")
async def get_patient_details(
payload: Dict[str, Any] = Body(...),
db: Session = Depends(get_db),
jwt_payload: dict = Depends(verify_jwt_token)
):
patient_id = payload.get('patient_id')
if not patient_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The 'patient_id' field is required in the request body."
)
try:
patient_query = text("SELECT * FROM patients WHERE patient_id = :patient_id")
patient_result = db.execute(patient_query, {'patient_id': patient_id}).fetchone()
if patient_result:
# Convert the patient result (a SQLAlchemy Row) to a mutable dictionary
patient_details = dict(patient_result._mapping)
wound_query = text("SELECT * FROM wounds WHERE patient_id = :patient_id")
wound_results = db.execute(wound_query, {'patient_id': patient_id}).fetchall()
wound_details_list = [dict(wound._mapping) for wound in wound_results]
# Add the wound details to the main patient dictionary
patient_details['wound_details'] = wound_details_list
# ==================================================================
# START: FIX FOR JSON SERIALIZATION ERROR
# ==================================================================
# This loop iterates through the final dictionary and converts any
# date or datetime objects into ISO 8601 formatted strings.
for key, value in patient_details.items():
if isinstance(value, (datetime.datetime, datetime.date)):
patient_details[key] = value.isoformat()
# Also, do the same for the wound details
for wound in patient_details['wound_details']:
for key, value in wound.items():
if isinstance(value, (datetime.datetime, datetime.date)):
wound[key] = value.isoformat()
# ==================================================================
# END: FIX FOR JSON SERIALIZATION ERROR
# ==================================================================
return JSONResponse(content=patient_details, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found')
except Exception as e:
print(f"An error occurred while fetching details for patient_id {patient_id}: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/store_image")
async def store_image(patient_id: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)):
if not allowed_file(image.filename):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type')
filename = secure_filename(image.filename)
try:
patient_folder = os.path.join(UPLOAD_FOLDER, 'patients', patient_id)
os.makedirs(patient_folder, exist_ok=True)
image_path = os.path.join(patient_folder, filename)
with open(image_path, "wb") as buffer:
buffer.write(await image.read())
image_url = f"{BASE_URL}/uploads/patients/{patient_id}/{filename}"
with db as session:
query = text("UPDATE patients SET profile_photo_path = :image_url WHERE patient_id = :patient_id")
session.execute(query, {'image_url': image_url, 'patient_id': patient_id})
session.commit()
return JSONResponse(content={
'message': 'Image stored successfully',
'image_url': image_url
}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/get_image")
async def get_image(patient_id: str, db: SessionLocal = Depends(get_db)):
if not patient_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Patient ID parameter is required')
try:
with db as session:
query = text("SELECT profile_photo_path FROM patients WHERE patient_id = :patient_id")
result = session.execute(query, {'patient_id': patient_id}).fetchone()
if result and result.profile_photo_path:
image_url = result.profile_photo_path
filename = image_url.split('/')[-1]
file_path = os.path.join(UPLOAD_FOLDER, 'patients', patient_id, filename)
if os.path.exists(file_path):
return FileResponse(file_path)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given patient ID')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/store_wound_image")
async def store_wound_image(patient_id: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)):
if not allowed_file(image.filename):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type')
filename = secure_filename(image.filename)
try:
patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id)
os.makedirs(patient_folder, exist_ok=True)
image_path = os.path.join(patient_folder, filename)
with open(image_path, "wb") as buffer:
buffer.write(await image.read())
image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{filename}"
with db as session:
query = text("UPDATE wounds SET image = :image_url WHERE patient_id = :patient_id")
session.execute(query, {'image_url': image_url, 'patient_id': patient_id})
session.commit()
return JSONResponse(content={
'message': 'Image stored successfully',
'image_url': image_url
}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/get_wound_image")
async def get_wound_image(patient_id: str, db: SessionLocal = Depends(get_db)):
if not patient_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Patient ID parameter is required')
try:
with db as session:
query = text("SELECT image FROM wounds WHERE patient_id = :patient_id")
result = session.execute(query, {'patient_id': patient_id}).fetchone()
if result and result.image:
image_url = result.image
filename = image_url.split('/')[-1]
file_path = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id, filename)
if os.path.exists(file_path):
return FileResponse(file_path)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given patient ID')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/store_med_image")
async def store_med_image(email: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)):
if not allowed_file(image.filename):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type')
filename = secure_filename(image.filename)
try:
med_practitioner_folder = os.path.join(UPLOAD_FOLDER, 'medical_practitioners', email)
os.makedirs(med_practitioner_folder, exist_ok=True)
image_path = os.path.join(med_practitioner_folder, filename)
with open(image_path, "wb") as buffer:
buffer.write(await image.read())
image_url = f"{BASE_URL}/uploads/medical_practitioners/{email}/{filename}"
with db as session:
query = text("UPDATE users SET profile_photo_path = :image_url WHERE email = :email")
session.execute(query, {'image_url': image_url, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Image stored and path updated successfully', 'image_url': image_url}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/get_med_image")
async def get_med_image(email: str, db: SessionLocal = Depends(get_db)):
if not email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required')
try:
with db as session:
query = text("SELECT profile_photo_path FROM users WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result and result.profile_photo_path:
image_url = result.profile_photo_path
filename = image_url.split('/')[-1]
file_path = os.path.join(UPLOAD_FOLDER, 'medical_practitioners', email, filename)
if os.path.exists(file_path):
return FileResponse(file_path)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given email')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/store_org_image")
async def store_org_image(email: str = Form(...), image: UploadFile = File(...), db: SessionLocal = Depends(get_db)):
if not allowed_file(image.filename):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type')
filename = secure_filename(image.filename)
try:
org_folder = os.path.join(UPLOAD_FOLDER, 'organisations', email)
os.makedirs(org_folder, exist_ok=True)
image_path = os.path.join(org_folder, filename)
with open(image_path, "wb") as buffer:
buffer.write(await image.read())
image_url = f"{BASE_URL}/uploads/organisations/{email}/{filename}"
with db as session:
query = text("UPDATE organisations SET profile_photo_path = :image_url WHERE email = :email")
session.execute(query, {'image_url': image_url, 'email': email})
session.commit()
return JSONResponse(content={'message': 'Image stored and path updated successfully', 'image_url': image_url}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/get_org_image")
async def get_org_image(email: str, db: SessionLocal = Depends(get_db)):
if not email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email parameter is required')
try:
with db as session:
query = text("SELECT profile_photo_path FROM organisations WHERE email = :email")
result = session.execute(query, {'email': email}).fetchone()
if result and result.profile_photo_path:
image_url = result.profile_photo_path
filename = image_url.split('/')[-1]
file_path = os.path.join(UPLOAD_FOLDER, 'organisations', email, filename)
if os.path.exists(file_path):
return FileResponse(file_path)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image file not found on server')
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Image not found for the given email')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/save_notes")
async def save_notes(request_data: SaveNotesRequest, db: SessionLocal = Depends(get_db)):
patient_id = request_data.patient_id
notes = request_data.notes
try:
with db as session:
query = text("UPDATE patients SET notes = :notes WHERE patient_id = :patient_id")
session.execute(query, {'notes': notes, 'patient_id': patient_id})
session.commit()
return JSONResponse(content={'message': 'Notes saved successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/total_appointments")
async def total_appointments(request_data: TotalAppointmentsRequest, db: SessionLocal = Depends(get_db)):
start_date_str = request_data.start_date
end_date_str = request_data.end_date
doctor = request_data.doctor
try:
start_date = datetime.datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.datetime.strptime(end_date_str, '%Y-%m-%d')
with db as session:
query = text("""
SELECT DATE(scheduled_date) as date, COUNT(*) as total_appointments
FROM patients
WHERE scheduled_date BETWEEN :start_date AND :end_date
AND doctor = :doctor
GROUP BY DATE(scheduled_date)
ORDER BY DATE(scheduled_date)
""")
results = session.execute(query, {
'start_date': start_date,
'end_date': end_date,
'doctor': doctor
}).fetchall()
appointments_by_day = {}
for row in results:
date_str = row.date.strftime('%Y-%m-%d')
appointments_by_day[date_str] = row.total_appointments
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
if date_str not in appointments_by_day:
appointments_by_day[date_str] = 0
current_date += datetime.timedelta(days=1)
return JSONResponse(content={'appointments_by_day': appointments_by_day}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/admin_add_practitioner")
async def admin_add_practitioner(request_data: AdminAddPractitionerRequest, db: SessionLocal = Depends(get_db)):
name = request_data.name
email = request_data.email
c_code = request_data.c_code
phone = request_data.phone
org = request_data.org
try:
with db as session:
query = text("SELECT email FROM users WHERE email = :email")
existing_email = session.execute(query, {'email': email}).fetchone()
query = text("SELECT phone FROM users WHERE phone = :phone")
existing_phone = session.execute(query, {'phone': phone}).fetchone()
if existing_email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email already exists. Please login.')
elif existing_phone:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Phone number already exists. Please use another phone number.')
else:
uuid_val = generate_session_id()
license_key = generate_license_key()
query = text("INSERT INTO users (name, email, c_code, phone, uuid, licence_key, org) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :org)")
session.execute(query, {
'name': name,
'email': email,
'c_code': c_code,
'phone': phone,
'uuid': uuid_val,
'license_key': license_key,
'org': org
})
session.commit()
return JSONResponse(content={'message': 'Data added successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/update_org_profile")
async def update_org_profile(request_data: UpdateOrgProfileRequest, db: SessionLocal = Depends(get_db)):
name = request_data.name
department = request_data.department
about = request_data.about
location = request_data.location
latitude = request_data.latitude
longitude = request_data.longitude
email = request_data.email
if email is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email is required')
try:
with db as session:
query = text("SELECT email FROM organisations WHERE email = :email")
existing_org = session.execute(query, {'email': email}).fetchone()
if not existing_org:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found')
update_query = text("""
UPDATE organisations
SET name = :name,
departments = :department,
about = :about,
location = :location,
latitude = :latitude,
longitude = :longitude
WHERE email = :email
""")
session.execute(update_query, {
'name': name,
'department': department,
'about': about,
'location': location,
'latitude': latitude,
'longitude': longitude,
'email': email
})
session.commit()
return JSONResponse(content={'message': 'Organisation profile updated successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/update_med_profile")
async def update_med_profile(request_data: UpdateMedProfileRequest, db: SessionLocal = Depends(get_db)):
name = request_data.name
department = request_data.department
about = request_data.about
location = request_data.location
latitude = request_data.latitude
longitude = request_data.longitude
email = request_data.email
if email is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email is required')
try:
with db as session:
query = text("SELECT email FROM users WHERE email = :email")
existing_user = session.execute(query, {'email': email}).fetchone()
if not existing_user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Medical user not found')
update_query = text("""
UPDATE users
SET name = :name,
departments = :department,
about = :about,
location = :location,
latitude = :latitude,
longitude = :longitude
WHERE email = :email
""")
session.execute(update_query, {
'name': name,
'department': department,
'about': about,
'location': location,
'latitude': latitude,
'longitude': longitude,
'email': email
})
session.commit()
return JSONResponse(content={'message': 'Medical profile updated successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/add_wound_details_v2")
async def add_wound_details_v2(request_data: AddWoundDetailsV2Request = Depends(), image: UploadFile = File(...), db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
length = request_data.length
breadth = request_data.breadth
depth = request_data.depth
area = request_data.area
moisture = request_data.moisture
wound_location = request_data.wound_location
tissue = request_data.tissue
exudate = request_data.exudate
periwound = request_data.periwound
periwound_type = request_data.periwound_type
patient_id = request_data.patient_id
type = request_data.type
category = request_data.category
edge = request_data.edge
infection = request_data.infection
last_dressing_date = request_data.last_dressing_date
updated_at = datetime.datetime.utcnow()
if not allowed_file(image.filename):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type')
filename = secure_filename(image.filename)
try:
with db as session:
query = text("SELECT area, id, created_at FROM wounds WHERE patient_id = :patient_id")
result = session.execute(query, {'patient_id': patient_id}).fetchone()
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found')
existing_area = result[0]
if not existing_area:
existing_area = 0
wound_id = result[1]
created_at = result[2]
area_difference = float(area) - float(existing_area)
if area_difference > 0:
size_variation = 'wound area increased'
elif area_difference < 0:
size_variation = 'wound area reduced'
else:
size_variation = 'wound area same'
patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id)
os.makedirs(patient_folder, exist_ok=True)
image_path = os.path.join(patient_folder, filename)
with open(image_path, "wb") as buffer:
buffer.write(await image.read())
image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{filename}"
query1 = text("""
INSERT INTO wound_images (depth, width, height, uuid, updated_at, patient_id, size_variation, image, wound_id, created_at, area)
VALUES (:depth, :breadth, :length, :uuid, :updated_at, :patient_id, :size_variation, :image_url, :wound_id, :created_at, :area)
""")
session.execute(query1, {
'depth': depth, 'breadth': breadth, 'length': length, 'uuid': generate_session_id(),
'updated_at': updated_at, 'patient_id': patient_id, 'size_variation': size_variation,
'image_url': image_url, 'wound_id': wound_id, 'created_at': created_at, 'area': area
})
query2 = text("""
UPDATE wounds SET height = :length, width = :breadth, depth = :depth, area = :area,
moisture = :moisture, position = :wound_location, tissue = :tissue, exudate = :exudate,
periwound = :periwound, periwound_type = :periwound_type, type = :type, category = :category,
edges = :edge, infection = :infection, updated_at = :updated_at, last_dressing = :last_dressing_date,
image = :image_url
WHERE patient_id = :patient_id
""")
session.execute(query2, {
'length': length, 'breadth': breadth, 'depth': depth, 'area': area,
'moisture': moisture, 'wound_location': wound_location, 'tissue': tissue,
'exudate': exudate, 'periwound': periwound, 'periwound_type': periwound_type,
'type': type, 'category': category, 'edge': edge, 'infection': infection,
'updated_at': updated_at, 'last_dressing_date': last_dressing_date, 'patient_id': patient_id,
'image_url': image_url
})
session.commit()
return JSONResponse(content={
'message': 'Wound details and image stored successfully',
'image_url': image_url
}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/wound_progress_timeline")
async def get_wound_details_v2(patient_id: str, db: SessionLocal = Depends(get_db)):
if not patient_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='patient_id is required')
try:
with db as session:
query = text("""
SELECT * FROM wound_images WHERE patient_id = :patient_id ORDER BY updated_at DESC """)
results = session.execute(query, {'patient_id': patient_id}).fetchall()
wound_details = []
for row in results:
wound_details.append({
'length': row.height,
'breadth': row.width,
'depth': row.depth,
'size_variation': row.size_variation,
'image': row.image,
'patient_id': row.patient_id,
'area': row.area,
'updated_at': row.updated_at.isoformat()
})
return JSONResponse(content=wound_details, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/med/forgot/pin/otp")
async def med_forgot_pin_otp(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
phone = request_data.phone
try:
with db as session:
query = text("SELECT * FROM users WHERE phone = :phone")
user = session.execute(query, {'phone': phone}).fetchone()
if user:
phone_with_code = user.c_code + user.phone
otp = "1234" # generate_otp()
# send_sms(phone_with_code, otp)
updated_at = datetime.datetime.utcnow()
expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
update_otp_in_med_database(session, phone, otp, expiry_time, updated_at)
return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'otp': otp}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/org/forgot/pin/otp")
async def org_forgot_pin_otp(request_data: SendOtpRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
phone = request_data.phone
try:
with db as session:
query = text("SELECT * FROM organisations WHERE phone = :phone")
user = session.execute(query, {'phone': phone}).fetchone()
if user:
phone_with_code = user.c_code + user.phone
otp = "1234" # generate_otp()
# send_sms(phone_with_code, otp)
updated_at = datetime.datetime.utcnow()
expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
update_otp_in_database(session, phone, otp, expiry_time, updated_at)
return JSONResponse(content={'status': 200, 'message': 'OTP Sent on mobile.', 'otp': otp}, status_code=status.HTTP_200_OK)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='OOPS! Phone Does Not Exist!')
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/update_scheduled_date_v2")
async def update_scheduled_date_v2(request_data: UpdateScheduledDateRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
email = request_data.email
patient_id = request_data.patient_id
scheduled_date = request_data.scheduled_date
try:
with db as session:
query = text("UPDATE patients SET scheduled_date = :scheduled_date WHERE email = :email AND patient_id = :patient_id")
result = session.execute(query, {'scheduled_date': scheduled_date, 'email': email, 'patient_id': patient_id})
session.commit()
if result.rowcount == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='No matching record found')
return JSONResponse(content={'message': 'Scheduled date updated successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.get("/total_appointments_v2")
async def total_appointments_v2(start_date: str, end_date: str, email: str, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
try:
start_date_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
end_date_dt = datetime.datetime.strptime(end_date, '%Y-%m-%d')
with db as session:
query = text("""
SELECT DATE(scheduled_date) as date, COUNT(*) as total_appointments
FROM patients
WHERE scheduled_date BETWEEN :start_date AND :end_date
AND email = :email
GROUP BY DATE(scheduled_date)
ORDER BY DATE(scheduled_date)
""")
results = session.execute(query, {
'start_date': start_date_dt,
'end_date': end_date_dt,
'email': email
}).fetchall()
appointments_by_day = {}
for row in results:
date_str = row.date.strftime('%Y-%m-%d')
appointments_by_day[date_str] = row.total_appointments
current_date = start_date_dt
while current_date <= end_date_dt:
date_str = current_date.strftime('%Y-%m-%d')
if date_str not in appointments_by_day:
appointments_by_day[date_str] = 0
current_date += datetime.timedelta(days=1)
return JSONResponse(content={'appointments_by_day': appointments_by_day}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/add_patient_v2")
async def add_patient_v2(request_data: AddPatientV2Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
name = request_data.name
dob = request_data.dob
gender = request_data.gender
age = request_data.age
height = request_data.height
weight = request_data.weight
email = request_data.email
doctor = request_data.doctor
role = request_data.role
patient_id = generate_patient_id()
created_at = datetime.datetime.utcnow()
updated_at = datetime.datetime.utcnow()
scheduled_date = datetime.datetime.utcnow()
try:
with db as session:
uuid_val = generate_session_id()
if role == "3":
user_query = text("SELECT id FROM users WHERE email = :email AND name = :doctor")
user_result = session.execute(user_query, {'email': email, 'doctor': doctor}).fetchone()
if not user_result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Doctor not found')
doctor_id = user_result.id
pat_query = text("INSERT INTO patients (name, dob, gender, age, height, weight, email, doctor, added_by, uuid, patient_id, created_at, updated_at, scheduled_date) VALUES (:name, :dob, :gender, :age, :height, :weight, :email, :doctor_id, :doctor, :uuid, :patient_id, :created_at, :updated_at, :scheduled_date)")
session.execute(pat_query, {'name': name, 'dob': dob, 'gender': gender, 'age': age, 'height': height, 'weight': weight, 'email': email, 'doctor_id': doctor_id, 'doctor': doctor, 'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at, 'scheduled_date': scheduled_date})
wound_query = text("INSERT INTO wounds (uuid, patient_id, created_at, updated_at) VALUES (:uuid, :patient_id, :created_at, :updated_at)")
session.execute(wound_query, {'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at})
elif role == "5":
org_query = text("SELECT id FROM organisations WHERE email = :email AND name = :doctor")
org_result = session.execute(org_query, {'email': email, 'doctor': doctor}).fetchone()
if not org_result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found')
org_id = org_result.id
pat_query = text("INSERT INTO patients (name, dob, gender, age, height, weight, email, org, added_by, uuid, patient_id, created_at, updated_at, scheduled_date) VALUES (:name, :dob, :gender, :age, :height, :weight, :email, :org_id, :doctor, :uuid, :patient_id, :created_at, :updated_at, :scheduled_date)")
session.execute(pat_query, {'name': name, 'dob': dob, 'gender': gender, 'age': age, 'height': height, 'weight': weight, 'email': email, 'org_id': org_id, 'doctor': doctor, 'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at, 'scheduled_date': scheduled_date})
wound_query = text("INSERT INTO wounds (uuid, patient_id, created_at, updated_at) VALUES (:uuid, :patient_id, :created_at, :updated_at)")
session.execute(wound_query, {'uuid': uuid_val, 'patient_id': patient_id, 'created_at': created_at, 'updated_at': updated_at})
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid role')
session.commit()
return JSONResponse(content={'message': 'Patient added successfully', 'patient_id': patient_id}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/save_notes_v2")
async def save_notes_v2(request_data: SaveNotesV2Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
patient_id = request_data.patient_id
notes = request_data.notes
remarks = request_data.remarks
try:
with db as session:
query = text("UPDATE patients SET notes = :notes, remarks = :remarks WHERE patient_id = :patient_id")
session.execute(query, {'notes': notes, 'remarks': remarks, 'patient_id': patient_id})
session.commit()
return JSONResponse(content={'message': 'Notes saved successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/admin_add_practitioner_v2")
async def admin_add_practitioner_v2(request_data: AdminAddPractitionerV2Request, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
name = request_data.name
email = request_data.email
c_code = request_data.c_code
phone = request_data.phone
org_email = request_data.org_email
created_at = datetime.datetime.utcnow()
updated_at = datetime.datetime.utcnow()
try:
with db as session:
query = text("SELECT email FROM users WHERE email = :email")
existing_email = session.execute(query, {'email': email}).fetchone()
query = text("SELECT phone FROM users WHERE phone = :phone")
existing_phone = session.execute(query, {'phone': phone}).fetchone()
if existing_email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email already exists. Please login.')
elif existing_phone:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Phone number already exists. Please use another phone number.')
else:
org_query = text("SELECT id FROM organisations WHERE email = :org_email")
org_result = session.execute(org_query, {'org_email': org_email}).fetchone()
if not org_result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Organisation not found')
org_id = org_result.id
uuid_val = generate_session_id()
license_key = generate_license_key()
query = text("INSERT INTO users (name, email, c_code, phone, uuid, licence_key, org, updated_at, created_at) VALUES (:name, :email, :c_code, :phone, :uuid, :license_key, :org_id, :updated_at, :created_at)")
session.execute(query, {
'name': name,
'email': email,
'c_code': c_code,
'phone': phone,
'uuid': uuid_val,
'license_key': license_key,
'org_id': org_id,
'updated_at': updated_at,
'created_at': created_at
})
session.commit()
return JSONResponse(content={'message': 'Data added successfully'}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/sort_patients")
async def sort_patients(request_data: SortPatientsRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
email = request_data.email
date_str = request_data.date
try:
with db as session:
if date_str:
appointment_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
query = text("""
SELECT * FROM patients WHERE scheduled_date = :appointment_date AND email = :email
""")
results = session.execute(query, {'appointment_date': appointment_date, 'email': email}).fetchall()
else:
query = text("""
SELECT * FROM patients WHERE email = :email
""")
results = session.execute(query, {'email': email}).fetchall()
patient_dicts = [dict(row._mapping) for row in results]
return JSONResponse(content={'patients': patient_dicts}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/get_appointment_count")
async def get_appointment_count(request_data: GetAppointmentCountRequest, db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
email = request_data.email
date_str = request_data.date
try:
appointment_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
with db as session:
query = text("""
SELECT COUNT(*) as count FROM patients WHERE scheduled_date = :appointment_date AND email = :email
""")
result = session.execute(query, {'appointment_date': appointment_date, 'email': email}).fetchone()
total_appointments = result.count if result else 0
return JSONResponse(content={
'title': f'The total number of appointments booked is {total_appointments}',
'description': f'The appointments are scheduled for {appointment_date}'
}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.post("/add_wound_detail")
async def add_wound_details_v3(request_data: AddWoundDetailsV3Request = Depends(), image: UploadFile = File(..., alias="image"), api_image: UploadFile = File(..., alias="api_image"), db: SessionLocal = Depends(get_db), payload: dict = Depends(verify_jwt_token)):
length = request_data.length
breadth = request_data.breadth
depth = request_data.depth
area = request_data.area
moisture = request_data.moisture
wound_location = request_data.wound_location
tissue = request_data.tissue
exudate = request_data.exudate
periwound = request_data.periwound
periwound_type = request_data.periwound_type
patient_id = request_data.patient_id
type = request_data.type
category = request_data.category
edge = request_data.edge
infection = request_data.infection
last_dressing_date = request_data.last_dressing_date
updated_at = datetime.datetime.utcnow()
if not allowed_file(image.filename) or not allowed_file(api_image.filename):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid file type for one or both images')
normal_image_filename = secure_filename(image.filename)
api_image_filename = secure_filename(api_image.filename)
try:
with db as session:
query = text("SELECT area, id, created_at FROM wounds WHERE patient_id = :patient_id")
result = session.execute(query, {'patient_id': patient_id}).fetchone()
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Patient not found')
existing_area = result[0]
if not existing_area:
existing_area = 0
wound_id = result[1]
created_at = result[2]
area_difference = float(area) - float(existing_area)
if area_difference > 0:
size_variation = 'wound area increased'
elif area_difference < 0:
size_variation = 'wound area reduced'
else:
size_variation = 'wound area same'
patient_folder = os.path.join(UPLOAD_FOLDER, 'wounds', patient_id)
os.makedirs(patient_folder, exist_ok=True)
normal_image_path = os.path.join(patient_folder, normal_image_filename)
with open(normal_image_path, "wb") as buffer:
buffer.write(await image.read())
api_patient_folder = os.path.join(UPLOAD_FOLDER, 'assessed_wounds', patient_id)
os.makedirs(api_patient_folder, exist_ok=True)
api_image_path = os.path.join(api_patient_folder, api_image_filename)
with open(api_image_path, "wb") as buffer:
buffer.write(await api_image.read())
image_url = f"{BASE_URL}/uploads/wounds/{patient_id}/{normal_image_filename}"
api_image_url = f"{BASE_URL}/uploads/assessed_wounds/{patient_id}/{api_image_filename}"
query1 = text("""
INSERT INTO wound_images (depth, width, height, uuid, updated_at, patient_id, size_variation, image, wound_id, created_at, area, photo_assessment)
VALUES (:depth, :breadth, :length, :uuid, :updated_at, :patient_id, :size_variation, :image_url, :wound_id, :created_at, :area, :api_image_url)
""")
session.execute(query1, {
'depth': depth, 'breadth': breadth, 'length': length, 'uuid': generate_session_id(),
'updated_at': updated_at, 'patient_id': patient_id, 'size_variation': size_variation,
'image_url': image_url, 'wound_id': wound_id, 'created_at': created_at, 'area': area,
'api_image_url': api_image_url
})
query2 = text("""
UPDATE wounds SET height = :length, width = :breadth, depth = :depth, area = :area,
moisture = :moisture, position = :wound_location, tissue = :tissue, exudate = :exudate,
periwound = :periwound, periwound_type = :periwound_type, type = :type, category = :category,
edges = :edge, infection = :infection, updated_at = :updated_at, last_dressing = :last_dressing_date,
image = :image_url, photo_assessment = :api_image_url
WHERE patient_id = :patient_id
""")
session.execute(query2, {
'length': length, 'breadth': breadth, 'depth': depth, 'area': area,
'moisture': moisture, 'wound_location': wound_location, 'tissue': tissue,
'exudate': exudate, 'periwound': periwound, 'periwound_type': periwound_type,
'type': type, 'category': category, 'edge': edge, 'infection': infection,
'updated_at': updated_at, 'last_dressing_date': last_dressing_date, 'patient_id': patient_id,
'image_url': image_url, 'api_image_url': api_image_url
})
session.commit()
return JSONResponse(content={
'message': 'Wound details and images stored successfully',
'image_url': image_url,
'photo_assessment': api_image_url
}, status_code=status.HTTP_200_OK)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))