import json import tempfile import pandas as pd from phytochemMiner import TaxaData from shiny import App, reactive, render, ui from helper_functions import highlight_text app_ui = ui.page_fluid( ui.row( ui.column( 4, ui.h2("Upload Files to Begin"), ui.input_file("json_files", "Upload JSON File Outputs from phytochemMiner", multiple=True, accept=[".json"]), ui.input_file("previous_result_files", "Import previously verified results from this tool (Optional)", multiple=False, accept=[".csv"]), ui.input_action_button("process_files", "Process Files", class_="btn btn-primary"), ui.hr() ), ), ui.row( ui.column( 8, ui.h2("Highlighted Text Viewer"), ui.output_ui("output_text", style=( "margin-top: 20px; " "padding: 20px; " "border: 1px solid #ccc; " "background-color: #fdfdfd; " "border-radius: 5px; " "box-shadow: 2px 2px 5px rgba(0, 0, 0, 0.1); " "font-family: Arial, sans-serif; " "font-size: 16px; " "line-height: 1.6; " "overflow-y: auto; " "max-height: 400px;" ) ) ), ui.column( 4, ui.div( ui.h3("Details Panel"), ui.HTML(f'File: {ui.output_text('file_name_text')}'), ui.HTML( f"Is compound: {ui.output_text('compound_text')} found in organism: " f"{ui.output_text('name_text')} according to the text?" ), ui.p('Parts of possibly relevant organism names are highlighted in ', ui.HTML('Red')), ui.p('Parts of possibly relevant compound names are highlighted in ', ui.HTML('Blue')), ui.div( ui.input_action_button("submit_button", "Yes", class_="btn btn-primary"), ui.input_action_button("pass_button", "No", class_="btn btn-secondary", style="margin-left: 10px;"), ), ui.p(), ui.div(ui.download_button("download_results", "Download Results", class_="btn-success")), style="background-color: #f8f9fa; padding: 10px; border: 1px solid #ccc;" "position: sticky; " "top: 20px; " "z-index: 1000; " ), ), ) ) def server(input, output, session): reactive_current_json_index = reactive.Value(0) reactive_current_taxon_index = reactive.Value(0) reactive_current_compound_index = reactive.Value(0) reactive_TaxaData_annotations = reactive.Value({}) saved_results = reactive.Value([]) reactive_finished = reactive.Value(False) reactive_previous_results = reactive.Value(None) @reactive.Effect @reactive.event(input.process_files) def handle_file_uploads(): """Processes the uploaded files.""" # Process Annotation Files json_files = input.json_files() uploaded_dicts = {} for json_file in json_files: with open(json_file['datapath'], "r") as file_: json_dict = json.load(file_) loaded_taxaData = TaxaData.model_validate(json_dict) if len(loaded_taxaData.taxa) == 0: print(f'No taxa for {json_file}') continue elif loaded_taxaData.text is None: print(f'No text for {json_file}') continue else: # Create copy of taxa list with only relevant taxa and compounds modified_taxa = [] for t in loaded_taxaData.taxa: if t.accepted_name is not None: t.verified_compounds = list(t.inchi_keys.keys()) if len(t.verified_compounds) > 0: modified_taxa.append(t) if len(modified_taxa) > 0: modified_TaxaData = TaxaData(taxa=modified_taxa) modified_TaxaData.text = loaded_taxaData.text uploaded_dicts[json_file["name"]] = modified_TaxaData reactive_TaxaData_annotations.set(uploaded_dicts) previous_result_files = input.previous_result_files() if previous_result_files is not None: print(previous_result_files) result_data = pd.read_csv(previous_result_files[0]['datapath']) reactive_previous_results.set(result_data) @output @render.ui def output_text(): """Render highlighted text for the uploaded content.""" annotations = reactive_TaxaData_annotations.get() if not annotations: return ui.p("No annotation files uploaded. Please upload annotation files to continue.") taxadata = list(annotations.values())[reactive_current_json_index.get()] text = taxadata.text taxon = taxadata.taxa[reactive_current_taxon_index.get()] highlighted = highlight_text(text, taxon.scientific_name, taxon.verified_compounds[reactive_current_compound_index.get()]) return ui.HTML(highlighted) def update_indices(): """ Update indices to move to the next compound, taxon, or taxadata object. """ annotations = reactive_TaxaData_annotations.get() if not annotations: return # Get current objects taxadata_list = list(annotations.values()) current_json_index = reactive_current_json_index.get() current_taxon_index = reactive_current_taxon_index.get() current_compound_index = reactive_current_compound_index.get() current_taxadata = taxadata_list[current_json_index] current_taxon = current_taxadata.taxa[current_taxon_index] # Move to the next compound if current_compound_index + 1 < len(current_taxon.verified_compounds): reactive_current_compound_index.set(current_compound_index + 1) # No more compounds, move to the next taxon elif current_taxon_index + 1 < len(current_taxadata.taxa): reactive_current_compound_index.set(0) # Reset compound index reactive_current_taxon_index.set(current_taxon_index + 1) # If next taxon dopesn't have compounds, move on if len(current_taxadata.taxa[current_taxon_index + 1].verified_compounds) == 0: update_indices() # No more taxa, move to the next TaxaData object elif current_json_index + 1 < len(taxadata_list): reactive_current_compound_index.set(0) reactive_current_taxon_index.set(0) reactive_current_json_index.set(current_json_index + 1) else: # End of all data, do nothing print("No more items to process.") reactive_finished.set(True) return previous_results = reactive_previous_results.get() if previous_results is not None: current_json_index = reactive_current_json_index.get() current_taxon_index = reactive_current_taxon_index.get() current_compound_index = reactive_current_compound_index.get() current_taxadata = taxadata_list[current_json_index] current_taxon = current_taxadata.taxa[current_taxon_index] current_compound = current_taxon.verified_compounds[current_compound_index] matching_row = previous_results[previous_results['taxon_name'] == current_taxon.scientific_name] matching_row = matching_row[matching_row['compound_name'] == current_compound] json_file = list(annotations.keys())[current_json_index] matching_row = matching_row[matching_row['json_file'] == json_file] if len(matching_row.index) > 0: # Append result results = saved_results.get() results.append({ "json_file": matching_row['json_file'].iloc[0], "taxon_name": current_taxon.scientific_name, "accepted_name": current_taxon.accepted_name, "compound_name": current_compound, "inchikey": current_taxon.inchi_keys[current_compound], "decision": matching_row['decision'].iloc[0] }) saved_results.set(results) # Update reactive storage update_indices() @output @render.text def name_text(): """Render current name being displayed.""" annotations = reactive_TaxaData_annotations.get() if annotations: taxadata = list(annotations.values())[reactive_current_json_index.get()] taxon = taxadata.taxa[reactive_current_taxon_index.get()] return taxon.scientific_name return "No annotation loaded." @output @render.text def compound_text(): """Render current compound being highlighted.""" annotations = reactive_TaxaData_annotations.get() if annotations: taxadata = list(annotations.values())[reactive_current_json_index.get()] taxon = taxadata.taxa[reactive_current_taxon_index.get()] try: return taxon.verified_compounds[reactive_current_compound_index.get()] except IndexError: update_indices() return "No annotation loaded." @output @render.text def file_name_text(): """.""" annotations = reactive_TaxaData_annotations.get() if annotations: json_file = list(annotations.keys())[reactive_current_json_index.get()] return json_file # .replace('_', '/').strip('.json') return "No annotation loaded." @reactive.Effect @reactive.event(input.submit_button) def on_yes_click(): """ Handle the user clicking "Yes". """ if reactive_finished.get(): return annotations = reactive_TaxaData_annotations.get() if not annotations: return json_file_name = list(annotations.keys())[reactive_current_json_index.get()] # Extract current context taxadata_list = list(annotations.values()) current_json_index = reactive_current_json_index.get() current_taxon_index = reactive_current_taxon_index.get() current_compound_index = reactive_current_compound_index.get() current_taxadata = taxadata_list[current_json_index] current_taxon = current_taxadata.taxa[current_taxon_index] current_compound = current_taxon.verified_compounds[current_compound_index] current_inchikey = current_taxon.inchi_keys[current_compound] # Append result results = saved_results.get() results.append({ "json_file": json_file_name, "taxon_name": current_taxon.scientific_name, "accepted_name": current_taxon.accepted_name, "compound_name": current_compound, "inchikey": current_inchikey, "decision": "Yes" }) saved_results.set(results) # Update reactive storage update_indices() @reactive.Effect @reactive.event(input.pass_button) def on_no_click(): """ Handle the user clicking "No". """ if reactive_finished.get(): return annotations = reactive_TaxaData_annotations.get() if not annotations: return json_file_name = list(annotations.keys())[reactive_current_json_index.get()] # Extract current context taxadata_list = list(annotations.values()) current_json_index = reactive_current_json_index.get() current_taxon_index = reactive_current_taxon_index.get() current_compound_index = reactive_current_compound_index.get() current_taxadata = taxadata_list[current_json_index] current_taxon = current_taxadata.taxa[current_taxon_index] current_compound = current_taxon.verified_compounds[current_compound_index] current_inchikey = current_taxon.inchi_keys[current_compound] # Append result results = saved_results.get() results.append({ "json_file": json_file_name, "taxon_name": current_taxon.scientific_name, "accepted_name": current_taxon.accepted_name, "compound_name": current_compound, "inchikey": current_inchikey, "decision": "No" }) saved_results.set(results) # Update reactive storage update_indices() @render.download() def download_results(): results = saved_results.get() if not results: print("No results to save.") return all_results = pd.DataFrame(results).drop_duplicates( subset=['json_file', 'taxon_name', 'compound_name', 'decision'], keep='first') # Create in-memory CSV file with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp: # print(f'result tmp csv: {tmp.name}') all_results.to_csv(tmp.name, index=False) return tmp.name app = App(app_ui, server)