|
|
import pandas as pd |
|
|
import gradio as gr |
|
|
from .visualization import QCVisualizer, AnalysisVisualizer |
|
|
|
|
|
|
|
|
class ComponentHelper: |
|
|
def __init__(self, |
|
|
file_qc_results: pd.DataFrame, |
|
|
dataset_qc_results: pd.DataFrame, |
|
|
analyzed_gating_results: pd.DataFrame, |
|
|
analyzed_gating_results_wCDAResults: pd.DataFrame, |
|
|
config: dict): |
|
|
self.file_qc_results = file_qc_results |
|
|
self.dataset_qc_results = dataset_qc_results |
|
|
self.analyzed_gating_results = analyzed_gating_results |
|
|
self.analyzed_gating_results_wCDAResults = analyzed_gating_results_wCDAResults |
|
|
self.config = config |
|
|
|
|
|
self.choices = {"dataset_filter": list(file_qc_results["Dataset"].unique()), |
|
|
"model_filter": sorted(list(file_qc_results["Instrument model"].unique())), |
|
|
"sop_exp_qc_tab_filter": list(file_qc_results["SOP-Exp"].unique()), |
|
|
"material_filter": list((file_qc_results["SOP-Exp"] + " " + file_qc_results["Material"]).unique()), |
|
|
"issue_filter": config["qc_results_tab"]["issues"], |
|
|
"sample_filter": list(analyzed_gating_results["Sample"].unique()), |
|
|
"sop_exp_analysis_tab_filter": list(analyzed_gating_results["SOP-Exp"].unique()), |
|
|
"compensation_control_filter": list(analyzed_gating_results["Compensation control"].unique()), |
|
|
"gating_control_filter": list(analyzed_gating_results["Gating control"].unique()), |
|
|
"pop_pheno_parent_filter": list(analyzed_gating_results["Population; Phenotype (Parent gate)"].unique()), |
|
|
"analyzed_result_filter": config["gating_results_tab"]["results"], |
|
|
} |
|
|
|
|
|
self.creator = self._Creator(self) |
|
|
self.updater = self._Updater(self) |
|
|
|
|
|
class _Creator: |
|
|
def __init__(self, |
|
|
helper_instance): |
|
|
self.helper = helper_instance |
|
|
self.initialization_values = {} |
|
|
|
|
|
def main_filter_components(self): |
|
|
with gr.Row(): |
|
|
gr.Markdown("Instrument model") |
|
|
clear_all_models_button = gr.Button("Clear All", size="sm") |
|
|
select_all_models_button = gr.Button("Select All", size="sm") |
|
|
model_filter = gr.Dropdown(label='', multiselect=True, |
|
|
choices=self.helper.choices["model_filter"], |
|
|
value=self.helper.choices["model_filter"]) |
|
|
self.initialization_values["model_filter"] = self.helper.choices["model_filter"] |
|
|
|
|
|
with gr.Row(): |
|
|
gr.Markdown("Dataset") |
|
|
clear_all_datasets_button = gr.Button("Clear All", size="sm") |
|
|
select_all_datasets_button = gr.Button( |
|
|
"Select All", size="sm") |
|
|
dataset_filter = gr.Dropdown(label='', multiselect=True, |
|
|
choices=self.helper.choices["dataset_filter"], |
|
|
value=self.helper.choices["dataset_filter"]) |
|
|
self.initialization_values["dataset_filter"] = self.helper.choices["dataset_filter"] |
|
|
|
|
|
main_apply_filters_button = gr.Button(value="Apply filters") |
|
|
|
|
|
return clear_all_models_button, select_all_models_button, model_filter, \ |
|
|
clear_all_datasets_button, select_all_datasets_button, dataset_filter, \ |
|
|
main_apply_filters_button |
|
|
|
|
|
def qc_tab_filter_components(self): |
|
|
with gr.Row(): |
|
|
gr.Markdown("SOP-Exp") |
|
|
clear_all_sop_exps_qc_tab_button = gr.Button( |
|
|
"Clear All", size="sm") |
|
|
select_all_sop_exps_qc_tab_button = gr.Button( |
|
|
"Select All", size="sm") |
|
|
sop_exp_qc_tab_filter = gr.Dropdown(label='', multiselect=True, |
|
|
choices=self.helper.choices["sop_exp_qc_tab_filter"], |
|
|
value=self.helper.choices["sop_exp_qc_tab_filter"]) |
|
|
self.initialization_values["sop_exp_qc_tab_filter"] = self.helper.choices["sop_exp_qc_tab_filter"] |
|
|
|
|
|
with gr.Row(): |
|
|
gr.Markdown("Material") |
|
|
clear_all_materials_button = gr.Button("Clear All", size="sm") |
|
|
select_all_materials_button = gr.Button( |
|
|
"Select All", size="sm") |
|
|
material_filter = gr.Dropdown(label='', multiselect=True, |
|
|
choices=self.helper.choices["material_filter"], |
|
|
value=self.helper.choices["material_filter"]) |
|
|
self.initialization_values["material_filter"] = self.helper.choices["material_filter"] |
|
|
|
|
|
with gr.Row(): |
|
|
gr.Markdown("Issues") |
|
|
clear_all_issues_button = gr.Button("Clear All", size="sm") |
|
|
select_all_issues_button = gr.Button("Select All", size="sm") |
|
|
issue_filter = gr.Dropdown(label='', multiselect=True, |
|
|
choices=self.helper.choices["issue_filter"], |
|
|
value=self.helper.choices["issue_filter"]) |
|
|
self.initialization_values["issue_filter"] = self.helper.choices["issue_filter"] |
|
|
|
|
|
qc_tab_apply_filters_button = gr.Button(value="Apply filters") |
|
|
|
|
|
return clear_all_sop_exps_qc_tab_button, select_all_sop_exps_qc_tab_button, sop_exp_qc_tab_filter, \ |
|
|
clear_all_materials_button, select_all_materials_button, material_filter, \ |
|
|
clear_all_issues_button, select_all_issues_button, issue_filter, \ |
|
|
qc_tab_apply_filters_button |
|
|
|
|
|
def qc_tab_dataset_qc_status_filter_component(self): |
|
|
gr.Markdown("Dataset QC status") |
|
|
qc_tab_dataset_qc_status_filter = gr.CheckboxGroup(label='', |
|
|
choices=[ |
|
|
"Pass QC", "Fail QC"], |
|
|
value=["Pass QC", "Fail QC"]) |
|
|
self.initialization_values["qc_tab_dataset_qc_status_filter"] = [ |
|
|
"Pass QC", "Fail QC"] |
|
|
|
|
|
return qc_tab_dataset_qc_status_filter |
|
|
|
|
|
def file_qc_result_components(self): |
|
|
gr.Markdown("File QC status") |
|
|
file_qc_status_filter = gr.CheckboxGroup(label='', |
|
|
choices=[ |
|
|
"Pass QC", "Fail QC"], |
|
|
value=["Fail QC"]) |
|
|
self.initialization_values["file_qc_status_filter"] = ["Fail QC"] |
|
|
|
|
|
updater = self.helper.updater |
|
|
updater.update_file_qc_table(self.initialization_values["dataset_filter"], |
|
|
self.initialization_values["sop_exp_qc_tab_filter"], |
|
|
self.initialization_values["material_filter"], |
|
|
self.initialization_values["issue_filter"], |
|
|
self.initialization_values["qc_tab_dataset_qc_status_filter"], |
|
|
self.initialization_values["file_qc_status_filter"]) |
|
|
file_qc_table = gr.Dataframe( |
|
|
type="pandas", |
|
|
show_copy_button=True, show_row_numbers=True, |
|
|
value=updater.filtered_file_qc_results) |
|
|
|
|
|
file_qc_table_no_file_msg = gr.Markdown( |
|
|
value="All files are filtered", visible=False) |
|
|
|
|
|
return file_qc_status_filter, file_qc_table, file_qc_table_no_file_msg |
|
|
|
|
|
def dataset_qc_result_components(self): |
|
|
updater = self.helper.updater |
|
|
updater.update_file_qc_table(self.initialization_values["dataset_filter"], |
|
|
self.initialization_values["sop_exp_qc_tab_filter"], |
|
|
self.initialization_values["material_filter"], |
|
|
self.initialization_values["issue_filter"], |
|
|
self.initialization_values["qc_tab_dataset_qc_status_filter"], |
|
|
self.initialization_values["file_qc_status_filter"]) |
|
|
dataset_qc_table = gr.Dataframe( |
|
|
type="pandas", |
|
|
show_copy_button=True, show_row_numbers=True, |
|
|
value=updater.filtered_dataset_qc_results) |
|
|
|
|
|
dataset_qc_table_no_dataset_msg = gr.Markdown( |
|
|
value="All datasets are filtered", visible=False) |
|
|
|
|
|
return dataset_qc_table, dataset_qc_table_no_dataset_msg |
|
|
|
|
|
def qc_result_visual_components(self): |
|
|
updater = self.helper.updater |
|
|
updater.update_qc_fig(self.initialization_values["dataset_filter"], |
|
|
self.initialization_values["sop_exp_qc_tab_filter"], |
|
|
self.initialization_values["material_filter"], |
|
|
self.initialization_values["issue_filter"], |
|
|
self.initialization_values["qc_tab_dataset_qc_status_filter"]) |
|
|
qc_fig = gr.Plot(elem_id="plot_wScrollBar", |
|
|
value=updater.qc_visual) |
|
|
qc_fig_no_file_msg = gr.Markdown( |
|
|
value="All files are filtered", visible=False) |
|
|
|
|
|
return qc_fig, qc_fig_no_file_msg |
|
|
|
|
|
def analysis_tab_filter_components(self): |
|
|
gr.Markdown("Sample") |
|
|
sample_filter = gr.Dropdown(label='', |
|
|
choices=self.helper.choices["sample_filter"], |
|
|
value=self.helper.choices["sample_filter"][0]) |
|
|
self.initialization_values["sample_filter"] = self.helper.choices["sample_filter"][0] |
|
|
|
|
|
gr.Markdown("SOP-Exp") |
|
|
sop_exp_analysis_tab_filter = gr.Dropdown(label='', |
|
|
choices=self.helper.choices["sop_exp_analysis_tab_filter"], |
|
|
value=self.helper.choices["sop_exp_analysis_tab_filter"][0]) |
|
|
self.initialization_values["sop_exp_analysis_tab_filter"] = self.helper.choices["sop_exp_analysis_tab_filter"][0] |
|
|
|
|
|
gr.Markdown("Compensation control") |
|
|
compensation_control_filter = gr.Dropdown(label='', |
|
|
choices=self.helper.choices["compensation_control_filter"], |
|
|
value=self.helper.choices["compensation_control_filter"][0]) |
|
|
self.initialization_values["compensation_control_filter"] = self.helper.choices["compensation_control_filter"][0] |
|
|
|
|
|
gr.Markdown("Gating control") |
|
|
gating_control_filter = gr.Dropdown(label='', |
|
|
choices=self.helper.choices["gating_control_filter"], |
|
|
value=self.helper.choices["gating_control_filter"][0]) |
|
|
self.initialization_values["gating_control_filter"] = self.helper.choices["gating_control_filter"][0] |
|
|
|
|
|
gr.Markdown("Population; Phenotype (Parent gate)") |
|
|
pop_pheno_parent_filter = gr.Dropdown(label='', |
|
|
choices=self.helper.choices["pop_pheno_parent_filter"], |
|
|
value=self.helper.choices["pop_pheno_parent_filter"][3]) |
|
|
self.initialization_values["pop_pheno_parent_filter"] = self.helper.choices["pop_pheno_parent_filter"][3] |
|
|
|
|
|
clear_gating_tab_filters_button = gr.Button( |
|
|
value="Clear selections") |
|
|
analysis_tab_apply_filters_button = gr.Button( |
|
|
value="Apply filters") |
|
|
|
|
|
return sample_filter, sop_exp_analysis_tab_filter, compensation_control_filter, \ |
|
|
gating_control_filter, pop_pheno_parent_filter, clear_gating_tab_filters_button, analysis_tab_apply_filters_button |
|
|
|
|
|
def analyzed_result_filter_component(self): |
|
|
gr.Markdown("Dataset QC status") |
|
|
analysis_tab_dataset_qc_status_filter = gr.CheckboxGroup(label='', |
|
|
choices=[ |
|
|
"Pass QC", "Fail QC"], |
|
|
value=["Pass QC"]) |
|
|
self.initialization_values["analysis_tab_dataset_qc_status_filter"] = [ |
|
|
"Pass QC"] |
|
|
|
|
|
gr.Markdown("Analyzed result") |
|
|
analyzed_result_filter = gr.Dropdown(label='', |
|
|
multiselect=True, |
|
|
choices=self.helper.config["gating_results_tab"]["results"], |
|
|
value=[self.helper.config["gating_results_tab"]["results"][0]]) |
|
|
self.initialization_values["analyzed_result_filter"] = [ |
|
|
self.helper.config["gating_results_tab"]["results"][0]] |
|
|
|
|
|
return analysis_tab_dataset_qc_status_filter, analyzed_result_filter |
|
|
|
|
|
def analysis_barplot_components(self): |
|
|
updater = self.helper.updater |
|
|
updater.update_barplot_fig(self.initialization_values["dataset_filter"], |
|
|
self.initialization_values["analysis_tab_dataset_qc_status_filter"], |
|
|
self.initialization_values["sample_filter"], |
|
|
self.initialization_values["sop_exp_analysis_tab_filter"], |
|
|
self.initialization_values["compensation_control_filter"], |
|
|
self.initialization_values["gating_control_filter"], |
|
|
self.initialization_values["pop_pheno_parent_filter"], |
|
|
self.initialization_values["analyzed_result_filter"]) |
|
|
analysis_barplot_fig = gr.Plot(elem_id="plot_wScrollBar", |
|
|
value=updater.barplot) |
|
|
barplot_not_reportable_msg = gr.Markdown( |
|
|
value="The assigned result type of the experiment is not reportable.", visible=False) |
|
|
|
|
|
return analysis_barplot_fig, barplot_not_reportable_msg |
|
|
|
|
|
def analysis_heatmap_components(self): |
|
|
gr.Markdown("Protocol for multi-exps comparison") |
|
|
compared_protocol_filter = gr.Radio(label='', |
|
|
choices=[ |
|
|
"Compensation control", "Population; Phenotype (Parent gate)"], |
|
|
value="Compensation control") |
|
|
|
|
|
include_CDA_results_checkbox = gr.Checkbox(label="Include available CDA results", |
|
|
value=False) |
|
|
|
|
|
updater = self.helper.updater |
|
|
updater.update_heatmap_fig(self.initialization_values["dataset_filter"], |
|
|
self.initialization_values["analysis_tab_dataset_qc_status_filter"], |
|
|
self.initialization_values["sample_filter"], |
|
|
self.initialization_values["sop_exp_analysis_tab_filter"], |
|
|
self.initialization_values["compensation_control_filter"], |
|
|
self.initialization_values["gating_control_filter"], |
|
|
self.initialization_values["pop_pheno_parent_filter"], |
|
|
self.initialization_values["analyzed_result_filter"], |
|
|
"Compensation control", |
|
|
False) |
|
|
with gr.Row(): |
|
|
analysis_heatmap_exp_info_table = gr.Dataframe( |
|
|
show_copy_button=True, value=updater.exp_info_table) |
|
|
analysis_heatmap_exp_comparison_table = gr.Dataframe( |
|
|
show_copy_button=True, value=updater.exp_comparison_table) |
|
|
analysis_heatmap_fig = gr.Plot(elem_id="plot_wScrollBar", |
|
|
value=updater.heatmap) |
|
|
heatmap_not_reportable_msg = gr.Markdown( |
|
|
value="The assigned result type of all compared experiments are not reportable.", visible=False) |
|
|
|
|
|
return compared_protocol_filter, include_CDA_results_checkbox, analysis_heatmap_exp_info_table, analysis_heatmap_exp_comparison_table, \ |
|
|
analysis_heatmap_fig, heatmap_not_reportable_msg |
|
|
|
|
|
class _Updater: |
|
|
def __init__(self, |
|
|
helper_instance): |
|
|
self.helper = helper_instance |
|
|
|
|
|
def select_all_choices(self, filter_name: str): |
|
|
return gr.update(value=self.helper.choices[filter_name]) |
|
|
|
|
|
def clean_all_choices(self): |
|
|
return gr.update(value=[]) |
|
|
|
|
|
def update_dataset_filter(self, selected_models: list[str], selected_datasets: list[str]): |
|
|
model_dataset_mapping = self.helper.file_qc_results.groupby("Instrument model")["Dataset"].apply( |
|
|
lambda x: sorted(x.unique().tolist())).to_dict() |
|
|
updated_dataset_choices = set() |
|
|
if selected_models: |
|
|
for selected_model in selected_models: |
|
|
if selected_model in model_dataset_mapping.keys(): |
|
|
updated_dataset_choices.update( |
|
|
model_dataset_mapping[selected_model]) |
|
|
updated_dataset_choices = sorted( |
|
|
list(updated_dataset_choices)) |
|
|
self.helper.choices["dataset_filter"] = updated_dataset_choices |
|
|
updated_dataset_values = [ |
|
|
item for item in selected_datasets if item in updated_dataset_choices] |
|
|
return gr.update(choices=updated_dataset_choices, |
|
|
value=updated_dataset_values) |
|
|
|
|
|
def update_material_filter(self, selected_sop_exps: list[str], selected_materials: list[str]): |
|
|
df = self.helper.file_qc_results.copy() |
|
|
df["Material"] = df["SOP-Exp"] + " " + df["Material"] |
|
|
sop_exp_material_mapping = df.groupby("SOP-Exp")["Material"].apply( |
|
|
lambda x: sorted(x.unique().tolist())).to_dict() |
|
|
updated_material_choices = set() |
|
|
if selected_sop_exps: |
|
|
for selected_sop_exp in selected_sop_exps: |
|
|
if selected_sop_exp in sop_exp_material_mapping.keys(): |
|
|
updated_material_choices.update( |
|
|
sop_exp_material_mapping[selected_sop_exp]) |
|
|
updated_material_choices = sorted( |
|
|
list(updated_material_choices)) |
|
|
self.helper.choices["material_filter"] = updated_material_choices |
|
|
updated_material_values = [ |
|
|
item for item in selected_materials if item in updated_material_choices] |
|
|
return gr.update(choices=updated_material_choices, |
|
|
value=updated_material_values) |
|
|
|
|
|
def _filter_dataset_qc_result(self, selected_datasets, selected_dataset_qc_status): |
|
|
return self.helper.dataset_qc_results[(self.helper.dataset_qc_results["Dataset"].isin(selected_datasets)) & |
|
|
(self.helper.dataset_qc_results["QC status"].isin(selected_dataset_qc_status))] |
|
|
|
|
|
def update_file_qc_table(self, |
|
|
selected_datasets: list[str], selected_sop_exps: list[str], selected_materials: list[str], |
|
|
selected_qc_issues: list[str], selected_dataset_qc_status: list[str], selected_file_qc_status: list[str]): |
|
|
self.filtered_dataset_qc_results = self._filter_dataset_qc_result( |
|
|
selected_datasets, selected_dataset_qc_status) |
|
|
selected_materials = [material.split( |
|
|
" ")[1] for material in selected_materials] |
|
|
self.filtered_file_qc_results = self.helper.file_qc_results.loc[(self.helper.file_qc_results["Dataset"].isin(self.filtered_dataset_qc_results["Dataset"])) & |
|
|
(self.helper.file_qc_results["SOP-Exp"].isin(selected_sop_exps)) & |
|
|
(self.helper.file_qc_results["Material"].isin(selected_materials)) & |
|
|
(self.helper.file_qc_results["QC status"].isin( |
|
|
selected_file_qc_status)), |
|
|
self.helper.config["qc_results_tab"]["file_infos"]+selected_qc_issues] |
|
|
|
|
|
if len(self.filtered_file_qc_results) == 0: |
|
|
file_qc_table_update = {"visible": False} |
|
|
file_qc_table_no_file_msg_update = {"visible": True} |
|
|
else: |
|
|
file_qc_table_update = { |
|
|
"value": {"data": self.filtered_file_qc_results.values.tolist(), |
|
|
"headers": self.filtered_file_qc_results.columns.to_list()}, |
|
|
"visible": True} |
|
|
file_qc_table_no_file_msg_update = {"visible": False} |
|
|
if len(self.filtered_dataset_qc_results) == 0: |
|
|
dataset_qc_table_update = {"visible": False} |
|
|
dataset_qc_table_no_dataset_msg_update = { |
|
|
"visible": True} |
|
|
else: |
|
|
dataset_qc_table_update = { |
|
|
"value": {"data": self.filtered_dataset_qc_results.values.tolist(), |
|
|
"headers": self.filtered_dataset_qc_results.columns.to_list()}, |
|
|
"visible": True} |
|
|
dataset_qc_table_no_dataset_msg_update = { |
|
|
"visible": False} |
|
|
return [gr.update(**file_qc_table_update), |
|
|
gr.update(**file_qc_table_no_file_msg_update), |
|
|
gr.update(**dataset_qc_table_update), |
|
|
gr.update(**dataset_qc_table_no_dataset_msg_update)] |
|
|
|
|
|
def update_qc_fig(self, |
|
|
selected_datasets: list[str], selected_sop_exps: list[str], selected_materials: list[str], |
|
|
selected_qc_issues: list[str], selected_dataset_qc_status: list[str]): |
|
|
filtered_dataset_qc_results = self.filtered_dataset_qc_results = self._filter_dataset_qc_result( |
|
|
selected_datasets, selected_dataset_qc_status) |
|
|
selected_materials = [material.split( |
|
|
" ")[1] for material in selected_materials] |
|
|
filtered_file_qc_results = self.helper.file_qc_results.loc[(self.helper.file_qc_results["Dataset"].isin(filtered_dataset_qc_results["Dataset"])) & |
|
|
(self.helper.file_qc_results["SOP-Exp"].isin(selected_sop_exps)) & |
|
|
(self.helper.file_qc_results["Material"].isin( |
|
|
selected_materials)), |
|
|
self.helper.config["qc_results_tab"]["file_infos"]+selected_qc_issues] |
|
|
if len(filtered_file_qc_results) == 0: |
|
|
return [gr.update(visible=False), gr.update(visible=True)] |
|
|
else: |
|
|
self.qc_visual = QCVisualizer.visualize(filtered_dataset_qc_results, |
|
|
self.helper.config["qc_results_tab"]["file_sets"], |
|
|
filtered_file_qc_results, |
|
|
selected_qc_issues) |
|
|
return [gr.update(value=self.qc_visual, |
|
|
visible=True), |
|
|
gr.update(visible=False)] |
|
|
|
|
|
def update_barplot_fig(self, |
|
|
selected_datasets: list[str], selected_dataset_qc_status: list[str], |
|
|
selected_sample, selected_sop_exp, selected_comp, |
|
|
selected_fmo, selected_pop_pheno_parent, selected_results: list[str]): |
|
|
if any([s is None for s in [selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent]]): |
|
|
return [gr.update(visible=False), gr.update(visible=False)] |
|
|
|
|
|
filtered_dataset_qc_results = self.filtered_dataset_qc_results = self._filter_dataset_qc_result( |
|
|
selected_datasets, selected_dataset_qc_status) |
|
|
filtered_analyzed_gating_results = self.helper.analyzed_gating_results.loc[(self.helper.analyzed_gating_results["Dataset"].isin(filtered_dataset_qc_results["Dataset"])) & |
|
|
(self.helper.analyzed_gating_results["Sample"] == selected_sample) & |
|
|
(self.helper.analyzed_gating_results["SOP-Exp"] == selected_sop_exp) & |
|
|
(self.helper.analyzed_gating_results["Compensation control"] == selected_comp) & |
|
|
(self.helper.analyzed_gating_results["Gating control"] == selected_fmo) & |
|
|
(self.helper.analyzed_gating_results["Population; Phenotype (Parent gate)"] == selected_pop_pheno_parent), |
|
|
self.helper.config["gating_results_tab"]["exp_infos"]+[c for c in self.helper.analyzed_gating_results.columns if any(selected_result in c for selected_result in selected_results)]] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.barplot = AnalysisVisualizer.visualize_barplot(filtered_analyzed_gating_results, |
|
|
selected_results) |
|
|
return [gr.update(value=self.barplot, |
|
|
visible=True), |
|
|
gr.update(visible=False)] |
|
|
|
|
|
def update_heatmap_fig(self, |
|
|
selected_datasets: list[str], selected_dataset_qc_status: list[str], |
|
|
selected_sample, selected_sop_exp, selected_comp, |
|
|
selected_fmo, selected_pop_pheno_parent, selected_results: list[str], selected_comparison, |
|
|
include_CDA): |
|
|
if include_CDA: |
|
|
analyzed_results = self.helper.analyzed_gating_results_wCDAResults |
|
|
else: |
|
|
analyzed_results = self.helper.analyzed_gating_results |
|
|
|
|
|
if any([s is None for s in [selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent]]): |
|
|
return [gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)] |
|
|
masks = {"Sample": analyzed_results["Sample"] == selected_sample, |
|
|
"SOP-Exp": analyzed_results["SOP-Exp"] == selected_sop_exp, |
|
|
"Compensation control": analyzed_results["Compensation control"] == selected_comp, |
|
|
"Gating control": analyzed_results["Gating control"] == selected_fmo, |
|
|
"Population; Phenotype (Parent gate)": analyzed_results["Population; Phenotype (Parent gate)"] == selected_pop_pheno_parent} |
|
|
filtering_mask = pd.DataFrame(pd.concat([mask for mask_name, mask in masks.items( |
|
|
) if mask_name != selected_comparison], axis=1)).all(axis=1) |
|
|
|
|
|
filtered_dataset_qc_results = self.filtered_dataset_qc_results = self._filter_dataset_qc_result( |
|
|
selected_datasets, selected_dataset_qc_status) |
|
|
filtered_analyzed_gating_results = analyzed_results.loc[analyzed_results["Dataset"].isin(filtered_dataset_qc_results["Dataset"]) & |
|
|
filtering_mask, |
|
|
self.helper.config["gating_results_tab"]["exp_infos"]+[c for c in analyzed_results.columns if any(selected_result in c for selected_result in selected_results)]] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.exp_info_table = filtered_analyzed_gating_results[[ |
|
|
e for e in self.helper.config["gating_results_tab"]["exp_infos"] if e not in ["Dataset", "Site (anonymized)", "Instrument model", "Result ID"]]] |
|
|
|
|
|
if selected_comparison == "Population; Phenotype (Parent gate)": |
|
|
shown_exp_comparison = [ |
|
|
"Population", "Phenotype", "Parent gate"] |
|
|
else: |
|
|
shown_exp_comparison = [selected_comparison] |
|
|
|
|
|
self.exp_info_table = pd.DataFrame(self.exp_info_table).drop( |
|
|
columns=shown_exp_comparison) |
|
|
self.exp_info_table = self.exp_info_table.value_counts().reset_index().drop( |
|
|
columns="count").T.reset_index().set_axis(["Protocol", "Content"], axis=1) |
|
|
|
|
|
self.exp_comparison_table = filtered_analyzed_gating_results[["Result ID"]+shown_exp_comparison].value_counts( |
|
|
).reset_index().drop(columns="count") |
|
|
|
|
|
if selected_comparison == "Population; Phenotype (Parent gate)": |
|
|
filtered_analyzed_gating_results["Result ID"] = filtered_analyzed_gating_results["Result ID"].astype(str) + \ |
|
|
" (" + filtered_analyzed_gating_results["Population"] + ")" |
|
|
else: |
|
|
filtered_analyzed_gating_results["Result ID"] = filtered_analyzed_gating_results["Result ID"].astype(str) + \ |
|
|
" (" + filtered_analyzed_gating_results[selected_comparison] + ")" |
|
|
|
|
|
self.heatmap = AnalysisVisualizer.visualize_heatmap(filtered_analyzed_gating_results, |
|
|
selected_results) |
|
|
|
|
|
return [gr.update(value=self.exp_info_table, visible=True), |
|
|
gr.update(value=self.exp_comparison_table, visible=True), |
|
|
gr.update(value=self.heatmap, |
|
|
visible=True), |
|
|
gr.update(visible=False)] |
|
|
|
|
|
def update_analysis_tab_filters(self, selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent): |
|
|
masks = {} |
|
|
col_selection_mapping = dict(zip(["Sample", "SOP-Exp", "Compensation control", "Gating control", "Population; Phenotype (Parent gate)"], |
|
|
[selected_sample, selected_sop_exp, selected_comp, selected_fmo, selected_pop_pheno_parent])) |
|
|
for col, selection in col_selection_mapping.items(): |
|
|
if selection is not None: |
|
|
masks[col] = self.helper.analyzed_gating_results[col] == selection |
|
|
if len(masks) == 0: |
|
|
return self.clear_analysis_tab_filters() |
|
|
|
|
|
filtering_mask = pd.DataFrame( |
|
|
pd.concat([mask for mask in masks.values()], axis=1)).all(axis=1) |
|
|
filtered_analyzed_gating_results = self.helper.analyzed_gating_results[ |
|
|
filtering_mask] |
|
|
|
|
|
updated_choices = {} |
|
|
updated_values = {} |
|
|
for col in ["Sample", "SOP-Exp", "Compensation control", "Gating control", "Population; Phenotype (Parent gate)"]: |
|
|
updated_choices[col] = sorted( |
|
|
list(filtered_analyzed_gating_results[col].unique())) |
|
|
updated_values[col] = ( |
|
|
col_selection_mapping[col] if col_selection_mapping[col] in updated_choices[col] else None) |
|
|
|
|
|
return [gr.update(choices=updated_choices[col], value=updated_values[col]) |
|
|
for col in ["Sample", "SOP-Exp", "Compensation control", "Gating control", "Population; Phenotype (Parent gate)"]] |
|
|
|
|
|
def clear_analysis_tab_filters(self): |
|
|
return [gr.update(choices=self.helper.choices["sample_filter"], value=None), |
|
|
gr.update( |
|
|
choices=self.helper.choices["sop_exp_analysis_tab_filter"], value=None), |
|
|
gr.update( |
|
|
choices=self.helper.choices["compensation_control_filter"], value=None), |
|
|
gr.update( |
|
|
choices=self.helper.choices["gating_control_filter"], value=None), |
|
|
gr.update(choices=self.helper.choices["pop_pheno_parent_filter"], value=None)] |
|
|
|