Spaces:
Sleeping
Sleeping
| ## Adding red box for failure | |
| import os | |
| import gradio as gr | |
| import json | |
| import time | |
| import base64 | |
| from dotenv import load_dotenv | |
| from ClinicalStatusAgent import ClinicalStatusAgent | |
| from TestFindingAgent import TestFindingAgent | |
| from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| from chartdiagnosischecker import ChartDiagnosisChecker | |
| from MeatValidatorAgent import MEATValidatorAgent | |
| from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| load_dotenv() | |
| APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| CSV_PATH = "hcc_mapping.csv" | |
| SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # ---------- JSON to Markdown ---------- | |
| # --- MODIFICATION: Added 'status' parameter to control color scheme --- | |
| def json_to_markdown(data, status='success') -> str: | |
| try: | |
| if isinstance(data, dict): | |
| file_name = data.get("file_name", "Unknown Patient") | |
| hcc_code = data.get("hcc_code", "N/A") | |
| model_version = data.get("model_version", "N/A") | |
| analyses = data.get("final_analysis", []) | |
| # Demographics | |
| patient_name = data.get("patient_name", "") | |
| dob = data.get("dob", "") | |
| age = data.get("age", "") | |
| gender = data.get("gender", "") | |
| address = data.get("address", "") | |
| phone = data.get("phone", "") | |
| patient_identifier = data.get("patient_identifier", "") | |
| else: | |
| return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # Dynamically set colors based on status | |
| if status == 'error': | |
| border_color, bg_color, header_color = "#f44336", "#ffeb ee", "#d32f2f" # Red theme | |
| else: | |
| border_color, bg_color, header_color = "#4CAF50", "#f9fdf9", "#2e7d32" # Green theme | |
| md = f""" | |
| <div style="border:2px solid {border_color}; padding:15px; border-radius:10px; background:{bg_color};"> | |
| <h2 style="color:{header_color};">π HCC Chart Validation Report </h2> | |
| <p><b>π§Ύ File Name:</b> {file_name}</p> | |
| <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| """ | |
| # Add demographics | |
| if any([patient_name, dob, age, gender, address, phone, patient_identifier]): | |
| md += "<hr style='border:0;border-top:1px solid #c8e6c9;margin:10px 0;'/>" | |
| md += f"<h3 style='color:{header_color};'>π€ Patient Demographics</h3>" # Use dynamic color | |
| if patient_name: md += f"<p><b>Name:</b> {patient_name}</p>" | |
| if patient_identifier: md += f"<p><b>Patient Identifier:</b> {patient_identifier}</p>" | |
| if dob: md += f"<p><b>Date of Birth:</b> {dob}</p>" | |
| if age: md += f"<p><b>Age:</b> {age}</p>" | |
| if gender: md += f"<p><b>Gender:</b> {gender}</p>" | |
| if address: md += f"<p><b>Address:</b> {address}</p>" | |
| if phone: md += f"<p><b>Phone:</b> {phone}</p>" | |
| md += "</div><br/>" | |
| # Render analyses if they exist | |
| if analyses: | |
| for idx, diag in enumerate(analyses, 1): | |
| md += f""" | |
| <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| <h3 style="color:#1565c0;">Diagnosis {idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| """ | |
| explicit_ans = diag.get("answer_explicit", "N/A") | |
| explicit_rat = diag.get("rationale_explicit", "") | |
| implicit_ans = diag.get("answer_implicit", "N/A") | |
| implicit_rat = diag.get("rationale_implicit", "") | |
| if explicit_ans.lower() == "yes": | |
| md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| else: | |
| md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| md += f""" | |
| <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| """ | |
| if "tests" in diag: | |
| md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| tests = diag["tests"] | |
| if "vitals" in tests: | |
| md += "<li><b>Vitals:</b><ul>" | |
| for k, v in tests["vitals"].items(): md += f"<li>{k}: {v}</li>" | |
| md += "</ul></li>" | |
| if "procedures" in tests: | |
| md += "<li><b>Procedures:</b><ul>" | |
| for k, v in tests["procedures"].items(): md += f"<li>{k}: {v}</li>" | |
| md += "</ul></li>" | |
| if "lab_test" in tests: | |
| md += "<li><b>Lab Tests:</b><ul>" | |
| for k, v in tests["lab_test"].items(): md += f"<li>{k}: {v}</li>" | |
| md += "</ul></li>" | |
| md += "</ul></details>" | |
| if "meat" in diag: | |
| md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| for k, v in diag["meat"].items(): | |
| emoji = "β " if v else "β" | |
| md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| md += "</ul>" | |
| md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| md += "</details>" | |
| if "comorbidities" in diag and diag["comorbidities"]: | |
| present_comorbidities = [c for c in diag["comorbidities"] if c.get("is_present")] | |
| if present_comorbidities: | |
| md += "<details open><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| for c in present_comorbidities: | |
| md += f"<li>β <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| md += "</ul></details>" | |
| md += "</div>" | |
| return md | |
| except Exception as e: | |
| return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # ---------- Processing Pipeline with Gradio Progress ---------- | |
| def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs"): | |
| try: | |
| start = time.time() | |
| step = 0 | |
| total_steps = 8 # Total number of steps in the pipeline | |
| def log(msg, current_step=0): | |
| elapsed = time.time() - start | |
| bar_html = '<div style="display: flex; width: 100%; gap: 2px; height: 12px; margin: 8px 0 5px 0;">' | |
| for i in range(1, total_steps + 1): | |
| if i < current_step: color = "#1e40af" | |
| elif i == current_step: color = "#3b82f6" | |
| else: color = "#e5e7eb" | |
| bar_html += f'<div style="flex-grow: 1; background-color: {color}; border-radius: 3px;"></div>' | |
| bar_html += '</div>' | |
| return f"<div style='padding-top:10px;'>{msg}{bar_html}<small>β³ Elapsed: {elapsed:.1f} sec</small></div>" | |
| if pdf_file is None: | |
| yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| return | |
| hcc_code_str = str(hcc_code or "").strip() | |
| if not hcc_code_str: | |
| yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| return | |
| os.makedirs(output_folder, exist_ok=True) | |
| pdf_path = pdf_file.name | |
| file_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| print(f"[PROCESSING] {file_name}") | |
| # Step 1: Extract Demographics | |
| step += 1 | |
| initial_progress_msg = log(f"π§ Step {step}/{total_steps}: Extracting patient demographics...", step) | |
| yield initial_progress_msg | |
| demographics_engine = PatientInfoExtractionEngine(pdf_path) | |
| demographics_info = demographics_engine.run() | |
| print(f"[DEMOGRAPHICS] Extracted: {demographics_info}") | |
| initial_report_data = { | |
| "file_name": file_name, | |
| "hcc_code": hcc_code_str, | |
| "model_version": model_version, | |
| "final_analysis": [], | |
| "patient_name": demographics_info.get("name", ""), | |
| "dob": demographics_info.get("dob", ""), | |
| "age": demographics_info.get("age", ""), | |
| "gender": demographics_info.get("gender", ""), | |
| "address": demographics_info.get("address", ""), | |
| "phone": demographics_info.get("phone", ""), | |
| "patient_identifier": demographics_info.get("patient_identifier", "") | |
| } | |
| demographics_md = json_to_markdown(initial_report_data) | |
| yield demographics_md | |
| time.sleep(0.5) | |
| # Step 2: Diagnoses | |
| step += 1 | |
| yield demographics_md + log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step) | |
| diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| if not diagnoses: | |
| yield demographics_md + log(f"β No diagnoses found for HCC {hcc_code_str}.", step) | |
| return | |
| # Step 3: Chart checking | |
| step += 1 | |
| yield demographics_md + log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step) | |
| all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| confirmed_diagnoses = [d for d in all_checked_results if d.get("answer_explicit", "").lower() == "yes" or d.get("answer_implicit", "").lower() == "yes"] | |
| # --- MODIFICATION: Custom failure UI for this specific condition --- | |
| if not confirmed_diagnoses: | |
| error_report_md = json_to_markdown(initial_report_data, status='error') | |
| error_message = f"No confirmed diagnoses found for HCC {hcc_code_str} in the patient chart." | |
| error_box_html = f""" | |
| <div style='border:1px solid #d32f2f; color: #c62828; background-color: #ffcdd2; padding:12px; border-radius:8px; margin-top:15px;'> | |
| <b>β Validation Failed:</b> {error_message} | |
| </div> | |
| """ | |
| yield error_report_md + error_box_html | |
| return | |
| # --- END OF MODIFICATION --- | |
| # Step 4: Tests | |
| step += 1 | |
| yield demographics_md + log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step) | |
| diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # Step 5: Clinical Status | |
| step += 1 | |
| yield demographics_md + log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step) | |
| diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # Step 6: MEAT | |
| step += 1 | |
| if active_diagnoses: | |
| yield demographics_md + log(f"π Step {step}/{total_steps}: Validating MEAT...", step) | |
| validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| else: | |
| validated_meat_diagnoses = [] | |
| yield demographics_md + log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step) | |
| # Step 7: Comorbidities | |
| step += 1 | |
| diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| if diagnoses_passed_meat: | |
| yield demographics_md + log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step) | |
| comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| else: | |
| comorbidity_results = [] | |
| # Step 8: Final Report | |
| step += 1 | |
| yield demographics_md + log(f"β Step {step}/{total_steps}: Generating final report...", step) | |
| # Merge results for final output | |
| status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| final_analysis = [] | |
| for entry in all_checked_results: | |
| diag_name = entry["diagnosis"] | |
| updated_entry = entry.copy() | |
| if diag_name in status_map: updated_entry.update(status_map[diag_name]) | |
| if diag_name in meat_map: updated_entry.update(meat_map[diag_name]) | |
| if diag_name in comorbidity_map: updated_entry.update(comorbidity_map[diag_name]) | |
| final_analysis.append(updated_entry) | |
| filtered_final_analysis = [e for e in final_analysis if e.get("answer_explicit", "").lower() == "yes" or e.get("answer_implicit", "").lower() == "yes"] | |
| output_data = { | |
| "file_name": file_name, | |
| "hcc_code": hcc_code_str, | |
| "model_version": model_version, | |
| "final_analysis": filtered_final_analysis, | |
| "patient_name": demographics_info.get("name", ""), | |
| "dob": demographics_info.get("dob", ""), | |
| "age": demographics_info.get("age", ""), | |
| "gender": demographics_info.get("gender", ""), | |
| "address": demographics_info.get("address", ""), | |
| "phone": demographics_info.get("phone", ""), | |
| "patient_identifier": demographics_info.get("patient_identifier", "") | |
| } | |
| elapsed = time.time() - start | |
| yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| except Exception as e: | |
| print(f"[ERROR] {e}") | |
| yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # ---------- Gradio Theme and Helpers ---------- | |
| simple_theme = gr.themes.Soft( | |
| primary_hue=gr.themes.colors.blue, | |
| secondary_hue=gr.themes.colors.slate, | |
| neutral_hue=gr.themes.colors.slate, | |
| ).set( | |
| button_primary_background_fill="#1e40af", | |
| button_primary_background_fill_hover="#1d4ed8", | |
| button_primary_text_color="white", | |
| background_fill_primary="white", | |
| background_fill_secondary="#f8fafc", | |
| ) | |
| def load_sample_pdf(): | |
| if not os.path.exists(SAMPLE_PDF): | |
| raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| class PDFWrapper: | |
| def __init__(self, path): | |
| self.name = path | |
| return PDFWrapper(SAMPLE_PDF) | |
| def pdf_to_iframe(file): | |
| if file is None: return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| try: | |
| with open(file.name, "rb") as f: pdf_bytes = f.read() | |
| encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| return f'<iframe src="data:application/pdf;base64,{encoded}" width="100%" height="600px" style="border:1px solid #ccc;"></iframe>' | |
| except Exception as e: | |
| return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| def clear_outputs(): | |
| initial_md = "<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>" | |
| initial_preview = "<p>Upload a PDF to preview</p>" | |
| return initial_md, initial_preview | |
| # ---------- Gradio UI ---------- | |
| with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| gr.HTML(f""" | |
| <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| <p style='text-align:center;color:#64748b;'> | |
| Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| </p> | |
| """) | |
| with gr.Row(): | |
| pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| with gr.Column(scale=2): | |
| output_md = gr.Markdown( | |
| label="Validation Report", | |
| value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| ) | |
| pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| pdf_upload.clear(fn=clear_outputs, inputs=[], outputs=[output_md, pdf_preview]) | |
| run_btn.click( | |
| fn=process_pipeline, | |
| inputs=[pdf_upload, hcc_code, model_version], | |
| outputs=[output_md], | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Examples( | |
| examples=[[SAMPLE_PDF, "12", "V24"]], | |
| inputs=[pdf_upload, hcc_code, model_version], | |
| label="Click to load an example", | |
| cache_examples=False | |
| ) | |
| with gr.Column(scale=2): | |
| pass | |
| if __name__ == "__main__": | |
| interface.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)) | |
| ) | |
| # ## small samples | |
| # import os | |
| # import gradio as gr | |
| # import json | |
| # import time | |
| # import base64 | |
| # from dotenv import load_dotenv | |
| # from ClinicalStatusAgent import ClinicalStatusAgent | |
| # from TestFindingAgent import TestFindingAgent | |
| # from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| # from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| # from chartdiagnosischecker import ChartDiagnosisChecker | |
| # from MeatValidatorAgent import MEATValidatorAgent | |
| # from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| # load_dotenv() | |
| # APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| # CSV_PATH = "hcc_mapping.csv" | |
| # SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # # ---------- JSON to Markdown ---------- | |
| # def json_to_markdown(data) -> str: | |
| # try: | |
| # if isinstance(data, dict): | |
| # file_name = data.get("file_name", "Unknown Patient") | |
| # hcc_code = data.get("hcc_code", "N/A") | |
| # model_version = data.get("model_version", "N/A") | |
| # analyses = data.get("final_analysis", []) | |
| # # Demographics | |
| # patient_name = data.get("patient_name", "") | |
| # dob = data.get("dob", "") | |
| # age = data.get("age", "") | |
| # gender = data.get("gender", "") | |
| # address = data.get("address", "") | |
| # phone = data.get("phone", "") | |
| # patient_identifier = data.get("patient_identifier", "") | |
| # else: | |
| # return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # md = f""" | |
| # <div style="border:2px solid #4CAF50; padding:15px; border-radius:10px; background:#f9fdf9;"> | |
| # <h2 style="color:#2e7d32;">π HCC Chart Validation Report </h2> | |
| # <p><b>π§Ύ File Name:</b> {file_name}</p> | |
| # <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| # <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| # """ | |
| # # Add demographics | |
| # if any([patient_name, dob, age, gender, address, phone, patient_identifier]): | |
| # md += "<hr style='border:0;border-top:1px solid #c8e6c9;margin:10px 0;'/>" | |
| # md += "<h3 style='color:#1b5e20;'>π€ Patient Demographics</h3>" | |
| # if patient_name: md += f"<p><b>Name:</b> {patient_name}</p>" | |
| # if patient_identifier: md += f"<p><b>Patient Identifier:</b> {patient_identifier}</p>" | |
| # if dob: md += f"<p><b>Date of Birth:</b> {dob}</p>" | |
| # if age: md += f"<p><b>Age:</b> {age}</p>" | |
| # if gender: md += f"<p><b>Gender:</b> {gender}</p>" | |
| # if address: md += f"<p><b>Address:</b> {address}</p>" | |
| # if phone: md += f"<p><b>Phone:</b> {phone}</p>" | |
| # md += "</div><br/>" | |
| # # Render analyses if they exist | |
| # if analyses: | |
| # for idx, diag in enumerate(analyses, 1): | |
| # md += f""" | |
| # <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| # <h3 style="color:#1565c0;">Diagnosis {idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| # <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| # <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| # """ | |
| # explicit_ans = diag.get("answer_explicit", "N/A") | |
| # explicit_rat = diag.get("rationale_explicit", "") | |
| # implicit_ans = diag.get("answer_implicit", "N/A") | |
| # implicit_rat = diag.get("rationale_implicit", "") | |
| # if explicit_ans.lower() == "yes": | |
| # md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| # else: | |
| # md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| # md += f""" | |
| # <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| # <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| # """ | |
| # if "tests" in diag: | |
| # md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| # tests = diag["tests"] | |
| # if "vitals" in tests: | |
| # md += "<li><b>Vitals:</b><ul>" | |
| # for k, v in tests["vitals"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "procedures" in tests: | |
| # md += "<li><b>Procedures:</b><ul>" | |
| # for k, v in tests["procedures"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "lab_test" in tests: | |
| # md += "<li><b>Lab Tests:</b><ul>" | |
| # for k, v in tests["lab_test"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # md += "</ul></details>" | |
| # if "meat" in diag: | |
| # md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| # for k, v in diag["meat"].items(): | |
| # emoji = "β " if v else "β" | |
| # md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| # md += "</ul>" | |
| # md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| # md += "</details>" | |
| # if "comorbidities" in diag and diag["comorbidities"]: | |
| # present_comorbidities = [c for c in diag["comorbidities"] if c.get("is_present")] | |
| # if present_comorbidities: | |
| # md += "<details open><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| # for c in present_comorbidities: | |
| # md += f"<li>β <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| # md += "</ul></details>" | |
| # md += "</div>" | |
| # return md | |
| # except Exception as e: | |
| # return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # # ---------- Processing Pipeline with Gradio Progress ---------- | |
| # def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs"): | |
| # try: | |
| # start = time.time() | |
| # step = 0 | |
| # total_steps = 8 # Total number of steps in the pipeline | |
| # def log(msg, current_step=0): | |
| # elapsed = time.time() - start | |
| # bar_html = '<div style="display: flex; width: 100%; gap: 2px; height: 12px; margin: 8px 0 5px 0;">' | |
| # for i in range(1, total_steps + 1): | |
| # if i < current_step: color = "#1e40af" | |
| # elif i == current_step: color = "#3b82f6" | |
| # else: color = "#e5e7eb" | |
| # bar_html += f'<div style="flex-grow: 1; background-color: {color}; border-radius: 3px;"></div>' | |
| # bar_html += '</div>' | |
| # return f"<div style='padding-top:10px;'>{msg}{bar_html}<small>β³ Elapsed: {elapsed:.1f} sec</small></div>" | |
| # if pdf_file is None: | |
| # yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| # return | |
| # hcc_code_str = str(hcc_code or "").strip() | |
| # if not hcc_code_str: | |
| # yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| # return | |
| # os.makedirs(output_folder, exist_ok=True) | |
| # pdf_path = pdf_file.name | |
| # file_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # print(f"[PROCESSING] {file_name}") | |
| # # Step 1: Extract Demographics | |
| # step += 1 | |
| # initial_progress_msg = log(f"π§ Step {step}/{total_steps}: Extracting patient demographics...", step) | |
| # yield initial_progress_msg | |
| # demographics_engine = PatientInfoExtractionEngine(pdf_path) | |
| # demographics_info = demographics_engine.run() | |
| # print(f"[DEMOGRAPHICS] Extracted: {demographics_info}") | |
| # initial_report_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": [], | |
| # "patient_name": demographics_info.get("name", ""), | |
| # "dob": demographics_info.get("dob", ""), | |
| # "age": demographics_info.get("age", ""), | |
| # "gender": demographics_info.get("gender", ""), | |
| # "address": demographics_info.get("address", ""), | |
| # "phone": demographics_info.get("phone", ""), | |
| # "patient_identifier": demographics_info.get("patient_identifier", "") | |
| # } | |
| # demographics_md = json_to_markdown(initial_report_data) | |
| # yield demographics_md | |
| # time.sleep(0.5) | |
| # # Step 2: Diagnoses | |
| # step += 1 | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step) | |
| # diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| # if not diagnoses: | |
| # yield demographics_md + log(f"β No diagnoses found for HCC {hcc_code_str}.", step) | |
| # return | |
| # # Step 3: Chart checking | |
| # step += 1 | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step) | |
| # all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| # confirmed_diagnoses = [d for d in all_checked_results if d.get("answer_explicit", "").lower() == "yes" or d.get("answer_implicit", "").lower() == "yes"] | |
| # if not confirmed_diagnoses: | |
| # yield demographics_md + log(f"β No confirmed diagnoses for HCC {hcc_code_str} in {file_name}.", step) | |
| # return | |
| # # Step 4: Tests | |
| # step += 1 | |
| # yield demographics_md + log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step) | |
| # diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # # Step 5: Clinical Status | |
| # step += 1 | |
| # yield demographics_md + log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step) | |
| # diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| # active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # # Step 6: MEAT | |
| # step += 1 | |
| # if active_diagnoses: | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Validating MEAT...", step) | |
| # validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| # else: | |
| # validated_meat_diagnoses = [] | |
| # yield demographics_md + log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step) | |
| # # Step 7: Comorbidities | |
| # step += 1 | |
| # diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| # if diagnoses_passed_meat: | |
| # yield demographics_md + log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step) | |
| # comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| # else: | |
| # comorbidity_results = [] | |
| # # Step 8: Final Report | |
| # step += 1 | |
| # yield demographics_md + log(f"β Step {step}/{total_steps}: Generating final report...", step) | |
| # # Merge results for final output | |
| # status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| # meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| # comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| # final_analysis = [] | |
| # for entry in all_checked_results: | |
| # diag_name = entry["diagnosis"] | |
| # updated_entry = entry.copy() | |
| # if diag_name in status_map: updated_entry.update(status_map[diag_name]) | |
| # if diag_name in meat_map: updated_entry.update(meat_map[diag_name]) | |
| # if diag_name in comorbidity_map: updated_entry.update(comorbidity_map[diag_name]) | |
| # final_analysis.append(updated_entry) | |
| # filtered_final_analysis = [e for e in final_analysis if e.get("answer_explicit", "").lower() == "yes" or e.get("answer_implicit", "").lower() == "yes"] | |
| # output_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": filtered_final_analysis, | |
| # "patient_name": demographics_info.get("name", ""), | |
| # "dob": demographics_info.get("dob", ""), | |
| # "age": demographics_info.get("age", ""), | |
| # "gender": demographics_info.get("gender", ""), | |
| # "address": demographics_info.get("address", ""), | |
| # "phone": demographics_info.get("phone", ""), | |
| # "patient_identifier": demographics_info.get("patient_identifier", "") | |
| # } | |
| # elapsed = time.time() - start | |
| # yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| # except Exception as e: | |
| # print(f"[ERROR] {e}") | |
| # yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # # ---------- Gradio Theme and Helpers ---------- | |
| # simple_theme = gr.themes.Soft( | |
| # primary_hue=gr.themes.colors.blue, | |
| # secondary_hue=gr.themes.colors.slate, | |
| # neutral_hue=gr.themes.colors.slate, | |
| # ).set( | |
| # button_primary_background_fill="#1e40af", | |
| # button_primary_background_fill_hover="#1d4ed8", | |
| # button_primary_text_color="white", | |
| # background_fill_primary="white", | |
| # background_fill_secondary="#f8fafc", | |
| # ) | |
| # def load_sample_pdf(): | |
| # if not os.path.exists(SAMPLE_PDF): | |
| # raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| # class PDFWrapper: | |
| # def __init__(self, path): | |
| # self.name = path | |
| # return PDFWrapper(SAMPLE_PDF) | |
| # def pdf_to_iframe(file): | |
| # if file is None: return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| # try: | |
| # with open(file.name, "rb") as f: pdf_bytes = f.read() | |
| # encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| # return f'<iframe src="data:application/pdf;base64,{encoded}" width="100%" height="600px" style="border:1px solid #ccc;"></iframe>' | |
| # except Exception as e: | |
| # return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| # def clear_outputs(): | |
| # initial_md = "<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>" | |
| # initial_preview = "<p>Upload a PDF to preview</p>" | |
| # return initial_md, initial_preview | |
| # # ---------- Gradio UI ---------- | |
| # with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| # gr.HTML(f""" | |
| # <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| # <p style='text-align:center;color:#64748b;'> | |
| # Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| # </p> | |
| # """) | |
| # with gr.Row(): | |
| # pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| # hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| # model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| # run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| # with gr.Row(): | |
| # with gr.Column(scale=2): | |
| # pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| # with gr.Column(scale=2): | |
| # output_md = gr.Markdown( | |
| # label="Validation Report", | |
| # value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| # ) | |
| # pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| # pdf_upload.clear(fn=clear_outputs, inputs=[], outputs=[output_md, pdf_preview]) | |
| # run_btn.click( | |
| # fn=process_pipeline, | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # outputs=[output_md], | |
| # ) | |
| # # --- MODIFICATION: Adjusted column scaling to make the Examples section smaller --- | |
| # with gr.Row(): | |
| # with gr.Column(scale=1): # This column will be smaller | |
| # gr.Examples( | |
| # examples=[[SAMPLE_PDF, "12", "V24"]], | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # label="Click to load an example", | |
| # cache_examples=False | |
| # ) | |
| # with gr.Column(scale=2): # This empty column will take up the remaining space | |
| # pass | |
| # # --- END OF MODIFICATION --- | |
| # if __name__ == "__main__": | |
| # interface.queue().launch( | |
| # server_name="0.0.0.0", | |
| # server_port=int(os.environ.get("PORT", 7860)) | |
| # ) | |
| # import os | |
| # import gradio as gr | |
| # import json | |
| # import time | |
| # import base64 | |
| # from dotenv import load_dotenv | |
| # from ClinicalStatusAgent import ClinicalStatusAgent | |
| # from TestFindingAgent import TestFindingAgent | |
| # from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| # from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| # from chartdiagnosischecker import ChartDiagnosisChecker | |
| # from MeatValidatorAgent import MEATValidatorAgent | |
| # from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| # load_dotenv() | |
| # APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| # CSV_PATH = "hcc_mapping.csv" | |
| # SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # # ---------- JSON to Markdown ---------- | |
| # def json_to_markdown(data) -> str: | |
| # try: | |
| # if isinstance(data, dict): | |
| # file_name = data.get("file_name", "Unknown Patient") | |
| # hcc_code = data.get("hcc_code", "N/A") | |
| # model_version = data.get("model_version", "N/A") | |
| # analyses = data.get("final_analysis", []) | |
| # # Demographics | |
| # patient_name = data.get("patient_name", "") | |
| # dob = data.get("dob", "") | |
| # age = data.get("age", "") | |
| # gender = data.get("gender", "") | |
| # address = data.get("address", "") | |
| # phone = data.get("phone", "") | |
| # patient_identifier = data.get("patient_identifier", "") | |
| # else: | |
| # return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # md = f""" | |
| # <div style="border:2px solid #4CAF50; padding:15px; border-radius:10px; background:#f9fdf9;"> | |
| # <h2 style="color:#2e7d32;">π HCC Chart Validation Report </h2> | |
| # <p><b>π§Ύ File Name:</b> {file_name}</p> | |
| # <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| # <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| # """ | |
| # # Add demographics | |
| # if any([patient_name, dob, age, gender, address, phone, patient_identifier]): | |
| # md += "<hr style='border:0;border-top:1px solid #c8e6c9;margin:10px 0;'/>" | |
| # md += "<h3 style='color:#1b5e20;'>π€ Patient Demographics</h3>" | |
| # if patient_name: md += f"<p><b>Name:</b> {patient_name}</p>" | |
| # if patient_identifier: md += f"<p><b>Patient Identifier:</b> {patient_identifier}</p>" | |
| # if dob: md += f"<p><b>Date of Birth:</b> {dob}</p>" | |
| # if age: md += f"<p><b>Age:</b> {age}</p>" | |
| # if gender: md += f"<p><b>Gender:</b> {gender}</p>" | |
| # if address: md += f"<p><b>Address:</b> {address}</p>" | |
| # if phone: md += f"<p><b>Phone:</b> {phone}</p>" | |
| # md += "</div><br/>" | |
| # # Render analyses if they exist | |
| # if analyses: | |
| # for idx, diag in enumerate(analyses, 1): | |
| # md += f""" | |
| # <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| # <h3 style="color:#1565c0;">Diagnosis {idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| # <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| # <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| # """ | |
| # explicit_ans = diag.get("answer_explicit", "N/A") | |
| # explicit_rat = diag.get("rationale_explicit", "") | |
| # implicit_ans = diag.get("answer_implicit", "N/A") | |
| # implicit_rat = diag.get("rationale_implicit", "") | |
| # if explicit_ans.lower() == "yes": | |
| # md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| # else: | |
| # md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| # md += f""" | |
| # <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| # <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| # """ | |
| # if "tests" in diag: | |
| # md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| # tests = diag["tests"] | |
| # if "vitals" in tests: | |
| # md += "<li><b>Vitals:</b><ul>" | |
| # for k, v in tests["vitals"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "procedures" in tests: | |
| # md += "<li><b>Procedures:</b><ul>" | |
| # for k, v in tests["procedures"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "lab_test" in tests: | |
| # md += "<li><b>Lab Tests:</b><ul>" | |
| # for k, v in tests["lab_test"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # md += "</ul></details>" | |
| # if "meat" in diag: | |
| # md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| # for k, v in diag["meat"].items(): | |
| # emoji = "β " if v else "β" | |
| # md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| # md += "</ul>" | |
| # md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| # md += "</details>" | |
| # if "comorbidities" in diag and diag["comorbidities"]: | |
| # present_comorbidities = [c for c in diag["comorbidities"] if c.get("is_present")] | |
| # if present_comorbidities: | |
| # md += "<details open><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| # for c in present_comorbidities: | |
| # md += f"<li>β <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| # md += "</ul></details>" | |
| # md += "</div>" | |
| # return md | |
| # except Exception as e: | |
| # return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # # ---------- Processing Pipeline with Gradio Progress ---------- | |
| # def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs"): | |
| # try: | |
| # start = time.time() | |
| # step = 0 | |
| # total_steps = 8 # Total number of steps in the pipeline | |
| # def log(msg, current_step=0): | |
| # elapsed = time.time() - start | |
| # bar_html = '<div style="display: flex; width: 100%; gap: 2px; height: 12px; margin: 8px 0 5px 0;">' | |
| # for i in range(1, total_steps + 1): | |
| # if i < current_step: color = "#1e40af" | |
| # elif i == current_step: color = "#3b82f6" | |
| # else: color = "#e5e7eb" | |
| # bar_html += f'<div style="flex-grow: 1; background-color: {color}; border-radius: 3px;"></div>' | |
| # bar_html += '</div>' | |
| # return f"<div style='padding-top:10px;'>{msg}{bar_html}<small>β³ Elapsed: {elapsed:.1f} sec</small></div>" | |
| # if pdf_file is None: | |
| # yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| # return | |
| # hcc_code_str = str(hcc_code or "").strip() | |
| # if not hcc_code_str: | |
| # yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| # return | |
| # os.makedirs(output_folder, exist_ok=True) | |
| # pdf_path = pdf_file.name | |
| # file_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # print(f"[PROCESSING] {file_name}") | |
| # # Step 1: Extract Demographics | |
| # step += 1 | |
| # initial_progress_msg = log(f"π§ Step {step}/{total_steps}: Extracting patient demographics...", step) | |
| # yield initial_progress_msg | |
| # demographics_engine = PatientInfoExtractionEngine(pdf_path) | |
| # demographics_info = demographics_engine.run() | |
| # print(f"[DEMOGRAPHICS] Extracted: {demographics_info}") | |
| # # --- MODIFICATION: Using the elaborated method for demographics --- | |
| # initial_report_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": [], | |
| # "patient_name": demographics_info.get("name", ""), | |
| # "dob": demographics_info.get("dob", ""), | |
| # "age": demographics_info.get("age", ""), | |
| # "gender": demographics_info.get("gender", ""), | |
| # "address": demographics_info.get("address", ""), | |
| # "phone": demographics_info.get("phone", ""), | |
| # "patient_identifier": demographics_info.get("patient_identifier", "") | |
| # } | |
| # # --- END OF MODIFICATION --- | |
| # demographics_md = json_to_markdown(initial_report_data) | |
| # yield demographics_md | |
| # time.sleep(0.5) | |
| # # Step 2: Diagnoses | |
| # step += 1 | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step) | |
| # diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| # if not diagnoses: | |
| # yield demographics_md + log(f"β No diagnoses found for HCC {hcc_code_str}.", step) | |
| # return | |
| # # Step 3: Chart checking | |
| # step += 1 | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step) | |
| # all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| # confirmed_diagnoses = [d for d in all_checked_results if d.get("answer_explicit", "").lower() == "yes" or d.get("answer_implicit", "").lower() == "yes"] | |
| # if not confirmed_diagnoses: | |
| # yield demographics_md + log(f"β No confirmed diagnoses for HCC {hcc_code_str} in {file_name}.", step) | |
| # return | |
| # # Step 4: Tests | |
| # step += 1 | |
| # yield demographics_md + log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step) | |
| # diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # # Step 5: Clinical Status | |
| # step += 1 | |
| # yield demographics_md + log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step) | |
| # diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| # active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # # Step 6: MEAT | |
| # step += 1 | |
| # if active_diagnoses: | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Validating MEAT...", step) | |
| # validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| # else: | |
| # validated_meat_diagnoses = [] | |
| # yield demographics_md + log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step) | |
| # # Step 7: Comorbidities | |
| # step += 1 | |
| # diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| # if diagnoses_passed_meat: | |
| # yield demographics_md + log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step) | |
| # comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| # else: | |
| # comorbidity_results = [] | |
| # # Step 8: Final Report | |
| # step += 1 | |
| # yield demographics_md + log(f"β Step {step}/{total_steps}: Generating final report...", step) | |
| # # Merge results for final output | |
| # status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| # meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| # comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| # final_analysis = [] | |
| # for entry in all_checked_results: | |
| # diag_name = entry["diagnosis"] | |
| # updated_entry = entry.copy() | |
| # if diag_name in status_map: updated_entry.update(status_map[diag_name]) | |
| # if diag_name in meat_map: updated_entry.update(meat_map[diag_name]) | |
| # if diag_name in comorbidity_map: updated_entry.update(comorbidity_map[diag_name]) | |
| # final_analysis.append(updated_entry) | |
| # filtered_final_analysis = [e for e in final_analysis if e.get("answer_explicit", "").lower() == "yes" or e.get("answer_implicit", "").lower() == "yes"] | |
| # # --- MODIFICATION: Using the elaborated method for demographics --- | |
| # output_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": filtered_final_analysis, | |
| # "patient_name": demographics_info.get("name", ""), | |
| # "dob": demographics_info.get("dob", ""), | |
| # "age": demographics_info.get("age", ""), | |
| # "gender": demographics_info.get("gender", ""), | |
| # "address": demographics_info.get("address", ""), | |
| # "phone": demographics_info.get("phone", ""), | |
| # "patient_identifier": demographics_info.get("patient_identifier", "") | |
| # } | |
| # # --- END OF MODIFICATION --- | |
| # elapsed = time.time() - start | |
| # yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| # except Exception as e: | |
| # print(f"[ERROR] {e}") | |
| # yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # # ---------- Gradio Theme and Helpers ---------- | |
| # simple_theme = gr.themes.Soft( | |
| # primary_hue=gr.themes.colors.blue, | |
| # secondary_hue=gr.themes.colors.slate, | |
| # neutral_hue=gr.themes.colors.slate, | |
| # ).set( | |
| # button_primary_background_fill="#1e40af", | |
| # button_primary_background_fill_hover="#1d4ed8", | |
| # button_primary_text_color="white", | |
| # background_fill_primary="white", | |
| # background_fill_secondary="#f8fafc", | |
| # ) | |
| # def load_sample_pdf(): | |
| # if not os.path.exists(SAMPLE_PDF): | |
| # raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| # class PDFWrapper: | |
| # def __init__(self, path): | |
| # self.name = path | |
| # return PDFWrapper(SAMPLE_PDF) | |
| # def pdf_to_iframe(file): | |
| # if file is None: return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| # try: | |
| # with open(file.name, "rb") as f: pdf_bytes = f.read() | |
| # encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| # return f'<iframe src="data:application/pdf;base64,{encoded}" width="100%" height="600px" style="border:1px solid #ccc;"></iframe>' | |
| # except Exception as e: | |
| # return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| # def clear_outputs(): | |
| # initial_md = "<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>" | |
| # initial_preview = "<p>Upload a PDF to preview</p>" | |
| # return initial_md, initial_preview | |
| # # ---------- Gradio UI ---------- | |
| # with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| # gr.HTML(f""" | |
| # <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| # <p style='text-align:center;color:#64748b;'> | |
| # Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| # </p> | |
| # """) | |
| # with gr.Row(): | |
| # pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| # hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| # model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| # run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| # with gr.Row(): | |
| # with gr.Column(scale=2): | |
| # pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| # with gr.Column(scale=2): | |
| # output_md = gr.Markdown( | |
| # label="Validation Report", | |
| # value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| # ) | |
| # pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| # pdf_upload.clear(fn=clear_outputs, inputs=[], outputs=[output_md, pdf_preview]) | |
| # run_btn.click( | |
| # fn=process_pipeline, | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # outputs=[output_md], | |
| # ) | |
| # gr.Examples( | |
| # examples=[[SAMPLE_PDF, "12", "V24"]], | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # cache_examples=False | |
| # ) | |
| # if __name__ == "__main__": | |
| # interface.queue().launch( | |
| # server_name="0.0.0.0", | |
| # server_port=int(os.environ.get("PORT", 7860)) | |
| # ) | |
| # import os | |
| # import gradio as gr | |
| # import json | |
| # import time | |
| # import base64 | |
| # from dotenv import load_dotenv | |
| # from ClinicalStatusAgent import ClinicalStatusAgent | |
| # from TestFindingAgent import TestFindingAgent | |
| # from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| # from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| # from chartdiagnosischecker import ChartDiagnosisChecker | |
| # from MeatValidatorAgent import MEATValidatorAgent | |
| # from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| # load_dotenv() | |
| # APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| # CSV_PATH = "hcc_mapping.csv" | |
| # SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # # ---------- JSON to Markdown ---------- | |
| # def json_to_markdown(data) -> str: | |
| # try: | |
| # if isinstance(data, dict): | |
| # file_name = data.get("file_name", "Unknown Patient") | |
| # hcc_code = data.get("hcc_code", "N/A") | |
| # model_version = data.get("model_version", "N/A") | |
| # analyses = data.get("final_analysis", []) | |
| # # Demographics | |
| # patient_name = data.get("patient_name", "") | |
| # dob = data.get("dob", "") | |
| # age = data.get("age", "") | |
| # gender = data.get("gender", "") | |
| # address = data.get("address", "") | |
| # phone = data.get("phone", "") | |
| # patient_identifier = data.get("patient_identifier", "") | |
| # else: | |
| # return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # md = f""" | |
| # <div style="border:2px solid #4CAF50; padding:15px; border-radius:10px; background:#f9fdf9;"> | |
| # <h2 style="color:#2e7d32;">π HCC Chart Validation Report </h2> | |
| # <p><b>π§Ύ File Name:</b> {file_name}</p> | |
| # <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| # <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| # """ | |
| # # Add demographics | |
| # if any([patient_name, dob, age, gender, address, phone, patient_identifier]): | |
| # md += "<hr style='border:0;border-top:1px solid #c8e6c9;margin:10px 0;'/>" | |
| # md += "<h3 style='color:#1b5e20;'>π€ Patient Demographics</h3>" | |
| # if patient_name: md += f"<p><b>Name:</b> {patient_name}</p>" | |
| # if patient_identifier: md += f"<p><b>Patient Identifier:</b> {patient_identifier}</p>" | |
| # if dob: md += f"<p><b>Date of Birth:</b> {dob}</p>" | |
| # if age: md += f"<p><b>Age:</b> {age}</p>" | |
| # if gender: md += f"<p><b>Gender:</b> {gender}</p>" | |
| # if address: md += f"<p><b>Address:</b> {address}</p>" | |
| # if phone: md += f"<p><b>Phone:</b> {phone}</p>" | |
| # md += "</div><br/>" | |
| # # Render analyses if they exist | |
| # if analyses: | |
| # for idx, diag in enumerate(analyses, 1): | |
| # md += f""" | |
| # <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| # <h3 style="color:#1565c0;">Diagnosis {idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| # <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| # <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| # """ | |
| # explicit_ans = diag.get("answer_explicit", "N/A") | |
| # explicit_rat = diag.get("rationale_explicit", "") | |
| # implicit_ans = diag.get("answer_implicit", "N/A") | |
| # implicit_rat = diag.get("rationale_implicit", "") | |
| # if explicit_ans.lower() == "yes": | |
| # md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| # else: | |
| # md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| # md += f""" | |
| # <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| # <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| # """ | |
| # if "tests" in diag: | |
| # md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| # tests = diag["tests"] | |
| # if "vitals" in tests: | |
| # md += "<li><b>Vitals:</b><ul>" | |
| # for k, v in tests["vitals"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "procedures" in tests: | |
| # md += "<li><b>Procedures:</b><ul>" | |
| # for k, v in tests["procedures"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "lab_test" in tests: | |
| # md += "<li><b>Lab Tests:</b><ul>" | |
| # for k, v in tests["lab_test"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # md += "</ul></details>" | |
| # if "meat" in diag: | |
| # md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| # for k, v in diag["meat"].items(): | |
| # emoji = "β " if v else "β" | |
| # md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| # md += "</ul>" | |
| # md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| # md += "</details>" | |
| # # --- MODIFICATION: Filter to only show present comorbidities --- | |
| # if "comorbidities" in diag and diag["comorbidities"]: | |
| # present_comorbidities = [c for c in diag["comorbidities"] if c.get("is_present")] | |
| # if present_comorbidities: | |
| # md += "<details open><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| # for c in present_comorbidities: | |
| # md += f"<li>β <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| # md += "</ul></details>" | |
| # # --- END OF MODIFICATION --- | |
| # md += "</div>" | |
| # return md | |
| # except Exception as e: | |
| # return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # # ---------- Processing Pipeline with Gradio Progress ---------- | |
| # def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs"): | |
| # try: | |
| # start = time.time() | |
| # step = 0 | |
| # total_steps = 8 # Total number of steps in the pipeline | |
| # def log(msg, current_step=0): | |
| # elapsed = time.time() - start | |
| # bar_html = '<div style="display: flex; width: 100%; gap: 2px; height: 12px; margin: 8px 0 5px 0;">' | |
| # for i in range(1, total_steps + 1): | |
| # if i < current_step: color = "#1e40af" | |
| # elif i == current_step: color = "#3b82f6" | |
| # else: color = "#e5e7eb" | |
| # bar_html += f'<div style="flex-grow: 1; background-color: {color}; border-radius: 3px;"></div>' | |
| # bar_html += '</div>' | |
| # return f"<div style='padding-top:10px;'>{msg}{bar_html}<small>β³ Elapsed: {elapsed:.1f} sec</small></div>" | |
| # if pdf_file is None: | |
| # yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| # return | |
| # hcc_code_str = str(hcc_code or "").strip() | |
| # if not hcc_code_str: | |
| # yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| # return | |
| # os.makedirs(output_folder, exist_ok=True) | |
| # pdf_path = pdf_file.name | |
| # file_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # print(f"[PROCESSING] {file_name}") | |
| # # Step 1: Extract Demographics | |
| # step += 1 | |
| # initial_progress_msg = log(f"π§ Step {step}/{total_steps}: Extracting patient demographics...", step) | |
| # yield initial_progress_msg | |
| # demographics_engine = PatientInfoExtractionEngine(pdf_path) | |
| # demographics_info = demographics_engine.run() | |
| # print(f"[DEMOGRAPHICS] Extracted: {demographics_info}") | |
| # initial_report_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": [], | |
| # **demographics_info | |
| # } | |
| # demographics_md = json_to_markdown(initial_report_data) | |
| # yield demographics_md | |
| # time.sleep(0.5) | |
| # # Step 2: Diagnoses | |
| # step += 1 | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step) | |
| # diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| # if not diagnoses: | |
| # yield demographics_md + log(f"β No diagnoses found for HCC {hcc_code_str}.", step) | |
| # return | |
| # # Step 3: Chart checking | |
| # step += 1 | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step) | |
| # all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| # confirmed_diagnoses = [d for d in all_checked_results if d.get("answer_explicit", "").lower() == "yes" or d.get("answer_implicit", "").lower() == "yes"] | |
| # if not confirmed_diagnoses: | |
| # yield demographics_md + log(f"β No confirmed diagnoses for HCC {hcc_code_str} in {file_name}.", step) | |
| # return | |
| # # Step 4: Tests | |
| # step += 1 | |
| # yield demographics_md + log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step) | |
| # diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # # Step 5: Clinical Status | |
| # step += 1 | |
| # yield demographics_md + log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step) | |
| # diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| # active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # # Step 6: MEAT | |
| # step += 1 | |
| # if active_diagnoses: | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Validating MEAT...", step) | |
| # validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| # else: | |
| # validated_meat_diagnoses = [] | |
| # yield demographics_md + log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step) | |
| # # Step 7: Comorbidities | |
| # step += 1 | |
| # diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| # if diagnoses_passed_meat: | |
| # yield demographics_md + log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step) | |
| # comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| # else: | |
| # comorbidity_results = [] | |
| # # Step 8: Final Report | |
| # step += 1 | |
| # yield demographics_md + log(f"β Step {step}/{total_steps}: Generating final report...", step) | |
| # # Merge results for final output | |
| # status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| # meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| # comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| # final_analysis = [] | |
| # for entry in all_checked_results: | |
| # diag_name = entry["diagnosis"] | |
| # updated_entry = entry.copy() | |
| # if diag_name in status_map: updated_entry.update(status_map[diag_name]) | |
| # if diag_name in meat_map: updated_entry.update(meat_map[diag_name]) | |
| # if diag_name in comorbidity_map: updated_entry.update(comorbidity_map[diag_name]) | |
| # final_analysis.append(updated_entry) | |
| # filtered_final_analysis = [e for e in final_analysis if e.get("answer_explicit", "").lower() == "yes" or e.get("answer_implicit", "").lower() == "yes"] | |
| # output_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": filtered_final_analysis, | |
| # **demographics_info | |
| # } | |
| # elapsed = time.time() - start | |
| # yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| # except Exception as e: | |
| # print(f"[ERROR] {e}") | |
| # yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # # ---------- Gradio Theme and Helpers ---------- | |
| # simple_theme = gr.themes.Soft( | |
| # primary_hue=gr.themes.colors.blue, | |
| # secondary_hue=gr.themes.colors.slate, | |
| # neutral_hue=gr.themes.colors.slate, | |
| # ).set( | |
| # button_primary_background_fill="#1e40af", | |
| # button_primary_background_fill_hover="#1d4ed8", | |
| # button_primary_text_color="white", | |
| # background_fill_primary="white", | |
| # background_fill_secondary="#f8fafc", | |
| # ) | |
| # def load_sample_pdf(): | |
| # if not os.path.exists(SAMPLE_PDF): | |
| # raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| # class PDFWrapper: | |
| # def __init__(self, path): | |
| # self.name = path | |
| # return PDFWrapper(SAMPLE_PDF) | |
| # def pdf_to_iframe(file): | |
| # if file is None: return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| # try: | |
| # with open(file.name, "rb") as f: pdf_bytes = f.read() | |
| # encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| # return f'<iframe src="data:application/pdf;base64,{encoded}" width="100%" height="600px" style="border:1px solid #ccc;"></iframe>' | |
| # except Exception as e: | |
| # return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| # # ---------- Gradio UI ---------- | |
| # with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| # gr.HTML(f""" | |
| # <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| # <p style='text-align:center;color:#64748b;'> | |
| # Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| # </p> | |
| # """) | |
| # with gr.Row(): | |
| # pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| # hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| # model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| # run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| # with gr.Row(): | |
| # with gr.Column(scale=2): | |
| # pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| # with gr.Column(scale=2): | |
| # output_md = gr.Markdown( | |
| # label="Validation Report", | |
| # value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| # ) | |
| # pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| # run_btn.click( | |
| # fn=process_pipeline, | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # outputs=[output_md], | |
| # ) | |
| # gr.Examples( | |
| # examples=[[SAMPLE_PDF]], | |
| # inputs=[pdf_upload], | |
| # outputs=[output_md], | |
| # fn=lambda x: process_pipeline(load_sample_pdf(), hcc_code="12", model_version="V24"), | |
| # cache_examples=False | |
| # ) | |
| # if __name__ == "__main__": | |
| # interface.queue().launch( | |
| # server_name="0.0.0.0", | |
| # server_port=int(os.environ.get("PORT", 7860)) | |
| # ) | |
| # import os | |
| # import gradio as gr | |
| # import json | |
| # import time | |
| # import base64 | |
| # from dotenv import load_dotenv | |
| # from ClinicalStatusAgent import ClinicalStatusAgent | |
| # from TestFindingAgent import TestFindingAgent | |
| # from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| # from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| # from chartdiagnosischecker import ChartDiagnosisChecker | |
| # from MeatValidatorAgent import MEATValidatorAgent | |
| # from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| # load_dotenv() | |
| # APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| # CSV_PATH = "hcc_mapping.csv" | |
| # SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # # ---------- JSON to Markdown ---------- | |
| # def json_to_markdown(data) -> str: | |
| # try: | |
| # if isinstance(data, dict): | |
| # file_name = data.get("file_name", "Unknown Patient") | |
| # hcc_code = data.get("hcc_code", "N/A") | |
| # model_version = data.get("model_version", "N/A") | |
| # analyses = data.get("final_analysis", []) | |
| # # Demographics | |
| # patient_name = data.get("patient_name", "") | |
| # dob = data.get("dob", "") | |
| # age = data.get("age", "") | |
| # gender = data.get("gender", "") | |
| # address = data.get("address", "") | |
| # phone = data.get("phone", "") | |
| # patient_identifier = data.get("patient_identifier", "") | |
| # else: | |
| # return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # md = f""" | |
| # <div style="border:2px solid #4CAF50; padding:15px; border-radius:10px; background:#f9fdf9;"> | |
| # <h2 style="color:#2e7d32;">π HCC Chart Validation Report </h2> | |
| # <p><b>π§Ύ File Name:</b> {file_name}</p> | |
| # <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| # <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| # """ | |
| # # Add demographics | |
| # if any([patient_name, dob, age, gender, address, phone, patient_identifier]): | |
| # md += "<hr style='border:0;border-top:1px solid #c8e6c9;margin:10px 0;'/>" | |
| # md += "<h3 style='color:#1b5e20;'>π€ Patient Demographics</h3>" | |
| # if patient_name: md += f"<p><b>Name:</b> {patient_name}</p>" | |
| # if patient_identifier: md += f"<p><b>Patient Identifier:</b> {patient_identifier}</p>" | |
| # if dob: md += f"<p><b>Date of Birth:</b> {dob}</p>" | |
| # if age: md += f"<p><b>Age:</b> {age}</p>" | |
| # if gender: md += f"<p><b>Gender:</b> {gender}</p>" | |
| # if address: md += f"<p><b>Address:</b> {address}</p>" | |
| # if phone: md += f"<p><b>Phone:</b> {phone}</p>" | |
| # md += "</div><br/>" | |
| # # Render analyses if they exist | |
| # if analyses: | |
| # for idx, diag in enumerate(analyses, 1): | |
| # md += f""" | |
| # <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| # <h3 style="color:#1565c0;">{idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| # <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| # <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| # """ | |
| # explicit_ans = diag.get("answer_explicit", "N/A") | |
| # explicit_rat = diag.get("rationale_explicit", "") | |
| # implicit_ans = diag.get("answer_implicit", "N/A") | |
| # implicit_rat = diag.get("rationale_implicit", "") | |
| # if explicit_ans.lower() == "yes": | |
| # md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| # else: | |
| # md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| # md += f""" | |
| # <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| # <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| # """ | |
| # if "tests" in diag: | |
| # md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| # tests = diag["tests"] | |
| # if "vitals" in tests: | |
| # md += "<li><b>Vitals:</b><ul>" | |
| # for k, v in tests["vitals"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "procedures" in tests: | |
| # md += "<li><b>Procedures:</b><ul>" | |
| # for k, v in tests["procedures"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "lab_test" in tests: | |
| # md += "<li><b>Lab Tests:</b><ul>" | |
| # for k, v in tests["lab_test"].items(): md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # md += "</ul></details>" | |
| # if "meat" in diag: | |
| # md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| # for k, v in diag["meat"].items(): | |
| # emoji = "β " if v else "β" | |
| # md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| # md += "</ul>" | |
| # md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| # md += "</details>" | |
| # if "comorbidities" in diag and diag["comorbidities"]: | |
| # md += "<details><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| # for c in diag["comorbidities"]: | |
| # emoji = "β " if c.get("is_present") else "β" | |
| # md += f"<li>{emoji} <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| # md += "</ul></details>" | |
| # md += "</div>" | |
| # return md | |
| # except Exception as e: | |
| # return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # # ---------- Processing Pipeline with Gradio Progress ---------- | |
| # def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs", progress=gr.Progress()): | |
| # try: | |
| # start = time.time() | |
| # step = 0 | |
| # total_steps = 8 # Total number of steps in the pipeline | |
| # def log(msg, current_step=0): | |
| # elapsed = time.time() - start | |
| # bar_html = '<div style="display: flex; width: 100%; gap: 2px; height: 12px; margin: 8px 0 5px 0;">' | |
| # for i in range(1, total_steps + 1): | |
| # if i < current_step: color = "#1e40af" | |
| # elif i == current_step: color = "#3b82f6" | |
| # else: color = "#e5e7eb" | |
| # bar_html += f'<div style="flex-grow: 1; background-color: {color}; border-radius: 3px;"></div>' | |
| # bar_html += '</div>' | |
| # return f"{msg}{bar_html}<small>β³ Elapsed: {elapsed:.1f} sec</small>" | |
| # if pdf_file is None: | |
| # yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| # return | |
| # hcc_code_str = str(hcc_code or "").strip() | |
| # if not hcc_code_str: | |
| # yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| # return | |
| # os.makedirs(output_folder, exist_ok=True) | |
| # pdf_path = pdf_file.name | |
| # file_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # print(f"[PROCESSING] {file_name}") | |
| # # --- MODIFICATION: Extract and show demographics first --- | |
| # # Step 1: Extract Demographics | |
| # step += 1 | |
| # progress((step, total_steps), desc="Extracting demographics") | |
| # yield log(f"π§ Step {step}/{total_steps}: Extracting patient demographics...", step) | |
| # demographics_engine = PatientInfoExtractionEngine(pdf_path) | |
| # demographics_info = demographics_engine.run() | |
| # print(f"[DEMOGRAPHICS] Extracted: {demographics_info}") | |
| # # Create and yield the initial report with only demographics | |
| # initial_report_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": [], | |
| # **demographics_info | |
| # } | |
| # demographics_md = json_to_markdown(initial_report_data) | |
| # yield demographics_md | |
| # time.sleep(1) # Pause for a moment to show the demographics | |
| # # --- END OF MODIFICATION --- | |
| # # Subsequent steps will append progress below the initial demographic display | |
| # # Step 2: Diagnoses | |
| # step += 1 | |
| # progress((step, total_steps), desc="Extracting diagnoses") | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step) | |
| # diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| # if not diagnoses: | |
| # yield demographics_md + log(f"β No diagnoses found for HCC {hcc_code_str}.", step) | |
| # return | |
| # # Step 3: Chart checking | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking chart") | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step) | |
| # all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| # confirmed_diagnoses = [d for d in all_checked_results if d.get("answer_explicit", "").lower() == "yes" or d.get("answer_implicit", "").lower() == "yes"] | |
| # if not confirmed_diagnoses: | |
| # yield demographics_md + log(f"β No confirmed diagnoses for HCC {hcc_code_str} in {file_name}.", step) | |
| # return | |
| # # Step 4: Tests | |
| # step += 1 | |
| # progress((step, total_steps), desc="Finding tests") | |
| # yield demographics_md + log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step) | |
| # diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # # Step 5: Clinical Status | |
| # step += 1 | |
| # progress((step, total_steps), desc="Determining clinical status") | |
| # yield demographics_md + log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step) | |
| # diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| # active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # # Step 6: MEAT | |
| # step += 1 | |
| # progress((step, total_steps), desc="Validating MEAT") | |
| # if active_diagnoses: | |
| # yield demographics_md + log(f"π Step {step}/{total_steps}: Validating MEAT...", step) | |
| # validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| # else: | |
| # validated_meat_diagnoses = [] | |
| # yield demographics_md + log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step) | |
| # # Step 7: Comorbidities | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking comorbidities") | |
| # diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| # if diagnoses_passed_meat: | |
| # yield demographics_md + log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step) | |
| # comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| # else: | |
| # comorbidity_results = [] | |
| # # Step 8: Final Report | |
| # step += 1 | |
| # progress((step, total_steps), desc="Generating report") | |
| # yield demographics_md + log(f"β Step {step}/{total_steps}: Generating final report...", step) | |
| # # Merge results for final output | |
| # status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| # meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| # comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| # final_analysis = [] | |
| # for entry in all_checked_results: | |
| # diag_name = entry["diagnosis"] | |
| # updated_entry = entry.copy() | |
| # if diag_name in status_map: updated_entry.update(status_map[diag_name]) | |
| # if diag_name in meat_map: updated_entry.update(meat_map[diag_name]) | |
| # if diag_name in comorbidity_map: updated_entry.update(comorbidity_map[diag_name]) | |
| # final_analysis.append(updated_entry) | |
| # filtered_final_analysis = [e for e in final_analysis if e.get("answer_explicit", "").lower() == "yes" or e.get("answer_implicit", "").lower() == "yes"] | |
| # output_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": filtered_final_analysis, | |
| # **demographics_info | |
| # } | |
| # elapsed = time.time() - start | |
| # yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| # except Exception as e: | |
| # print(f"[ERROR] {e}") | |
| # yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # # ---------- Gradio Theme and Helpers ---------- | |
| # simple_theme = gr.themes.Soft( | |
| # primary_hue=gr.themes.colors.blue, | |
| # secondary_hue=gr.themes.colors.slate, | |
| # neutral_hue=gr.themes.colors.slate, | |
| # ).set( | |
| # button_primary_background_fill="#1e40af", | |
| # button_primary_background_fill_hover="#1d4ed8", | |
| # button_primary_text_color="white", | |
| # background_fill_primary="white", | |
| # background_fill_secondary="#f8fafc", | |
| # ) | |
| # def load_sample_pdf(): | |
| # if not os.path.exists(SAMPLE_PDF): | |
| # raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| # class PDFWrapper: | |
| # def __init__(self, path): | |
| # self.name = path | |
| # return PDFWrapper(SAMPLE_PDF) | |
| # def pdf_to_iframe(file): | |
| # if file is None: return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| # try: | |
| # with open(file.name, "rb") as f: pdf_bytes = f.read() | |
| # encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| # return f'<iframe src="data:application/pdf;base64,{encoded}" width="100%" height="600px" style="border:1px solid #ccc;"></iframe>' | |
| # except Exception as e: | |
| # return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| # # ---------- Gradio UI ---------- | |
| # with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| # gr.HTML(f""" | |
| # <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| # <p style='text-align:center;color:#64748b;'> | |
| # Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| # </p> | |
| # """) | |
| # with gr.Row(): | |
| # pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| # hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| # model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| # run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| # with gr.Row(): | |
| # with gr.Column(scale=2): | |
| # pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| # with gr.Column(scale=2): | |
| # output_md = gr.Markdown( | |
| # label="Validation Report", | |
| # value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| # ) | |
| # pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| # run_btn.click( | |
| # fn=process_pipeline, | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # outputs=[output_md], | |
| # ) | |
| # gr.Examples( | |
| # examples=[[SAMPLE_PDF]], | |
| # inputs=[pdf_upload], | |
| # outputs=[output_md], | |
| # fn=lambda x: process_pipeline(load_sample_pdf(), hcc_code="12", model_version="V24"), | |
| # cache_examples=False | |
| # ) | |
| # if __name__ == "__main__": | |
| # interface.queue().launch( | |
| # server_name="0.0.0.0", | |
| # server_port=int(os.environ.get("PORT", 7860)) | |
| # ) | |
| # import os | |
| # import gradio as gr | |
| # import json | |
| # import time | |
| # import base64 | |
| # from dotenv import load_dotenv | |
| # from ClinicalStatusAgent import ClinicalStatusAgent | |
| # from TestFindingAgent import TestFindingAgent | |
| # from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| # from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| # from chartdiagnosischecker import ChartDiagnosisChecker | |
| # from MeatValidatorAgent import MEATValidatorAgent | |
| # from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| # load_dotenv() | |
| # APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| # CSV_PATH = "hcc_mapping.csv" | |
| # SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # # ---------- JSON to Markdown ---------- | |
| # def json_to_markdown(data) -> str: | |
| # try: | |
| # if isinstance(data, dict) and "final_analysis" in data: | |
| # file_name = data.get("file_name", "Unknown Patient") | |
| # hcc_code = data.get("hcc_code", "N/A") | |
| # model_version = data.get("model_version", "N/A") | |
| # analyses = data.get("final_analysis", []) | |
| # # Demographics | |
| # patient_name = data.get("patient_name", "") | |
| # dob = data.get("dob", "") | |
| # age = data.get("age", "") | |
| # gender = data.get("gender", "") | |
| # address = data.get("address", "") | |
| # phone = data.get("phone", "") | |
| # patient_identifier = data.get("patient_identifier", "") | |
| # elif isinstance(data, list): | |
| # file_name = "N/A" | |
| # hcc_code = "N/A" | |
| # model_version = "N/A" | |
| # analyses = data | |
| # patient_name = dob = age = gender = address = phone = patient_identifier = "" | |
| # else: | |
| # return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # md = f""" | |
| # <div style="border:2px solid #4CAF50; padding:15px; border-radius:10px; background:#f9fdf9;"> | |
| # <h2 style="color:#2e7d32;">π HCC Chart Validation Report </h2> | |
| # <p><b>π§Ύ Patient ID:</b> {patient_id}</p> | |
| # <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| # <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| # """ | |
| # # Add demographics | |
| # if any([patient_name, dob, age, gender, address, phone, patient_identifier]): | |
| # md += "<hr style='border:0;border-top:1px solid #c8e6c9;margin:10px 0;'/>" | |
| # md += "<h3 style='color:#1b5e20;'>π€ Patient Demographics</h3>" | |
| # if patient_name: md += f"<p><b>Name:</b> {patient_name}</p>" | |
| # if patient_identifier: md += f"<p><b>Patient Identifier:</b> {patient_identifier}</p>" | |
| # if dob: md += f"<p><b>Date of Birth:</b> {dob}</p>" | |
| # if age: md += f"<p><b>Age:</b> {age}</p>" | |
| # if gender: md += f"<p><b>Gender:</b> {gender}</p>" | |
| # if address: md += f"<p><b>Address:</b> {address}</p>" | |
| # if phone: md += f"<p><b>Phone:</b> {phone}</p>" | |
| # md += "</div><br/>" | |
| # # Render analyses | |
| # for idx, diag in enumerate(analyses, 1): | |
| # md += f""" | |
| # <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| # <h3 style="color:#1565c0;">{idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| # <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| # <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| # """ | |
| # explicit_ans = diag.get("answer_explicit", "N/A") | |
| # explicit_rat = diag.get("rationale_explicit", "") | |
| # implicit_ans = diag.get("answer_implicit", "N/A") | |
| # implicit_rat = diag.get("rationale_implicit", "") | |
| # if explicit_ans.lower() == "yes": | |
| # md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| # else: | |
| # md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| # md += f""" | |
| # <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| # <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| # """ | |
| # if "tests" in diag: | |
| # md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| # tests = diag["tests"] | |
| # if "vitals" in tests: | |
| # md += "<li><b>Vitals:</b><ul>" | |
| # for k, v in tests["vitals"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "procedures" in tests: | |
| # md += "<li><b>Procedures:</b><ul>" | |
| # for k, v in tests["procedures"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "lab_test" in tests: | |
| # md += "<li><b>Lab Tests:</b><ul>" | |
| # for k, v in tests["lab_test"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # md += "</ul></details>" | |
| # if "meat" in diag: | |
| # md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| # for k, v in diag["meat"].items(): | |
| # emoji = "β " if v else "β" | |
| # md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| # md += "</ul>" | |
| # md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| # md += "</details>" | |
| # if "comorbidities" in diag and diag["comorbidities"]: | |
| # md += "<details><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| # for c in diag["comorbidities"]: | |
| # emoji = "β " if c.get("is_present") else "β" | |
| # md += f"<li>{emoji} <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| # md += "</ul></details>" | |
| # md += "</div>" | |
| # return md | |
| # except Exception as e: | |
| # return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # # ---------- Processing Pipeline with Gradio Progress ---------- | |
| # def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs", progress=gr.Progress()): | |
| # try: | |
| # start = time.time() | |
| # step = 0 | |
| # total_steps = 8 # Total number of steps in the pipeline | |
| # def log(msg, current_step=0): | |
| # elapsed = time.time() - start | |
| # # Generate the segmented progress bar HTML | |
| # bar_html = '<div style="display: flex; width: 100%; gap: 2px; height: 12px; margin: 8px 0 5px 0;">' # Added top margin | |
| # for i in range(1, total_steps + 1): | |
| # if i < current_step: | |
| # color = "#1e40af" # Completed (Dark Blue) | |
| # elif i == current_step: | |
| # color = "#3b82f6" # In Progress (Lighter Blue) | |
| # else: | |
| # color = "#e5e7eb" # Pending (Grey) | |
| # bar_html += f'<div style="flex-grow: 1; background-color: {color}; border-radius: 3px;"></div>' | |
| # bar_html += '</div>' | |
| # # --- MODIFIED: Swapped order of msg and bar_html --- | |
| # return f"{msg}{bar_html}<small>β³ Elapsed: {elapsed:.1f} sec</small>" | |
| # # --- END OF MODIFICATION --- | |
| # if pdf_file is None: | |
| # yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| # return | |
| # hcc_code_str = str(hcc_code or "").strip() | |
| # if not hcc_code_str: | |
| # yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| # return | |
| # os.makedirs(output_folder, exist_ok=True) | |
| # pdf_path = pdf_file.name | |
| # file_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # print(f"[PROCESSING] {file_name}") | |
| # # Step 1: Extract Demographics | |
| # step += 1 | |
| # progress((step, total_steps), desc="Extracting demographics") | |
| # yield log(f"π§ Step {step}/{total_steps}: Extracting patient demographics...", step) | |
| # demographics_engine = PatientInfoExtractionEngine(pdf_path) | |
| # demographics_info = demographics_engine.run() | |
| # print(f"[DEMOGRAPHICS] Extracted: {demographics_info}") | |
| # # Step 2: Diagnoses | |
| # step += 1 | |
| # progress((step, total_steps), desc="Extracting diagnoses") | |
| # yield log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step) | |
| # diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| # if not diagnoses: | |
| # yield log(f"β No diagnoses found for HCC {hcc_code_str}.", step) | |
| # return | |
| # # Step 3: Chart checking | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking chart") | |
| # yield log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step) | |
| # all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| # confirmed_diagnoses = [ | |
| # d for d in all_checked_results | |
| # if d.get("answer_explicit", "").lower() == "yes" | |
| # or d.get("answer_implicit", "").lower() == "yes" | |
| # ] | |
| # if not confirmed_diagnoses: | |
| # yield log(f"β No confirmed diagnoses for HCC {hcc_code_str} in {patient_name}.", step) | |
| # return | |
| # # Step 4: Tests | |
| # step += 1 | |
| # progress((step, total_steps), desc="Finding tests") | |
| # yield log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step) | |
| # diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # # Step 5: Clinical Status | |
| # step += 1 | |
| # progress((step, total_steps), desc="Determining clinical status") | |
| # yield log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step) | |
| # diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| # active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # # Step 6: MEAT | |
| # step += 1 | |
| # progress((step, total_steps), desc="Validating MEAT") | |
| # if active_diagnoses: | |
| # yield log(f"π Step {step}/{total_steps}: Validating MEAT...", step) | |
| # validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| # else: | |
| # validated_meat_diagnoses = [] | |
| # yield log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step) | |
| # # Step 7: Comorbidities | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking comorbidities") | |
| # diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| # if diagnoses_passed_meat: | |
| # yield log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step) | |
| # comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| # else: | |
| # comorbidity_results = [] | |
| # # Step 8: Final Report | |
| # step += 1 | |
| # progress((step, total_steps), desc="Generating report") | |
| # yield log(f"β Step {step}/{total_steps}: Generating final report...", step) | |
| # # Merge results | |
| # status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| # meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| # comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| # final_analysis = [] | |
| # for entry in all_checked_results: | |
| # diag_name = entry["diagnosis"] | |
| # updated_entry = entry.copy() | |
| # if diag_name in status_map: | |
| # updated_entry.update(status_map[diag_name]) | |
| # if diag_name in meat_map: | |
| # updated_entry.update(meat_map[diag_name]) | |
| # if diag_name in comorbidity_map: | |
| # updated_entry.update(comorbidity_map[diag_name]) | |
| # final_analysis.append(updated_entry) | |
| # filtered_final_analysis = [ | |
| # e for e in final_analysis | |
| # if e.get("answer_explicit", "").lower() == "yes" | |
| # or e.get("answer_implicit", "").lower() == "yes" | |
| # ] | |
| # # Include demographics in output | |
| # output_data = { | |
| # "file_name": file_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "patient_name": demographics_info.get("name", ""), | |
| # "dob": demographics_info.get("dob", ""), | |
| # "age": demographics_info.get("age", ""), | |
| # "gender": demographics_info.get("gender", ""), | |
| # "address": demographics_info.get("address", ""), | |
| # "phone": demographics_info.get("phone", ""), | |
| # "patient_identifier": demographics_info.get("patient_identifier", ""), | |
| # "final_analysis": filtered_final_analysis | |
| # } | |
| # elapsed = time.time() - start | |
| # yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| # except Exception as e: | |
| # print(f"[ERROR] {e}") | |
| # yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # # ---------- Gradio Theme ---------- | |
| # simple_theme = gr.themes.Soft( | |
| # primary_hue=gr.themes.colors.blue, | |
| # secondary_hue=gr.themes.colors.slate, | |
| # neutral_hue=gr.themes.colors.slate, | |
| # ).set( | |
| # button_primary_background_fill="#1e40af", | |
| # button_primary_background_fill_hover="#1d4ed8", | |
| # button_primary_text_color="white", | |
| # background_fill_primary="white", | |
| # background_fill_secondary="#f8fafc", | |
| # ) | |
| # # ---------- Helper for Sample PDF ---------- | |
| # def load_sample_pdf(): | |
| # if not os.path.exists(SAMPLE_PDF): | |
| # raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| # class PDFWrapper: | |
| # def __init__(self, path): | |
| # self.name = path | |
| # return PDFWrapper(SAMPLE_PDF) | |
| # # ---------- Helper for PDF Preview ---------- | |
| # def pdf_to_iframe(file): | |
| # if file is None: | |
| # return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| # try: | |
| # with open(file.name, "rb") as f: | |
| # pdf_bytes = f.read() | |
| # encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| # return f""" | |
| # <iframe | |
| # src="data:application/pdf;base64,{encoded}" | |
| # width="100%" height="600px" | |
| # style="border:1px solid #ccc;" | |
| # ></iframe> | |
| # """ | |
| # except Exception as e: | |
| # return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| # # ---------- Gradio UI ---------- | |
| # with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| # gr.HTML(f""" | |
| # <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| # <p style='text-align:center;color:#64748b;'> | |
| # Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| # </p> | |
| # """) | |
| # with gr.Row(): | |
| # pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| # hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| # model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| # run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| # with gr.Row(): | |
| # with gr.Column(scale=2): | |
| # pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| # with gr.Column(scale=2): | |
| # output_md = gr.Markdown( | |
| # label="Validation Report", | |
| # value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| # ) | |
| # pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| # run_btn.click( | |
| # fn=process_pipeline, | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # outputs=[output_md], | |
| # ) | |
| # gr.Examples( | |
| # examples=[[SAMPLE_PDF]], | |
| # inputs=[pdf_upload], | |
| # outputs=[output_md], | |
| # fn=lambda x: process_pipeline(load_sample_pdf(), hcc_code="12", model_version="V24"), | |
| # cache_examples=False | |
| # ) | |
| # if __name__ == "__main__": | |
| # interface.queue().launch( | |
| # server_name="0.0.0.0", | |
| # server_port=int(os.environ.get("PORT", 7860)) | |
| # ) | |
| # import os | |
| # import gradio as gr | |
| # import json | |
| # import time | |
| # import base64 | |
| # from dotenv import load_dotenv | |
| # from ClinicalStatusAgent import ClinicalStatusAgent | |
| # from TestFindingAgent import TestFindingAgent | |
| # from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| # from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| # from chartdiagnosischecker import ChartDiagnosisChecker | |
| # from MeatValidatorAgent import MEATValidatorAgent | |
| # from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| # load_dotenv() | |
| # APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| # CSV_PATH = "hcc_mapping.csv" | |
| # SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # # ---------- JSON to Markdown ---------- | |
| # def json_to_markdown(data) -> str: | |
| # try: | |
| # if isinstance(data, dict) and "final_analysis" in data: | |
| # patient_id = data.get("patient_id", "Unknown Patient") | |
| # hcc_code = data.get("hcc_code", "N/A") | |
| # model_version = data.get("model_version", "N/A") | |
| # analyses = data.get("final_analysis", []) | |
| # # Demographics | |
| # patient_name = data.get("patient_name", "") | |
| # dob = data.get("dob", "") | |
| # age = data.get("age", "") | |
| # gender = data.get("gender", "") | |
| # address = data.get("address", "") | |
| # phone = data.get("phone", "") | |
| # patient_identifier = data.get("patient_identifier", "") | |
| # elif isinstance(data, list): | |
| # patient_id = "N/A" | |
| # hcc_code = "N/A" | |
| # model_version = "N/A" | |
| # analyses = data | |
| # patient_name = dob = age = gender = address = phone = patient_identifier = "" | |
| # else: | |
| # return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # md = f""" | |
| # <div style="border:2px solid #4CAF50; padding:15px; border-radius:10px; background:#f9fdf9;"> | |
| # <h2 style="color:#2e7d32;">π HCC Chart Validation Report </h2> | |
| # <p><b>π§Ύ Patient ID:</b> {patient_id}</p> | |
| # <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| # <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| # """ | |
| # # Add demographics | |
| # if any([patient_name, dob, age, gender, address, phone, patient_identifier]): | |
| # md += "<hr style='border:0;border-top:1px solid #c8e6c9;margin:10px 0;'/>" | |
| # md += "<h3 style='color:#1b5e20;'>π€ Patient Demographics</h3>" | |
| # if patient_name: md += f"<p><b>Name:</b> {patient_name}</p>" | |
| # if patient_identifier: md += f"<p><b>Patient Identifier:</b> {patient_identifier}</p>" | |
| # if dob: md += f"<p><b>Date of Birth:</b> {dob}</p>" | |
| # if age: md += f"<p><b>Age:</b> {age}</p>" | |
| # if gender: md += f"<p><b>Gender:</b> {gender}</p>" | |
| # if address: md += f"<p><b>Address:</b> {address}</p>" | |
| # if phone: md += f"<p><b>Phone:</b> {phone}</p>" | |
| # md += "</div><br/>" | |
| # # Render analyses | |
| # for idx, diag in enumerate(analyses, 1): | |
| # md += f""" | |
| # <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| # <h3 style="color:#1565c0;">{idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| # <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| # <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| # """ | |
| # explicit_ans = diag.get("answer_explicit", "N/A") | |
| # explicit_rat = diag.get("rationale_explicit", "") | |
| # implicit_ans = diag.get("answer_implicit", "N/A") | |
| # implicit_rat = diag.get("rationale_implicit", "") | |
| # if explicit_ans.lower() == "yes": | |
| # md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| # else: | |
| # md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| # md += f""" | |
| # <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| # <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| # """ | |
| # if "tests" in diag: | |
| # md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| # tests = diag["tests"] | |
| # if "vitals" in tests: | |
| # md += "<li><b>Vitals:</b><ul>" | |
| # for k, v in tests["vitals"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "procedures" in tests: | |
| # md += "<li><b>Procedures:</b><ul>" | |
| # for k, v in tests["procedures"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "lab_test" in tests: | |
| # md += "<li><b>Lab Tests:</b><ul>" | |
| # for k, v in tests["lab_test"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # md += "</ul></details>" | |
| # if "meat" in diag: | |
| # md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| # for k, v in diag["meat"].items(): | |
| # emoji = "β " if v else "β" | |
| # md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| # md += "</ul>" | |
| # md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| # md += "</details>" | |
| # if "comorbidities" in diag and diag["comorbidities"]: | |
| # md += "<details><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| # for c in diag["comorbidities"]: | |
| # emoji = "β " if c.get("is_present") else "β" | |
| # md += f"<li>{emoji} <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| # md += "</ul></details>" | |
| # md += "</div>" | |
| # return md | |
| # except Exception as e: | |
| # return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # # ---------- Processing Pipeline with Gradio Progress ---------- | |
| # def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs", progress=gr.Progress()): | |
| # try: | |
| # start = time.time() | |
| # step = 0 | |
| # total_steps = 8 # Step 0 added for demographics | |
| # def log(msg, step_progress=None): | |
| # elapsed = time.time() - start | |
| # progress_html = "" | |
| # if step_progress is not None: | |
| # progress_html = f""" | |
| # <div style="width:100%; background:#eee; border-radius:6px; margin:5px 0;"> | |
| # <div style="width:{step_progress*100}%; background:#1e40af; height:12px; border-radius:6px;"></div> | |
| # </div> | |
| # """ | |
| # return f"{progress_html}{msg}<br/><small>β³ Elapsed: {elapsed:.1f} sec</small>" | |
| # if pdf_file is None: | |
| # yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| # return | |
| # hcc_code_str = str(hcc_code or "").strip() | |
| # if not hcc_code_str: | |
| # yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| # return | |
| # os.makedirs(output_folder, exist_ok=True) | |
| # pdf_path = pdf_file.name | |
| # patient_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # print(f"[PROCESSING] {patient_name}") | |
| # # Step 0: Extract Demographics | |
| # step += 1 | |
| # progress((step, total_steps), desc="Extracting demographics") | |
| # yield log(f"π§ Step {step}/{total_steps}: Extracting patient demographics...", step/total_steps) | |
| # demographics_engine = PatientInfoExtractionEngine(pdf_path) | |
| # demographics_info = demographics_engine.run() | |
| # print(f"[DEMOGRAPHICS] Extracted: {demographics_info}") | |
| # # Step 1: Diagnoses | |
| # step += 1 | |
| # progress((step, total_steps), desc="Extracting diagnoses") | |
| # yield log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step/total_steps) | |
| # diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| # if not diagnoses: | |
| # yield log(f"β No diagnoses found for HCC {hcc_code_str}.", step/total_steps) | |
| # return | |
| # # Step 2: Chart checking | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking chart") | |
| # yield log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step/total_steps) | |
| # all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| # confirmed_diagnoses = [ | |
| # d for d in all_checked_results | |
| # if d.get("answer_explicit", "").lower() == "yes" | |
| # or d.get("answer_implicit", "").lower() == "yes" | |
| # ] | |
| # if not confirmed_diagnoses: | |
| # yield log(f"β No confirmed diagnoses for HCC {hcc_code_str} in {patient_name}.", step/total_steps) | |
| # return | |
| # # Step 3: Tests | |
| # step += 1 | |
| # progress((step, total_steps), desc="Finding tests") | |
| # yield log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step/total_steps) | |
| # diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # # Step 4: Clinical Status | |
| # step += 1 | |
| # progress((step, total_steps), desc="Determining clinical status") | |
| # yield log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step/total_steps) | |
| # diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| # active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # # Step 5: MEAT | |
| # step += 1 | |
| # progress((step, total_steps), desc="Validating MEAT") | |
| # if active_diagnoses: | |
| # yield log(f"π Step {step}/{total_steps}: Validating MEAT...", step/total_steps) | |
| # validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| # else: | |
| # validated_meat_diagnoses = [] | |
| # yield log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step/total_steps) | |
| # # Step 6: Comorbidities | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking comorbidities") | |
| # diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| # if diagnoses_passed_meat: | |
| # yield log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step/total_steps) | |
| # comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| # else: | |
| # comorbidity_results = [] | |
| # # Step 7: Final Report | |
| # step += 1 | |
| # progress((step, total_steps), desc="Generating report") | |
| # yield log(f"β Step {step}/{total_steps}: Generating final report...", step/total_steps) | |
| # # Merge results | |
| # status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| # meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| # comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| # final_analysis = [] | |
| # for entry in all_checked_results: | |
| # diag_name = entry["diagnosis"] | |
| # updated_entry = entry.copy() | |
| # if diag_name in status_map: | |
| # updated_entry.update(status_map[diag_name]) | |
| # if diag_name in meat_map: | |
| # updated_entry.update(meat_map[diag_name]) | |
| # if diag_name in comorbidity_map: | |
| # updated_entry.update(comorbidity_map[diag_name]) | |
| # final_analysis.append(updated_entry) | |
| # filtered_final_analysis = [ | |
| # e for e in final_analysis | |
| # if e.get("answer_explicit", "").lower() == "yes" | |
| # or e.get("answer_implicit", "").lower() == "yes" | |
| # ] | |
| # # Include demographics in output | |
| # output_data = { | |
| # "patient_id": patient_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "patient_name": demographics_info.get("name", ""), | |
| # "dob": demographics_info.get("dob", ""), | |
| # "age": demographics_info.get("age", ""), | |
| # "gender": demographics_info.get("gender", ""), | |
| # "address": demographics_info.get("address", ""), | |
| # "phone": demographics_info.get("phone", ""), | |
| # "patient_identifier": demographics_info.get("patient_identifier", ""), | |
| # "final_analysis": filtered_final_analysis | |
| # } | |
| # elapsed = time.time() - start | |
| # yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| # except Exception as e: | |
| # print(f"[ERROR] {e}") | |
| # yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # # ---------- Gradio Theme ---------- | |
| # simple_theme = gr.themes.Soft( | |
| # primary_hue=gr.themes.colors.blue, | |
| # secondary_hue=gr.themes.colors.slate, | |
| # neutral_hue=gr.themes.colors.slate, | |
| # ).set( | |
| # button_primary_background_fill="#1e40af", | |
| # button_primary_background_fill_hover="#1d4ed8", | |
| # button_primary_text_color="white", | |
| # background_fill_primary="white", | |
| # background_fill_secondary="#f8fafc", | |
| # ) | |
| # # ---------- Helper for Sample PDF ---------- | |
| # def load_sample_pdf(): | |
| # if not os.path.exists(SAMPLE_PDF): | |
| # raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| # class PDFWrapper: | |
| # def __init__(self, path): | |
| # self.name = path | |
| # return PDFWrapper(SAMPLE_PDF) | |
| # # ---------- Helper for PDF Preview ---------- | |
| # def pdf_to_iframe(file): | |
| # if file is None: | |
| # return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| # try: | |
| # with open(file.name, "rb") as f: | |
| # pdf_bytes = f.read() | |
| # encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| # return f""" | |
| # <iframe | |
| # src="data:application/pdf;base64,{encoded}" | |
| # width="100%" height="600px" | |
| # style="border:1px solid #ccc;" | |
| # ></iframe> | |
| # """ | |
| # except Exception as e: | |
| # return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| # # ---------- Gradio UI ---------- | |
| # with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| # gr.HTML(f""" | |
| # <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| # <p style='text-align:center;color:#64748b;'> | |
| # Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| # </p> | |
| # """) | |
| # with gr.Row(): | |
| # pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| # hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| # model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| # run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| # with gr.Row(): | |
| # with gr.Column(scale=2): | |
| # pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| # with gr.Column(scale=2): | |
| # output_md = gr.Markdown( | |
| # label="Validation Report", | |
| # value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| # ) | |
| # pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| # run_btn.click( | |
| # fn=process_pipeline, | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # outputs=[output_md], | |
| # ) | |
| # gr.Examples( | |
| # examples=[[SAMPLE_PDF]], | |
| # inputs=[pdf_upload], | |
| # outputs=[output_md], | |
| # fn=lambda x: process_pipeline(load_sample_pdf(), hcc_code="12", model_version="V24"), | |
| # cache_examples=False | |
| # ) | |
| # if __name__ == "__main__": | |
| # interface.queue().launch( | |
| # server_name="0.0.0.0", | |
| # server_port=int(os.environ.get("PORT", 7860)) | |
| # ) | |
| # import os | |
| # import gradio as gr | |
| # import json | |
| # import time | |
| # import base64 | |
| # from dotenv import load_dotenv | |
| # from ClinicalStatusAgent import ClinicalStatusAgent | |
| # from TestFindingAgent import TestFindingAgent | |
| # from ComorbidityCheckerAgent import ComorbidityCheckerAgent | |
| # from HCCDiagnosisListEngine import HCCDiagnosisListEngine | |
| # from chartdiagnosischecker import ChartDiagnosisChecker | |
| # from MeatValidatorAgent import MEATValidatorAgent | |
| # from PatientInfoExtractionEngine import PatientInfoExtractionEngine | |
| # from typing import Optional | |
| # load_dotenv() | |
| # APP_TITLE = "Risk Adjustment (HCC Chart Validation)" | |
| # CSV_PATH = "hcc_mapping.csv" | |
| # SAMPLE_PDF = "sample_patient_chart.pdf" # Place a sample PDF in the same folder | |
| # # ---------- JSON to Markdown ---------- | |
| # def json_to_markdown(data) -> str: | |
| # try: | |
| # if isinstance(data, dict) and "final_analysis" in data: | |
| # patient_id = data.get("patient_id", "Unknown Patient") | |
| # hcc_code = data.get("hcc_code", "N/A") | |
| # model_version = data.get("model_version", "N/A") | |
| # analyses = data.get("final_analysis", []) | |
| # elif isinstance(data, list): | |
| # patient_id = "N/A" | |
| # hcc_code = "N/A" | |
| # model_version = "N/A" | |
| # analyses = data | |
| # else: | |
| # return "<div style='color:red; font-weight:bold;'>β οΈ Invalid data format for report.</div>" | |
| # md = f""" | |
| # <div style="border:2px solid #4CAF50; padding:15px; border-radius:10px; background:#f9fdf9;"> | |
| # <h2 style="color:#2e7d32;">π HCC Chart Validation Report </h2> | |
| # <p><b>π§Ύ Patient ID:</b> {patient_id}</p> | |
| # <p><b>π·οΈ HCC Code:</b> {hcc_code}</p> | |
| # <p><b>βοΈ Model Version:</b> {model_version}</p> | |
| # </div> | |
| # <br/> | |
| # """ | |
| # for idx, diag in enumerate(analyses, 1): | |
| # md += f""" | |
| # <div style="border:1px solid #ccc; padding:12px; border-radius:8px; margin-bottom:12px;"> | |
| # <h3 style="color:#1565c0;">{idx}. {diag.get("diagnosis", "Unknown Diagnosis")}</h3> | |
| # <p><b>ICD-10:</b> {diag.get("icd10", "N/A")}</p> | |
| # <p><b>Reference:</b> <a href="{diag.get("reference","")}" target="_blank">{diag.get("reference","")}</a></p> | |
| # """ | |
| # explicit_ans = diag.get("answer_explicit", "N/A") | |
| # explicit_rat = diag.get("rationale_explicit", "") | |
| # implicit_ans = diag.get("answer_implicit", "N/A") | |
| # implicit_rat = diag.get("rationale_implicit", "") | |
| # if explicit_ans.lower() == "yes": | |
| # md += f"<p><b>Explicit:</b> {explicit_ans} β {explicit_rat}</p>" | |
| # else: | |
| # md += f"<p><b>Implicit:</b> {implicit_ans} β {implicit_rat}</p>" | |
| # md += f""" | |
| # <p><b>Clinical Status:</b> {diag.get("clinical_status","N/A")}</p> | |
| # <p><b>Status Rationale:</b> {diag.get("status_rationale","")}</p> | |
| # """ | |
| # if "tests" in diag: | |
| # md += "<details><summary><b>π§ͺ Tests & Procedures</b></summary><ul>" | |
| # tests = diag["tests"] | |
| # if "vitals" in tests: | |
| # md += "<li><b>Vitals:</b><ul>" | |
| # for k, v in tests["vitals"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "procedures" in tests: | |
| # md += "<li><b>Procedures:</b><ul>" | |
| # for k, v in tests["procedures"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # if "lab_test" in tests: | |
| # md += "<li><b>Lab Tests:</b><ul>" | |
| # for k, v in tests["lab_test"].items(): | |
| # md += f"<li>{k}: {v}</li>" | |
| # md += "</ul></li>" | |
| # md += "</ul></details>" | |
| # if "meat" in diag: | |
| # md += "<details><summary><b>π MEAT Validation</b></summary><ul>" | |
| # for k, v in diag["meat"].items(): | |
| # emoji = "β " if v else "β" | |
| # md += f"<li>{k.capitalize()}: {emoji}</li>" | |
| # md += "</ul>" | |
| # md += f"<p><b>MEAT Rationale:</b> {diag.get('meat_rationale','')}</p>" | |
| # md += "</details>" | |
| # if "comorbidities" in diag and diag["comorbidities"]: | |
| # md += "<details><summary><b>π©Ί Comorbidities</b></summary><ul>" | |
| # for c in diag["comorbidities"]: | |
| # emoji = "β " if c.get("is_present") else "β" | |
| # md += f"<li>{emoji} <b>{c.get('condition')}</b><br/><i>{c.get('rationale')}</i></li>" | |
| # md += "</ul></details>" | |
| # md += "</div>" | |
| # return md | |
| # except Exception as e: | |
| # return f"<div style='color:red; font-weight:bold;'>β οΈ Error rendering report: {e}</div>" | |
| # # ---------- Processing Pipeline with Gradio Progress ---------- | |
| # def process_pipeline(pdf_file, hcc_code, model_version, csv_path=CSV_PATH, output_folder="outputs", progress=gr.Progress()): | |
| # try: | |
| # start = time.time() | |
| # step = 0 | |
| # total_steps = 7 | |
| # def log(msg, step_progress=None): | |
| # elapsed = time.time() - start | |
| # progress_html = "" | |
| # if step_progress is not None: | |
| # progress_html = f""" | |
| # <div style="width:100%; background:#eee; border-radius:6px; margin:5px 0;"> | |
| # <div style="width:{step_progress*100}%; background:#1e40af; height:12px; border-radius:6px;"></div> | |
| # </div> | |
| # """ | |
| # return f"{progress_html}{msg}<br/><small>β³ Elapsed: {elapsed:.1f} sec</small>" | |
| # if pdf_file is None: | |
| # yield log("β οΈ Please upload a patient chart PDF.", 0) | |
| # return | |
| # hcc_code_str = str(hcc_code or "").strip() | |
| # if not hcc_code_str: | |
| # yield log("β οΈ Please enter a valid HCC Code before running validation.", 0) | |
| # return | |
| # os.makedirs(output_folder, exist_ok=True) | |
| # pdf_path = pdf_file.name | |
| # patient_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # print(f"[PROCESSING] {patient_name}") | |
| # # Step 1: Diagnoses | |
| # step += 1 | |
| # progress((step, total_steps), desc="Extracting diagnoses") | |
| # yield log(f"π Step {step}/{total_steps}: Extracting possible HCC Diagnoses...", step/total_steps) | |
| # diagnoses = HCCDiagnosisListEngine(hcc_code_str, model_version, csv_path).run() | |
| # if not diagnoses: | |
| # yield log(f"β No diagnoses found for HCC {hcc_code_str}.", step/total_steps) | |
| # return | |
| # # Step 2: Chart checking | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking chart") | |
| # yield log(f"π Step {step}/{total_steps}: Checking diagnoses in patient chart...", step/total_steps) | |
| # all_checked_results = ChartDiagnosisChecker(pdf_path).run(diagnoses) | |
| # confirmed_diagnoses = [ | |
| # d for d in all_checked_results | |
| # if d.get("answer_explicit", "").lower() == "yes" | |
| # or d.get("answer_implicit", "").lower() == "yes" | |
| # ] | |
| # if not confirmed_diagnoses: | |
| # yield log(f"β No confirmed diagnoses for HCC {hcc_code_str} in {patient_name}.", step/total_steps) | |
| # return | |
| # # Step 3: Tests | |
| # step += 1 | |
| # progress((step, total_steps), desc="Finding tests") | |
| # yield log(f"π§ͺ Step {step}/{total_steps}: Finding relevant tests...", step/total_steps) | |
| # diagnoses_with_tests = TestFindingAgent(hcc_code=hcc_code_str, model_version=model_version).run(confirmed_diagnoses) | |
| # # Step 4: Clinical Status | |
| # step += 1 | |
| # progress((step, total_steps), desc="Determining clinical status") | |
| # yield log(f"βοΈ Step {step}/{total_steps}: Determining clinical status...", step/total_steps) | |
| # diagnoses_with_status = ClinicalStatusAgent().run(diagnoses_with_tests) | |
| # active_diagnoses = [d for d in diagnoses_with_status if d.get("clinical_status") == "ACTIVE"] | |
| # # Step 5: MEAT | |
| # step += 1 | |
| # progress((step, total_steps), desc="Validating MEAT") | |
| # if active_diagnoses: | |
| # yield log(f"π Step {step}/{total_steps}: Validating MEAT...", step/total_steps) | |
| # validated_meat_diagnoses = MEATValidatorAgent().run(active_diagnoses) | |
| # else: | |
| # validated_meat_diagnoses = [] | |
| # yield log("βΉοΈ No ACTIVE diagnoses found. Skipping MEAT/Comorbidity.", step/total_steps) | |
| # # Step 6: Comorbidities | |
| # step += 1 | |
| # progress((step, total_steps), desc="Checking comorbidities") | |
| # diagnoses_passed_meat = [d for d in validated_meat_diagnoses if any(d.get("meat", {}).values())] | |
| # if diagnoses_passed_meat: | |
| # yield log(f"π€ Step {step}/{total_steps}: Checking comorbidities...", step/total_steps) | |
| # comorbidity_results = ComorbidityCheckerAgent(pdf_path, hcc_code_str, model_version).run(diagnoses_passed_meat) | |
| # else: | |
| # comorbidity_results = [] | |
| # # Step 7: Final Report | |
| # step += 1 | |
| # progress((step, total_steps), desc="Generating report") | |
| # yield log(f"β Step {step}/{total_steps}: Generating final report...", step/total_steps) | |
| # # Merge results | |
| # status_map = {d["diagnosis"]: d for d in diagnoses_with_status} | |
| # meat_map = {d["diagnosis"]: d for d in validated_meat_diagnoses} | |
| # comorbidity_map = {d["diagnosis"]: d for d in comorbidity_results} | |
| # final_analysis = [] | |
| # for entry in all_checked_results: | |
| # diag_name = entry["diagnosis"] | |
| # updated_entry = entry.copy() | |
| # if diag_name in status_map: | |
| # updated_entry.update(status_map[diag_name]) | |
| # if diag_name in meat_map: | |
| # updated_entry.update(meat_map[diag_name]) | |
| # if diag_name in comorbidity_map: | |
| # updated_entry.update(comorbidity_map[diag_name]) | |
| # final_analysis.append(updated_entry) | |
| # filtered_final_analysis = [ | |
| # e for e in final_analysis | |
| # if e.get("answer_explicit", "").lower() == "yes" | |
| # or e.get("answer_implicit", "").lower() == "yes" | |
| # ] | |
| # output_data = { | |
| # "patient_id": patient_name, | |
| # "hcc_code": hcc_code_str, | |
| # "model_version": model_version, | |
| # "final_analysis": filtered_final_analysis | |
| # } | |
| # elapsed = time.time() - start | |
| # yield json_to_markdown(output_data) + f"<br/><div style='color:green;'>β Completed in {elapsed:.1f} sec</div>" | |
| # except Exception as e: | |
| # print(f"[ERROR] {e}") | |
| # yield f"<div style='color:red; font-weight:bold;'>β οΈ Error: {e}</div>" | |
| # # ---------- Gradio Theme ---------- | |
| # simple_theme = gr.themes.Soft( | |
| # primary_hue=gr.themes.colors.blue, | |
| # secondary_hue=gr.themes.colors.slate, | |
| # neutral_hue=gr.themes.colors.slate, | |
| # ).set( | |
| # button_primary_background_fill="#1e40af", | |
| # button_primary_background_fill_hover="#1d4ed8", | |
| # button_primary_text_color="white", | |
| # background_fill_primary="white", | |
| # background_fill_secondary="#f8fafc", | |
| # ) | |
| # # ---------- Helper for Sample PDF ---------- | |
| # def load_sample_pdf(): | |
| # if not os.path.exists(SAMPLE_PDF): | |
| # raise FileNotFoundError(f"Sample PDF not found at {SAMPLE_PDF}") | |
| # class PDFWrapper: | |
| # def __init__(self, path): | |
| # self.name = path | |
| # return PDFWrapper(SAMPLE_PDF) | |
| # # ---------- Helper for PDF Preview ---------- | |
| # def pdf_to_iframe(file): | |
| # if file is None: | |
| # return "<p style='color:orange;'>No PDF uploaded.</p>" | |
| # try: | |
| # with open(file.name, "rb") as f: | |
| # pdf_bytes = f.read() | |
| # encoded = base64.b64encode(pdf_bytes).decode("utf-8") | |
| # return f""" | |
| # <iframe | |
| # src="data:application/pdf;base64,{encoded}" | |
| # width="100%" height="600px" | |
| # style="border:1px solid #ccc;" | |
| # ></iframe> | |
| # """ | |
| # except Exception as e: | |
| # return f"<p style='color:red;'>Failed to display PDF: {e}</p>" | |
| # # ---------- Gradio UI ---------- | |
| # with gr.Blocks(theme=simple_theme, title=APP_TITLE) as interface: | |
| # gr.HTML(f""" | |
| # <h1 style='text-align:center;color:#1e40af;'> π© {APP_TITLE}</h1> | |
| # <p style='text-align:center;color:#64748b;'> | |
| # Upload a chart, set HCC + model version, and validate MEAT criteria. | |
| # </p> | |
| # """) | |
| # with gr.Row(): | |
| # pdf_upload = gr.File(label="Upload Patient Chart (PDF)", file_types=[".pdf"], scale=1) | |
| # hcc_code = gr.Textbox(label="HCC Code (e.g., 12)", placeholder="Enter HCC code", scale=1) | |
| # model_version = gr.Dropdown(choices=["V24", "V28"], label="Model Version", value="V24", scale=1) | |
| # run_btn = gr.Button("π Run Validation", variant="primary", scale=1) | |
| # with gr.Row(): | |
| # with gr.Column(scale=2): | |
| # pdf_preview = gr.HTML(label="π PDF Preview", value="<p>Upload a PDF to preview</p>") | |
| # with gr.Column(scale=2): | |
| # output_md = gr.Markdown( | |
| # label="Validation Report", | |
| # value="<div style='border:2px solid #1e40af; border-radius:12px; padding:15px; background-color:#f0f9ff;'>π Upload a PDF and click <b>Run Validation</b> to start.</div>", | |
| # ) | |
| # # Connect PDF upload to preview | |
| # pdf_upload.change(fn=pdf_to_iframe, inputs=pdf_upload, outputs=pdf_preview) | |
| # # Connect run button with progress | |
| # run_btn.click( | |
| # fn=process_pipeline, | |
| # inputs=[pdf_upload, hcc_code, model_version], | |
| # outputs=[output_md], | |
| # ) | |
| # gr.Examples( | |
| # examples=[[SAMPLE_PDF]], | |
| # inputs=[pdf_upload], | |
| # outputs=[output_md], | |
| # fn=lambda x: process_pipeline(load_sample_pdf(), hcc_code="12", model_version="V24"), | |
| # cache_examples=False | |
| # ) | |
| # if __name__ == "__main__": | |
| # interface.queue().launch( | |
| # server_name="0.0.0.0", | |
| # server_port=int(os.environ.get("PORT", 7860)) | |
| # ) | |