| | import asyncio |
| | from datetime import datetime |
| | import traceback |
| | from typing import TypedDict |
| | import gradio as gr |
| | import uuid |
| | import os |
| | import pandas as pd |
| | from tqdm.auto import tqdm |
| | from common import get_db |
| | from config import SheamiConfig |
| | from graph import create_graph |
| | from modules.models import HealthReport, SheamiMilestone, SheamiState, SheamiUser |
| | from pdf_reader import read_pdf |
| | from gradio_modal import Modal |
| | from plot_helper import render_vitals_plot_layout |
| | from report_formatter import render_patient_state |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv(override=True) |
| | MAX_FILES = int(os.getenv("MAX_FILES", 3)) |
| |
|
| |
|
| | async def process_reports(user_email: str, patient_id: str, files: list): |
| | if not files: |
| | yield construct_process_message(message="Please upload at least one PDF file.") |
| | return |
| |
|
| | yield construct_process_message( |
| | message=f"Initiating processing of {len(files)} files ..." |
| | ) |
| | thread_id = str(uuid.uuid4()) |
| | workflow = create_graph( |
| | user_email=user_email, patient_id=patient_id, thread_id=thread_id |
| | ) |
| |
|
| | uploaded_reports = [] |
| | for file in files: |
| | file_path = file.name |
| | contents = read_pdf(file_path) |
| | |
| | uploaded_reports.append( |
| | HealthReport( |
| | report_file_name_with_path=file_path, |
| | report_file_name=os.path.basename(file_path), |
| | report_contents=contents, |
| | ) |
| | ) |
| |
|
| | state = SheamiState( |
| | uploaded_reports=uploaded_reports, |
| | thread_id=thread_id, |
| | user_email=user_email, |
| | patient_id=patient_id, |
| | ) |
| | config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 50} |
| |
|
| | |
| | buffer = "" |
| | final_state = state |
| | try: |
| | async for msg_packet in workflow.astream( |
| | state, config=config, stream_mode="values" |
| | ): |
| | final_state = msg_packet |
| | |
| | |
| | |
| | |
| | units_processed = ( |
| | msg_packet["units_processed"] if "units_processed" in msg_packet else 0 |
| | ) |
| | units_total = ( |
| | msg_packet["units_total"] if "units_total" in msg_packet else 6 |
| | ) |
| | process_desc = ( |
| | msg_packet["process_desc"] |
| | if "process_desc" in msg_packet |
| | else "Working on it ..." |
| | ) |
| |
|
| | overall_units_processed = ( |
| | msg_packet["overall_units_processed"] |
| | if "overall_units_processed" in msg_packet |
| | else 0 |
| | ) |
| | overall_units_total = ( |
| | msg_packet["overall_units_total"] |
| | if "overall_units_total" in msg_packet |
| | else 0 |
| | ) |
| |
|
| | if "messages" in msg_packet and msg_packet["messages"]: |
| | |
| | all_but_last = msg_packet["messages"][:-1] |
| | last_message = msg_packet["messages"][-1] |
| | buffer = "\n".join(all_but_last) |
| |
|
| | yield construct_process_message( |
| | message=buffer, |
| | current_step=process_desc, |
| | units_processed=units_processed, |
| | units_total=units_total, |
| | overall_units_processed=overall_units_processed, |
| | overall_units_total=overall_units_total, |
| | milestones=msg_packet["milestones"], |
| | ) |
| | buffer += "\n" |
| | for c in last_message: |
| | buffer += c |
| | yield construct_process_message( |
| | message=buffer, |
| | current_step=process_desc, |
| | units_processed=units_processed, |
| | units_total=units_total, |
| | overall_units_processed=overall_units_processed, |
| | overall_units_total=overall_units_total, |
| | milestones=msg_packet["milestones"], |
| | ) |
| | await asyncio.sleep(0.005) |
| | await asyncio.sleep(0.1) |
| |
|
| | buffer += ( |
| | "\n\n" |
| | f"✅ Processed <span class='highlighted-text'>{len(files)}</span> reports.\n" |
| | "Please download the output file from below within 5 min." |
| | ) |
| | except Exception as e: |
| | print("Error processing stream", e) |
| | traceback.print_exc() |
| | buffer += f"\n\n❌ Error processing reports. {e}" |
| |
|
| | final_state["milestones"][-1].status = "failed" |
| | final_state["milestones"][-1].end_time = datetime.now() |
| | |
| | await get_db().add_or_update_milestone( |
| | run_id=final_state["run_id"], |
| | milestone=final_state["milestones"][-1].step_name, |
| | status="failed", |
| | end=True, |
| | ) |
| |
|
| | |
| | await get_db().update_run_stats( |
| | run_id=final_state["run_id"], status="failed", message=f"{e}" |
| | ) |
| | finally: |
| | print("In finally ...", final_state["pdf_path"]) |
| | if final_state["pdf_path"]: |
| | yield construct_process_message( |
| | message=buffer, |
| | final_output=gr.update(value=final_state["pdf_path"], visible=True), |
| | milestones=final_state["milestones"], |
| | reports_output=msg_packet["standardized_reports"], |
| | trends_output=msg_packet["trends_json"], |
| | ) |
| | else: |
| | print("Yielding error message") |
| | yield construct_process_message( |
| | message=buffer, |
| | final_output=gr.update(visible=False), |
| | milestones=final_state["milestones"], |
| | reports_output=msg_packet["standardized_reports"], |
| | trends_output=msg_packet["trends_json"], |
| | error=True, |
| | ) |
| |
|
| |
|
| | def generate_milestones_data( |
| | num_rows=5, |
| | headers=[ |
| | "Step", |
| | "Status", |
| | "Start Time", |
| | "End Time", |
| | "Duration (s)", |
| | ], |
| | ): |
| | steps = [ |
| | "Consume & Standardize Reports", |
| | "Standardize Test Names", |
| | "Standardize Measurement Units", |
| | "Aggregate Trends", |
| | "Interpret & Plot Trends", |
| | ] |
| |
|
| | data = [] |
| | for i in range(num_rows): |
| | if i < len(steps): |
| | |
| | row = [steps[i]] + ["" for _ in headers[1:]] |
| | else: |
| | |
| | row = ["" for _ in headers] |
| | data.append(row) |
| |
|
| | return headers, data |
| |
|
| |
|
| | def disable_component(): |
| | return gr.update(interactive=False) |
| |
|
| |
|
| | def enable_component(): |
| | return gr.update(interactive=True) |
| |
|
| |
|
| | def construct_process_message( |
| | message: str, |
| | final_output: str = None, |
| | current_step: str = None, |
| | units_processed: int = 0, |
| | units_total: int = 0, |
| | overall_units_processed: int = 0, |
| | overall_units_total: int = 0, |
| | milestones: list[SheamiMilestone] = [], |
| | reports_output=None, |
| | trends_output=None, |
| | error=False, |
| | ): |
| | try: |
| | if units_total > 0: |
| | overall_pct_complete = ( |
| | (overall_units_processed + (units_processed / units_total)) |
| | / overall_units_total |
| | ) * 100 |
| | else: |
| | overall_pct_complete = (overall_units_processed / overall_units_total) * 100 |
| | except ZeroDivisionError: |
| | overall_pct_complete = 0 |
| | |
| | |
| | |
| | |
| | |
| | message = message.replace("\n", "<br>") |
| | formatted_message = ( |
| | f"<div class='transparent_div'>{message}<br><span class='dots'></span></div>" |
| | if not final_output |
| | else f"<div class='transparent_div'>{message}</div>" |
| | ) |
| |
|
| | final_message = "" |
| | if final_output: |
| | if error: |
| | final_message = "❌ There was an error processing your request. Please try after sometime." |
| | else: |
| | final_message = "✅ Your health trends report is ready for download!" |
| | else: |
| | final_message = "" |
| |
|
| | return ( |
| | formatted_message, |
| | disable_component() if not final_output else enable_component(), |
| | hide_component() if not final_output else show_component(), |
| | final_output, |
| | ( |
| | hide_component() if not final_output else show_component() |
| | ), |
| | milestones_to_rows(milestones), |
| | *render_patient_state( |
| | reports_output, trends_output |
| | ), |
| | final_message, |
| | ) |
| |
|
| |
|
| | def render_logo(): |
| | return gr.Image( |
| | type="filepath", |
| | value=SheamiConfig.logo_path, |
| | label="My Logo", |
| | show_label=False, |
| | show_download_button=False, |
| | show_fullscreen_button=False, |
| | show_share_button=False, |
| | interactive=False, |
| | container=False, |
| | ) |
| |
|
| |
|
| | def render_logo_small(): |
| | return gr.Image( |
| | type="filepath", |
| | value=SheamiConfig.logo_small_path, |
| | label="My Logo", |
| | show_label=False, |
| | show_share_button=False, |
| | show_download_button=False, |
| | show_fullscreen_button=False, |
| | interactive=False, |
| | container=False, |
| | ) |
| |
|
| | def render_banner(): |
| | return gr.Image( |
| | type="filepath", |
| | value=SheamiConfig.banner_path, |
| | label="Banner", |
| | show_label=False, |
| | show_share_button=False, |
| | show_download_button=False, |
| | show_fullscreen_button=False, |
| | interactive=False, |
| | container=False, |
| | ) |
| |
|
| | def toggle_logo_small(logo): |
| | print(logo) |
| | new_logo = ( |
| | SheamiConfig.logo_path if "logo-small" in logo else SheamiConfig.logo_small_path |
| | ) |
| | return gr.update(value=new_logo) |
| |
|
| |
|
| | def clear_component(): |
| | return gr.update(value=None) |
| |
|
| |
|
| | def hide_component(): |
| | return gr.update(visible=False) |
| |
|
| |
|
| | def show_component(): |
| | return gr.update(visible=True) |
| |
|
| |
|
| | def close_side_bar(): |
| | return gr.update(open=False) |
| |
|
| |
|
| | def open_side_bar(): |
| | return gr.update(open=True) |
| |
|
| |
|
| | def make_status_tab_active(): |
| | return gr.update(selected="my_status_container") |
| |
|
| |
|
| | def make_final_report_tab_active(): |
| | return gr.update(selected="my_final_report_container") |
| |
|
| |
|
| | def milestones_to_rows(milestones: list[SheamiMilestone]) -> list[list]: |
| | num_rows = 10 |
| | headers, data = generate_milestones_data(num_rows=num_rows) |
| |
|
| | for i, m in enumerate(milestones): |
| | if i >= len(data): |
| | break |
| | data[i] = [ |
| | m.step_name, |
| | m.status_icon, |
| | m.start_time.strftime("%H:%M:%S") if m.start_time else "", |
| | m.end_time.strftime("%H:%M:%S") if m.end_time else "", |
| | f"{m.time_taken:.2f}" if m.time_taken else "", |
| | ] |
| |
|
| | return data |
| |
|
| |
|
| | def handle_file_input_change(files): |
| | if files: |
| | if len(files) > MAX_FILES: |
| | return ( |
| | hide_component(), |
| | f"❌ Maximum of {MAX_FILES} files can be uploaded at a time.", |
| | ) |
| | else: |
| | return show_component(), f"✅ {len(files)} selected." |
| | else: |
| | return hide_component(), "❌ No files selected." |
| |
|
| |
|
| | def get_css(): |
| | return """ |
| | /* Container spacing */ |
| | .pro-radio .wrap { |
| | display: flex; |
| | flex-direction: column; |
| | gap: 8px; |
| | } |
| | |
| | /* Hide the default radio dot */ |
| | .pro-radio input[type="radio"] { |
| | display: none !important; |
| | } |
| | |
| | /* Base card look */ |
| | .pro-radio label { |
| | display: block; |
| | background: #fafafa; |
| | color: #222; |
| | font-family: "Inter", sans-serif; |
| | font-size: 15px; |
| | font-weight: 500; |
| | padding: 12px 16px; |
| | border-radius: 8px; |
| | border: 1px solid #ddd; |
| | cursor: pointer; |
| | transition: all 0.2s ease; |
| | outline: none !important; |
| | box-shadow: none !important; |
| | } |
| | |
| | /* Hover state */ |
| | .pro-radio label:hover { |
| | background: #f0f0f0; |
| | border-color: #bbb; |
| | } |
| | |
| | /* Selected card */ |
| | .pro-radio input[type="radio"]:checked + span { |
| | background: #e6f0ff; /* light blue background */ |
| | border: 1px solid #0066cc; /* blue border */ |
| | border-radius: 8px; |
| | font-weight: 600; |
| | color: #0066cc; |
| | display: block; |
| | padding: 12px 16px; |
| | outline: none !important; |
| | box-shadow: none !important; |
| | } |
| | |
| | /* Kill any weird inner focus box */ |
| | .pro-radio span { |
| | outline: none !important; |
| | box-shadow: none !important; |
| | border: none !important; |
| | } |
| | |
| | /* Remove Gradio's green selected background on the LABEL itself */ |
| | .pro-radio label:has(input[type="radio"]:checked), |
| | .pro-radio label[aria-checked="true"], |
| | .pro-radio label[data-selected="true"], |
| | .pro-radio .selected, |
| | .pro-radio [data-selected="true"] { |
| | background: #fafafa !important; /* or transparent */ |
| | box-shadow: none !important; |
| | border-color: #ddd !important; |
| | } |
| | |
| | /* Keep your selected look on the SPAN only (no inner blue box) */ |
| | .pro-radio input[type="radio"]:checked + span { |
| | background: #e6f0ff; |
| | border: 1px solid #0066cc; |
| | border-radius: 8px; |
| | display: block; |
| | padding: 12px 16px; |
| | color: #0066cc; |
| | font-weight: 600; |
| | outline: none !important; |
| | box-shadow: none !important; |
| | } |
| | |
| | /* Hide native dot + any focus rings */ |
| | .pro-radio input[type="radio"] { display: none !important; } |
| | .pro-radio label, |
| | .pro-radio label:focus, |
| | .pro-radio label:focus-within, |
| | .pro-radio input[type="radio"]:focus + span { |
| | outline: none !important; |
| | box-shadow: none !important; |
| | } |
| | |
| | |
| | .highlighted-text { |
| | color: #FFD700; /* bright gold to stand out */ |
| | font-weight: bold; /* makes it pop */ |
| | font-family: "Courier New", monospace; /* subtle variation */ |
| | background-color: #222; /* faint background contrast */ |
| | padding: 0 3px; /* like a tag highlight */ |
| | border-radius: 3px; /* smooth corners */ |
| | } |
| | #patient-card{ |
| | border: 1px solid rgba(0,0,0,0.06); |
| | background: #fafafa; |
| | border-radius: 10px; |
| | padding: 10px; |
| | box-sizing: border-box; |
| | gap: 12px; |
| | } |
| | #logged_in_user { |
| | text-align : center |
| | } |
| | #logged_in_user input textarea { |
| | font-weight : bold; |
| | color : #00FF00; |
| | text-align : center !important; |
| | } |
| | #add_patient_modal { |
| | width : 400px; |
| | } |
| | .dots { |
| | display: inline-block; |
| | min-width: 1.5em; /* enough space for 3 dots */ |
| | text-align: left; |
| | color: #00FF00; |
| | } |
| | |
| | .dots::after { |
| | content: " ."; |
| | min-width : 100px; |
| | animation: dots 1.5s steps(3, end) infinite; |
| | } |
| | |
| | @keyframes dots { |
| | 0% { content: " "; } |
| | 33% { content: " ."; } |
| | 66% { content: " .."; } |
| | 100% { content: " ..."; } |
| | } |
| | |
| | div.transparent_div { |
| | color: #00FF00; /* classic terminal green */ |
| | background-color: #111111; /* softer black background */ |
| | font-family: monospace; /* console-like font */ |
| | font-size: 14px; |
| | line-height: 1.4; |
| | border: none; /* clean console feel */ |
| | outline: none; |
| | resize: none; |
| | padding: 8px; |
| | min-height: 300px; |
| | } |
| | |
| | #transparent_textbox input, |
| | #transparent_textbox textarea { |
| | color: #00FF00; /* classic terminal green */ |
| | background-color: #111111; /* softer black background */ |
| | font-family: monospace; /* console-like font */ |
| | font-size: 14px; |
| | line-height: 1.4; |
| | border: none; /* clean console feel */ |
| | outline: none; |
| | resize: none; |
| | padding: 8px; |
| | } |
| | |
| | #transparent_textbox textarea { |
| | overflow-y: auto; /* keep scroll if logs overflow */ |
| | } |
| | #centered_col { |
| | display: flex; |
| | justify-content: center; /* center horizontally */ |
| | align-items: center; /* center vertically */ |
| | height: 100px; /* or depending on your desired height */ |
| | } |
| | .text-center { |
| | text-align : center |
| | } |
| | """ |
| |
|
| |
|
| | def get_app_title(): |
| | return "SHEAMI" |
| |
|
| |
|
| | def get_app_theme(): |
| | return gr.themes.Ocean() |
| |
|
| |
|
| | def get_gradio_block( |
| | container, |
| | user_email_state, |
| | patient_id_state, |
| | fn_callback, |
| | fn_callback_inputs=[], |
| | fn_callback_outputs=[], |
| | ): |
| | |
| | with container: |
| | my_logo = render_logo() |
| | with gr.Row(equal_height=False): |
| | with gr.Column() as inputs_container: |
| | file_input = gr.File( |
| | file_types=[".pdf"], |
| | type="filepath", |
| | file_count="multiple", |
| | label="Upload your Lab Reports (PDF)", |
| | ) |
| | file_upload_status = gr.Markdown() |
| | with gr.Row(): |
| | gr.Column() |
| | run_btn = gr.Button( |
| | "Process Reports", variant="primary", visible=False, scale=0 |
| | ) |
| |
|
| | gr.Column() |
| | |
| | with gr.Tabs( |
| | visible=False, selected="my_status_container" |
| | ) as output_container: |
| | with gr.Tab( |
| | "Report Download", id="my_final_report_container" |
| | ) as final_report_container: |
| | final_message = gr.Markdown() |
| | with gr.Row(equal_height=False): |
| | pdf_download = gr.DownloadButton( |
| | label="Download 📊", |
| | scale=0, |
| | ) |
| | |
| | upload_more_reports_btn = gr.Button( |
| | "Upload more", variant="primary", scale=0 |
| | ) |
| | gr.Column() |
| |
|
| | with gr.Accordion( |
| | "Standardized Reports", open=False, visible=False |
| | ): |
| | reports_output = gr.HTML() |
| |
|
| | with gr.Accordion("Trends", open=False, visible=False): |
| | trends_output = gr.Code(language="json") |
| |
|
| | with gr.Tab( |
| | "Run Statistics", id="my_status_container" |
| | ) as status_container: |
| | with gr.Row(equal_height=True): |
| | (headers, empty_data) = generate_milestones_data() |
| |
|
| | milestone_df = gr.DataFrame( |
| | value=empty_data, |
| | headers=headers, |
| | datatype=["str", "str", "str", "str", "str"], |
| | interactive=False, |
| | row_count=5, |
| | ) |
| |
|
| | with gr.Column(): |
| | logs_textbox = gr.HTML( |
| | value="<div class='transparent_div'>Processing request <span class='dots'></span></div>", |
| | label="Logs", |
| | container=False, |
| | elem_id="transparent_textbox", |
| | ) |
| |
|
| | file_input.change( |
| | handle_file_input_change, |
| | inputs=[file_input], |
| | outputs=[run_btn, file_upload_status], |
| | ) |
| | run_btn.click(toggle_logo_small, inputs=[my_logo], outputs=[my_logo]).then( |
| | hide_component, outputs=[inputs_container] |
| | ).then(show_component, outputs=[output_container]).then( |
| | show_component, outputs=[logs_textbox] |
| | ).then( |
| | show_component, outputs=[status_container] |
| | ).then( |
| | make_status_tab_active, outputs=[output_container] |
| | ).then( |
| | process_reports, |
| | inputs=[user_email_state, patient_id_state, file_input], |
| | outputs=[ |
| | logs_textbox, |
| | run_btn, |
| | file_input, |
| | pdf_download, |
| | final_report_container, |
| | milestone_df, |
| | reports_output, |
| | trends_output, |
| | final_message, |
| | ], |
| | queue=True, |
| | ).then( |
| | make_final_report_tab_active, outputs=[output_container] |
| | ).then( |
| | clear_component, outputs=[file_input] |
| | ).then( |
| | fn_callback, outputs=fn_callback_outputs, inputs=fn_callback_inputs |
| | ) |
| |
|
| | upload_more_reports_btn.click(hide_component, outputs=[output_container]).then( |
| | show_component, outputs=[inputs_container] |
| | ).then(toggle_logo_small, inputs=[my_logo], outputs=[my_logo]).then( |
| | clear_component, outputs=[logs_textbox] |
| | ) |
| |
|
| |
|
| | def build(user_email, patient_id): |
| | |
| | with gr.Blocks( |
| | theme=get_app_theme(), |
| | title=get_app_title(), |
| | css=get_css(), |
| | ) as sheami_app: |
| | user_email_state = gr.State(user_email) |
| | patient_id_state = gr.State(patient_id) |
| |
|
| | get_gradio_block( |
| | gr.Column(), |
| | user_email_state=user_email_state, |
| | patient_id_state=patient_id_state, |
| | fn_callback=lambda: None, |
| | ) |
| |
|
| | return sheami_app |
| |
|
| |
|
| | def render_selected_patient_actions(): |
| | with gr.Column(scale=4): |
| | selected_patient_info = gr.Markdown("⚠ No patient selected") |
| | with gr.Row(): |
| | delete_patient_btn = gr.Button( |
| | "❌ Delete", |
| | size="sm", |
| | scale=0, |
| | variant="stop", |
| | interactive=False, |
| | ) |
| | edit_patient_btn = gr.Button( |
| | "✏️ Edit", |
| | size="sm", |
| | scale=0, |
| | variant="huggingface", |
| | interactive=False, |
| | ) |
| | upload_reports_btn = gr.Button( |
| | "⬆️ Upload", |
| | size="sm", |
| | scale=0, |
| | variant="huggingface", |
| | interactive=False, |
| | ) |
| | add_vitals_btn = gr.Button( |
| | "🩺 Add Vitals", |
| | scale=0, |
| | variant="huggingface", |
| | size="sm", |
| | interactive=False, |
| | ) |
| | return ( |
| | selected_patient_info, |
| | delete_patient_btn, |
| | edit_patient_btn, |
| | upload_reports_btn, |
| | add_vitals_btn, |
| | ) |
| |
|
| |
|
| | def render_top_menu_bar(logged_in_user: SheamiUser = None): |
| | with gr.Row(elem_classes="menu-bar") as menu_bar: |
| | with gr.Column(scale=2, visible=False) as sheami_logo_container: |
| | sheami_logo = render_logo() |
| | |
| | gr.Column() |
| | with gr.Row(scale=2): |
| | if logged_in_user: |
| | gr.Image( |
| | value=logged_in_user.picture_url, |
| | scale=0, |
| | container=False, |
| | show_download_button=False, |
| | show_fullscreen_button=False, |
| | show_share_button=False, |
| | height=30, |
| | visible=False, |
| | ) |
| | with gr.Column( |
| | scale=4, |
| | ): |
| | gr.Markdown( |
| | value=logged_in_user.name, |
| | elem_classes="text-center", |
| | visible=False, |
| | ) |
| | email_in = gr.Text( |
| | label="👤 You are logged in as", |
| | placeholder="doctor1@sheami.com", |
| | value=logged_in_user.email, |
| | interactive=False, |
| | elem_id="logged_in_user", |
| | text_align="left", |
| | show_label=False, |
| | container=False, |
| | elem_classes="text-center", |
| | visible=False, |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | else: |
| | email_in = gr.Text() |
| | return ( |
| | sheami_logo_container, |
| | email_in, |
| | |
| | |
| | |
| | |
| | |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| | async def save_vitals_readings( |
| | patient_id, |
| | reading_date: datetime, |
| | height, |
| | weight, |
| | bp_sys, |
| | bp_dia, |
| | glucose, |
| | pbs, |
| | spo2, |
| | custom_name, |
| | custom_value, |
| | custom_unit, |
| | created_by_user="some_user", |
| | ): |
| | if not patient_id: |
| | return "⚠️ Please select a patient from the sidebar.", [] |
| |
|
| | readings = [] |
| | if height: |
| | readings.append({"name": "Height", "value": height, "unit": "cm"}) |
| | if weight: |
| | readings.append({"name": "Weight", "value": weight, "unit": "kg"}) |
| | if bp_sys and bp_dia: |
| | readings.append({"name": "BP", "value": f"{bp_sys}/{bp_dia}", "unit": "mmHg"}) |
| | if glucose: |
| | readings.append({"name": "Fasting Glucose", "value": glucose, "unit": "mg/dL"}) |
| | if pbs: |
| | readings.append({"name": "PBS", "value": pbs, "unit": "mg/dL"}) |
| | if spo2: |
| | readings.append({"name": "SpO₂", "value": spo2, "unit": "%"}) |
| | if custom_name and custom_value: |
| | readings.append( |
| | {"name": custom_name, "value": custom_value, "unit": custom_unit} |
| | ) |
| |
|
| | await get_db().save_readings_to_db( |
| | patient_id, reading_date, readings, created_by_user |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | vitals_history = await get_db().get_vitals_by_patient(patient_id) |
| | latest_vitals = await render_latest_vitals_card_layout(patient_id) |
| | vitals_plots = await render_vitals_plot_layout(patient_id) |
| | |
| | return ( |
| | f"✅ Saved for {patient_id} on {reading_date}", |
| | flatten_vitals(vitals_history), |
| | *latest_vitals, |
| | *vitals_plots, |
| | ) |
| |
|
| |
|
| | def flatten_vitals(docs): |
| | rows = [] |
| | for doc in docs: |
| | reading_date = ( |
| | doc["date"].strftime("%Y-%m-%d") |
| | if isinstance(doc["date"], datetime) |
| | else doc["date"] |
| | ) |
| | for r in doc.get("readings", []): |
| | rows.append( |
| | { |
| | "date": reading_date, |
| | "name": r.get("name", ""), |
| | "value": r.get("value", ""), |
| | "unit": r.get("unit", ""), |
| | "status": r.get("status", "pending AI analysis"), |
| | } |
| | ) |
| | df = pd.DataFrame(rows) |
| | if df.empty: |
| | return pd.DataFrame(columns=["date", "name", "value", "unit", "status"]) |
| | df = df.fillna("-") |
| | return df |
| |
|
| |
|
| | async def render_latest_vitals_card_layout(patient_id: str): |
| | """ |
| | Retrieve the latest vital readings for a patient and generate exactly 20 Gradio Label cards. |
| | |
| | If more than 20 readings are present, only the first 20 are used (truncated). |
| | If fewer than 20 readings are available, the output is padded with empty Label cards to reach a total of 20. |
| | |
| | Args: |
| | patient_id (str): Unique identifier of the patient whose vital readings are to be fetched. |
| | |
| | Returns: |
| | list[gr.Label]: A list of 20 Gradio Label components, each displaying a vital reading or padding as needed. |
| | """ |
| | vitals = await get_db().get_latest_vitals_by_patient(patient_id) |
| | readings = vitals.get("readings", []) |
| | cards = [] |
| |
|
| | |
| | readings = readings[:20] |
| | for reading in sorted(readings, key=lambda x: x["name"]): |
| | cards.append( |
| | gr.Label( |
| | value=f"{reading['value']}{reading['unit']}", |
| | label=reading["name"], |
| | visible=True, |
| | ) |
| | ) |
| |
|
| | |
| | while len(cards) < 20: |
| | cards.append(gr.Label(value="-", label="", visible=True)) |
| |
|
| | return cards |
| |
|
| |
|
| | def empty_state_component( |
| | message: str, |
| | title: str = "No data available", |
| | icon: str = "ℹ️", |
| | ): |
| | return gr.HTML( |
| | f""" |
| | <div style=" |
| | display: flex; |
| | align-items: center; |
| | justify-content: center; |
| | height: 100%; |
| | text-align: center; |
| | "> |
| | <div style=" |
| | padding: 20px; |
| | border-radius: 16px; |
| | background-color: #f9fafb; |
| | color: #4b5563; |
| | box-shadow: 0 2px 6px rgba(0,0,0,0.05); |
| | max-width: 400px; |
| | "> |
| | <div style="font-size: 28px; margin-bottom: 8px;">{icon}</div> |
| | <div style="font-size: 18px; font-weight: 600; margin-bottom: 4px;"> |
| | {title} |
| | </div> |
| | <div style="font-size: 14px;"> |
| | {message} |
| | </div> |
| | </div> |
| | </div> |
| | """, |
| | label=None, |
| | ) |
| |
|
| |
|
| | def hide_tabs_if_no_patient_selected(patient_id): |
| | if patient_id: |
| | return show_component(), hide_component() |
| | else: |
| | return hide_component(), show_component() |
| |
|
| |
|
| | def show_no_data_found_if_none(data): |
| | if not data: |
| | return show_component(), hide_component() |
| | else: |
| | return hide_component(), show_component() |
| |
|
| | def show_no_data_found_if_dataframe_empty(data: pd.DataFrame): |
| | print("rows:", len(data), "cols:", len(data.columns), "empty:", data.empty) |
| |
|
| | |
| | if data.empty: |
| | return show_component(), hide_component() |
| |
|
| | |
| | if data.dropna(how="all").shape[0] == 0: |
| | return show_component(), hide_component() |
| |
|
| | |
| | if data.dropna(axis=1, how="all").shape[1] == 0: |
| | return show_component(), hide_component() |
| |
|
| | |
| | return hide_component(), show_component() |
| |
|
| | def render_about_markdowns(): |
| | with gr.Column() as group: |
| | gr.Markdown("# 🧪 How It Works") |
| | gr.Markdown( |
| | """ |
| | - 🤖 Upload your lab reports (**PDF only**) to unlock **AI-powered insights** on test results and **personalized vitals**. |
| | - 🫀 Vitals such as **height, weight, BMI, and other demographics** are factored in to give you **contextualized, patient-specific insights**. |
| | - 📊 When multiple reports are available, **Sheami™** highlights **trends over time** in both lab tests and vitals. |
| | - 💾 All uploaded reports and generated insights stay securely in your workspace for you to **review, download, or remove** at any time. |
| | """, |
| | show_copy_button=False, |
| | ) |
| | gr.Markdown("---") |
| | gr.Markdown( |
| | """ |
| | > ⚠️ **Disclaimer** |
| | > This application is intended solely for informational and educational purposes. |
| | > It is **not** a substitute for professional medical advice, diagnosis, or treatment. |
| | > Always seek the guidance of a qualified healthcare provider with any questions |
| | > you may have regarding a medical condition. Never disregard professional advice |
| | > or delay seeking it because of information provided by this app. |
| | """, |
| | show_copy_button=False, |
| | ) |
| | gr.Markdown("---") |
| | gr.Markdown("By clicking **Proceed**, you agree to these terms.") |
| | return group |
| |
|