import pandas as pd import gradio as gr from .visualization import QCVisualizer, AnalysisVisualizer class ComponentHelper: def __init__(self, file_qc_results: pd.DataFrame, dataset_qc_results: pd.DataFrame, analyzed_gating_results: pd.DataFrame, analyzed_gating_results_wCDAResults: pd.DataFrame, config: dict): self.file_qc_results = file_qc_results self.dataset_qc_results = dataset_qc_results self.analyzed_gating_results = analyzed_gating_results self.analyzed_gating_results_wCDAResults = analyzed_gating_results_wCDAResults self.config = config self.choices = {"dataset_filter": list(file_qc_results["Dataset"].unique()), "model_filter": sorted(list(file_qc_results["Instrument model"].unique())), "sop_exp_qc_tab_filter": list(file_qc_results["SOP-Exp"].unique()), "material_filter": list((file_qc_results["SOP-Exp"] + " " + file_qc_results["Material"]).unique()), "issue_filter": config["qc_results_tab"]["issues"], "sample_filter": list(analyzed_gating_results["Sample"].unique()), "sop_exp_analysis_tab_filter": list(analyzed_gating_results["SOP-Exp"].unique()), "compensation_control_filter": list(analyzed_gating_results["Compensation control"].unique()), "gating_control_filter": list(analyzed_gating_results["Gating control"].unique()), "pop_pheno_parent_filter": list(analyzed_gating_results["Population; Phenotype (Parent gate)"].unique()), "analyzed_result_filter": config["gating_results_tab"]["results"], } self.creator = self._Creator(self) self.updater = self._Updater(self) class _Creator: def __init__(self, helper_instance): self.helper = helper_instance self.initialization_values = {} def main_filter_components(self): with gr.Row(): gr.Markdown("Instrument model") clear_all_models_button = gr.Button("Clear All", size="sm") select_all_models_button = gr.Button("Select All", size="sm") model_filter = gr.Dropdown(label='', multiselect=True, choices=self.helper.choices["model_filter"], value=self.helper.choices["model_filter"]) self.initialization_values["model_filter"] = self.helper.choices["model_filter"] with gr.Row(): gr.Markdown("Dataset") clear_all_datasets_button = gr.Button("Clear All", size="sm") select_all_datasets_button = gr.Button( "Select All", size="sm") dataset_filter = gr.Dropdown(label='', multiselect=True, choices=self.helper.choices["dataset_filter"], value=self.helper.choices["dataset_filter"]) self.initialization_values["dataset_filter"] = self.helper.choices["dataset_filter"] main_apply_filters_button = gr.Button(value="Apply filters") return clear_all_models_button, select_all_models_button, model_filter, \ clear_all_datasets_button, select_all_datasets_button, dataset_filter, \ main_apply_filters_button def qc_tab_filter_components(self): with gr.Row(): gr.Markdown("SOP-Exp") clear_all_sop_exps_qc_tab_button = gr.Button( "Clear All", size="sm") select_all_sop_exps_qc_tab_button = gr.Button( "Select All", size="sm") sop_exp_qc_tab_filter = gr.Dropdown(label='', multiselect=True, choices=self.helper.choices["sop_exp_qc_tab_filter"], value=self.helper.choices["sop_exp_qc_tab_filter"]) self.initialization_values["sop_exp_qc_tab_filter"] = self.helper.choices["sop_exp_qc_tab_filter"] with gr.Row(): gr.Markdown("Material") clear_all_materials_button = gr.Button("Clear All", size="sm") select_all_materials_button = gr.Button( "Select All", size="sm") material_filter = gr.Dropdown(label='', multiselect=True, choices=self.helper.choices["material_filter"], value=self.helper.choices["material_filter"]) self.initialization_values["material_filter"] = self.helper.choices["material_filter"] with gr.Row(): gr.Markdown("Issues") clear_all_issues_button = gr.Button("Clear All", size="sm") select_all_issues_button = gr.Button("Select All", size="sm") issue_filter = gr.Dropdown(label='', multiselect=True, choices=self.helper.choices["issue_filter"], value=self.helper.choices["issue_filter"]) self.initialization_values["issue_filter"] = self.helper.choices["issue_filter"] qc_tab_apply_filters_button = gr.Button(value="Apply filters") return clear_all_sop_exps_qc_tab_button, select_all_sop_exps_qc_tab_button, sop_exp_qc_tab_filter, \ clear_all_materials_button, select_all_materials_button, material_filter, \ clear_all_issues_button, select_all_issues_button, issue_filter, \ qc_tab_apply_filters_button def qc_tab_dataset_qc_status_filter_component(self): gr.Markdown("Dataset QC status") qc_tab_dataset_qc_status_filter = gr.CheckboxGroup(label='', choices=[ "Pass QC", "Fail QC"], value=["Pass QC", "Fail QC"]) self.initialization_values["qc_tab_dataset_qc_status_filter"] = [ "Pass QC", "Fail QC"] return qc_tab_dataset_qc_status_filter def file_qc_result_components(self): gr.Markdown("File QC status") file_qc_status_filter = gr.CheckboxGroup(label='', choices=[ "Pass QC", "Fail QC"], value=["Fail QC"]) self.initialization_values["file_qc_status_filter"] = ["Fail QC"] updater = self.helper.updater updater.update_file_qc_table(self.initialization_values["dataset_filter"], self.initialization_values["sop_exp_qc_tab_filter"], self.initialization_values["material_filter"], self.initialization_values["issue_filter"], self.initialization_values["qc_tab_dataset_qc_status_filter"], self.initialization_values["file_qc_status_filter"]) file_qc_table = gr.Dataframe( type="pandas", show_copy_button=True, show_row_numbers=True, value=updater.filtered_file_qc_results) file_qc_table_no_file_msg = gr.Markdown( value="All files are filtered", visible=False) return file_qc_status_filter, file_qc_table, file_qc_table_no_file_msg def dataset_qc_result_components(self): updater = self.helper.updater updater.update_file_qc_table(self.initialization_values["dataset_filter"], self.initialization_values["sop_exp_qc_tab_filter"], self.initialization_values["material_filter"], self.initialization_values["issue_filter"], self.initialization_values["qc_tab_dataset_qc_status_filter"], self.initialization_values["file_qc_status_filter"]) dataset_qc_table = gr.Dataframe( type="pandas", show_copy_button=True, show_row_numbers=True, value=updater.filtered_dataset_qc_results) dataset_qc_table_no_dataset_msg = gr.Markdown( value="All datasets are filtered", visible=False) return dataset_qc_table, dataset_qc_table_no_dataset_msg def qc_result_visual_components(self): updater = self.helper.updater updater.update_qc_fig(self.initialization_values["dataset_filter"], self.initialization_values["sop_exp_qc_tab_filter"], self.initialization_values["material_filter"], self.initialization_values["issue_filter"], self.initialization_values["qc_tab_dataset_qc_status_filter"]) qc_fig = gr.Plot(elem_id="plot_wScrollBar", value=updater.qc_visual) qc_fig_no_file_msg = gr.Markdown( value="All files are filtered", visible=False) return qc_fig, qc_fig_no_file_msg def analysis_tab_filter_components(self): gr.Markdown("Sample") sample_filter = gr.Dropdown(label='', choices=self.helper.choices["sample_filter"], value=self.helper.choices["sample_filter"][0]) self.initialization_values["sample_filter"] = self.helper.choices["sample_filter"][0] gr.Markdown("SOP-Exp") sop_exp_analysis_tab_filter = gr.Dropdown(label='', choices=self.helper.choices["sop_exp_analysis_tab_filter"], value=self.helper.choices["sop_exp_analysis_tab_filter"][0]) self.initialization_values["sop_exp_analysis_tab_filter"] = self.helper.choices["sop_exp_analysis_tab_filter"][0] gr.Markdown("Compensation control") compensation_control_filter = gr.Dropdown(label='', choices=self.helper.choices["compensation_control_filter"], value=self.helper.choices["compensation_control_filter"][0]) self.initialization_values["compensation_control_filter"] = self.helper.choices["compensation_control_filter"][0] gr.Markdown("Gating control") gating_control_filter = gr.Dropdown(label='', choices=self.helper.choices["gating_control_filter"], value=self.helper.choices["gating_control_filter"][0]) self.initialization_values["gating_control_filter"] = self.helper.choices["gating_control_filter"][0] gr.Markdown("Population; Phenotype (Parent gate)") pop_pheno_parent_filter = gr.Dropdown(label='', choices=self.helper.choices["pop_pheno_parent_filter"], value=self.helper.choices["pop_pheno_parent_filter"][3]) self.initialization_values["pop_pheno_parent_filter"] = self.helper.choices["pop_pheno_parent_filter"][3] clear_gating_tab_filters_button = gr.Button( value="Clear selections") analysis_tab_apply_filters_button = gr.Button( value="Apply filters") return sample_filter, sop_exp_analysis_tab_filter, compensation_control_filter, \ gating_control_filter, pop_pheno_parent_filter, clear_gating_tab_filters_button, analysis_tab_apply_filters_button def analyzed_result_filter_component(self): gr.Markdown("Dataset QC status") analysis_tab_dataset_qc_status_filter = gr.CheckboxGroup(label='', choices=[ "Pass QC", "Fail QC"], value=["Pass QC"]) self.initialization_values["analysis_tab_dataset_qc_status_filter"] = [ "Pass QC"] gr.Markdown("Analyzed result") analyzed_result_filter = gr.Dropdown(label='', multiselect=True, choices=self.helper.config["gating_results_tab"]["results"], value=[self.helper.config["gating_results_tab"]["results"][0]]) self.initialization_values["analyzed_result_filter"] = [ self.helper.config["gating_results_tab"]["results"][0]] return analysis_tab_dataset_qc_status_filter, analyzed_result_filter def analysis_barplot_components(self): updater = self.helper.updater updater.update_barplot_fig(self.initialization_values["dataset_filter"], self.initialization_values["analysis_tab_dataset_qc_status_filter"], self.initialization_values["sample_filter"], self.initialization_values["sop_exp_analysis_tab_filter"], self.initialization_values["compensation_control_filter"], self.initialization_values["gating_control_filter"], self.initialization_values["pop_pheno_parent_filter"], self.initialization_values["analyzed_result_filter"]) analysis_barplot_fig = gr.Plot(elem_id="plot_wScrollBar", value=updater.barplot) barplot_not_reportable_msg = gr.Markdown( value="The assigned result type of the experiment is not reportable.", visible=False) return analysis_barplot_fig, barplot_not_reportable_msg def analysis_heatmap_components(self): gr.Markdown("Protocol for multi-exps comparison") compared_protocol_filter = gr.Radio(label='', choices=[ "Compensation control", "Population; Phenotype (Parent gate)"], value="Compensation control") include_CDA_results_checkbox = gr.Checkbox(label="Include available CDA results", value=False) updater = self.helper.updater updater.update_heatmap_fig(self.initialization_values["dataset_filter"], self.initialization_values["analysis_tab_dataset_qc_status_filter"], self.initialization_values["sample_filter"], self.initialization_values["sop_exp_analysis_tab_filter"], self.initialization_values["compensation_control_filter"], self.initialization_values["gating_control_filter"], self.initialization_values["pop_pheno_parent_filter"], self.initialization_values["analyzed_result_filter"], "Compensation control", False) with gr.Row(): analysis_heatmap_exp_info_table = gr.Dataframe( show_copy_button=True, value=updater.exp_info_table) analysis_heatmap_exp_comparison_table = gr.Dataframe( show_copy_button=True, value=updater.exp_comparison_table) analysis_heatmap_fig = gr.Plot(elem_id="plot_wScrollBar", value=updater.heatmap) heatmap_not_reportable_msg = gr.Markdown( value="The assigned result type of all compared experiments are not reportable.", visible=False) return compared_protocol_filter, include_CDA_results_checkbox, analysis_heatmap_exp_info_table, analysis_heatmap_exp_comparison_table, \ analysis_heatmap_fig, heatmap_not_reportable_msg class _Updater: def __init__(self, helper_instance): self.helper = helper_instance def select_all_choices(self, filter_name: str): return gr.update(value=self.helper.choices[filter_name]) def clean_all_choices(self): return gr.update(value=[]) def update_dataset_filter(self, selected_models: list[str], selected_datasets: list[str]): model_dataset_mapping = self.helper.file_qc_results.groupby("Instrument model")["Dataset"].apply( lambda x: sorted(x.unique().tolist())).to_dict() updated_dataset_choices = set() if selected_models: for selected_model in selected_models: if selected_model in model_dataset_mapping.keys(): updated_dataset_choices.update( model_dataset_mapping[selected_model]) updated_dataset_choices = sorted( list(updated_dataset_choices)) self.helper.choices["dataset_filter"] = updated_dataset_choices updated_dataset_values = [ item for item in selected_datasets if item in updated_dataset_choices] return gr.update(choices=updated_dataset_choices, value=updated_dataset_values) def update_material_filter(self, selected_sop_exps: list[str], selected_materials: list[str]): df = self.helper.file_qc_results.copy() df["Material"] = df["SOP-Exp"] + " " + df["Material"] sop_exp_material_mapping = df.groupby("SOP-Exp")["Material"].apply( lambda x: sorted(x.unique().tolist())).to_dict() updated_material_choices = set() if selected_sop_exps: for selected_sop_exp in selected_sop_exps: if selected_sop_exp in sop_exp_material_mapping.keys(): updated_material_choices.update( sop_exp_material_mapping[selected_sop_exp]) updated_material_choices = sorted( list(updated_material_choices)) self.helper.choices["material_filter"] = updated_material_choices updated_material_values = [ item for item in selected_materials if item in updated_material_choices] return gr.update(choices=updated_material_choices, value=updated_material_values) def _filter_dataset_qc_result(self, selected_datasets, selected_dataset_qc_status): return self.helper.dataset_qc_results[(self.helper.dataset_qc_results["Dataset"].isin(selected_datasets)) & (self.helper.dataset_qc_results["QC status"].isin(selected_dataset_qc_status))] def update_file_qc_table(self, selected_datasets: list[str], selected_sop_exps: list[str], selected_materials: list[str], selected_qc_issues: list[str], selected_dataset_qc_status: list[str], selected_file_qc_status: list[str]): self.filtered_dataset_qc_results = self._filter_dataset_qc_result( selected_datasets, selected_dataset_qc_status) selected_materials = [material.split( " ")[1] for material in selected_materials] self.filtered_file_qc_results = self.helper.file_qc_results.loc[(self.helper.file_qc_results["Dataset"].isin(self.filtered_dataset_qc_results["Dataset"])) & (self.helper.file_qc_results["SOP-Exp"].isin(selected_sop_exps)) & (self.helper.file_qc_results["Material"].isin(selected_materials)) & (self.helper.file_qc_results["QC status"].isin( selected_file_qc_status)), self.helper.config["qc_results_tab"]["file_infos"]+selected_qc_issues] if len(self.filtered_file_qc_results) == 0: file_qc_table_update = {"visible": False} file_qc_table_no_file_msg_update = {"visible": True} else: file_qc_table_update = { "value": {"data": self.filtered_file_qc_results.values.tolist(), "headers": self.filtered_file_qc_results.columns.to_list()}, "visible": True} file_qc_table_no_file_msg_update = {"visible": False} if len(self.filtered_dataset_qc_results) == 0: dataset_qc_table_update = {"visible": False} dataset_qc_table_no_dataset_msg_update = { "visible": True} else: dataset_qc_table_update = { "value": {"data": self.filtered_dataset_qc_results.values.tolist(), "headers": self.filtered_dataset_qc_results.columns.to_list()}, "visible": True} dataset_qc_table_no_dataset_msg_update = { "visible": False} return [gr.update(**file_qc_table_update), gr.update(**file_qc_table_no_file_msg_update), gr.update(**dataset_qc_table_update), gr.update(**dataset_qc_table_no_dataset_msg_update)] def update_qc_fig(self, selected_datasets: list[str], selected_sop_exps: list[str], selected_materials: list[str], selected_qc_issues: list[str], selected_dataset_qc_status: list[str]): filtered_dataset_qc_results = self.filtered_dataset_qc_results = self._filter_dataset_qc_result( selected_datasets, selected_dataset_qc_status) selected_materials = [material.split( " ")[1] for material in selected_materials] filtered_file_qc_results = self.helper.file_qc_results.loc[(self.helper.file_qc_results["Dataset"].isin(filtered_dataset_qc_results["Dataset"])) & (self.helper.file_qc_results["SOP-Exp"].isin(selected_sop_exps)) & (self.helper.file_qc_results["Material"].isin( selected_materials)), self.helper.config["qc_results_tab"]["file_infos"]+selected_qc_issues] if len(filtered_file_qc_results) == 0: return [gr.update(visible=False), gr.update(visible=True)] else: self.qc_visual = QCVisualizer.visualize(filtered_dataset_qc_results, self.helper.config["qc_results_tab"]["file_sets"], filtered_file_qc_results, selected_qc_issues) return [gr.update(value=self.qc_visual, visible=True), gr.update(visible=False)] def update_barplot_fig(self, selected_datasets: list[str], selected_dataset_qc_status: list[str], selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent, selected_results: list[str]): if any([s is None for s in [selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent]]): return [gr.update(visible=False), gr.update(visible=False)] filtered_dataset_qc_results = self.filtered_dataset_qc_results = self._filter_dataset_qc_result( selected_datasets, selected_dataset_qc_status) filtered_analyzed_gating_results = self.helper.analyzed_gating_results.loc[(self.helper.analyzed_gating_results["Dataset"].isin(filtered_dataset_qc_results["Dataset"])) & (self.helper.analyzed_gating_results["Sample"] == selected_sample) & (self.helper.analyzed_gating_results["SOP-Exp"] == selected_sop_exp) & (self.helper.analyzed_gating_results["Compensation control"] == selected_comp) & (self.helper.analyzed_gating_results["Gating control"] == selected_fmo) & (self.helper.analyzed_gating_results["Population; Phenotype (Parent gate)"] == selected_pop_pheno_parent), self.helper.config["gating_results_tab"]["exp_infos"]+[c for c in self.helper.analyzed_gating_results.columns if any(selected_result in c for selected_result in selected_results)]] # if (filtered_analyzed_gating_results[[f"{selected_result}_mean", f"{selected_result}_std"]] == "Not reportable").all(axis=None): # return [gr.update(visible=False), gr.update(visible=True)] self.barplot = AnalysisVisualizer.visualize_barplot(filtered_analyzed_gating_results, selected_results) return [gr.update(value=self.barplot, visible=True), gr.update(visible=False)] def update_heatmap_fig(self, selected_datasets: list[str], selected_dataset_qc_status: list[str], selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent, selected_results: list[str], selected_comparison, include_CDA): if include_CDA: analyzed_results = self.helper.analyzed_gating_results_wCDAResults else: analyzed_results = self.helper.analyzed_gating_results if any([s is None for s in [selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent]]): return [gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)] masks = {"Sample": analyzed_results["Sample"] == selected_sample, "SOP-Exp": analyzed_results["SOP-Exp"] == selected_sop_exp, "Compensation control": analyzed_results["Compensation control"] == selected_comp, "Gating control": analyzed_results["Gating control"] == selected_fmo, "Population; Phenotype (Parent gate)": analyzed_results["Population; Phenotype (Parent gate)"] == selected_pop_pheno_parent} filtering_mask = pd.DataFrame(pd.concat([mask for mask_name, mask in masks.items( ) if mask_name != selected_comparison], axis=1)).all(axis=1) filtered_dataset_qc_results = self.filtered_dataset_qc_results = self._filter_dataset_qc_result( selected_datasets, selected_dataset_qc_status) filtered_analyzed_gating_results = analyzed_results.loc[analyzed_results["Dataset"].isin(filtered_dataset_qc_results["Dataset"]) & filtering_mask, self.helper.config["gating_results_tab"]["exp_infos"]+[c for c in analyzed_results.columns if any(selected_result in c for selected_result in selected_results)]] # if (filtered_analyzed_gating_results[[f"{selected_result}_mean", f"{selected_result}_std"]] == "Not reportable").all(axis=None): # return [gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True)] self.exp_info_table = filtered_analyzed_gating_results[[ e for e in self.helper.config["gating_results_tab"]["exp_infos"] if e not in ["Dataset", "Site (anonymized)", "Instrument model", "Result ID"]]] if selected_comparison == "Population; Phenotype (Parent gate)": shown_exp_comparison = [ "Population", "Phenotype", "Parent gate"] else: shown_exp_comparison = [selected_comparison] self.exp_info_table = pd.DataFrame(self.exp_info_table).drop( columns=shown_exp_comparison) self.exp_info_table = self.exp_info_table.value_counts().reset_index().drop( columns="count").T.reset_index().set_axis(["Protocol", "Content"], axis=1) self.exp_comparison_table = filtered_analyzed_gating_results[["Result ID"]+shown_exp_comparison].value_counts( ).reset_index().drop(columns="count") if selected_comparison == "Population; Phenotype (Parent gate)": filtered_analyzed_gating_results["Result ID"] = filtered_analyzed_gating_results["Result ID"].astype(str) + \ " (" + filtered_analyzed_gating_results["Population"] + ")" else: filtered_analyzed_gating_results["Result ID"] = filtered_analyzed_gating_results["Result ID"].astype(str) + \ " (" + filtered_analyzed_gating_results[selected_comparison] + ")" self.heatmap = AnalysisVisualizer.visualize_heatmap(filtered_analyzed_gating_results, selected_results) return [gr.update(value=self.exp_info_table, visible=True), gr.update(value=self.exp_comparison_table, visible=True), gr.update(value=self.heatmap, visible=True), gr.update(visible=False)] def update_analysis_tab_filters(self, selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent): masks = {} col_selection_mapping = dict(zip(["Sample", "SOP-Exp", "Compensation control", "Gating control", "Population; Phenotype (Parent gate)"], [selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent])) for col, selection in col_selection_mapping.items(): if selection is not None: masks[col] = self.helper.analyzed_gating_results[col] == selection if len(masks) == 0: return self.clear_analysis_tab_filters() filtering_mask = pd.DataFrame( pd.concat([mask for mask in masks.values()], axis=1)).all(axis=1) filtered_analyzed_gating_results = self.helper.analyzed_gating_results[ filtering_mask] updated_choices = {} updated_values = {} for col in ["Sample", "SOP-Exp", "Compensation control", "Gating control", "Population; Phenotype (Parent gate)"]: updated_choices[col] = sorted( list(filtered_analyzed_gating_results[col].unique())) updated_values[col] = ( col_selection_mapping[col] if col_selection_mapping[col] in updated_choices[col] else None) return [gr.update(choices=updated_choices[col], value=updated_values[col]) for col in ["Sample", "SOP-Exp", "Compensation control", "Gating control", "Population; Phenotype (Parent gate)"]] def clear_analysis_tab_filters(self): return [gr.update(choices=self.helper.choices["sample_filter"], value=None), gr.update( choices=self.helper.choices["sop_exp_analysis_tab_filter"], value=None), gr.update( choices=self.helper.choices["compensation_control_filter"], value=None), gr.update( choices=self.helper.choices["gating_control_filter"], value=None), gr.update(choices=self.helper.choices["pop_pheno_parent_filter"], value=None)]