|
|
import sqlite3 |
|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
import random |
|
|
import string |
|
|
import os |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_db_connection(): |
|
|
"""Create a database connection with proper error handling""" |
|
|
try: |
|
|
conn = sqlite3.connect("hospital.db", check_same_thread=False) |
|
|
conn.row_factory = sqlite3.Row |
|
|
return conn |
|
|
except sqlite3.Error as e: |
|
|
print(f"Database connection error: {e}") |
|
|
return None |
|
|
|
|
|
conn = get_db_connection() |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS doctors ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
name TEXT NOT NULL, |
|
|
specialty TEXT, |
|
|
avg_consultation_time INTEGER DEFAULT 15, -- in minutes |
|
|
available BOOLEAN DEFAULT 1 |
|
|
)''') |
|
|
|
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS patients ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
name TEXT NOT NULL, |
|
|
phone TEXT NOT NULL, |
|
|
email TEXT, |
|
|
symptoms TEXT, |
|
|
priority INTEGER DEFAULT 3, -- 1=emergency, 2=urgent, 3=normal |
|
|
doctor_id INTEGER, |
|
|
queue_number INTEGER, |
|
|
token TEXT UNIQUE, |
|
|
registration_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
estimated_start_time TIMESTAMP, |
|
|
status TEXT DEFAULT 'Waiting', |
|
|
completed_time TIMESTAMP, |
|
|
notes TEXT, |
|
|
FOREIGN KEY (doctor_id) REFERENCES doctors(id) |
|
|
)''') |
|
|
|
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS appointments ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
patient_name TEXT NOT NULL, |
|
|
phone TEXT NOT NULL, |
|
|
email TEXT, |
|
|
doctor_id INTEGER, |
|
|
appointment_date TEXT, |
|
|
appointment_time TEXT, |
|
|
reason TEXT, |
|
|
status TEXT DEFAULT 'Scheduled', |
|
|
FOREIGN KEY (doctor_id) REFERENCES doctors(id) |
|
|
)''') |
|
|
|
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS notifications ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
patient_id INTEGER, |
|
|
message TEXT, |
|
|
sent_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (patient_id) REFERENCES patients(id) |
|
|
)''') |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM doctors") |
|
|
if cursor.fetchone()[0] == 0: |
|
|
doctors = [ |
|
|
("Dr. Smith", "General Medicine", 15, 1), |
|
|
("Dr. Lee", "Pediatrics", 20, 1), |
|
|
("Dr. Patel", "Cardiology", 25, 1), |
|
|
("Dr. Johnson", "Orthopedics", 20, 1), |
|
|
("Dr. Garcia", "Dermatology", 15, 1) |
|
|
] |
|
|
cursor.executemany("INSERT INTO doctors (name, specialty, avg_consultation_time, available) VALUES (?, ?, ?, ?)", doctors) |
|
|
conn.commit() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_token(): |
|
|
"""Generate a unique alphanumeric token for patients""" |
|
|
token = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) |
|
|
return token |
|
|
|
|
|
def calculate_wait_time(doctor_id, queue_position): |
|
|
"""Calculate estimated wait time based on doctor's avg consultation time and queue position""" |
|
|
cursor.execute("SELECT avg_consultation_time FROM doctors WHERE id=?", (doctor_id,)) |
|
|
avg_time = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM patients WHERE doctor_id=? AND status='In Consultation'", (doctor_id,)) |
|
|
in_consultation = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
wait_minutes = (queue_position - 1 + in_consultation) * avg_time |
|
|
|
|
|
|
|
|
variation = random.uniform(0.8, 1.2) |
|
|
wait_minutes = int(wait_minutes * variation) |
|
|
|
|
|
return wait_minutes |
|
|
|
|
|
def format_wait_time(minutes): |
|
|
"""Format wait time into hours and minutes""" |
|
|
if minutes < 60: |
|
|
return f"{minutes} minutes" |
|
|
hours = minutes // 60 |
|
|
mins = minutes % 60 |
|
|
return f"{hours} hour{'s' if hours > 1 else ''} {mins} minutes" |
|
|
|
|
|
def update_all_wait_times(): |
|
|
"""Update estimated wait times for all waiting patients""" |
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT p.id, p.doctor_id, p.queue_number |
|
|
FROM patients p |
|
|
WHERE p.status='Waiting' |
|
|
ORDER BY p.doctor_id, p.queue_number |
|
|
""") |
|
|
waiting_patients = cursor.fetchall() |
|
|
|
|
|
|
|
|
for doctor_id in set([p[1] for p in waiting_patients]): |
|
|
|
|
|
doctor_patients = [p for p in waiting_patients if p[1] == doctor_id] |
|
|
|
|
|
|
|
|
for patient in doctor_patients: |
|
|
wait_minutes = calculate_wait_time(doctor_id, patient[2]) |
|
|
estimated_time = datetime.now() + timedelta(minutes=wait_minutes) |
|
|
cursor.execute( |
|
|
"UPDATE patients SET estimated_start_time=? WHERE id=?", |
|
|
(estimated_time.strftime('%Y-%m-%d %H:%M:%S'), patient[0]) |
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def register_patient(name, phone, email, symptoms, priority, doctor_name): |
|
|
"""Register a new patient with improved data validation and wait time estimation""" |
|
|
|
|
|
if not name or not phone or not doctor_name: |
|
|
return "Error: Name, phone number, and doctor selection are required." |
|
|
|
|
|
try: |
|
|
|
|
|
cursor.execute("SELECT id, available FROM doctors WHERE name=?", (doctor_name,)) |
|
|
doctor_result = cursor.fetchone() |
|
|
|
|
|
if not doctor_result: |
|
|
return f"Error: Doctor {doctor_name} not found." |
|
|
|
|
|
doctor_id, is_available = doctor_result |
|
|
|
|
|
if not is_available: |
|
|
return f"Sorry, {doctor_name} is currently not available. Please select another doctor." |
|
|
|
|
|
|
|
|
cursor.execute("SELECT id FROM patients WHERE phone=? AND status IN ('Waiting', 'In Consultation')", (phone,)) |
|
|
if cursor.fetchone(): |
|
|
return "This phone number is already registered in the active queue." |
|
|
|
|
|
|
|
|
cursor.execute("SELECT MAX(queue_number) FROM patients WHERE doctor_id=?", (doctor_id,)) |
|
|
last_queue = cursor.fetchone()[0] |
|
|
next_queue = 1 if last_queue is None else last_queue + 1 |
|
|
|
|
|
|
|
|
token = generate_token() |
|
|
while True: |
|
|
|
|
|
cursor.execute("SELECT id FROM patients WHERE token=?", (token,)) |
|
|
if not cursor.fetchone(): |
|
|
break |
|
|
token = generate_token() |
|
|
|
|
|
|
|
|
wait_minutes = calculate_wait_time(doctor_id, next_queue) |
|
|
estimated_time = datetime.now() + timedelta(minutes=wait_minutes) |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
INSERT INTO patients |
|
|
(name, phone, email, symptoms, priority, doctor_id, queue_number, token, estimated_start_time, status) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
""", (name, phone, email, symptoms, priority, doctor_id, next_queue, token, |
|
|
estimated_time.strftime('%Y-%m-%d %H:%M:%S'), 'Waiting')) |
|
|
conn.commit() |
|
|
|
|
|
|
|
|
patient_id = cursor.lastrowid |
|
|
|
|
|
|
|
|
notification_msg = f"Welcome {name}! You are registered with {doctor_name}. Your token is {token} and queue number is {next_queue}." |
|
|
cursor.execute("INSERT INTO notifications (patient_id, message) VALUES (?, ?)", |
|
|
(patient_id, notification_msg)) |
|
|
conn.commit() |
|
|
|
|
|
|
|
|
update_all_wait_times() |
|
|
|
|
|
|
|
|
wait_time_str = format_wait_time(wait_minutes) |
|
|
return f"Registered successfully!\n\nYour token: {token}\nQueue number: {next_queue}\nDoctor: {doctor_name}\nEstimated wait time: {wait_time_str}" |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
return f"An error occurred: {str(e)}" |
|
|
|
|
|
def check_status(phone_or_token): |
|
|
"""Check patient status by phone or token""" |
|
|
if not phone_or_token: |
|
|
return "Please enter a phone number or token." |
|
|
|
|
|
try: |
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT |
|
|
p.id, p.name, p.token, p.queue_number, p.status, p.estimated_start_time, |
|
|
d.name as doctor_name, d.specialty |
|
|
FROM patients p |
|
|
JOIN doctors d ON p.doctor_id = d.id |
|
|
WHERE p.token=? AND p.status IN ('Waiting', 'In Consultation') |
|
|
LIMIT 1 |
|
|
""", (phone_or_token,)) |
|
|
|
|
|
result = cursor.fetchone() |
|
|
|
|
|
|
|
|
if not result: |
|
|
cursor.execute(""" |
|
|
SELECT |
|
|
p.id, p.name, p.token, p.queue_number, p.status, p.estimated_start_time, |
|
|
d.name as doctor_name, d.specialty |
|
|
FROM patients p |
|
|
JOIN doctors d ON p.doctor_id = d.id |
|
|
WHERE p.phone=? AND p.status IN ('Waiting', 'In Consultation') |
|
|
ORDER BY p.id DESC LIMIT 1 |
|
|
""", (phone_or_token,)) |
|
|
result = cursor.fetchone() |
|
|
|
|
|
if not result: |
|
|
return "No active registration found. If you've already completed your visit or haven't registered yet, please check with reception." |
|
|
|
|
|
|
|
|
estimated_time = datetime.strptime(result[5], '%Y-%m-%d %H:%M:%S') if result[5] else None |
|
|
now = datetime.now() |
|
|
|
|
|
if estimated_time and estimated_time > now: |
|
|
time_diff = estimated_time - now |
|
|
wait_minutes = int(time_diff.total_seconds() / 60) |
|
|
wait_str = format_wait_time(wait_minutes) |
|
|
else: |
|
|
wait_str = "You should be called soon" |
|
|
|
|
|
status_info = f""" |
|
|
Patient: {result[1]} |
|
|
Token: {result[2]} |
|
|
Queue #: {result[3]} |
|
|
Status: {result[4]} |
|
|
Doctor: {result[6]} ({result[7]}) |
|
|
""" |
|
|
|
|
|
if result[4] == 'Waiting': |
|
|
status_info += f"Estimated wait: {wait_str}" |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT message FROM notifications |
|
|
WHERE patient_id=? |
|
|
ORDER BY sent_time DESC LIMIT 1 |
|
|
""", (result[0],)) |
|
|
|
|
|
notification = cursor.fetchone() |
|
|
if notification: |
|
|
status_info += f"\n\nNotification: {notification[0]}" |
|
|
|
|
|
return status_info |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error checking status: {str(e)}" |
|
|
|
|
|
def get_doctor_queue(doctor_name): |
|
|
"""Get current queue for a specific doctor""" |
|
|
try: |
|
|
cursor.execute("SELECT id FROM doctors WHERE name=?", (doctor_name,)) |
|
|
doctor_id = cursor.fetchone()[0] |
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT |
|
|
id, name, queue_number, priority, |
|
|
strftime('%H:%M', registration_time) as reg_time, |
|
|
status, phone, token |
|
|
FROM patients |
|
|
WHERE doctor_id=? AND status IN ('Waiting', 'In Consultation') |
|
|
ORDER BY |
|
|
CASE status |
|
|
WHEN 'In Consultation' THEN 0 |
|
|
ELSE 1 |
|
|
END, |
|
|
priority, queue_number |
|
|
""", (doctor_id,)) |
|
|
|
|
|
rows = cursor.fetchall() |
|
|
|
|
|
|
|
|
result = [] |
|
|
for row in rows: |
|
|
|
|
|
priority_text = {1: "Emergency", 2: "Urgent", 3: "Normal"}.get(row[3], "Normal") |
|
|
|
|
|
result.append([row[0], row[1], row[2], priority_text, row[4], row[5], row[6], row[7]]) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error getting doctor queue: {e}") |
|
|
return [] |
|
|
|
|
|
def get_doctor_availability(): |
|
|
"""Get the list of available doctors with their specialties""" |
|
|
cursor.execute(""" |
|
|
SELECT id, name, specialty, available |
|
|
FROM doctors |
|
|
ORDER BY name |
|
|
""") |
|
|
doctors = cursor.fetchall() |
|
|
|
|
|
|
|
|
result = [] |
|
|
for doc in doctors: |
|
|
status = "Available" if doc[3] else "Unavailable" |
|
|
result.append([doc[0], doc[1], doc[2], status]) |
|
|
|
|
|
return result |
|
|
|
|
|
def toggle_doctor_availability(doctor_id): |
|
|
"""Toggle a doctor's availability status""" |
|
|
try: |
|
|
if not doctor_id: |
|
|
return "Please select a doctor." |
|
|
|
|
|
cursor.execute("SELECT available FROM doctors WHERE id=?", (doctor_id,)) |
|
|
current_status = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
new_status = 0 if current_status else 1 |
|
|
cursor.execute("UPDATE doctors SET available=? WHERE id=?", (new_status, doctor_id)) |
|
|
conn.commit() |
|
|
|
|
|
status_text = "available" if new_status else "unavailable" |
|
|
return f"Doctor status updated to {status_text}" |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
return f"Error updating status: {str(e)}" |
|
|
|
|
|
def call_next(doctor_name): |
|
|
"""Call the next patient in queue with notifications""" |
|
|
try: |
|
|
cursor.execute("SELECT id FROM doctors WHERE name=?", (doctor_name,)) |
|
|
doctor_id = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT id, name |
|
|
FROM patients |
|
|
WHERE doctor_id=? AND status='In Consultation' |
|
|
LIMIT 1 |
|
|
""", (doctor_id,)) |
|
|
current_patient = cursor.fetchone() |
|
|
|
|
|
if current_patient: |
|
|
return f"{current_patient[1]} is currently in consultation. Please complete their visit before calling the next patient." |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT id, name, token, queue_number, phone |
|
|
FROM patients |
|
|
WHERE doctor_id=? AND status='Waiting' |
|
|
ORDER BY priority, queue_number ASC |
|
|
LIMIT 1 |
|
|
""", (doctor_id,)) |
|
|
|
|
|
patient = cursor.fetchone() |
|
|
|
|
|
if not patient: |
|
|
return "No patients waiting." |
|
|
|
|
|
|
|
|
cursor.execute("UPDATE patients SET status='In Consultation' WHERE id=?", (patient[0],)) |
|
|
|
|
|
|
|
|
notification_msg = f"It's your turn! Please proceed to {doctor_name}'s room immediately." |
|
|
cursor.execute("INSERT INTO notifications (patient_id, message) VALUES (?, ?)", |
|
|
(patient[0], notification_msg)) |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
update_all_wait_times() |
|
|
|
|
|
|
|
|
return f"Now calling: {patient[1]} (Token: {patient[2]}, Queue #: {patient[3]})" |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
return f"Error calling next patient: {str(e)}" |
|
|
|
|
|
def complete_patient(patient_id, notes=""): |
|
|
"""Mark patient consultation as completed with optional notes""" |
|
|
try: |
|
|
if not patient_id: |
|
|
return "Please select a patient to mark as completed." |
|
|
|
|
|
|
|
|
cursor.execute( |
|
|
"UPDATE patients SET status='Completed', completed_time=?, notes=? WHERE id=?", |
|
|
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), notes, patient_id) |
|
|
) |
|
|
|
|
|
|
|
|
cursor.execute("SELECT name, token FROM patients WHERE id=?", (patient_id,)) |
|
|
patient = cursor.fetchone() |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
update_all_wait_times() |
|
|
|
|
|
return f"Patient {patient[0]} (Token: {patient[1]}) marked as completed." |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
return f"Error completing patient visit: {str(e)}" |
|
|
|
|
|
def schedule_appointment(name, phone, email, doctor_name, date, time, reason): |
|
|
"""Schedule a future appointment""" |
|
|
try: |
|
|
|
|
|
if not name or not phone or not doctor_name or not date or not time: |
|
|
return "Error: Name, phone, doctor, date and time are required fields." |
|
|
|
|
|
|
|
|
cursor.execute("SELECT id FROM doctors WHERE name=?", (doctor_name,)) |
|
|
doctor_result = cursor.fetchone() |
|
|
|
|
|
if not doctor_result: |
|
|
return f"Error: Doctor {doctor_name} not found." |
|
|
|
|
|
doctor_id = doctor_result[0] |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT COUNT(*) FROM appointments |
|
|
WHERE doctor_id=? AND appointment_date=? AND appointment_time=? AND status='Scheduled' |
|
|
""", (doctor_id, date, time)) |
|
|
|
|
|
if cursor.fetchone()[0] > 0: |
|
|
return f"Sorry, {doctor_name} is already booked at {time} on {date}. Please select another time." |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
INSERT INTO appointments |
|
|
(patient_name, phone, email, doctor_id, appointment_date, appointment_time, reason, status) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, 'Scheduled') |
|
|
""", (name, phone, email, doctor_id, date, time, reason)) |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
return f"Appointment scheduled successfully for {name} with {doctor_name} on {date} at {time}." |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
return f"Error scheduling appointment: {str(e)}" |
|
|
|
|
|
def get_appointments(date=None, doctor_name=None): |
|
|
"""Get list of appointments by date and/or doctor""" |
|
|
try: |
|
|
query = """ |
|
|
SELECT |
|
|
a.id, a.patient_name, a.phone, a.appointment_time, |
|
|
d.name as doctor_name, a.reason, a.status |
|
|
FROM appointments a |
|
|
JOIN doctors d ON a.doctor_id = d.id |
|
|
WHERE 1=1 |
|
|
""" |
|
|
params = [] |
|
|
|
|
|
if date: |
|
|
query += " AND a.appointment_date = ?" |
|
|
params.append(date) |
|
|
|
|
|
if doctor_name and doctor_name != "All Doctors": |
|
|
query += " AND d.name = ?" |
|
|
params.append(doctor_name) |
|
|
|
|
|
query += " ORDER BY a.appointment_time" |
|
|
|
|
|
cursor.execute(query, params) |
|
|
appointments = cursor.fetchall() |
|
|
|
|
|
|
|
|
result = [] |
|
|
for appt in appointments: |
|
|
result.append([appt[0], appt[1], appt[2], appt[3], appt[4], appt[5], appt[6]]) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error getting appointments: {e}") |
|
|
return [] |
|
|
|
|
|
def get_daily_stats(): |
|
|
"""Get daily statistics for the dashboard""" |
|
|
try: |
|
|
today = datetime.now().strftime('%Y-%m-%d') |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT COUNT(*) FROM patients |
|
|
WHERE date(registration_time) = ? |
|
|
""", (today,)) |
|
|
total_patients = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM patients WHERE status='Waiting'") |
|
|
waiting_patients = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM patients WHERE status='In Consultation'") |
|
|
in_consultation = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT COUNT(*) FROM patients |
|
|
WHERE status='Completed' AND date(completed_time) = ? |
|
|
""", (today,)) |
|
|
completed_today = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT AVG( |
|
|
(julianday(completed_time) - julianday(registration_time)) * 24 * 60 |
|
|
) FROM patients |
|
|
WHERE status='Completed' AND date(completed_time) = ? |
|
|
""", (today,)) |
|
|
avg_wait_time = cursor.fetchone()[0] |
|
|
avg_wait_formatted = f"{int(avg_wait_time)} minutes" if avg_wait_time else "N/A" |
|
|
|
|
|
return { |
|
|
"total_patients": total_patients, |
|
|
"waiting": waiting_patients, |
|
|
"in_consultation": in_consultation, |
|
|
"completed": completed_today, |
|
|
"avg_wait_time": avg_wait_formatted |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error getting stats: {e}") |
|
|
return { |
|
|
"total_patients": 0, |
|
|
"waiting": 0, |
|
|
"in_consultation": 0, |
|
|
"completed": 0, |
|
|
"avg_wait_time": "N/A" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_doctor_names(): |
|
|
"""Get list of available doctor names""" |
|
|
cursor.execute("SELECT name FROM doctors WHERE available=1 ORDER BY name") |
|
|
doctors = [row[0] for row in cursor.fetchall()] |
|
|
return doctors |
|
|
|
|
|
def get_all_doctor_names(): |
|
|
"""Get list of all doctor names regardless of availability""" |
|
|
cursor.execute("SELECT name FROM doctors ORDER BY name") |
|
|
doctors = [row[0] for row in cursor.fetchall()] |
|
|
return doctors |
|
|
|
|
|
|
|
|
theme = gr.themes.Soft( |
|
|
primary_hue="blue", |
|
|
secondary_hue="indigo", |
|
|
) |
|
|
|
|
|
with gr.Blocks(theme=theme, title="Hospital Queue Management System") as demo: |
|
|
gr.Markdown("# 🏥 Hospital Queue Management System") |
|
|
|
|
|
|
|
|
with gr.Tab("📊 Dashboard"): |
|
|
gr.Markdown("### Today's Statistics") |
|
|
|
|
|
with gr.Row(): |
|
|
total_count = gr.Textbox(label="Total Patients Today") |
|
|
waiting_count = gr.Textbox(label="Currently Waiting") |
|
|
consulting_count = gr.Textbox(label="In Consultation") |
|
|
completed_count = gr.Textbox(label="Completed Today") |
|
|
avg_wait = gr.Textbox(label="Average Wait Time") |
|
|
|
|
|
refresh_stats_btn = gr.Button("Refresh Statistics") |
|
|
|
|
|
def update_dashboard(): |
|
|
stats = get_daily_stats() |
|
|
return [ |
|
|
stats["total_patients"], |
|
|
stats["waiting"], |
|
|
stats["in_consultation"], |
|
|
stats["completed"], |
|
|
stats["avg_wait_time"] |
|
|
] |
|
|
|
|
|
refresh_stats_btn.click( |
|
|
fn=update_dashboard, |
|
|
inputs=[], |
|
|
outputs=[total_count, waiting_count, consulting_count, completed_count, avg_wait] |
|
|
) |
|
|
|
|
|
|
|
|
demo.load( |
|
|
fn=update_dashboard, |
|
|
inputs=[], |
|
|
outputs=[total_count, waiting_count, consulting_count, completed_count, avg_wait] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("➕ Register Patient"): |
|
|
gr.Markdown("### New Patient Registration") |
|
|
|
|
|
with gr.Row(): |
|
|
name = gr.Textbox(label="Patient Name*", placeholder="Enter full name") |
|
|
phone = gr.Textbox(label="Phone Number*", placeholder="Enter phone number") |
|
|
|
|
|
with gr.Row(): |
|
|
email = gr.Textbox(label="Email (Optional)", placeholder="Enter email address") |
|
|
priority = gr.Dropdown( |
|
|
choices=[ |
|
|
{"label": "Normal", "value": 3}, |
|
|
{"label": "Urgent", "value": 2}, |
|
|
{"label": "Emergency", "value": 1} |
|
|
], |
|
|
label="Priority", |
|
|
value=3 |
|
|
) |
|
|
|
|
|
symptoms = gr.Textbox( |
|
|
label="Symptoms/Reason for Visit", |
|
|
placeholder="Briefly describe the symptoms or reason for visit", |
|
|
lines=3 |
|
|
) |
|
|
|
|
|
|
|
|
doctor = gr.Dropdown( |
|
|
choices=get_doctor_names(), |
|
|
label="Select Doctor*", |
|
|
info="Only shows available doctors" |
|
|
) |
|
|
|
|
|
register_btn = gr.Button("Register Patient", variant="primary") |
|
|
register_output = gr.Textbox(label="Registration Details", lines=6) |
|
|
|
|
|
register_btn.click( |
|
|
fn=register_patient, |
|
|
inputs=[name, phone, email, symptoms, priority, doctor], |
|
|
outputs=register_output |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("🔍 Check My Status"): |
|
|
gr.Markdown("### Patient Status Lookup") |
|
|
|
|
|
phone_lookup = gr.Textbox( |
|
|
label="Enter Your Phone Number or Token", |
|
|
placeholder="Enter phone number or 6-digit token", |
|
|
info="You can use either your phone number or token to check your status" |
|
|
) |
|
|
|
|
|
check_btn = gr.Button("Check Status", variant="primary") |
|
|
status_output = gr.Textbox(label="Your Status", lines=8) |
|
|
|
|
|
check_btn.click(fn=check_status, inputs=phone_lookup, outputs=status_output) |
|
|
|
|
|
|
|
|
with gr.Tab("🩺 Doctor Panel"): |
|
|
gr.Markdown("### Manage Patient Queue") |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
doc_select = gr.Dropdown( |
|
|
choices=get_all_doctor_names(), |
|
|
label="Select Doctor", |
|
|
info="Select doctor to view their queue" |
|
|
) |
|
|
|
|
|
with gr.Column(): |
|
|
call_btn = gr.Button("Call Next Patient", variant="primary") |
|
|
call_output = gr.Textbox(label="Now Calling", lines=2) |
|
|
|
|
|
queue_table = gr.Dataframe( |
|
|
headers=["ID", "Name", "Queue #", "Priority", "Reg. Time", "Status", "Phone", "Token"], |
|
|
datatype=["number", "str", "number", "str", "str", "str", "str", "str"], |
|
|
interactive=False, |
|
|
label="Current Queue" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
selected_patient_id = gr.Number(label="Patient ID", precision=0) |
|
|
patient_notes = gr.Textbox(label="Consultation Notes", lines=2) |
|
|
complete_btn = gr.Button("Complete Visit") |
|
|
complete_output = gr.Textbox(label="Result") |
|
|
|
|
|
refresh_btn = gr.Button("Refresh Queue") |
|
|
|
|
|
|
|
|
call_btn.click(fn=call_next, inputs=doc_select, outputs=call_output) |
|
|
|
|
|
complete_btn.click( |
|
|
fn=complete_patient, |
|
|
inputs=[selected_patient_id, patient_notes], |
|
|
outputs=complete_output |
|
|
) |
|
|
|
|
|
refresh_btn.click( |
|
|
fn=lambda d: get_doctor_queue(d), |
|
|
inputs=doc_select, |
|
|
outputs=queue_table |
|
|
) |
|
|
|
|
|
|
|
|
doc_select.change( |
|
|
fn=lambda d: get_doctor_queue(d), |
|
|
inputs=doc_select, |
|
|
outputs=queue_table |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("📅 Schedule Appointment"): |
|
|
gr.Markdown("### Schedule a Future Appointment") |
|
|
|
|
|
with gr.Row(): |
|
|
appt_name = gr.Textbox(label="Patient Name*", placeholder="Enter full name") |
|
|
appt_phone = gr.Textbox(label="Phone Number*", placeholder="Enter phone number") |
|
|
|
|
|
with gr.Row(): |
|
|
appt_email = gr.Textbox(label="Email (Optional)", placeholder="Enter email address") |
|
|
|
|
|
appt_doctor = gr.Dropdown( |
|
|
choices=get_all_doctor_names(), |
|
|
label="Select Doctor*" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
appt_date = gr.Textbox( |
|
|
label="Appointment Date*", |
|
|
placeholder="YYYY-MM-DD", |
|
|
info="Enter date in YYYY-MM-DD format" |
|
|
) |
|
|
appt_time = gr.Dropdown( |
|
|
choices=[ |
|
|
"09:00 AM", "09:30 AM", "10:00 AM", "10:30 AM", "11:00 AM", "11:30 AM", |
|
|
"01:00 PM", "01:30 PM", "02:00 PM", "02:30 PM", "03:00 PM", "03:30 PM", |
|
|
"04:00 PM", "04:30 PM" |
|
|
], |
|
|
label="Appointment Time*" |
|
|
) |
|
|
|
|
|
appt_reason = gr.Textbox( |
|
|
label="Reason for Appointment", |
|
|
placeholder="Briefly describe the reason for appointment", |
|
|
lines=3 |
|
|
) |
|
|
|
|
|
schedule_btn = gr.Button("Schedule Appointment", variant="primary") |
|
|
schedule_output = gr.Textbox(label="Appointment Details") |
|
|
|
|
|
schedule_btn.click( |
|
|
fn=schedule_appointment, |
|
|
inputs=[appt_name, appt_phone, appt_email, appt_doctor, appt_date, appt_time, appt_reason], |
|
|
outputs=schedule_output |
|
|
) |
|
|
|
|
|
gr.Markdown("### View Appointments") |
|
|
|
|
|
with gr.Row(): |
|
|
view_date = gr.Textbox( |
|
|
label="Date (YYYY-MM-DD)", |
|
|
placeholder="Leave empty for all dates", |
|
|
value=datetime.now().strftime('%Y-%m-%d') |
|
|
) |
|
|
view_doctor = gr.Dropdown( |
|
|
choices=["All Doctors"] + get_all_doctor_names(), |
|
|
label="Doctor", |
|
|
value="All Doctors" |
|
|
) |
|
|
view_btn = gr.Button("View Appointments") |
|
|
|
|
|
appointments_table = gr.Dataframe( |
|
|
headers=["ID", "Patient Name", "Phone", "Time", "Doctor", "Reason", "Status"], |
|
|
interactive=False, |
|
|
label="Appointments" |
|
|
) |
|
|
|
|
|
view_btn.click( |
|
|
fn=get_appointments, |
|
|
inputs=[view_date, view_doctor], |
|
|
outputs=appointments_table |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("⚙️ Admin Panel"): |
|
|
gr.Markdown("### Doctor Availability Management") |
|
|
|
|
|
doctors_table = gr.Dataframe( |
|
|
headers=["ID", "Name", "Specialty", "Status"], |
|
|
interactive=False, |
|
|
label="Doctors" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
doctor_id = gr.Number(label="Doctor ID", precision=0) |
|
|
toggle_btn = gr.Button("Toggle Availability") |
|
|
toggle_output = gr.Textbox(label="Result") |
|
|
|
|
|
refresh_doctors_btn = gr.Button("Refresh Doctor List") |
|
|
|
|
|
|
|
|
toggle_btn.click( |
|
|
fn=toggle_doctor_availability, |
|
|
inputs=doctor_id, |
|
|
outputs=toggle_output |
|
|
) |
|
|
|
|
|
refresh_doctors_btn.click( |
|
|
fn=get_doctor_availability, |
|
|
inputs=[], |
|
|
outputs=doctors_table |
|
|
) |
|
|
|
|
|
|
|
|
demo.load( |
|
|
fn=get_doctor_availability, |
|
|
inputs=[], |
|
|
outputs=doctors_table |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("### System Maintenance") |
|
|
|
|
|
with gr.Row(): |
|
|
backup_btn = gr.Button("Backup Database") |
|
|
backup_output = gr.Textbox(label="Backup Status") |
|
|
|
|
|
def backup_database(): |
|
|
"""Create a backup of the database""" |
|
|
try: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
backup_file = f"hospital_backup_{timestamp}.db" |
|
|
|
|
|
|
|
|
backup_conn = sqlite3.connect(backup_file) |
|
|
|
|
|
|
|
|
conn.backup(backup_conn) |
|
|
backup_conn.close() |
|
|
|
|
|
return f"Backup created successfully: {backup_file}" |
|
|
except Exception as e: |
|
|
return f"Backup failed: {str(e)}" |
|
|
|
|
|
backup_btn.click(fn=backup_database, inputs=[], outputs=backup_output) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(share=False) |