Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| from datetime import datetime, date | |
| from uuid import uuid4 | |
| import logging | |
| import pandas as pd | |
| import os | |
| from urllib.parse import quote, unquote | |
| import time | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, filename='app_log.txt', | |
| format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Load environment variables | |
| SALESFORCE_USERNAME = os.getenv("SF_USERNAME", "app41@clinic.com") | |
| SALESFORCE_PASSWORD = os.getenv("SF_PASSWORD", "text@12345") | |
| SALESFORCE_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "6JgPkW3K5MPPGXDgc2JNxCUdB") | |
| SALESFORCE_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://clinic3-dev-ed.develop.lightning.force.com/") | |
| TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID", "AC80ba89ceecaa0f3058326f5c86d63f18") | |
| TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", "13d8416e924902b19f8181b77c968060") | |
| TWILIO_PHONE = os.getenv("TWILIO_PHONE", "+13304226436") | |
| required_vars = [SALESFORCE_USERNAME, SALESFORCE_PASSWORD, SALESFORCE_SECURITY_TOKEN, SALESFORCE_INSTANCE_URL, | |
| TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE] | |
| if not all(required_vars): | |
| logging.error("Missing environment variables.") | |
| raise ValueError("Missing environment variables. Configure API tokens in Hugging Face Spaces secrets.") | |
| # Salesforce Authentication | |
| def get_salesforce_access_token(): | |
| try: | |
| login_url = "https://login.salesforce.com/services/Soap/u/60.0" | |
| payload = f"""<?xml version="1.0" encoding="utf-8" ?> | |
| <env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema" | |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
| xmlns:env="http://schemas.xmlsoap.org/soap/envelope/"> | |
| <env:Body> | |
| <n1:login xmlns:n1="urn:partner.soap.sforce.com"> | |
| <n1:username>{SALESFORCE_USERNAME}</n1:username> | |
| <n1:password>{SALESFORCE_PASSWORD}{SALESFORCE_SECURITY_TOKEN}</n1:password> | |
| </n1:login> | |
| </env:Body> | |
| </env:Envelope>""" | |
| headers = {"Content-Type": "text/xml", "SOAPAction": "login"} | |
| response = requests.post(login_url, data=payload, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| response_text = response.text | |
| session_id = response_text.split("<sessionId>")[1].split("</sessionId>")[0] | |
| server_url = response_text.split("<serverUrl>")[1].split("</serverUrl>")[0] | |
| instance_url = server_url.split("/services")[0] | |
| return {"access_token": session_id, "instance_url": instance_url} | |
| except Exception as e: | |
| logging.error(f"Salesforce authentication failed: {str(e)}") | |
| return None | |
| sf_auth = get_salesforce_access_token() | |
| if not sf_auth: | |
| logging.error("Failed to authenticate with Salesforce.") | |
| raise ValueError("Salesforce authentication failed.") | |
| SALESFORCE_BASE_URL = sf_auth["instance_url"] + "/services/data/v60.0/" | |
| SALESFORCE_HEADERS = { | |
| "Authorization": f"Bearer {sf_auth['access_token']}", | |
| "Content-Type": "application/json" | |
| } | |
| # Salesforce Client | |
| class SalesforceClient: | |
| def __init__(self, base_url, headers): | |
| self.base_url = base_url | |
| self.headers = headers | |
| def create_record(self, object_name, data): | |
| try: | |
| response = requests.post(f"{self.base_url}sobjects/{object_name}/", headers=self.headers, json=data) | |
| response.raise_for_status() | |
| return response.json()["id"] | |
| except Exception as e: | |
| logging.error(f"Create record failed: {str(e)}") | |
| return None | |
| def update_record(self, object_name, record_id, data): | |
| try: | |
| response = requests.patch(f"{self.base_url}sobjects/{object_name}/{record_id}", headers=self.headers, json=data) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| logging.error(f"Update record failed: {str(e)}") | |
| return False | |
| def query_records(self, query): | |
| try: | |
| encoded_query = quote(query) | |
| response = requests.get(f"{self.base_url}query/?q={encoded_query}", headers=self.headers) | |
| response.raise_for_status() | |
| return response.json()["records"] | |
| except Exception as e: | |
| logging.error(f"Query records failed: {str(e)}") | |
| return [] | |
| # Twilio Client | |
| class TwilioClient: | |
| def __init__(self, account_sid, auth_token, from_number): | |
| self.api_url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json" | |
| self.auth = (account_sid, auth_token) | |
| self.from_number = from_number | |
| def send_message(self, to_number, body, method="sms"): | |
| try: | |
| if not to_number.startswith("+"): | |
| return False | |
| to_number = f"whatsapp:{to_number}" if method == "whatsapp" else to_number | |
| from_number = f"whatsapp:{self.from_number}" if method == "whatsapp" else self.from_number | |
| data = {"From": from_number, "To": to_number, "Body": body} | |
| response = requests.post(self.api_url, auth=self.auth, data=data, timeout=10) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| logging.error(f"Send message failed: {str(e)}") | |
| return False | |
| # Initialize Clients | |
| sf_client = SalesforceClient(SALESFORCE_BASE_URL, SALESFORCE_HEADERS) | |
| twilio_client = TwilioClient(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE) | |
| # Patient Info and State Management | |
| def get_patient_info(): | |
| patient_id = unquote(requests.utils.urlparse(os.environ.get("QUERY_STRING", "")).query.split("patientId=")[1].split("&")[0] if "patientId=" in os.environ.get("QUERY_STRING", "") else "") | |
| patient_name = unquote(requests.utils.urlparse(os.environ.get("QUERY_STRING", "")).query.split("patientName=")[1].split("&")[0] if "patientName=" in os.environ.get("QUERY_STRING", "") else "") | |
| return patient_id, patient_name | |
| patient_id, patient_name = get_patient_info() | |
| patient_id_state = gr.State(value=patient_id) | |
| patient_name_state = gr.State(value=patient_name) | |
| # Follow-Up Scheduler | |
| def follow_up_scheduler(): | |
| while True: | |
| current_date = date.today().isoformat() | |
| plans = sf_client.query_records(f"SELECT Patient__c, MessageContent__c FROM FollowUpPlan__c WHERE ScheduledDate__c = '{current_date}' AND Status__c = 'Scheduled'") | |
| for plan in plans: | |
| patient = sf_client.query_records(f"SELECT Phone__c, ConsentGiven__c FROM Patient__c WHERE Id = '{plan['Patient__c']}'") | |
| if patient and patient[0]["ConsentGiven__c"]: | |
| message = plan.get("MessageContent__c", f"Hi, this is your daily follow-up.") | |
| if twilio_client.send_message(patient[0]["Phone__c"], message): | |
| sf_client.create_record("MessageLog__c", { | |
| "Patient__c": plan["Patient__c"], "MessageText__c": message, | |
| "Direction__c": "Outbound", "Timestamp__c": datetime.utcnow().isoformat() | |
| }) | |
| sf_client.update_record("FollowUpPlan__c", plan["Id"], {"Status__c": "Sent"}) | |
| time.sleep(86400) # Run daily | |
| # Update Patient Profile | |
| def update_patient_profile(patient_id, height, weight, emergency_name, emergency_number, emergency_relationship, address, dob): | |
| if not patient_id: | |
| return "Please log in to update your profile." | |
| update_data = {} | |
| if height is not None and height != "": update_data["Height__c"] = float(height) | |
| if weight is not None and weight != "": update_data["Weight__c"] = float(weight) | |
| if emergency_name: update_data["EmergencyContactName__c"] = emergency_name | |
| if emergency_number: update_data["EmergencyContactNumber__c"] = emergency_number | |
| if emergency_relationship: update_data["EmergencyContactRelationship__c"] = emergency_relationship | |
| if address: update_data["Address__c"] = address | |
| if dob: update_data["DateOfBirth__c"] = dob | |
| if update_data: | |
| if sf_client.update_record("Patient__c", patient_id, update_data): | |
| return "Profile updated successfully!" | |
| return "Failed to update profile." | |
| return "No changes made." | |
| # Schedule Appointment | |
| def schedule_appointment(patient_id, doctor_name, appointment_type, preferred_date, time_slot, reason, special_requests): | |
| if not all([patient_id, doctor_name, appointment_type, preferred_date, time_slot]): | |
| return "All fields are required." | |
| doctors = [doc["Name"] for doc in sf_client.query_records("SELECT Name FROM Doctor__c")] | |
| if doctor_name not in doctors: | |
| return "Invalid doctor selected." | |
| appointment_data = { | |
| "Patient__c": patient_id, | |
| "Doctor__c": doctor_name, | |
| "AppointmentType__c": appointment_type, | |
| "DateTime__c": f"{preferred_date} {time_slot}:00", | |
| "Status__c": "Scheduled", | |
| "Reason__c": reason, | |
| "SpecialRequests__c": special_requests | |
| } | |
| result = sf_client.create_record("Appointment__c", appointment_data) | |
| if result: | |
| return f"Appointment scheduled with {doctor_name} on {preferred_date} at {time_slot}:00!" | |
| return "Failed to schedule appointment." | |
| # Get Next Appointment | |
| def get_next_appointment(patient_id): | |
| if not patient_id: | |
| return None | |
| next_appt = sf_client.query_records(f"SELECT DateTime__c, Doctor__c FROM Appointment__c WHERE Patient__c = '{patient_id}' AND Status__c = 'Scheduled' ORDER BY DateTime__c ASC LIMIT 1") | |
| return next_appt[0] if next_appt else None | |
| # Get Pending Fields | |
| def get_pending_fields(patient_id): | |
| if not patient_id: | |
| return [] | |
| patient_data = sf_client.query_records(f"SELECT Height__c, Weight__c, EmergencyContactName__c, EmergencyContactNumber__c, EmergencyContactRelationship__c, Address__c, DateOfBirth__c FROM Patient__c WHERE Id = '{patient_id}'")[0] | |
| required_fields = ["Height__c", "Weight__c", "EmergencyContactName__c", "EmergencyContactNumber__c", "EmergencyContactRelationship__c", "Address__c", "DateOfBirth__c"] | |
| pending = [field for field in required_fields if not patient_data.get(field)] | |
| return pending | |
| # Get All Appointments | |
| def get_all_appointments(patient_id): | |
| if not patient_id: | |
| return [] | |
| appointments = sf_client.query_records(f"SELECT DateTime__c, Doctor__c, AppointmentType__c, Status__c, Reason__c FROM Appointment__c WHERE Patient__c = '{patient_id}' ORDER BY DateTime__c DESC") | |
| return appointments if appointments else [] | |
| # Gradio Interface | |
| with gr.Blocks(theme=gr.themes.Soft(), title="HealthPortal - Patient Care") as demo: | |
| gr.Markdown("# HealthPortal - Patient Care System") | |
| patient_id_state = gr.State(value=patient_id) | |
| patient_name_state = gr.State(value=patient_name) | |
| with gr.Tabs(): | |
| with gr.Tab("Home"): | |
| if not patient_id_state.value: | |
| gr.Markdown("**You are not logged in.** Please log in or register to access your personalized dashboard.") | |
| with gr.Row(): | |
| gr.Button("Login", link=f"{SALESFORCE_INSTANCE_URL}/apex/PatientLoginForm") | |
| gr.Button("Register", link=f"{SALESFORCE_INSTANCE_URL}/apex/PatientRegistrationForm") | |
| # Note: Login and register links unchanged; relies on iframe/Salesforce flow | |
| else: | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### New Appointment") | |
| next_appt = get_next_appointment(patient_id_state.value) | |
| if next_appt: | |
| gr.Markdown(f"**Date:** {next_appt['DateTime__c'].split(' ')[0]}\n**Time:** {next_appt['DateTime__c'].split(' ')[1][:5]}\n**Doctor:** {next_appt['Doctor__c']}") | |
| else: | |
| gr.Markdown("No upcoming appointments.") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Pending Forms") | |
| pending_fields = get_pending_fields(patient_id_state.value) | |
| if pending_fields: | |
| gr.Markdown(f"**Unfilled Fields:** {', '.join(pending_fields)}") | |
| with gr.Row(): | |
| details_button = gr.Button("View Details") | |
| else: | |
| gr.Markdown("No pending forms.") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Health Score") | |
| health_score = sf_client.query_records(f"SELECT AVG(RiskScore__c) FROM SymptomLog__c WHERE Patient__c = '{patient_id_state.value}'")[0].get("expr0", 0) | |
| gr.Markdown(f"**Score:** {int(health_score)}/100\n{'Excellent' if health_score >= 80 else 'Monitor'}") | |
| if pending_fields: | |
| def show_details_page(): | |
| return gr.update(visible=True) | |
| details_page = gr.Column(visible=False) | |
| with details_page: | |
| gr.Markdown("## Update Patient Details") | |
| patient_data = sf_client.query_records(f"SELECT Height__c, Weight__c, EmergencyContactName__c, EmergencyContactNumber__c, EmergencyContactRelationship__c, Address__c, DateOfBirth__c FROM Patient__c WHERE Id = '{patient_id_state.value}'")[0] | |
| height_input = gr.Number(label="Height (cm)", value=patient_data.get("Height__c", 0), interactive=True) | |
| weight_input = gr.Number(label="Weight (kg)", value=patient_data.get("Weight__c", 0), interactive=True) | |
| emergency_name_input = gr.Textbox(label="Emergency Contact Name", value=patient_data.get("EmergencyContactName__c", "")) | |
| emergency_number_input = gr.Textbox(label="Emergency Contact Number", value=patient_data.get("EmergencyContactNumber__c", "")) | |
| emergency_rel_input = gr.Textbox(label="Emergency Contact Relationship", value=patient_data.get("EmergencyContactRelationship__c", "")) | |
| address_input = gr.Textbox(label="Address", value=patient_data.get("Address__c", "")) | |
| dob_input = gr.Textbox(label="Date of Birth (YYYY-MM-DD)", value=patient_data.get("DateOfBirth__c", "")) | |
| update_button = gr.Button("Save") | |
| back_button = gr.Button("Back") | |
| output = gr.Textbox(label="Result") | |
| update_button.click( | |
| fn=update_patient_profile, | |
| inputs=[patient_id_state, height_input, weight_input, emergency_name_input, emergency_number_input, emergency_rel_input, address_input, dob_input], | |
| outputs=output | |
| ) | |
| back_button.click( | |
| fn=lambda: gr.update(visible=False), | |
| outputs=details_page | |
| ) | |
| details_button.click( | |
| fn=show_details_page, | |
| outputs=details_page | |
| ) | |
| with gr.Tab("Appointments"): | |
| if not patient_id_state.value: | |
| gr.Markdown("Please log in to manage appointments.") | |
| else: | |
| gr.Markdown("## Appointment History") | |
| appointments = get_all_appointments(patient_id_state.value) | |
| if appointments: | |
| for appt in appointments: | |
| gr.Markdown(f"**{appt['AppointmentType__c']} with {appt['Doctor__c']}\nDate:** {appt['DateTime__c'].split(' ')[0]}\n**Time:** {appt['DateTime__c'].split(' ')[1][:5]}\n**Status:** {appt['Status__c']}\n**Reason:** {appt.get('Reason__c', 'N/A')}") | |
| else: | |
| gr.Markdown("You have not booked any appointments.") | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=100): | |
| schedule_button = gr.Button("Schedule Appointment", variant="primary") | |
| with gr.Column(scale=3): | |
| pass | |
| def show_schedule_page(): | |
| return gr.update(visible=True) | |
| schedule_page = gr.Column(visible=False) | |
| with schedule_page: | |
| gr.Markdown("## Schedule New Appointment") | |
| doctors = [doc["Name"] for doc in sf_client.query_records("SELECT Name FROM Doctor__c")] | |
| doctor_input = gr.Dropdown(doctors, label="Select Provider", value=doctors[0] if doctors else "") | |
| appointment_type_input = gr.Dropdown(["Consultation", "Follow-up", "Procedure"], label="Appointment Type", value="Consultation") | |
| date_input = gr.Date(label="Preferred Date", value=date.today()) | |
| time_slots = ["09:00", "10:00", "11:00", "14:00", "15:00", "16:00"] # Mock, to be replaced with real availability | |
| time_input = gr.Dropdown(time_slots, label="Available Time Slots", value=time_slots[0]) | |
| reason_input = gr.Textbox(label="Reason for Visit") | |
| special_input = gr.Textbox(label="Special Instructions") | |
| schedule_submit = gr.Button("Schedule") | |
| output = gr.Textbox(label="Result") | |
| schedule_submit.click( | |
| fn=schedule_appointment, | |
| inputs=[patient_id_state, doctor_input, appointment_type_input, date_input, time_input, reason_input, special_input], | |
| outputs=output | |
| ) | |
| schedule_button.click( | |
| fn=show_schedule_page, | |
| outputs=schedule_page | |
| ) | |
| # Launch with Scheduler | |
| if __name__ == "__main__": | |
| import threading | |
| scheduler_thread = threading.Thread(target=follow_up_scheduler, daemon=True) | |
| scheduler_thread.start() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |