import pandas as pd import os import json import glob # For os.walk if needed, or can use glob directly import mimetypes from datetime import datetime from functools import lru_cache import time from openai import OpenAI # Add OpenAI import for the describe_image function # --- NEW IMPORTS FOR LITERATURE SEARCH --- from semanticscholar import SemanticScholar # For Semantic Scholar API from Bio import Entrez # For PubMed import arxiv # For ArXiv API # --- END NEW IMPORTS --- # --- NEW IMPORTS FOR TEXT FETCHING FROM URLS --- import requests from bs4 import BeautifulSoup import base64 # For encoding image data import io from PIL import Image # For image processing # --- END NEW IMPORTS FOR TEXT FETCHING --- # --- Define Project Root and WWW Path relative to this file --- # This file is in taijichat/tools/ # Project root is one level up from 'tools' (i.e., 'taijichat/' directory) _TOOLS_DIR = os.path.dirname(os.path.abspath(__file__)) _PROJECT_ROOT = os.path.abspath(os.path.join(_TOOLS_DIR, "..")) BASE_WWW_PATH = os.path.join(_PROJECT_ROOT, "www") # This will be an absolute path UI_TEXTS_FILE = os.path.join(_TOOLS_DIR, "ui_texts.json") # ui_texts.json is in the same directory TF_PAGERANK_PATH = os.path.join(BASE_WWW_PATH, "tablePagerank") WAVE_ANALYSIS_PATH = os.path.join(BASE_WWW_PATH, "waveanalysis") TF_CORR_PATH = os.path.join(BASE_WWW_PATH, "TFcorintextrm") TF_COMMUNITIES_PATH = os.path.join(BASE_WWW_PATH, "tfcommunities") # Cache timeout in seconds CACHE_TIMEOUT = 300 # 5 minutes # Cache for storing data with timestamps _data_cache = {} _cache_timestamps = {} def _is_cache_valid(cache_key): """Check if cached data is still valid based on timeout.""" if cache_key not in _cache_timestamps: return False return (time.time() - _cache_timestamps[cache_key]) < CACHE_TIMEOUT def _get_cached_data(cache_key): """Get data from cache if valid.""" if _is_cache_valid(cache_key): return _data_cache.get(cache_key) return None def _set_cached_data(cache_key, data): """Store data in cache with current timestamp.""" _data_cache[cache_key] = data _cache_timestamps[cache_key] = time.time() # --- Tool Implementations --- @lru_cache(maxsize=32) def get_raw_excel_data(file_path: str) -> list: """ Reads a specified Excel file and returns its raw content as a list of lists. Uses caching to improve performance. """ try: # Check cache first cache_key = f"raw_excel_{file_path}" cached_data = _get_cached_data(cache_key) if cached_data is not None: return cached_data # If not in cache, read file df = pd.read_excel(file_path, header=None) df = df.fillna('') result = df.values.tolist() # Store in cache _set_cached_data(cache_key, result) return result except FileNotFoundError: print(f"Error: File not found at {file_path}") return [] except Exception as e: print(f"Error reading Excel file {file_path}: {e}") return [] def get_processed_tf_data(dataset_identifier: str) -> list: """ Reads and processes a TF-related Excel file identified by its dataset_identifier. Uses caching and timeout handling. """ try: # Check cache first cache_key = f"processed_tf_{dataset_identifier}" cached_data = _get_cached_data(cache_key) if cached_data is not None: return cached_data file_path = get_tf_catalog_dataset_path(dataset_identifier) if not file_path: print(f"Error: Could not resolve dataset_identifier '{dataset_identifier}' to a file path.") return [] if not os.path.exists(file_path): print(f"Error: File not found at resolved path: {file_path}") return [] df = pd.read_excel(file_path) df_transposed = df.transpose() new_headers = df_transposed.iloc[0].tolist() df_processed = df_transposed[1:] df_processed.columns = new_headers df_processed = df_processed.fillna('') result = [new_headers] + df_processed.values.tolist() # Store in cache _set_cached_data(cache_key, result) return result except Exception as e: print(f"Error processing TF data for {dataset_identifier}: {e}") return [] def filter_data_by_column_keywords(dataset: list, keywords: str) -> list: """ Filters a dataset (list of lists, first list is headers) by keywords in column names. The 'dataset' input MUST be the actual data (e.g. output from get_processed_tf_data), not a dataset name or file path. """ if not dataset or not isinstance(dataset, list) or len(dataset) < 1 or not isinstance(dataset[0], list): print("Error: Invalid dataset format for filtering. Must be a list of lists with headers.") return [] headers = dataset[0] # Handle empty dataset (only headers) if len(dataset) == 1: data_rows = [] else: data_rows = dataset[1:] df = pd.DataFrame(data_rows, columns=headers) if not keywords or not keywords.strip(): return dataset keyword_list = [k.strip().lower() for k in keywords.split(',')] matching_columns = [] if df.empty and not data_rows: # Only headers were passed, and they are the only columns for header_col in headers: # Check against original headers directly for keyword in keyword_list: if keyword in str(header_col).lower(): matching_columns.append(header_col) break else: # DataFrame has columns (either from data_rows or just headers if data_rows was empty but df was formed) for header_col in df.columns: for keyword in keyword_list: if keyword in str(header_col).lower(): matching_columns.append(header_col) break if not matching_columns: return [headers] unique_matching_headers = sorted(list(set(matching_columns)), key=lambda x: headers.index(x)) if df.empty: # If the original df was empty (only headers passed, or data was empty) # We've already found the matching headers, just return them return [unique_matching_headers] + [[] for _ in data_rows] # Preserve number of empty data rows if any filtered_df = df[unique_matching_headers] return [unique_matching_headers] + filtered_df.values.tolist() def get_tf_wave_search_data(tf_search_term: str = None) -> dict: """ Reads searchtfwaves.xlsx and filters by TF if a search term is provided. Uses caching for better performance. """ try: # Check cache first cache_key = f"wave_search_{tf_search_term}" cached_data = _get_cached_data(cache_key) if cached_data is not None: return cached_data file_path = os.path.join(WAVE_ANALYSIS_PATH, "searchtfwaves.xlsx") if not os.path.exists(file_path): print(f"Error: TF wave search file not found at {file_path}") return {} df = pd.read_excel(file_path) expected_cols = [f"Wave{i}" for i in range(1, 8)] if not all(col in df.columns for col in expected_cols): if len(df.columns) >= 7: rename_map = {df.columns[i]: expected_cols[i] for i in range(7)} df.rename(columns=rename_map, inplace=True) df = df[expected_cols] else: print(f"Warning: {file_path} does not have at least 7 columns to map to Wave1-7.") df = df.fillna('') if not tf_search_term or not tf_search_term.strip(): result = {col: df[col].tolist() for col in df.columns if col in df} else: result = {} search_term_lower = tf_search_term.strip().lower() for col in df.columns: if col not in df: continue matching_genes = [gene for gene in df[col] if search_term_lower in str(gene).lower()] if matching_genes: result[col] = matching_genes # Store in cache _set_cached_data(cache_key, result) return result except Exception as e: print(f"Error reading or processing wave search data: {e}") return {} def get_tf_correlation_data(tf_name: str = None) -> list: """ Reads TF-TFcorTRMTEX.xlsx and filters by tf_name if provided. Uses caching for better performance. """ try: # Check cache first cache_key = f"correlation_{tf_name}" cached_data = _get_cached_data(cache_key) if cached_data is not None: return cached_data file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx") if not os.path.exists(file_path): print(f"Error: TF correlation data file not found at {file_path}") return [] df = pd.read_excel(file_path) df = df.fillna('') headers = df.columns.tolist() if not tf_name or not tf_name.strip(): result = [headers] + df.values.tolist() else: tf_name_col = "TF Name" if tf_name_col not in df.columns: if len(df.columns) > 0: tf_name_col = df.columns[0] else: return [headers] filtered_df = df[df[tf_name_col].astype(str).str.lower() == tf_name.strip().lower()] result = [headers] + filtered_df.values.tolist() if not filtered_df.empty else [headers] # Store in cache _set_cached_data(cache_key, result) return result except Exception as e: print(f"Error processing correlation data: {e}") return [] def get_tf_correlation_image_path(tf_name: str) -> str: """ Gets the image path for a TF from the correlation data. """ file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx") if not os.path.exists(file_path): print(f"Error: TF correlation data file (for image path) not found at {file_path} (Resolved: {os.path.abspath(file_path)})") return "" image_column_name = "TF Merged Graph Path" tf_identifier_column = "TF Name" try: df = pd.read_excel(file_path) if tf_identifier_column not in df.columns: if len(df.columns) > 0: tf_identifier_column = df.columns[0] else: return "" if image_column_name not in df.columns: print(f"Error: Image path column '{image_column_name}' not found in {file_path} (Resolved: {os.path.abspath(file_path)})") return "" row = df[df[tf_identifier_column].astype(str).str.lower() == tf_name.strip().lower()] if not row.empty: image_path_val = row.iloc[0][image_column_name] if pd.notna(image_path_val) and isinstance(image_path_val, str): if not image_path_val.startswith("www/"): return "www/" + image_path_val.lstrip('/') return str(image_path_val) else: return "" else: return "" except FileNotFoundError: print(f"Error: File not found at {file_path} (Resolved: {os.path.abspath(file_path)})") return "" except Exception as e: print(f"Error processing {file_path} for TF {tf_name}: {e} (Resolved: {os.path.abspath(file_path)})") return "" def list_all_tfs_in_correlation_data() -> list: """ Lists all unique TFs from the TF-TF correlation data file. """ file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx") if not os.path.exists(file_path): print(f"Error: TF correlation data file (for listing TFs) not found at {file_path} (Resolved: {os.path.abspath(file_path)})") return [] tf_identifier_column = "TF Name" try: df = pd.read_excel(file_path) if tf_identifier_column not in df.columns: if not df.empty and len(df.columns) > 0: tf_identifier_column = df.columns[0] else: return [] tf_list = df[tf_identifier_column].astype(str).str.strip().unique().tolist() return [tf for tf in tf_list if tf and tf.lower() != 'nan'] except FileNotFoundError: print(f"Error: File not found at {file_path} (Resolved: {os.path.abspath(file_path)})") return [] except Exception as e: print(f"Error processing {file_path}: {e} (Resolved: {os.path.abspath(file_path)})") return [] def get_tf_community_sheet_data(community_type: str) -> list: """ Reads data from a specific TF community Excel file (trm or texterm). Uses caching for better performance. """ try: # Check cache first cache_key = f"community_{community_type}" cached_data = _get_cached_data(cache_key) if cached_data is not None: return cached_data if community_type.lower() == "trm": file_name = "trmcommunities.xlsx" elif community_type.lower() == "texterm": file_name = "texcommunities.xlsx" else: print(f"Error: Invalid community_type '{community_type}'. Must be 'trm' or 'texterm'.") return [] file_path = os.path.join(TF_COMMUNITIES_PATH, file_name) if not os.path.exists(file_path): print(f"Error: TF community file not found at {file_path}") return [] result = get_raw_excel_data(file_path) # Store in cache _set_cached_data(cache_key, result) return result except Exception as e: print(f"Error processing community data: {e}") return [] # --- Mappings for static info tools --- # STATIC_IMAGE_PATHS stores paths relative to the project root, typically starting with "www/" # These are for constructing URLs or for components that expect paths relative to the web server root ('www'). # When accessed from tools/agent_tools.py, os.path.join(BASE_WWW_PATH, ...) correctly points to the file system location. # The get_static_image_path tool should return the "web path" e.g. "www/images/logo.png" _STATIC_IMAGE_WEB_PATHS = { "home_page_diagram": "www/homedesc.png", "ucsd_logo": "www/ucsdlogo.png", "salk_logo": "www/salklogo.png", "unc_logo": "www/unclogo.jpg", "modal_cs_description_img": "www/csdescrip.jpeg", "tfcat_overview_img": "www/tfcat/onlycellstates.png", "tfcat_multistates_heatmap": "www/tfcat/multistatesheatmap.png", "naive_bubble_plot": "www/bubbleplots/naivebubble.jpg", "te_bubble_plot": "www/bubbleplots/tebubble.jpg", "mp_bubble_plot": "www/bubbleplots/mpbubble.jpg", "tcm_bubble_plot": "www/bubbleplots/tcmbubble.jpg", "tem_bubble_plot": "www/bubbleplots/tembubble.jpg", "trm_bubble_plot": "www/bubbleplots/trmbubble.jpg", "texprog_bubble_plot": "www/bubbleplots/texprogbubble.jpg", "texefflike_bubble_plot": "www/bubbleplots/texintbubble.jpg", "texterm_bubble_plot": "www/bubbleplots/textermbubble.jpg", "wave_analysis_overview_diagram": "www/waveanalysis/tfwaveanal.png", "wave1_main_img": "www/waveanalysis/c1.jpg", "wave1_gokegg_img": "www/waveanalysis/c1_selected_GO_KEGG.jpg", "wave1_ranked_text1_img": "www/waveanalysis/txtJPG/c1_ranked_1.jpg", "wave1_ranked_text2_img": "www/waveanalysis/txtJPG/c1_ranked_2.jpg", "wave2_main_img": "www/waveanalysis/c2.jpg", "wave2_gokegg_img": "www/waveanalysis/c2_selected_GO_KEGG_v2.jpg", "wave2_ranked_text_img": "www/waveanalysis/txtJPG/c2_ranked.jpg", # ... (add all other wave images similarly, ensuring paths start with "www/") ... "wave3_main_img": "www/waveanalysis/c3.jpg", "wave3_gokegg_img": "www/waveanalysis/c3_selected_GO_KEGG.jpg", "wave3_ranked_text_img": "www/waveanalysis/txtJPG/c3_ranked.jpg", "wave4_main_img": "www/waveanalysis/c4.jpg", "wave4_gokegg_img": "www/waveanalysis/c4_selected_GO_KEGG.jpg", "wave4_ranked_text_img": "www/waveanalysis/txtJPG/c4_ranked.jpg", "wave5_main_img": "www/waveanalysis/c5.jpg", "wave5_gokegg_img": "www/waveanalysis/c5_selected_GO_KEGG.jpg", "wave5_ranked_text_img": "www/waveanalysis/txtJPG/c5_ranked.jpg", "wave6_main_img": "www/waveanalysis/c6.jpg", "wave6_gokegg_img": "www/waveanalysis/c6_selected_GO_KEGG.jpg", "wave6_ranked_text_img": "www/waveanalysis/txtJPG/c6_ranked.jpg", "wave7_main_img": "www/waveanalysis/c7.jpg", "wave7_gokegg_img": "www/waveanalysis/c7_selected_GO_KEGG.jpg", "wave7_ranked_text_img": "www/waveanalysis/txtJPG/c7_ranked.jpg", "network_correlation_desc_img": "www/networkanalysis/tfcorrdesc.png", "network_community_overview_img": "www/networkanalysis/community.jpg", "network_trmtex_community_comparison_img": "www/networkanalysis/trmtexcom.png", "network_community_pathway_img": "www/networkanalysis/tfcompathway.png", } # Global variable to cache loaded UI texts _loaded_ui_texts = None def _load_ui_texts(): """Helper function to load UI texts from JSON file (now in tools/ directory).""" global _loaded_ui_texts if _loaded_ui_texts is None: try: # UI_TEXTS_FILE is already defined as "./ui_texts.json" or "ui_texts.json" # so it's relative to this agent_tools.py file in the tools/ directory. with open(UI_TEXTS_FILE, 'r', encoding='utf-8') as f: _loaded_ui_texts = json.load(f) except FileNotFoundError: print(f"Error: UI texts file not found at {os.path.abspath(UI_TEXTS_FILE)}") _loaded_ui_texts = {} except json.JSONDecodeError: print(f"Error: Could not decode JSON from {UI_TEXTS_FILE}") _loaded_ui_texts = {} except Exception as e: print(f"An unexpected error occurred while loading {UI_TEXTS_FILE}: {e}") _loaded_ui_texts = {} return _loaded_ui_texts INTERNAL_NAVIGATION_TARGETS = { "to_tfcat": "Navigates to the 'TF Catalog > Search TF Scores' tab.", "to_tfwave": "Navigates to the 'TF Wave Analysis > Overview' tab.", "to_tfnet": "Navigates to the 'TF Network Analysis > Search TF-TF correlation in TRM/TEXterm' tab.", "c1_link": "Navigates to the 'TF Wave Analysis > Wave 1' tab.", "c2_link": "Navigates to the 'TF Wave Analysis > Wave 2' tab.", "c3_link": "Navigates to the 'TF Wave Analysis > Wave 3' tab.", "c4_link": "Navigates to the 'TF Wave Analysis > Wave 4' tab.", "c5_link": "Navigates to the 'TF Wave Analysis > Wave 5' tab.", "c6_link": "Navigates to the 'TF Wave Analysis > Wave 6' tab.", "c7_link": "Navigates to the 'TF Wave Analysis > Wave 7' tab.", } def get_static_image_path(image_identifier: str) -> str: """ Returns the predefined relative web path (e.g., "www/images/logo.png") for a known static image asset. """ return _STATIC_IMAGE_WEB_PATHS.get(image_identifier.lower(), "") def get_ui_descriptive_text(text_identifier: str) -> str: """ Retrieves predefined descriptive text or methodology explanations from ui_texts.json. """ texts = _load_ui_texts() processed_text_identifier = text_identifier.lower() if processed_text_identifier.startswith("wave_") and processed_text_identifier.endswith("_analysis_placeholder_details"): try: generic_wave_key = "wave_x_analysis_placeholder_details" if generic_wave_key in texts: wave_num_str = processed_text_identifier.split("_")[1] return texts[generic_wave_key].replace("{X}", wave_num_str) else: return texts.get(processed_text_identifier, "") except Exception as e: print(f"Error processing placeholder for {text_identifier}: {e}") return texts.get(processed_text_identifier, "") return texts.get(processed_text_identifier, "") def list_available_tf_catalog_datasets() -> list: """ Returns a list of identifiers for available TF catalog datasets. These identifiers are used with get_processed_tf_data. """ return [ "Overall_TF_PageRank", "Naive", "TE", "MP", "TCM", "TEM", "TRM", "TEXprog", "TEXeff", "TEXterm" ] def get_tf_catalog_dataset_path(dataset_identifier: str) -> str: """ Helper to get the actual file system path for a TF catalog dataset identifier. Paths are constructed relative to BASE_WWW_PATH (which is ../www from this file's location). This function is mostly for internal use by get_processed_tf_data. """ mapping = { "overall_tf_pagerank": os.path.join(TF_PAGERANK_PATH, "Table_TF PageRank Scores for Audrey.xlsx"), "naive": os.path.join(TF_PAGERANK_PATH, "Naive.xlsx"), "te": os.path.join(TF_PAGERANK_PATH, "TE.xlsx"), "mp": os.path.join(TF_PAGERANK_PATH, "MP.xlsx"), "tcm": os.path.join(TF_PAGERANK_PATH, "TCM.xlsx"), "tem": os.path.join(TF_PAGERANK_PATH, "TEM.xlsx"), "trm": os.path.join(TF_PAGERANK_PATH, "TRM.xlsx"), "texprog": os.path.join(TF_PAGERANK_PATH, "TEXprog.xlsx"), "texeff": os.path.join(TF_PAGERANK_PATH, "TEXeff.xlsx"), "texterm": os.path.join(TF_PAGERANK_PATH, "TEXterm.xlsx"), } return mapping.get(dataset_identifier.lower(), "") def list_available_cell_state_bubble_plots() -> list: """ Returns a list of identifiers for available cell-state specific bubble plot images. These identifiers can be used with get_static_image_path. """ return [key for key in _STATIC_IMAGE_WEB_PATHS if "bubble_plot" in key] def list_available_wave_analysis_assets(wave_number: int) -> dict: """ Returns a structured list of available asset identifiers for a TF wave. Identifiers can be used with get_static_image_path. """ if not 1 <= wave_number <= 7: return {} assets = { "main_image_id": f"wave{wave_number}_main_img", "gokegg_image_id": f"wave{wave_number}_gokegg_img" } # Adjust for wave2 gokegg specific key if STATIC_IMAGE_WEB_PATHS uses the v2 name in its key if wave_number == 2 and "wave2_gokegg_img_v2" in _STATIC_IMAGE_WEB_PATHS: # Example if key was specific assets["gokegg_image_id"] = "wave2_gokegg_img_v2" elif wave_number == 2 and _STATIC_IMAGE_WEB_PATHS.get(f"wave{wave_number}_gokegg_img","").endswith("_v2.jpg"): pass # The key wave2_gokegg_img already points to the v2 file. if wave_number == 1: assets["ranked_text_image_ids"] = [ f"wave{wave_number}_ranked_text1_img", f"wave{wave_number}_ranked_text2_img" ] else: # Waves 2-7 have one ranked text image assets["ranked_text_image_ids"] = [f"wave{wave_number}_ranked_text_img"] # Verify these identifiers exist in _STATIC_IMAGE_WEB_PATHS if assets["main_image_id"] not in _STATIC_IMAGE_WEB_PATHS: assets["main_image_id"] = None if assets["gokegg_image_id"] not in _STATIC_IMAGE_WEB_PATHS: assets["gokegg_image_id"] = None valid_ranked_ids = [] for r_id in assets.get("ranked_text_image_ids", []): if r_id in _STATIC_IMAGE_WEB_PATHS: valid_ranked_ids.append(r_id) assets["ranked_text_image_ids"] = valid_ranked_ids if not assets["ranked_text_image_ids"]: del assets["ranked_text_image_ids"] return {k:v for k,v in assets.items() if v is not None} def get_internal_navigation_info(link_id: str) -> str: """ Provides information about where an internal UI link is intended to navigate. """ return INTERNAL_NAVIGATION_TARGETS.get(link_id.lower(), "Navigation target not defined for this link ID.") def get_biorxiv_paper_url() -> str: """ Returns the URL for the main bioRxiv paper. """ return "https://doi.org/10.1101/2023.01.03.522354" # --- New Tool for Schema Discovery --- def discover_excel_files_and_schemas(base_scan_directory_name: str = "www") -> dict: """ Discovers Excel files (.xlsx) within a specified base directory (relative to project root), extracts their column headers, and returns a schema dictionary. Example base_scan_directory_name: "www" """ discovered_schema = {} # _PROJECT_ROOT should be defined earlier in the file as the absolute path to the project root. # Example: _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) scan_root_abs = os.path.join(_PROJECT_ROOT, base_scan_directory_name) if not os.path.isdir(scan_root_abs): print(f"Error: Base directory for schema discovery not found: {scan_root_abs}") return {} for dirpath, _, filenames in os.walk(scan_root_abs): for filename in filenames: if filename.endswith(".xlsx") and not filename.startswith("~"): # Ignore temp Excel files file_abs_path = os.path.join(dirpath, filename) # Make file_path relative to project root for the schema key and value file_rel_path = os.path.relpath(file_abs_path, _PROJECT_ROOT).replace("\\\\", "/") table_identifier = os.path.splitext(filename)[0].replace("-", "_").replace(" ", "_") try: # Read only the header row to get column names (efficient) # Assuming data is on the first sheet by default. xls = pd.ExcelFile(file_abs_path) if not xls.sheet_names: # Check if there are any sheets print(f"[Schema Discovery] Warning: No sheets found in {file_abs_path}") columns = [] else: # Use the first sheet name first_sheet_name = xls.sheet_names[0] df_header = pd.read_excel(xls, sheet_name=first_sheet_name, nrows=0) # Read 0 rows to just get headers columns = [str(col) for col in df_header.columns.tolist()] # Ensure columns are strings discovered_schema[file_rel_path] = { "file_path": file_rel_path, "table_identifier": table_identifier, "columns": columns, "sheets": xls.sheet_names if xls.sheet_names else [], # Store all sheet names "last_modified": datetime.now().isoformat(), "file_size_bytes": os.path.getsize(file_abs_path), "error": None } except Exception as e: print(f"[Schema Discovery] Error reading or processing headers for {file_abs_path}: {e}") discovered_schema[file_rel_path] = { "file_path": file_rel_path, "table_identifier": table_identifier, "columns": [], "sheets": [], "error": str(e) } if not discovered_schema: print(f"[Schema Discovery] No Excel files found in {scan_root_abs}") return discovered_schema # --- New Tool for Listing All Files in WWW --- def list_all_files_in_www_directory() -> list: """ Scans the entire BASE_WWW_PATH directory (and its subdirectories) and returns a list of dictionaries, each representing a file with its relative path from the project root, detected MIME type (best guess), and size in bytes. Excludes common hidden/system files like .DS_Store. """ file_manifest = [] # _PROJECT_ROOT and BASE_WWW_PATH should be defined at the top of the file # BASE_WWW_PATH is already an absolute path to the www directory if not os.path.isdir(BASE_WWW_PATH): print(f"Error: WWW directory for file listing not found: {BASE_WWW_PATH}") return [] # Common hidden/system files/folders to ignore ignore_list = [".DS_Store", "Thumbs.db"] ignore_prefixes = ["._"] # For macOS resource fork files for dirpath, dirnames, filenames in os.walk(BASE_WWW_PATH): # Optionally filter dirnames to prevent descending into certain hidden folders if needed # For now, walking all non-hidden listed above. for filename in filenames: if filename in ignore_list or any(filename.startswith(p) for p in ignore_prefixes): continue file_abs_path = os.path.join(dirpath, filename) # Make file_path relative to project root for the manifest file_rel_path_from_project_root = os.path.relpath(file_abs_path, _PROJECT_ROOT).replace("\\", "/") try: file_size = os.path.getsize(file_abs_path) # Guess MIME type # Ensure mimetypes is imported at the top of the file if not mimetypes.inited: mimetypes.init() # Initialize mimetypes if not already done mime_type, _ = mimetypes.guess_type(file_abs_path) if mime_type is None: # Basic fallback based on extension for common types if mimetypes fails ext = os.path.splitext(filename)[1].lower() if ext == ".txt" or ext == ".md": mime_type = "text/plain" elif ext == ".csv": mime_type = "text/csv" elif ext == ".json": mime_type = "application/json" else: mime_type = "application/octet-stream" # Generic binary file_manifest.append({ "path": file_rel_path_from_project_root, "type": mime_type, "size": file_size, "last_modified": datetime.now().isoformat(), "error": None }) except FileNotFoundError: # Should not happen if os.walk found it, but as a safeguard print(f"[File Manifest] Warning: File {file_abs_path} found by os.walk but then not accessible for size/type.") continue except Exception as e: print(f"[File Manifest] Error processing file {file_abs_path}: {e}") # Optionally add an error entry to the manifest for this file file_manifest.append({ "path": file_rel_path_from_project_root, "type": "unknown/error", "size": 0, "last_modified": datetime.now().isoformat(), "error": str(e) }) return file_manifest # --- START: Literature Search Tool Implementation --- def _normalize_authors(authors_data, source="Unknown"): """Helper to normalize author lists from different APIs.""" if not authors_data: return ["N/A"] if source == "SemanticScholar": # List of dicts with 'name' key return [author.get('name', "N/A") for author in authors_data] if source == "PubMed": # List of strings return authors_data if source == "ArXiv": # List of arxiv.Result.Author objects return [author.name for author in authors_data] return [str(a) for a in authors_data] # Generic fallback def _search_semanticscholar_internal(query: str, max_results: int = 2) -> list[dict]: papers = [] # print(f"[Tool:_search_semanticscholar_internal] Querying Semantic Scholar for: '{query}' (max: {max_results})") # COMMENTED OUT try: s2 = SemanticScholar(timeout=15) # Corrected: 'doi' is not a direct field for search_paper, 'externalIds' should be used. results = s2.search_paper(query, limit=max_results, fields=['title', 'authors', 'year', 'abstract', 'url', 'venue', 'externalIds']) if results and results.items: for item in results.items: doi_val = item.externalIds.get('DOI') if item.externalIds else None papers.append({ "title": getattr(item, 'title', "N/A"), "authors": _normalize_authors(getattr(item, 'authors', []), "SemanticScholar"), "year": getattr(item, 'year', "N/A"), "abstract": getattr(item, 'abstract', "N/A")[:500] + "..." if getattr(item, 'abstract', None) else "N/A", "doi": doi_val, # Use the extracted DOI "url": getattr(item, 'url', "N/A"), "venue": getattr(item, 'venue', "N/A"), "source_api": "Semantic Scholar" }) except Exception as e: # This print goes to stderr if run directly, but might still be captured by a simple exec context. # For agent integration, actual errors should be raised or returned structured. # For now, we'll assume ManagerAgent's error handling for the overall tool call is preferred. # Let's comment this out for now to ensure no stdout interference. # print(f"[Tool:_search_semanticscholar_internal] Error: {e}", file=sys.stderr) pass # Allow the function to return an empty list on error. return papers def _search_pubmed_internal(query: str, max_results: int = 2) -> list[dict]: papers = [] # print(f"[Tool:_search_pubmed_internal] Querying PubMed for: '{query}' (max: {max_results})") # COMMENTED OUT try: handle = Entrez.esearch(db="pubmed", term=query, retmax=str(max_results), sort="relevance") record = Entrez.read(handle) handle.close() ids = record["IdList"] if not ids: return papers handle = Entrez.efetch(db="pubmed", id=ids, rettype="medline", retmode="xml") records = Entrez.read(handle) # This is MedlineParser.parse, returns a generator usually handle.close() for pubmed_article in records.get('PubmedArticle', []): # records is a list of dicts if multiple ids article = pubmed_article.get('MedlineCitation', {}).get('Article', {}) title = article.get('ArticleTitle', "N/A") abstract_text_list = article.get('Abstract', {}).get('AbstractText', []) abstract = " ".join(abstract_text_list)[:500] + "..." if abstract_text_list else "N/A" year = article.get('Journal', {}).get('JournalIssue', {}).get('PubDate', {}).get('Year', "N/A") authors_list = [] author_info_list = article.get('AuthorList', []) for auth in author_info_list: if auth.get('LastName') and auth.get('ForeName'): authors_list.append(f"{auth.get('ForeName')} {auth.get('LastName')}") elif auth.get('CollectiveName'): authors_list.append(auth.get('CollectiveName')) doi = None article_ids = pubmed_article.get('PubmedData', {}).get('ArticleIdList', []) for aid in article_ids: if aid.attributes.get('IdType') == 'doi': doi = str(aid) # The content of the tag is the DOI break pmid = pubmed_article.get('MedlineCitation', {}).get('PMID', None) url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" if pmid else "N/A" venue = article.get('Journal', {}).get('Title', "N/A") papers.append({ "title": title, "authors": _normalize_authors(authors_list, "PubMed"), "year": year, "abstract": abstract, "doi": doi, "url": url, "venue": venue, "source_api": "PubMed" }) if len(papers) >= max_results: # Ensure we don't exceed due to structure of efetch break except Exception as e: # print(f"[Tool:_search_pubmed_internal] Error: {e}", file=sys.stderr) # COMMENTED OUT pass return papers def _search_arxiv_internal(query: str, max_results: int = 2) -> list[dict]: papers = [] # print(f"[Tool:_search_arxiv_internal] Querying ArXiv for: '{query}' (max: {max_results})") # COMMENTED OUT try: search = arxiv.Search( query = query, max_results = max_results, sort_by = arxiv.SortCriterion.Relevance ) results = list(arxiv.Client().results(search)) # Convert generator to list for result in results: papers.append({ "title": getattr(result, 'title', "N/A"), "authors": _normalize_authors(getattr(result, 'authors', []), "ArXiv"), "year": getattr(result, 'published').year if getattr(result, 'published', None) else "N/A", "abstract": getattr(result, 'summary', "N/A").replace('\n', ' ')[:500] + "...", # ArXiv abstracts can have newlines "doi": getattr(result, 'doi', None), "url": getattr(result, 'entry_id', "N/A"), # entry_id is the ArXiv URL like http://arxiv.org/abs/xxxx.xxxxx "venue": "ArXiv", # ArXiv is the venue "source_api": "ArXiv" }) except Exception as e: # print(f"[Tool:_search_arxiv_internal] Error: {e}", file=sys.stderr) # COMMENTED OUT pass return papers def multi_source_literature_search(queries: list[str], max_results_per_query_per_source: int = 1, max_total_unique_papers: int = 10) -> list[dict]: # print(f"[Tool:multi_source_literature_search] Received {len(queries)} queries. Max results per source/query: {max_results_per_query_per_source}, Max total unique papers: {max_total_unique_papers}") # COMMENTED OUT unique_papers_found_so_far = [] processed_dois = set() processed_titles_authors = set() for query_idx, query_str in enumerate(queries): if len(unique_papers_found_so_far) >= max_total_unique_papers: # print(f" Max total unique papers ({max_total_unique_papers}) reached. Stopping query processing early.") # COMMENTED OUT break # print(f" Processing query {query_idx+1}/{len(queries)}: '{query_str}'") # COMMENTED OUT current_query_results_from_all_sources = [] # Semantic Scholar if len(unique_papers_found_so_far) < max_total_unique_papers: s2_results = _search_semanticscholar_internal(query_str, max_results_per_query_per_source) current_query_results_from_all_sources.extend(s2_results) # PubMed if len(unique_papers_found_so_far) < max_total_unique_papers: pubmed_results = _search_pubmed_internal(query_str, max_results_per_query_per_source) current_query_results_from_all_sources.extend(pubmed_results) # ArXiv if len(unique_papers_found_so_far) < max_total_unique_papers: arxiv_results = _search_arxiv_internal(query_str, max_results_per_query_per_source) current_query_results_from_all_sources.extend(arxiv_results) # De-duplicate results from current_query_results_from_all_sources and add to unique_papers_found_so_far for paper in current_query_results_from_all_sources: if len(unique_papers_found_so_far) >= max_total_unique_papers: break is_new_paper = False doi = paper.get("doi") if doi and doi != "N/A": normalized_doi = doi.lower().strip() if normalized_doi not in processed_dois: processed_dois.add(normalized_doi) is_new_paper = True else: # Fallback to title + first author title = paper.get("title", "").lower().strip() first_author_list = paper.get("authors", []) first_author = first_author_list[0].lower().strip() if first_author_list and first_author_list[0] != "N/A" else "" title_author_key = f"{title}|{first_author}" if title and first_author and title_author_key not in processed_titles_authors: processed_titles_authors.add(title_author_key) is_new_paper = True elif title and not first_author and title not in processed_titles_authors: processed_titles_authors.add(title) is_new_paper = True if is_new_paper: unique_papers_found_so_far.append(paper) if len(unique_papers_found_so_far) >= max_total_unique_papers: # print(f" Max total unique papers ({max_total_unique_papers}) reached after processing query {query_idx+1}.") # COMMENTED OUT break final_results = unique_papers_found_so_far[:max_total_unique_papers] # print(f"[Tool:multi_source_literature_search] Total unique papers found (capped at {max_total_unique_papers}): {len(final_results)}") # COMMENTED OUT return final_results # --- END: Literature Search Tool Implementation --- # --- START: Text Fetching from URLs Tool Implementation --- def fetch_text_from_urls(paper_info_list: list[dict], max_chars_per_paper: int = 15000) -> list[dict]: # print(f"[Tool:fetch_text_from_urls] Attempting to fetch text for {len(paper_info_list)} papers.") # COMMENTED OUT updated_paper_info_list = [] headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } for paper in paper_info_list: url = paper.get("url") retrieved_text = None source_api = paper.get("source_api", "Unknown") # Get source for potential specific handling if not url or not isinstance(url, str) or not url.startswith("http"): retrieved_text = "Error: Invalid or missing URL." paper["retrieved_text_content"] = retrieved_text updated_paper_info_list.append(paper) # print(f" Skipping paper '{paper.get('title', 'N/A')}' due to invalid URL: {url}") # COMMENTED OUT continue # print(f" Fetching text for: '{paper.get('title', 'N/A')}' from {url[:70]}...") # COMMENTED OUT try: response = requests.get(url, headers=headers, timeout=20, allow_redirects=True) response.raise_for_status() # Raise an exception for HTTP errors soup = BeautifulSoup(response.content, 'html.parser') # Basic text extraction - attempt to find common article body tags or just get all text # This will need refinement for specific site structures (e.g., arXiv, PubMed Central) # For now, a general approach: body_content = soup.find('body') if body_content: # Remove script and style tags for script_or_style in body_content(["script", "style"]): script_or_style.decompose() # Try to get main article content if common tags exist main_article_tags = ['article', 'main', '.main-content', '.article-body', '.abstract'] # Add more specific selectors extracted_elements = [] for tag_selector in main_article_tags: elements = body_content.select(tag_selector) if elements: for el in elements: extracted_elements.append(el.get_text(separator=" ", strip=True)) break # Found a primary content block, assume this is good enough if extracted_elements: retrieved_text = " ".join(extracted_elements) else: retrieved_text = body_content.get_text(separator=" ", strip=True) else: retrieved_text = "Error: Could not find body content in HTML." if retrieved_text and not retrieved_text.startswith("Error:"): retrieved_text = retrieved_text[:max_chars_per_paper] if len(retrieved_text) == max_chars_per_paper: retrieved_text += "... (truncated)" # print(f" Successfully extracted ~{len(retrieved_text)} chars.") # COMMENTED OUT elif not retrieved_text: retrieved_text = "Error: No text could be extracted." except requests.exceptions.RequestException as e: retrieved_text = f"Error fetching URL: {str(e)}" # print(f" Error fetching URL {url}: {e}") # COMMENTED OUT except Exception as e: retrieved_text = f"Error processing HTML: {str(e)}" # print(f" Error processing HTML for {url}: {e}") # COMMENTED OUT paper["retrieved_text_content"] = retrieved_text updated_paper_info_list.append(paper) # Optional: add a small delay between requests if fetching from many URLs # time.sleep(0.25) # print(f"[Tool:fetch_text_from_urls] Finished fetching text for {len(updated_paper_info_list)} papers.") # COMMENTED OUT return updated_paper_info_list # --- END: Text Fetching from URLs Tool Implementation --- # Example of how GenerationAgent would call this tool: # Assume 'list_of_papers_from_search' is the output from multi_source_literature_search # print(json.dumps({'intermediate_data_for_llm': fetch_text_from_urls(paper_info_list=list_of_papers_from_search, max_chars_per_paper=10000)})) def describe_image(file_id: str, api_key: str = None) -> str: """ Process an uploaded image or document file and return a description using OpenAI's Vision model. Args: file_id (str): OpenAI file ID (starts with "file-") api_key (str, optional): OpenAI API key. If not provided, will try to use OPENAI_API_KEY environment variable. Returns: str: A detailed description of the image or document content """ try: # First check if OpenAI module is available try: from openai import OpenAI except ImportError: return "Error: The OpenAI module is not installed. Please install it with 'pip install openai'." if not file_id or not isinstance(file_id, str): return "Error: Invalid file ID format." # Determine the image_url based on file_id format image_url_to_use = {} if file_id.startswith("data:image/"): image_url_to_use = {"url": file_id} elif file_id.startswith("file-"): image_url_to_use = {"url": file_id} # Current behavior, might work for actual images else: # Try to prefix with "file-" if it's a raw ID without it. # This is a common case if GA forgets to add it from user input. if len(file_id) > 10 and not file_id.startswith("file-") and " " not in file_id: # basic check for a raw ID print(f"describe_image: received potentially raw file ID '{file_id}', prefixing with 'file-'.") image_url_to_use = {"url": f"file-{file_id}"} else: return f"Error: Invalid file_id format. Must be an OpenAI file ID (file-...) or a data URL (data:image/...). Received: {file_id}" # Create an OpenAI client with explicit API key if provided if api_key: client = OpenAI(api_key=api_key) else: # Try to use environment variable if not os.environ.get('OPENAI_API_KEY'): return "Error: OpenAI API key not found. Please provide an API key or set the OPENAI_API_KEY environment variable." client = OpenAI() # Prepare the message with the image messages = [ { "role": "user", "content": [ {"type": "text", "text": "Please describe this image in detail. If it contains text, transcribe important parts. If it's a scientific figure, explain what it shows. If it's a chart or graph, describe the data visualization and key insights."}, {"type": "image_url", "image_url": image_url_to_use} ] } ] # Call the Vision API response = client.chat.completions.create( model="gpt-4o", # Model with vision capabilities messages=messages, max_tokens=1000, temperature=0.2 # Lower temperature for more accurate descriptions ) # Capture token usage if hasattr(response, 'usage') and response.usage: usage_info = { 'prompt_tokens': response.usage.prompt_tokens, 'completion_tokens': response.usage.completion_tokens, 'total_tokens': response.usage.total_tokens } # Store usage in global collector if available (set by ExecutorAgent) import builtins if hasattr(builtins, '__agent_usage_collector__'): builtins.__agent_usage_collector__.append(usage_info) # Extract the description from the response description = response.choices[0].message.content return description except Exception as e: # Return a detailed error message for debugging error_message = f"Error processing image: {str(e)}" print(error_message) # Log the error for server-side debugging return error_message if __name__ == '__main__': # Test basic Excel schema discovery # print("Testing Excel Schema Discovery:") # schemas = discover_excel_files_and_schemas(base_scan_directory_name="www") # print(json.dumps(schemas, indent=2)) # print("\n") # Test WWW file manifest # print("Testing WWW File Manifest:") # manifest = list_all_files_in_www_directory(base_directory_name="www") # print(json.dumps(manifest, indent=2)) # print("\n") # --- Test Literature Search --- print("Testing Multi-Source Literature Search Tool:") test_queries_lit = [ "novel targets for CAR-T cell therapy in solid tumors", "role of microbiota in cancer immunotherapy response", "epigenetic regulation of T cell exhaustion" ] # To see output like GenerationAgent expects: # search_results_for_llm = {"intermediate_data_for_llm": multi_source_literature_search(queries=test_queries_lit, max_results_per_query_per_source=1)} # print(json.dumps(search_results_for_llm, indent=2)) # Simpler print for direct tool test: results = multi_source_literature_search(queries=test_queries_lit, max_results_per_query_per_source=1, max_total_unique_papers=2) # Fetch 2 papers for testing text fetch print(f"Found {len(results)} unique papers for text fetching test:") # print(json.dumps(results, indent=2)) if results: print("\nTesting Text Fetching from URLs Tool:") # To see output like GenerationAgent expects for LLM: # fetched_text_data_for_llm = {"intermediate_data_for_llm": fetch_text_from_urls(paper_info_list=results, max_chars_per_paper=5000)} # print(json.dumps(fetched_text_data_for_llm, indent=2)) # Simpler print for direct tool test: results_with_text = fetch_text_from_urls(paper_info_list=results, max_chars_per_paper=5000) print(f"Processed {len(results_with_text)} papers for text content:") for i, paper in enumerate(results_with_text): print(f"--- Paper {i+1} ---") print(f" Title: {paper.get('title')}") print(f" URL: {paper.get('url')}") text_content = paper.get('retrieved_text_content', 'Not found') print(f" Retrieved Text (first 200 chars): {text_content[:200]}...") print("\n") # No __main__ block here, this is a module of tools.