| | import pandas as pd
|
| | import os
|
| | import json
|
| | import glob
|
| | import mimetypes
|
| | from datetime import datetime
|
| | from functools import lru_cache
|
| | import time
|
| | from openai import OpenAI
|
| |
|
| |
|
| | from semanticscholar import SemanticScholar
|
| | from Bio import Entrez
|
| | import arxiv
|
| |
|
| |
|
| |
|
| | import requests
|
| | from bs4 import BeautifulSoup
|
| | import base64
|
| | import io
|
| | from PIL import Image
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | _TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
|
| | _PROJECT_ROOT = os.path.abspath(os.path.join(_TOOLS_DIR, ".."))
|
| | BASE_WWW_PATH = os.path.join(_PROJECT_ROOT, "www")
|
| |
|
| | UI_TEXTS_FILE = os.path.join(_TOOLS_DIR, "ui_texts.json")
|
| |
|
| | TF_PAGERANK_PATH = os.path.join(BASE_WWW_PATH, "tablePagerank")
|
| | WAVE_ANALYSIS_PATH = os.path.join(BASE_WWW_PATH, "waveanalysis")
|
| | TF_CORR_PATH = os.path.join(BASE_WWW_PATH, "TFcorintextrm")
|
| | TF_COMMUNITIES_PATH = os.path.join(BASE_WWW_PATH, "tfcommunities")
|
| |
|
| |
|
| | CACHE_TIMEOUT = 300
|
| |
|
| |
|
| | _data_cache = {}
|
| | _cache_timestamps = {}
|
| |
|
| | def _is_cache_valid(cache_key):
|
| | """Check if cached data is still valid based on timeout."""
|
| | if cache_key not in _cache_timestamps:
|
| | return False
|
| | return (time.time() - _cache_timestamps[cache_key]) < CACHE_TIMEOUT
|
| |
|
| | def _get_cached_data(cache_key):
|
| | """Get data from cache if valid."""
|
| | if _is_cache_valid(cache_key):
|
| | return _data_cache.get(cache_key)
|
| | return None
|
| |
|
| | def _set_cached_data(cache_key, data):
|
| | """Store data in cache with current timestamp."""
|
| | _data_cache[cache_key] = data
|
| | _cache_timestamps[cache_key] = time.time()
|
| |
|
| |
|
| |
|
| | @lru_cache(maxsize=32)
|
| | def get_raw_excel_data(file_path: str) -> list:
|
| | """
|
| | Reads a specified Excel file and returns its raw content as a list of lists.
|
| | Uses caching to improve performance.
|
| | """
|
| | try:
|
| |
|
| | cache_key = f"raw_excel_{file_path}"
|
| | cached_data = _get_cached_data(cache_key)
|
| | if cached_data is not None:
|
| | return cached_data
|
| |
|
| |
|
| | df = pd.read_excel(file_path, header=None)
|
| | df = df.fillna('')
|
| | result = df.values.tolist()
|
| |
|
| |
|
| | _set_cached_data(cache_key, result)
|
| | return result
|
| | except FileNotFoundError:
|
| | print(f"Error: File not found at {file_path}")
|
| | return []
|
| | except Exception as e:
|
| | print(f"Error reading Excel file {file_path}: {e}")
|
| | return []
|
| |
|
| | def get_processed_tf_data(dataset_identifier: str) -> list:
|
| | """
|
| | Reads and processes a TF-related Excel file identified by its dataset_identifier.
|
| | Uses caching and timeout handling.
|
| | """
|
| | try:
|
| |
|
| | cache_key = f"processed_tf_{dataset_identifier}"
|
| | cached_data = _get_cached_data(cache_key)
|
| | if cached_data is not None:
|
| | return cached_data
|
| |
|
| | file_path = get_tf_catalog_dataset_path(dataset_identifier)
|
| | if not file_path:
|
| | print(f"Error: Could not resolve dataset_identifier '{dataset_identifier}' to a file path.")
|
| | return []
|
| | if not os.path.exists(file_path):
|
| | print(f"Error: File not found at resolved path: {file_path}")
|
| | return []
|
| |
|
| | df = pd.read_excel(file_path)
|
| | df_transposed = df.transpose()
|
| | new_headers = df_transposed.iloc[0].tolist()
|
| | df_processed = df_transposed[1:]
|
| | df_processed.columns = new_headers
|
| | df_processed = df_processed.fillna('')
|
| | result = [new_headers] + df_processed.values.tolist()
|
| |
|
| |
|
| | _set_cached_data(cache_key, result)
|
| | return result
|
| | except Exception as e:
|
| | print(f"Error processing TF data for {dataset_identifier}: {e}")
|
| | return []
|
| |
|
| | def filter_data_by_column_keywords(dataset: list, keywords: str) -> list:
|
| | """
|
| | Filters a dataset (list of lists, first list is headers) by keywords in column names.
|
| | The 'dataset' input MUST be the actual data (e.g. output from get_processed_tf_data),
|
| | not a dataset name or file path.
|
| | """
|
| | if not dataset or not isinstance(dataset, list) or len(dataset) < 1 or not isinstance(dataset[0], list):
|
| | print("Error: Invalid dataset format for filtering. Must be a list of lists with headers.")
|
| | return []
|
| |
|
| | headers = dataset[0]
|
| |
|
| | if len(dataset) == 1:
|
| | data_rows = []
|
| | else:
|
| | data_rows = dataset[1:]
|
| |
|
| | df = pd.DataFrame(data_rows, columns=headers)
|
| |
|
| | if not keywords or not keywords.strip():
|
| | return dataset
|
| |
|
| | keyword_list = [k.strip().lower() for k in keywords.split(',')]
|
| |
|
| | matching_columns = []
|
| | if df.empty and not data_rows:
|
| | for header_col in headers:
|
| | for keyword in keyword_list:
|
| | if keyword in str(header_col).lower():
|
| | matching_columns.append(header_col)
|
| | break
|
| | else:
|
| | for header_col in df.columns:
|
| | for keyword in keyword_list:
|
| | if keyword in str(header_col).lower():
|
| | matching_columns.append(header_col)
|
| | break
|
| |
|
| | if not matching_columns:
|
| | return [headers]
|
| |
|
| | unique_matching_headers = sorted(list(set(matching_columns)), key=lambda x: headers.index(x))
|
| |
|
| | if df.empty:
|
| |
|
| | return [unique_matching_headers] + [[] for _ in data_rows]
|
| |
|
| | filtered_df = df[unique_matching_headers]
|
| | return [unique_matching_headers] + filtered_df.values.tolist()
|
| |
|
| |
|
| | def get_tf_wave_search_data(tf_search_term: str = None) -> dict:
|
| | """
|
| | Reads searchtfwaves.xlsx and filters by TF if a search term is provided.
|
| | Uses caching for better performance.
|
| | """
|
| | try:
|
| |
|
| | cache_key = f"wave_search_{tf_search_term}"
|
| | cached_data = _get_cached_data(cache_key)
|
| | if cached_data is not None:
|
| | return cached_data
|
| |
|
| | file_path = os.path.join(WAVE_ANALYSIS_PATH, "searchtfwaves.xlsx")
|
| | if not os.path.exists(file_path):
|
| | print(f"Error: TF wave search file not found at {file_path}")
|
| | return {}
|
| |
|
| | df = pd.read_excel(file_path)
|
| | expected_cols = [f"Wave{i}" for i in range(1, 8)]
|
| |
|
| | if not all(col in df.columns for col in expected_cols):
|
| | if len(df.columns) >= 7:
|
| | rename_map = {df.columns[i]: expected_cols[i] for i in range(7)}
|
| | df.rename(columns=rename_map, inplace=True)
|
| | df = df[expected_cols]
|
| | else:
|
| | print(f"Warning: {file_path} does not have at least 7 columns to map to Wave1-7.")
|
| |
|
| | df = df.fillna('')
|
| |
|
| | if not tf_search_term or not tf_search_term.strip():
|
| | result = {col: df[col].tolist() for col in df.columns if col in df}
|
| | else:
|
| | result = {}
|
| | search_term_lower = tf_search_term.strip().lower()
|
| | for col in df.columns:
|
| | if col not in df: continue
|
| | matching_genes = [gene for gene in df[col] if search_term_lower in str(gene).lower()]
|
| | if matching_genes:
|
| | result[col] = matching_genes
|
| |
|
| |
|
| | _set_cached_data(cache_key, result)
|
| | return result
|
| | except Exception as e:
|
| | print(f"Error reading or processing wave search data: {e}")
|
| | return {}
|
| |
|
| | def get_tf_correlation_data(tf_name: str = None) -> list:
|
| | """
|
| | Reads TF-TFcorTRMTEX.xlsx and filters by tf_name if provided.
|
| | Uses caching for better performance.
|
| | """
|
| | try:
|
| |
|
| | cache_key = f"correlation_{tf_name}"
|
| | cached_data = _get_cached_data(cache_key)
|
| | if cached_data is not None:
|
| | return cached_data
|
| |
|
| | file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx")
|
| | if not os.path.exists(file_path):
|
| | print(f"Error: TF correlation data file not found at {file_path}")
|
| | return []
|
| |
|
| | df = pd.read_excel(file_path)
|
| | df = df.fillna('')
|
| | headers = df.columns.tolist()
|
| |
|
| | if not tf_name or not tf_name.strip():
|
| | result = [headers] + df.values.tolist()
|
| | else:
|
| | tf_name_col = "TF Name"
|
| | if tf_name_col not in df.columns:
|
| | if len(df.columns) > 0:
|
| | tf_name_col = df.columns[0]
|
| | else:
|
| | return [headers]
|
| |
|
| | filtered_df = df[df[tf_name_col].astype(str).str.lower() == tf_name.strip().lower()]
|
| | result = [headers] + filtered_df.values.tolist() if not filtered_df.empty else [headers]
|
| |
|
| |
|
| | _set_cached_data(cache_key, result)
|
| | return result
|
| | except Exception as e:
|
| | print(f"Error processing correlation data: {e}")
|
| | return []
|
| |
|
| | def get_tf_correlation_image_path(tf_name: str) -> str:
|
| | """
|
| | Gets the image path for a TF from the correlation data.
|
| | """
|
| | file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx")
|
| | if not os.path.exists(file_path):
|
| | print(f"Error: TF correlation data file (for image path) not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
|
| | return ""
|
| |
|
| | image_column_name = "TF Merged Graph Path"
|
| | tf_identifier_column = "TF Name"
|
| |
|
| | try:
|
| | df = pd.read_excel(file_path)
|
| | if tf_identifier_column not in df.columns:
|
| | if len(df.columns) > 0:
|
| | tf_identifier_column = df.columns[0]
|
| | else:
|
| | return ""
|
| |
|
| | if image_column_name not in df.columns:
|
| | print(f"Error: Image path column '{image_column_name}' not found in {file_path} (Resolved: {os.path.abspath(file_path)})")
|
| | return ""
|
| |
|
| | row = df[df[tf_identifier_column].astype(str).str.lower() == tf_name.strip().lower()]
|
| |
|
| | if not row.empty:
|
| | image_path_val = row.iloc[0][image_column_name]
|
| | if pd.notna(image_path_val) and isinstance(image_path_val, str):
|
| | if not image_path_val.startswith("www/"):
|
| | return "www/" + image_path_val.lstrip('/')
|
| | return str(image_path_val)
|
| | else:
|
| | return ""
|
| | else:
|
| | return ""
|
| |
|
| | except FileNotFoundError:
|
| | print(f"Error: File not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
|
| | return ""
|
| | except Exception as e:
|
| | print(f"Error processing {file_path} for TF {tf_name}: {e} (Resolved: {os.path.abspath(file_path)})")
|
| | return ""
|
| |
|
| | def list_all_tfs_in_correlation_data() -> list:
|
| | """
|
| | Lists all unique TFs from the TF-TF correlation data file.
|
| | """
|
| | file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx")
|
| | if not os.path.exists(file_path):
|
| | print(f"Error: TF correlation data file (for listing TFs) not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
|
| | return []
|
| |
|
| | tf_identifier_column = "TF Name"
|
| | try:
|
| | df = pd.read_excel(file_path)
|
| | if tf_identifier_column not in df.columns:
|
| | if not df.empty and len(df.columns) > 0:
|
| | tf_identifier_column = df.columns[0]
|
| | else:
|
| | return []
|
| |
|
| | tf_list = df[tf_identifier_column].astype(str).str.strip().unique().tolist()
|
| | return [tf for tf in tf_list if tf and tf.lower() != 'nan']
|
| | except FileNotFoundError:
|
| | print(f"Error: File not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
|
| | return []
|
| | except Exception as e:
|
| | print(f"Error processing {file_path}: {e} (Resolved: {os.path.abspath(file_path)})")
|
| | return []
|
| |
|
| | def get_tf_community_sheet_data(community_type: str) -> list:
|
| | """
|
| | Reads data from a specific TF community Excel file (trm or texterm).
|
| | Uses caching for better performance.
|
| | """
|
| | try:
|
| |
|
| | cache_key = f"community_{community_type}"
|
| | cached_data = _get_cached_data(cache_key)
|
| | if cached_data is not None:
|
| | return cached_data
|
| |
|
| | if community_type.lower() == "trm":
|
| | file_name = "trmcommunities.xlsx"
|
| | elif community_type.lower() == "texterm":
|
| | file_name = "texcommunities.xlsx"
|
| | else:
|
| | print(f"Error: Invalid community_type '{community_type}'. Must be 'trm' or 'texterm'.")
|
| | return []
|
| |
|
| | file_path = os.path.join(TF_COMMUNITIES_PATH, file_name)
|
| | if not os.path.exists(file_path):
|
| | print(f"Error: TF community file not found at {file_path}")
|
| | return []
|
| |
|
| | result = get_raw_excel_data(file_path)
|
| |
|
| |
|
| | _set_cached_data(cache_key, result)
|
| | return result
|
| | except Exception as e:
|
| | print(f"Error processing community data: {e}")
|
| | return []
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | _STATIC_IMAGE_WEB_PATHS = {
|
| | "home_page_diagram": "www/homedesc.png",
|
| | "ucsd_logo": "www/ucsdlogo.png",
|
| | "salk_logo": "www/salklogo.png",
|
| | "unc_logo": "www/unclogo.jpg",
|
| | "modal_cs_description_img": "www/csdescrip.jpeg",
|
| | "tfcat_overview_img": "www/tfcat/onlycellstates.png",
|
| | "tfcat_multistates_heatmap": "www/tfcat/multistatesheatmap.png",
|
| | "naive_bubble_plot": "www/bubbleplots/naivebubble.jpg",
|
| | "te_bubble_plot": "www/bubbleplots/tebubble.jpg",
|
| | "mp_bubble_plot": "www/bubbleplots/mpbubble.jpg",
|
| | "tcm_bubble_plot": "www/bubbleplots/tcmbubble.jpg",
|
| | "tem_bubble_plot": "www/bubbleplots/tembubble.jpg",
|
| | "trm_bubble_plot": "www/bubbleplots/trmbubble.jpg",
|
| | "texprog_bubble_plot": "www/bubbleplots/texprogbubble.jpg",
|
| | "texefflike_bubble_plot": "www/bubbleplots/texintbubble.jpg",
|
| | "texterm_bubble_plot": "www/bubbleplots/textermbubble.jpg",
|
| | "wave_analysis_overview_diagram": "www/waveanalysis/tfwaveanal.png",
|
| | "wave1_main_img": "www/waveanalysis/c1.jpg",
|
| | "wave1_gokegg_img": "www/waveanalysis/c1_selected_GO_KEGG.jpg",
|
| | "wave1_ranked_text1_img": "www/waveanalysis/txtJPG/c1_ranked_1.jpg",
|
| | "wave1_ranked_text2_img": "www/waveanalysis/txtJPG/c1_ranked_2.jpg",
|
| | "wave2_main_img": "www/waveanalysis/c2.jpg",
|
| | "wave2_gokegg_img": "www/waveanalysis/c2_selected_GO_KEGG_v2.jpg",
|
| | "wave2_ranked_text_img": "www/waveanalysis/txtJPG/c2_ranked.jpg",
|
| |
|
| | "wave3_main_img": "www/waveanalysis/c3.jpg",
|
| | "wave3_gokegg_img": "www/waveanalysis/c3_selected_GO_KEGG.jpg",
|
| | "wave3_ranked_text_img": "www/waveanalysis/txtJPG/c3_ranked.jpg",
|
| | "wave4_main_img": "www/waveanalysis/c4.jpg",
|
| | "wave4_gokegg_img": "www/waveanalysis/c4_selected_GO_KEGG.jpg",
|
| | "wave4_ranked_text_img": "www/waveanalysis/txtJPG/c4_ranked.jpg",
|
| | "wave5_main_img": "www/waveanalysis/c5.jpg",
|
| | "wave5_gokegg_img": "www/waveanalysis/c5_selected_GO_KEGG.jpg",
|
| | "wave5_ranked_text_img": "www/waveanalysis/txtJPG/c5_ranked.jpg",
|
| | "wave6_main_img": "www/waveanalysis/c6.jpg",
|
| | "wave6_gokegg_img": "www/waveanalysis/c6_selected_GO_KEGG.jpg",
|
| | "wave6_ranked_text_img": "www/waveanalysis/txtJPG/c6_ranked.jpg",
|
| | "wave7_main_img": "www/waveanalysis/c7.jpg",
|
| | "wave7_gokegg_img": "www/waveanalysis/c7_selected_GO_KEGG.jpg",
|
| | "wave7_ranked_text_img": "www/waveanalysis/txtJPG/c7_ranked.jpg",
|
| | "network_correlation_desc_img": "www/networkanalysis/tfcorrdesc.png",
|
| | "network_community_overview_img": "www/networkanalysis/community.jpg",
|
| | "network_trmtex_community_comparison_img": "www/networkanalysis/trmtexcom.png",
|
| | "network_community_pathway_img": "www/networkanalysis/tfcompathway.png",
|
| | }
|
| |
|
| |
|
| |
|
| | _loaded_ui_texts = None
|
| |
|
| | def _load_ui_texts():
|
| | """Helper function to load UI texts from JSON file (now in tools/ directory)."""
|
| | global _loaded_ui_texts
|
| | if _loaded_ui_texts is None:
|
| | try:
|
| |
|
| |
|
| | with open(UI_TEXTS_FILE, 'r', encoding='utf-8') as f:
|
| | _loaded_ui_texts = json.load(f)
|
| | except FileNotFoundError:
|
| | print(f"Error: UI texts file not found at {os.path.abspath(UI_TEXTS_FILE)}")
|
| | _loaded_ui_texts = {}
|
| | except json.JSONDecodeError:
|
| | print(f"Error: Could not decode JSON from {UI_TEXTS_FILE}")
|
| | _loaded_ui_texts = {}
|
| | except Exception as e:
|
| | print(f"An unexpected error occurred while loading {UI_TEXTS_FILE}: {e}")
|
| | _loaded_ui_texts = {}
|
| | return _loaded_ui_texts
|
| |
|
| | INTERNAL_NAVIGATION_TARGETS = {
|
| | "to_tfcat": "Navigates to the 'TF Catalog > Search TF Scores' tab.",
|
| | "to_tfwave": "Navigates to the 'TF Wave Analysis > Overview' tab.",
|
| | "to_tfnet": "Navigates to the 'TF Network Analysis > Search TF-TF correlation in TRM/TEXterm' tab.",
|
| | "c1_link": "Navigates to the 'TF Wave Analysis > Wave 1' tab.",
|
| | "c2_link": "Navigates to the 'TF Wave Analysis > Wave 2' tab.",
|
| | "c3_link": "Navigates to the 'TF Wave Analysis > Wave 3' tab.",
|
| | "c4_link": "Navigates to the 'TF Wave Analysis > Wave 4' tab.",
|
| | "c5_link": "Navigates to the 'TF Wave Analysis > Wave 5' tab.",
|
| | "c6_link": "Navigates to the 'TF Wave Analysis > Wave 6' tab.",
|
| | "c7_link": "Navigates to the 'TF Wave Analysis > Wave 7' tab.",
|
| | }
|
| |
|
| | def get_static_image_path(image_identifier: str) -> str:
|
| | """
|
| | Returns the predefined relative web path (e.g., "www/images/logo.png") for a known static image asset.
|
| | """
|
| | return _STATIC_IMAGE_WEB_PATHS.get(image_identifier.lower(), "")
|
| |
|
| | def get_ui_descriptive_text(text_identifier: str) -> str:
|
| | """
|
| | Retrieves predefined descriptive text or methodology explanations from ui_texts.json.
|
| | """
|
| | texts = _load_ui_texts()
|
| | processed_text_identifier = text_identifier.lower()
|
| |
|
| | if processed_text_identifier.startswith("wave_") and processed_text_identifier.endswith("_analysis_placeholder_details"):
|
| | try:
|
| | generic_wave_key = "wave_x_analysis_placeholder_details"
|
| | if generic_wave_key in texts:
|
| | wave_num_str = processed_text_identifier.split("_")[1]
|
| | return texts[generic_wave_key].replace("{X}", wave_num_str)
|
| | else:
|
| | return texts.get(processed_text_identifier, "")
|
| | except Exception as e:
|
| | print(f"Error processing placeholder for {text_identifier}: {e}")
|
| | return texts.get(processed_text_identifier, "")
|
| |
|
| | return texts.get(processed_text_identifier, "")
|
| |
|
| | def list_available_tf_catalog_datasets() -> list:
|
| | """
|
| | Returns a list of identifiers for available TF catalog datasets.
|
| | These identifiers are used with get_processed_tf_data.
|
| | """
|
| | return [
|
| | "Overall_TF_PageRank",
|
| | "Naive", "TE", "MP", "TCM", "TEM", "TRM",
|
| | "TEXprog", "TEXeff", "TEXterm"
|
| | ]
|
| |
|
| | def get_tf_catalog_dataset_path(dataset_identifier: str) -> str:
|
| | """
|
| | Helper to get the actual file system path for a TF catalog dataset identifier.
|
| | Paths are constructed relative to BASE_WWW_PATH (which is ../www from this file's location).
|
| | This function is mostly for internal use by get_processed_tf_data.
|
| | """
|
| | mapping = {
|
| | "overall_tf_pagerank": os.path.join(TF_PAGERANK_PATH, "Table_TF PageRank Scores for Audrey.xlsx"),
|
| | "naive": os.path.join(TF_PAGERANK_PATH, "Naive.xlsx"),
|
| | "te": os.path.join(TF_PAGERANK_PATH, "TE.xlsx"),
|
| | "mp": os.path.join(TF_PAGERANK_PATH, "MP.xlsx"),
|
| | "tcm": os.path.join(TF_PAGERANK_PATH, "TCM.xlsx"),
|
| | "tem": os.path.join(TF_PAGERANK_PATH, "TEM.xlsx"),
|
| | "trm": os.path.join(TF_PAGERANK_PATH, "TRM.xlsx"),
|
| | "texprog": os.path.join(TF_PAGERANK_PATH, "TEXprog.xlsx"),
|
| | "texeff": os.path.join(TF_PAGERANK_PATH, "TEXeff.xlsx"),
|
| | "texterm": os.path.join(TF_PAGERANK_PATH, "TEXterm.xlsx"),
|
| | }
|
| | return mapping.get(dataset_identifier.lower(), "")
|
| |
|
| |
|
| | def list_available_cell_state_bubble_plots() -> list:
|
| | """
|
| | Returns a list of identifiers for available cell-state specific bubble plot images.
|
| | These identifiers can be used with get_static_image_path.
|
| | """
|
| | return [key for key in _STATIC_IMAGE_WEB_PATHS if "bubble_plot" in key]
|
| |
|
| |
|
| | def list_available_wave_analysis_assets(wave_number: int) -> dict:
|
| | """
|
| | Returns a structured list of available asset identifiers for a TF wave.
|
| | Identifiers can be used with get_static_image_path.
|
| | """
|
| | if not 1 <= wave_number <= 7:
|
| | return {}
|
| |
|
| | assets = {
|
| | "main_image_id": f"wave{wave_number}_main_img",
|
| | "gokegg_image_id": f"wave{wave_number}_gokegg_img"
|
| | }
|
| |
|
| | if wave_number == 2 and "wave2_gokegg_img_v2" in _STATIC_IMAGE_WEB_PATHS:
|
| | assets["gokegg_image_id"] = "wave2_gokegg_img_v2"
|
| | elif wave_number == 2 and _STATIC_IMAGE_WEB_PATHS.get(f"wave{wave_number}_gokegg_img","").endswith("_v2.jpg"):
|
| | pass
|
| |
|
| | if wave_number == 1:
|
| | assets["ranked_text_image_ids"] = [
|
| | f"wave{wave_number}_ranked_text1_img",
|
| | f"wave{wave_number}_ranked_text2_img"
|
| | ]
|
| | else:
|
| | assets["ranked_text_image_ids"] = [f"wave{wave_number}_ranked_text_img"]
|
| |
|
| |
|
| | if assets["main_image_id"] not in _STATIC_IMAGE_WEB_PATHS: assets["main_image_id"] = None
|
| | if assets["gokegg_image_id"] not in _STATIC_IMAGE_WEB_PATHS: assets["gokegg_image_id"] = None
|
| |
|
| | valid_ranked_ids = []
|
| | for r_id in assets.get("ranked_text_image_ids", []):
|
| | if r_id in _STATIC_IMAGE_WEB_PATHS:
|
| | valid_ranked_ids.append(r_id)
|
| | assets["ranked_text_image_ids"] = valid_ranked_ids
|
| | if not assets["ranked_text_image_ids"]: del assets["ranked_text_image_ids"]
|
| |
|
| | return {k:v for k,v in assets.items() if v is not None}
|
| |
|
| |
|
| | def get_internal_navigation_info(link_id: str) -> str:
|
| | """
|
| | Provides information about where an internal UI link is intended to navigate.
|
| | """
|
| | return INTERNAL_NAVIGATION_TARGETS.get(link_id.lower(), "Navigation target not defined for this link ID.")
|
| |
|
| | def get_biorxiv_paper_url() -> str:
|
| | """
|
| | Returns the URL for the main bioRxiv paper.
|
| | """
|
| | return "https://doi.org/10.1101/2023.01.03.522354"
|
| |
|
| |
|
| | def discover_excel_files_and_schemas(base_scan_directory_name: str = "www") -> dict:
|
| | """
|
| | Discovers Excel files (.xlsx) within a specified base directory (relative to project root),
|
| | extracts their column headers, and returns a schema dictionary.
|
| | Example base_scan_directory_name: "www"
|
| | """
|
| | discovered_schema = {}
|
| |
|
| |
|
| | scan_root_abs = os.path.join(_PROJECT_ROOT, base_scan_directory_name)
|
| |
|
| | if not os.path.isdir(scan_root_abs):
|
| | print(f"Error: Base directory for schema discovery not found: {scan_root_abs}")
|
| | return {}
|
| |
|
| | for dirpath, _, filenames in os.walk(scan_root_abs):
|
| | for filename in filenames:
|
| | if filename.endswith(".xlsx") and not filename.startswith("~"):
|
| | file_abs_path = os.path.join(dirpath, filename)
|
| |
|
| | file_rel_path = os.path.relpath(file_abs_path, _PROJECT_ROOT).replace("\\\\", "/")
|
| |
|
| | table_identifier = os.path.splitext(filename)[0].replace("-", "_").replace(" ", "_")
|
| |
|
| | try:
|
| |
|
| |
|
| | xls = pd.ExcelFile(file_abs_path)
|
| | if not xls.sheet_names:
|
| | print(f"[Schema Discovery] Warning: No sheets found in {file_abs_path}")
|
| | columns = []
|
| | else:
|
| |
|
| | first_sheet_name = xls.sheet_names[0]
|
| | df_header = pd.read_excel(xls, sheet_name=first_sheet_name, nrows=0)
|
| | columns = [str(col) for col in df_header.columns.tolist()]
|
| |
|
| | discovered_schema[file_rel_path] = {
|
| | "file_path": file_rel_path,
|
| | "table_identifier": table_identifier,
|
| | "columns": columns,
|
| | "sheets": xls.sheet_names if xls.sheet_names else [],
|
| | "last_modified": datetime.now().isoformat(),
|
| | "file_size_bytes": os.path.getsize(file_abs_path),
|
| | "error": None
|
| | }
|
| | except Exception as e:
|
| | print(f"[Schema Discovery] Error reading or processing headers for {file_abs_path}: {e}")
|
| | discovered_schema[file_rel_path] = {
|
| | "file_path": file_rel_path,
|
| | "table_identifier": table_identifier,
|
| | "columns": [],
|
| | "sheets": [],
|
| | "error": str(e)
|
| | }
|
| |
|
| | if not discovered_schema:
|
| | print(f"[Schema Discovery] No Excel files found in {scan_root_abs}")
|
| |
|
| | return discovered_schema
|
| |
|
| |
|
| | def list_all_files_in_www_directory() -> list:
|
| | """
|
| | Scans the entire BASE_WWW_PATH directory (and its subdirectories) and returns a list
|
| | of dictionaries, each representing a file with its relative path from the project root,
|
| | detected MIME type (best guess), and size in bytes.
|
| | Excludes common hidden/system files like .DS_Store.
|
| | """
|
| | file_manifest = []
|
| |
|
| |
|
| |
|
| | if not os.path.isdir(BASE_WWW_PATH):
|
| | print(f"Error: WWW directory for file listing not found: {BASE_WWW_PATH}")
|
| | return []
|
| |
|
| |
|
| | ignore_list = [".DS_Store", "Thumbs.db"]
|
| | ignore_prefixes = ["._"]
|
| |
|
| | for dirpath, dirnames, filenames in os.walk(BASE_WWW_PATH):
|
| |
|
| |
|
| |
|
| | for filename in filenames:
|
| | if filename in ignore_list or any(filename.startswith(p) for p in ignore_prefixes):
|
| | continue
|
| |
|
| | file_abs_path = os.path.join(dirpath, filename)
|
| |
|
| | file_rel_path_from_project_root = os.path.relpath(file_abs_path, _PROJECT_ROOT).replace("\\", "/")
|
| |
|
| | try:
|
| | file_size = os.path.getsize(file_abs_path)
|
| |
|
| |
|
| |
|
| | if not mimetypes.inited:
|
| | mimetypes.init()
|
| |
|
| | mime_type, _ = mimetypes.guess_type(file_abs_path)
|
| | if mime_type is None:
|
| |
|
| | ext = os.path.splitext(filename)[1].lower()
|
| | if ext == ".txt" or ext == ".md":
|
| | mime_type = "text/plain"
|
| | elif ext == ".csv":
|
| | mime_type = "text/csv"
|
| | elif ext == ".json":
|
| | mime_type = "application/json"
|
| | else:
|
| | mime_type = "application/octet-stream"
|
| |
|
| | file_manifest.append({
|
| | "path": file_rel_path_from_project_root,
|
| | "type": mime_type,
|
| | "size": file_size,
|
| | "last_modified": datetime.now().isoformat(),
|
| | "error": None
|
| | })
|
| | except FileNotFoundError:
|
| | print(f"[File Manifest] Warning: File {file_abs_path} found by os.walk but then not accessible for size/type.")
|
| | continue
|
| | except Exception as e:
|
| | print(f"[File Manifest] Error processing file {file_abs_path}: {e}")
|
| |
|
| | file_manifest.append({
|
| | "path": file_rel_path_from_project_root,
|
| | "type": "unknown/error",
|
| | "size": 0,
|
| | "last_modified": datetime.now().isoformat(),
|
| | "error": str(e)
|
| | })
|
| |
|
| | return file_manifest
|
| |
|
| |
|
| |
|
| | def _normalize_authors(authors_data, source="Unknown"):
|
| | """Helper to normalize author lists from different APIs."""
|
| | if not authors_data:
|
| | return ["N/A"]
|
| | if source == "SemanticScholar":
|
| | return [author.get('name', "N/A") for author in authors_data]
|
| | if source == "PubMed":
|
| | return authors_data
|
| | if source == "ArXiv":
|
| | return [author.name for author in authors_data]
|
| | return [str(a) for a in authors_data]
|
| |
|
| | def _search_semanticscholar_internal(query: str, max_results: int = 2) -> list[dict]:
|
| | papers = []
|
| |
|
| | try:
|
| | s2 = SemanticScholar(timeout=15)
|
| |
|
| | results = s2.search_paper(query, limit=max_results, fields=['title', 'authors', 'year', 'abstract', 'url', 'venue', 'externalIds'])
|
| | if results and results.items:
|
| | for item in results.items:
|
| | doi_val = item.externalIds.get('DOI') if item.externalIds else None
|
| | papers.append({
|
| | "title": getattr(item, 'title', "N/A"),
|
| | "authors": _normalize_authors(getattr(item, 'authors', []), "SemanticScholar"),
|
| | "year": getattr(item, 'year', "N/A"),
|
| | "abstract": getattr(item, 'abstract', "N/A")[:500] + "..." if getattr(item, 'abstract', None) else "N/A",
|
| | "doi": doi_val,
|
| | "url": getattr(item, 'url', "N/A"),
|
| | "venue": getattr(item, 'venue', "N/A"),
|
| | "source_api": "Semantic Scholar"
|
| | })
|
| | except Exception as e:
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | pass
|
| | return papers
|
| |
|
| | def _search_pubmed_internal(query: str, max_results: int = 2) -> list[dict]:
|
| | papers = []
|
| |
|
| | try:
|
| | handle = Entrez.esearch(db="pubmed", term=query, retmax=str(max_results), sort="relevance")
|
| | record = Entrez.read(handle)
|
| | handle.close()
|
| | ids = record["IdList"]
|
| | if not ids:
|
| | return papers
|
| |
|
| | handle = Entrez.efetch(db="pubmed", id=ids, rettype="medline", retmode="xml")
|
| | records = Entrez.read(handle)
|
| | handle.close()
|
| |
|
| | for pubmed_article in records.get('PubmedArticle', []):
|
| | article = pubmed_article.get('MedlineCitation', {}).get('Article', {})
|
| | title = article.get('ArticleTitle', "N/A")
|
| | abstract_text_list = article.get('Abstract', {}).get('AbstractText', [])
|
| | abstract = " ".join(abstract_text_list)[:500] + "..." if abstract_text_list else "N/A"
|
| | year = article.get('Journal', {}).get('JournalIssue', {}).get('PubDate', {}).get('Year', "N/A")
|
| | authors_list = []
|
| | author_info_list = article.get('AuthorList', [])
|
| | for auth in author_info_list:
|
| | if auth.get('LastName') and auth.get('ForeName'):
|
| | authors_list.append(f"{auth.get('ForeName')} {auth.get('LastName')}")
|
| | elif auth.get('CollectiveName'):
|
| | authors_list.append(auth.get('CollectiveName'))
|
| |
|
| | doi = None
|
| | article_ids = pubmed_article.get('PubmedData', {}).get('ArticleIdList', [])
|
| | for aid in article_ids:
|
| | if aid.attributes.get('IdType') == 'doi':
|
| | doi = str(aid)
|
| | break
|
| |
|
| | pmid = pubmed_article.get('MedlineCitation', {}).get('PMID', None)
|
| | url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" if pmid else "N/A"
|
| | venue = article.get('Journal', {}).get('Title', "N/A")
|
| |
|
| | papers.append({
|
| | "title": title,
|
| | "authors": _normalize_authors(authors_list, "PubMed"),
|
| | "year": year,
|
| | "abstract": abstract,
|
| | "doi": doi,
|
| | "url": url,
|
| | "venue": venue,
|
| | "source_api": "PubMed"
|
| | })
|
| | if len(papers) >= max_results:
|
| | break
|
| |
|
| | except Exception as e:
|
| |
|
| | pass
|
| | return papers
|
| |
|
| | def _search_arxiv_internal(query: str, max_results: int = 2) -> list[dict]:
|
| | papers = []
|
| |
|
| | try:
|
| | search = arxiv.Search(
|
| | query = query,
|
| | max_results = max_results,
|
| | sort_by = arxiv.SortCriterion.Relevance
|
| | )
|
| | results = list(arxiv.Client().results(search))
|
| |
|
| | for result in results:
|
| | papers.append({
|
| | "title": getattr(result, 'title', "N/A"),
|
| | "authors": _normalize_authors(getattr(result, 'authors', []), "ArXiv"),
|
| | "year": getattr(result, 'published').year if getattr(result, 'published', None) else "N/A",
|
| | "abstract": getattr(result, 'summary', "N/A").replace('\n', ' ')[:500] + "...",
|
| | "doi": getattr(result, 'doi', None),
|
| | "url": getattr(result, 'entry_id', "N/A"),
|
| | "venue": "ArXiv",
|
| | "source_api": "ArXiv"
|
| | })
|
| | except Exception as e:
|
| |
|
| | pass
|
| | return papers
|
| |
|
| | def multi_source_literature_search(queries: list[str], max_results_per_query_per_source: int = 1, max_total_unique_papers: int = 10) -> list[dict]:
|
| |
|
| |
|
| | unique_papers_found_so_far = []
|
| | processed_dois = set()
|
| | processed_titles_authors = set()
|
| |
|
| | for query_idx, query_str in enumerate(queries):
|
| | if len(unique_papers_found_so_far) >= max_total_unique_papers:
|
| |
|
| | break
|
| |
|
| |
|
| |
|
| | current_query_results_from_all_sources = []
|
| |
|
| |
|
| | if len(unique_papers_found_so_far) < max_total_unique_papers:
|
| | s2_results = _search_semanticscholar_internal(query_str, max_results_per_query_per_source)
|
| | current_query_results_from_all_sources.extend(s2_results)
|
| |
|
| |
|
| | if len(unique_papers_found_so_far) < max_total_unique_papers:
|
| | pubmed_results = _search_pubmed_internal(query_str, max_results_per_query_per_source)
|
| | current_query_results_from_all_sources.extend(pubmed_results)
|
| |
|
| |
|
| | if len(unique_papers_found_so_far) < max_total_unique_papers:
|
| | arxiv_results = _search_arxiv_internal(query_str, max_results_per_query_per_source)
|
| | current_query_results_from_all_sources.extend(arxiv_results)
|
| |
|
| |
|
| | for paper in current_query_results_from_all_sources:
|
| | if len(unique_papers_found_so_far) >= max_total_unique_papers:
|
| | break
|
| |
|
| | is_new_paper = False
|
| | doi = paper.get("doi")
|
| | if doi and doi != "N/A":
|
| | normalized_doi = doi.lower().strip()
|
| | if normalized_doi not in processed_dois:
|
| | processed_dois.add(normalized_doi)
|
| | is_new_paper = True
|
| | else:
|
| | title = paper.get("title", "").lower().strip()
|
| | first_author_list = paper.get("authors", [])
|
| | first_author = first_author_list[0].lower().strip() if first_author_list and first_author_list[0] != "N/A" else ""
|
| | title_author_key = f"{title}|{first_author}"
|
| | if title and first_author and title_author_key not in processed_titles_authors:
|
| | processed_titles_authors.add(title_author_key)
|
| | is_new_paper = True
|
| | elif title and not first_author and title not in processed_titles_authors:
|
| | processed_titles_authors.add(title)
|
| | is_new_paper = True
|
| |
|
| | if is_new_paper:
|
| | unique_papers_found_so_far.append(paper)
|
| |
|
| | if len(unique_papers_found_so_far) >= max_total_unique_papers:
|
| |
|
| | break
|
| |
|
| | final_results = unique_papers_found_so_far[:max_total_unique_papers]
|
| |
|
| |
|
| | return final_results
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def fetch_text_from_urls(paper_info_list: list[dict], max_chars_per_paper: int = 15000) -> list[dict]:
|
| |
|
| |
|
| | updated_paper_info_list = []
|
| | headers = {
|
| | 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
| | }
|
| |
|
| | for paper in paper_info_list:
|
| | url = paper.get("url")
|
| | retrieved_text = None
|
| | source_api = paper.get("source_api", "Unknown")
|
| |
|
| | if not url or not isinstance(url, str) or not url.startswith("http"):
|
| | retrieved_text = "Error: Invalid or missing URL."
|
| | paper["retrieved_text_content"] = retrieved_text
|
| | updated_paper_info_list.append(paper)
|
| |
|
| | continue
|
| |
|
| |
|
| | try:
|
| | response = requests.get(url, headers=headers, timeout=20, allow_redirects=True)
|
| | response.raise_for_status()
|
| |
|
| | soup = BeautifulSoup(response.content, 'html.parser')
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | body_content = soup.find('body')
|
| | if body_content:
|
| |
|
| | for script_or_style in body_content(["script", "style"]):
|
| | script_or_style.decompose()
|
| |
|
| |
|
| | main_article_tags = ['article', 'main', '.main-content', '.article-body', '.abstract']
|
| | extracted_elements = []
|
| | for tag_selector in main_article_tags:
|
| | elements = body_content.select(tag_selector)
|
| | if elements:
|
| | for el in elements:
|
| | extracted_elements.append(el.get_text(separator=" ", strip=True))
|
| | break
|
| |
|
| | if extracted_elements:
|
| | retrieved_text = " ".join(extracted_elements)
|
| | else:
|
| | retrieved_text = body_content.get_text(separator=" ", strip=True)
|
| | else:
|
| | retrieved_text = "Error: Could not find body content in HTML."
|
| |
|
| | if retrieved_text and not retrieved_text.startswith("Error:"):
|
| | retrieved_text = retrieved_text[:max_chars_per_paper]
|
| | if len(retrieved_text) == max_chars_per_paper:
|
| | retrieved_text += "... (truncated)"
|
| |
|
| | elif not retrieved_text:
|
| | retrieved_text = "Error: No text could be extracted."
|
| |
|
| | except requests.exceptions.RequestException as e:
|
| | retrieved_text = f"Error fetching URL: {str(e)}"
|
| |
|
| | except Exception as e:
|
| | retrieved_text = f"Error processing HTML: {str(e)}"
|
| |
|
| |
|
| | paper["retrieved_text_content"] = retrieved_text
|
| | updated_paper_info_list.append(paper)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | return updated_paper_info_list
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def describe_image(file_id: str, api_key: str = None) -> str:
|
| | """
|
| | Process an uploaded image or document file and return a description using OpenAI's Vision model.
|
| |
|
| | Args:
|
| | file_id (str): OpenAI file ID (starts with "file-")
|
| | api_key (str, optional): OpenAI API key. If not provided, will try to use OPENAI_API_KEY environment variable.
|
| |
|
| | Returns:
|
| | str: A detailed description of the image or document content
|
| | """
|
| | try:
|
| |
|
| | try:
|
| | from openai import OpenAI
|
| | except ImportError:
|
| | return "Error: The OpenAI module is not installed. Please install it with 'pip install openai'."
|
| |
|
| | if not file_id or not isinstance(file_id, str):
|
| | return "Error: Invalid file ID format."
|
| |
|
| |
|
| | image_url_to_use = {}
|
| | if file_id.startswith("data:image/"):
|
| | image_url_to_use = {"url": file_id}
|
| | elif file_id.startswith("file-"):
|
| | image_url_to_use = {"url": file_id}
|
| | else:
|
| |
|
| |
|
| | if len(file_id) > 10 and not file_id.startswith("file-") and " " not in file_id:
|
| | print(f"describe_image: received potentially raw file ID '{file_id}', prefixing with 'file-'.")
|
| | image_url_to_use = {"url": f"file-{file_id}"}
|
| | else:
|
| | return f"Error: Invalid file_id format. Must be an OpenAI file ID (file-...) or a data URL (data:image/...). Received: {file_id}"
|
| |
|
| |
|
| | if api_key:
|
| | client = OpenAI(api_key=api_key)
|
| | else:
|
| |
|
| | if not os.environ.get('OPENAI_API_KEY'):
|
| | return "Error: OpenAI API key not found. Please provide an API key or set the OPENAI_API_KEY environment variable."
|
| | client = OpenAI()
|
| |
|
| |
|
| | messages = [
|
| | {
|
| | "role": "user",
|
| | "content": [
|
| | {"type": "text", "text": "Please describe this image in detail. If it contains text, transcribe important parts. If it's a scientific figure, explain what it shows. If it's a chart or graph, describe the data visualization and key insights."},
|
| | {"type": "image_url", "image_url": image_url_to_use}
|
| | ]
|
| | }
|
| | ]
|
| |
|
| |
|
| | response = client.chat.completions.create(
|
| | model="gpt-4o",
|
| | messages=messages,
|
| | max_tokens=1000,
|
| | temperature=0.2
|
| | )
|
| |
|
| |
|
| | if hasattr(response, 'usage') and response.usage:
|
| | usage_info = {
|
| | 'prompt_tokens': response.usage.prompt_tokens,
|
| | 'completion_tokens': response.usage.completion_tokens,
|
| | 'total_tokens': response.usage.total_tokens
|
| | }
|
| |
|
| | import builtins
|
| | if hasattr(builtins, '__agent_usage_collector__'):
|
| | builtins.__agent_usage_collector__.append(usage_info)
|
| |
|
| |
|
| | description = response.choices[0].message.content
|
| |
|
| | return description
|
| |
|
| | except Exception as e:
|
| |
|
| | error_message = f"Error processing image: {str(e)}"
|
| | print(error_message)
|
| | return error_message
|
| |
|
| | if __name__ == '__main__':
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | print("Testing Multi-Source Literature Search Tool:")
|
| | test_queries_lit = [
|
| | "novel targets for CAR-T cell therapy in solid tumors",
|
| | "role of microbiota in cancer immunotherapy response",
|
| | "epigenetic regulation of T cell exhaustion"
|
| | ]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | results = multi_source_literature_search(queries=test_queries_lit, max_results_per_query_per_source=1, max_total_unique_papers=2)
|
| | print(f"Found {len(results)} unique papers for text fetching test:")
|
| |
|
| |
|
| | if results:
|
| | print("\nTesting Text Fetching from URLs Tool:")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | results_with_text = fetch_text_from_urls(paper_info_list=results, max_chars_per_paper=5000)
|
| | print(f"Processed {len(results_with_text)} papers for text content:")
|
| | for i, paper in enumerate(results_with_text):
|
| | print(f"--- Paper {i+1} ---")
|
| | print(f" Title: {paper.get('title')}")
|
| | print(f" URL: {paper.get('url')}")
|
| | text_content = paper.get('retrieved_text_content', 'Not found')
|
| | print(f" Retrieved Text (first 200 chars): {text_content[:200]}...")
|
| | print("\n")
|
| |
|
| | |