chat / tools /agent_tools.py
WeMWish's picture
Add authentication, token quota tracking, and comprehensive usage logging
8d66edb
import pandas as pd
import os
import json
import glob # For os.walk if needed, or can use glob directly
import mimetypes
from datetime import datetime
from functools import lru_cache
import time
from openai import OpenAI # Add OpenAI import for the describe_image function
# --- NEW IMPORTS FOR LITERATURE SEARCH ---
from semanticscholar import SemanticScholar # For Semantic Scholar API
from Bio import Entrez # For PubMed
import arxiv # For ArXiv API
# --- END NEW IMPORTS ---
# --- NEW IMPORTS FOR TEXT FETCHING FROM URLS ---
import requests
from bs4 import BeautifulSoup
import base64 # For encoding image data
import io
from PIL import Image # For image processing
# --- END NEW IMPORTS FOR TEXT FETCHING ---
# --- Define Project Root and WWW Path relative to this file ---
# This file is in taijichat/tools/
# Project root is one level up from 'tools' (i.e., 'taijichat/' directory)
_TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
_PROJECT_ROOT = os.path.abspath(os.path.join(_TOOLS_DIR, ".."))
BASE_WWW_PATH = os.path.join(_PROJECT_ROOT, "www") # This will be an absolute path
UI_TEXTS_FILE = os.path.join(_TOOLS_DIR, "ui_texts.json") # ui_texts.json is in the same directory
TF_PAGERANK_PATH = os.path.join(BASE_WWW_PATH, "tablePagerank")
WAVE_ANALYSIS_PATH = os.path.join(BASE_WWW_PATH, "waveanalysis")
TF_CORR_PATH = os.path.join(BASE_WWW_PATH, "TFcorintextrm")
TF_COMMUNITIES_PATH = os.path.join(BASE_WWW_PATH, "tfcommunities")
# Cache timeout in seconds
CACHE_TIMEOUT = 300 # 5 minutes
# Cache for storing data with timestamps
_data_cache = {}
_cache_timestamps = {}
def _is_cache_valid(cache_key):
"""Check if cached data is still valid based on timeout."""
if cache_key not in _cache_timestamps:
return False
return (time.time() - _cache_timestamps[cache_key]) < CACHE_TIMEOUT
def _get_cached_data(cache_key):
"""Get data from cache if valid."""
if _is_cache_valid(cache_key):
return _data_cache.get(cache_key)
return None
def _set_cached_data(cache_key, data):
"""Store data in cache with current timestamp."""
_data_cache[cache_key] = data
_cache_timestamps[cache_key] = time.time()
# --- Tool Implementations ---
@lru_cache(maxsize=32)
def get_raw_excel_data(file_path: str) -> list:
"""
Reads a specified Excel file and returns its raw content as a list of lists.
Uses caching to improve performance.
"""
try:
# Check cache first
cache_key = f"raw_excel_{file_path}"
cached_data = _get_cached_data(cache_key)
if cached_data is not None:
return cached_data
# If not in cache, read file
df = pd.read_excel(file_path, header=None)
df = df.fillna('')
result = df.values.tolist()
# Store in cache
_set_cached_data(cache_key, result)
return result
except FileNotFoundError:
print(f"Error: File not found at {file_path}")
return []
except Exception as e:
print(f"Error reading Excel file {file_path}: {e}")
return []
def get_processed_tf_data(dataset_identifier: str) -> list:
"""
Reads and processes a TF-related Excel file identified by its dataset_identifier.
Uses caching and timeout handling.
"""
try:
# Check cache first
cache_key = f"processed_tf_{dataset_identifier}"
cached_data = _get_cached_data(cache_key)
if cached_data is not None:
return cached_data
file_path = get_tf_catalog_dataset_path(dataset_identifier)
if not file_path:
print(f"Error: Could not resolve dataset_identifier '{dataset_identifier}' to a file path.")
return []
if not os.path.exists(file_path):
print(f"Error: File not found at resolved path: {file_path}")
return []
df = pd.read_excel(file_path)
df_transposed = df.transpose()
new_headers = df_transposed.iloc[0].tolist()
df_processed = df_transposed[1:]
df_processed.columns = new_headers
df_processed = df_processed.fillna('')
result = [new_headers] + df_processed.values.tolist()
# Store in cache
_set_cached_data(cache_key, result)
return result
except Exception as e:
print(f"Error processing TF data for {dataset_identifier}: {e}")
return []
def filter_data_by_column_keywords(dataset: list, keywords: str) -> list:
"""
Filters a dataset (list of lists, first list is headers) by keywords in column names.
The 'dataset' input MUST be the actual data (e.g. output from get_processed_tf_data),
not a dataset name or file path.
"""
if not dataset or not isinstance(dataset, list) or len(dataset) < 1 or not isinstance(dataset[0], list):
print("Error: Invalid dataset format for filtering. Must be a list of lists with headers.")
return []
headers = dataset[0]
# Handle empty dataset (only headers)
if len(dataset) == 1:
data_rows = []
else:
data_rows = dataset[1:]
df = pd.DataFrame(data_rows, columns=headers)
if not keywords or not keywords.strip():
return dataset
keyword_list = [k.strip().lower() for k in keywords.split(',')]
matching_columns = []
if df.empty and not data_rows: # Only headers were passed, and they are the only columns
for header_col in headers: # Check against original headers directly
for keyword in keyword_list:
if keyword in str(header_col).lower():
matching_columns.append(header_col)
break
else: # DataFrame has columns (either from data_rows or just headers if data_rows was empty but df was formed)
for header_col in df.columns:
for keyword in keyword_list:
if keyword in str(header_col).lower():
matching_columns.append(header_col)
break
if not matching_columns:
return [headers]
unique_matching_headers = sorted(list(set(matching_columns)), key=lambda x: headers.index(x))
if df.empty: # If the original df was empty (only headers passed, or data was empty)
# We've already found the matching headers, just return them
return [unique_matching_headers] + [[] for _ in data_rows] # Preserve number of empty data rows if any
filtered_df = df[unique_matching_headers]
return [unique_matching_headers] + filtered_df.values.tolist()
def get_tf_wave_search_data(tf_search_term: str = None) -> dict:
"""
Reads searchtfwaves.xlsx and filters by TF if a search term is provided.
Uses caching for better performance.
"""
try:
# Check cache first
cache_key = f"wave_search_{tf_search_term}"
cached_data = _get_cached_data(cache_key)
if cached_data is not None:
return cached_data
file_path = os.path.join(WAVE_ANALYSIS_PATH, "searchtfwaves.xlsx")
if not os.path.exists(file_path):
print(f"Error: TF wave search file not found at {file_path}")
return {}
df = pd.read_excel(file_path)
expected_cols = [f"Wave{i}" for i in range(1, 8)]
if not all(col in df.columns for col in expected_cols):
if len(df.columns) >= 7:
rename_map = {df.columns[i]: expected_cols[i] for i in range(7)}
df.rename(columns=rename_map, inplace=True)
df = df[expected_cols]
else:
print(f"Warning: {file_path} does not have at least 7 columns to map to Wave1-7.")
df = df.fillna('')
if not tf_search_term or not tf_search_term.strip():
result = {col: df[col].tolist() for col in df.columns if col in df}
else:
result = {}
search_term_lower = tf_search_term.strip().lower()
for col in df.columns:
if col not in df: continue
matching_genes = [gene for gene in df[col] if search_term_lower in str(gene).lower()]
if matching_genes:
result[col] = matching_genes
# Store in cache
_set_cached_data(cache_key, result)
return result
except Exception as e:
print(f"Error reading or processing wave search data: {e}")
return {}
def get_tf_correlation_data(tf_name: str = None) -> list:
"""
Reads TF-TFcorTRMTEX.xlsx and filters by tf_name if provided.
Uses caching for better performance.
"""
try:
# Check cache first
cache_key = f"correlation_{tf_name}"
cached_data = _get_cached_data(cache_key)
if cached_data is not None:
return cached_data
file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx")
if not os.path.exists(file_path):
print(f"Error: TF correlation data file not found at {file_path}")
return []
df = pd.read_excel(file_path)
df = df.fillna('')
headers = df.columns.tolist()
if not tf_name or not tf_name.strip():
result = [headers] + df.values.tolist()
else:
tf_name_col = "TF Name"
if tf_name_col not in df.columns:
if len(df.columns) > 0:
tf_name_col = df.columns[0]
else:
return [headers]
filtered_df = df[df[tf_name_col].astype(str).str.lower() == tf_name.strip().lower()]
result = [headers] + filtered_df.values.tolist() if not filtered_df.empty else [headers]
# Store in cache
_set_cached_data(cache_key, result)
return result
except Exception as e:
print(f"Error processing correlation data: {e}")
return []
def get_tf_correlation_image_path(tf_name: str) -> str:
"""
Gets the image path for a TF from the correlation data.
"""
file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx")
if not os.path.exists(file_path):
print(f"Error: TF correlation data file (for image path) not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
return ""
image_column_name = "TF Merged Graph Path"
tf_identifier_column = "TF Name"
try:
df = pd.read_excel(file_path)
if tf_identifier_column not in df.columns:
if len(df.columns) > 0:
tf_identifier_column = df.columns[0]
else:
return ""
if image_column_name not in df.columns:
print(f"Error: Image path column '{image_column_name}' not found in {file_path} (Resolved: {os.path.abspath(file_path)})")
return ""
row = df[df[tf_identifier_column].astype(str).str.lower() == tf_name.strip().lower()]
if not row.empty:
image_path_val = row.iloc[0][image_column_name]
if pd.notna(image_path_val) and isinstance(image_path_val, str):
if not image_path_val.startswith("www/"):
return "www/" + image_path_val.lstrip('/')
return str(image_path_val)
else:
return ""
else:
return ""
except FileNotFoundError:
print(f"Error: File not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
return ""
except Exception as e:
print(f"Error processing {file_path} for TF {tf_name}: {e} (Resolved: {os.path.abspath(file_path)})")
return ""
def list_all_tfs_in_correlation_data() -> list:
"""
Lists all unique TFs from the TF-TF correlation data file.
"""
file_path = os.path.join(TF_CORR_PATH, "TF-TFcorTRMTEX.xlsx")
if not os.path.exists(file_path):
print(f"Error: TF correlation data file (for listing TFs) not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
return []
tf_identifier_column = "TF Name"
try:
df = pd.read_excel(file_path)
if tf_identifier_column not in df.columns:
if not df.empty and len(df.columns) > 0:
tf_identifier_column = df.columns[0]
else:
return []
tf_list = df[tf_identifier_column].astype(str).str.strip().unique().tolist()
return [tf for tf in tf_list if tf and tf.lower() != 'nan']
except FileNotFoundError:
print(f"Error: File not found at {file_path} (Resolved: {os.path.abspath(file_path)})")
return []
except Exception as e:
print(f"Error processing {file_path}: {e} (Resolved: {os.path.abspath(file_path)})")
return []
def get_tf_community_sheet_data(community_type: str) -> list:
"""
Reads data from a specific TF community Excel file (trm or texterm).
Uses caching for better performance.
"""
try:
# Check cache first
cache_key = f"community_{community_type}"
cached_data = _get_cached_data(cache_key)
if cached_data is not None:
return cached_data
if community_type.lower() == "trm":
file_name = "trmcommunities.xlsx"
elif community_type.lower() == "texterm":
file_name = "texcommunities.xlsx"
else:
print(f"Error: Invalid community_type '{community_type}'. Must be 'trm' or 'texterm'.")
return []
file_path = os.path.join(TF_COMMUNITIES_PATH, file_name)
if not os.path.exists(file_path):
print(f"Error: TF community file not found at {file_path}")
return []
result = get_raw_excel_data(file_path)
# Store in cache
_set_cached_data(cache_key, result)
return result
except Exception as e:
print(f"Error processing community data: {e}")
return []
# --- Mappings for static info tools ---
# STATIC_IMAGE_PATHS stores paths relative to the project root, typically starting with "www/"
# These are for constructing URLs or for components that expect paths relative to the web server root ('www').
# When accessed from tools/agent_tools.py, os.path.join(BASE_WWW_PATH, ...) correctly points to the file system location.
# The get_static_image_path tool should return the "web path" e.g. "www/images/logo.png"
_STATIC_IMAGE_WEB_PATHS = {
"home_page_diagram": "www/homedesc.png",
"ucsd_logo": "www/ucsdlogo.png",
"salk_logo": "www/salklogo.png",
"unc_logo": "www/unclogo.jpg",
"modal_cs_description_img": "www/csdescrip.jpeg",
"tfcat_overview_img": "www/tfcat/onlycellstates.png",
"tfcat_multistates_heatmap": "www/tfcat/multistatesheatmap.png",
"naive_bubble_plot": "www/bubbleplots/naivebubble.jpg",
"te_bubble_plot": "www/bubbleplots/tebubble.jpg",
"mp_bubble_plot": "www/bubbleplots/mpbubble.jpg",
"tcm_bubble_plot": "www/bubbleplots/tcmbubble.jpg",
"tem_bubble_plot": "www/bubbleplots/tembubble.jpg",
"trm_bubble_plot": "www/bubbleplots/trmbubble.jpg",
"texprog_bubble_plot": "www/bubbleplots/texprogbubble.jpg",
"texefflike_bubble_plot": "www/bubbleplots/texintbubble.jpg",
"texterm_bubble_plot": "www/bubbleplots/textermbubble.jpg",
"wave_analysis_overview_diagram": "www/waveanalysis/tfwaveanal.png",
"wave1_main_img": "www/waveanalysis/c1.jpg",
"wave1_gokegg_img": "www/waveanalysis/c1_selected_GO_KEGG.jpg",
"wave1_ranked_text1_img": "www/waveanalysis/txtJPG/c1_ranked_1.jpg",
"wave1_ranked_text2_img": "www/waveanalysis/txtJPG/c1_ranked_2.jpg",
"wave2_main_img": "www/waveanalysis/c2.jpg",
"wave2_gokegg_img": "www/waveanalysis/c2_selected_GO_KEGG_v2.jpg",
"wave2_ranked_text_img": "www/waveanalysis/txtJPG/c2_ranked.jpg",
# ... (add all other wave images similarly, ensuring paths start with "www/") ...
"wave3_main_img": "www/waveanalysis/c3.jpg",
"wave3_gokegg_img": "www/waveanalysis/c3_selected_GO_KEGG.jpg",
"wave3_ranked_text_img": "www/waveanalysis/txtJPG/c3_ranked.jpg",
"wave4_main_img": "www/waveanalysis/c4.jpg",
"wave4_gokegg_img": "www/waveanalysis/c4_selected_GO_KEGG.jpg",
"wave4_ranked_text_img": "www/waveanalysis/txtJPG/c4_ranked.jpg",
"wave5_main_img": "www/waveanalysis/c5.jpg",
"wave5_gokegg_img": "www/waveanalysis/c5_selected_GO_KEGG.jpg",
"wave5_ranked_text_img": "www/waveanalysis/txtJPG/c5_ranked.jpg",
"wave6_main_img": "www/waveanalysis/c6.jpg",
"wave6_gokegg_img": "www/waveanalysis/c6_selected_GO_KEGG.jpg",
"wave6_ranked_text_img": "www/waveanalysis/txtJPG/c6_ranked.jpg",
"wave7_main_img": "www/waveanalysis/c7.jpg",
"wave7_gokegg_img": "www/waveanalysis/c7_selected_GO_KEGG.jpg",
"wave7_ranked_text_img": "www/waveanalysis/txtJPG/c7_ranked.jpg",
"network_correlation_desc_img": "www/networkanalysis/tfcorrdesc.png",
"network_community_overview_img": "www/networkanalysis/community.jpg",
"network_trmtex_community_comparison_img": "www/networkanalysis/trmtexcom.png",
"network_community_pathway_img": "www/networkanalysis/tfcompathway.png",
}
# Global variable to cache loaded UI texts
_loaded_ui_texts = None
def _load_ui_texts():
"""Helper function to load UI texts from JSON file (now in tools/ directory)."""
global _loaded_ui_texts
if _loaded_ui_texts is None:
try:
# UI_TEXTS_FILE is already defined as "./ui_texts.json" or "ui_texts.json"
# so it's relative to this agent_tools.py file in the tools/ directory.
with open(UI_TEXTS_FILE, 'r', encoding='utf-8') as f:
_loaded_ui_texts = json.load(f)
except FileNotFoundError:
print(f"Error: UI texts file not found at {os.path.abspath(UI_TEXTS_FILE)}")
_loaded_ui_texts = {}
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {UI_TEXTS_FILE}")
_loaded_ui_texts = {}
except Exception as e:
print(f"An unexpected error occurred while loading {UI_TEXTS_FILE}: {e}")
_loaded_ui_texts = {}
return _loaded_ui_texts
INTERNAL_NAVIGATION_TARGETS = {
"to_tfcat": "Navigates to the 'TF Catalog > Search TF Scores' tab.",
"to_tfwave": "Navigates to the 'TF Wave Analysis > Overview' tab.",
"to_tfnet": "Navigates to the 'TF Network Analysis > Search TF-TF correlation in TRM/TEXterm' tab.",
"c1_link": "Navigates to the 'TF Wave Analysis > Wave 1' tab.",
"c2_link": "Navigates to the 'TF Wave Analysis > Wave 2' tab.",
"c3_link": "Navigates to the 'TF Wave Analysis > Wave 3' tab.",
"c4_link": "Navigates to the 'TF Wave Analysis > Wave 4' tab.",
"c5_link": "Navigates to the 'TF Wave Analysis > Wave 5' tab.",
"c6_link": "Navigates to the 'TF Wave Analysis > Wave 6' tab.",
"c7_link": "Navigates to the 'TF Wave Analysis > Wave 7' tab.",
}
def get_static_image_path(image_identifier: str) -> str:
"""
Returns the predefined relative web path (e.g., "www/images/logo.png") for a known static image asset.
"""
return _STATIC_IMAGE_WEB_PATHS.get(image_identifier.lower(), "")
def get_ui_descriptive_text(text_identifier: str) -> str:
"""
Retrieves predefined descriptive text or methodology explanations from ui_texts.json.
"""
texts = _load_ui_texts()
processed_text_identifier = text_identifier.lower()
if processed_text_identifier.startswith("wave_") and processed_text_identifier.endswith("_analysis_placeholder_details"):
try:
generic_wave_key = "wave_x_analysis_placeholder_details"
if generic_wave_key in texts:
wave_num_str = processed_text_identifier.split("_")[1]
return texts[generic_wave_key].replace("{X}", wave_num_str)
else:
return texts.get(processed_text_identifier, "")
except Exception as e:
print(f"Error processing placeholder for {text_identifier}: {e}")
return texts.get(processed_text_identifier, "")
return texts.get(processed_text_identifier, "")
def list_available_tf_catalog_datasets() -> list:
"""
Returns a list of identifiers for available TF catalog datasets.
These identifiers are used with get_processed_tf_data.
"""
return [
"Overall_TF_PageRank",
"Naive", "TE", "MP", "TCM", "TEM", "TRM",
"TEXprog", "TEXeff", "TEXterm"
]
def get_tf_catalog_dataset_path(dataset_identifier: str) -> str:
"""
Helper to get the actual file system path for a TF catalog dataset identifier.
Paths are constructed relative to BASE_WWW_PATH (which is ../www from this file's location).
This function is mostly for internal use by get_processed_tf_data.
"""
mapping = {
"overall_tf_pagerank": os.path.join(TF_PAGERANK_PATH, "Table_TF PageRank Scores for Audrey.xlsx"),
"naive": os.path.join(TF_PAGERANK_PATH, "Naive.xlsx"),
"te": os.path.join(TF_PAGERANK_PATH, "TE.xlsx"),
"mp": os.path.join(TF_PAGERANK_PATH, "MP.xlsx"),
"tcm": os.path.join(TF_PAGERANK_PATH, "TCM.xlsx"),
"tem": os.path.join(TF_PAGERANK_PATH, "TEM.xlsx"),
"trm": os.path.join(TF_PAGERANK_PATH, "TRM.xlsx"),
"texprog": os.path.join(TF_PAGERANK_PATH, "TEXprog.xlsx"),
"texeff": os.path.join(TF_PAGERANK_PATH, "TEXeff.xlsx"),
"texterm": os.path.join(TF_PAGERANK_PATH, "TEXterm.xlsx"),
}
return mapping.get(dataset_identifier.lower(), "")
def list_available_cell_state_bubble_plots() -> list:
"""
Returns a list of identifiers for available cell-state specific bubble plot images.
These identifiers can be used with get_static_image_path.
"""
return [key for key in _STATIC_IMAGE_WEB_PATHS if "bubble_plot" in key]
def list_available_wave_analysis_assets(wave_number: int) -> dict:
"""
Returns a structured list of available asset identifiers for a TF wave.
Identifiers can be used with get_static_image_path.
"""
if not 1 <= wave_number <= 7:
return {}
assets = {
"main_image_id": f"wave{wave_number}_main_img",
"gokegg_image_id": f"wave{wave_number}_gokegg_img"
}
# Adjust for wave2 gokegg specific key if STATIC_IMAGE_WEB_PATHS uses the v2 name in its key
if wave_number == 2 and "wave2_gokegg_img_v2" in _STATIC_IMAGE_WEB_PATHS: # Example if key was specific
assets["gokegg_image_id"] = "wave2_gokegg_img_v2"
elif wave_number == 2 and _STATIC_IMAGE_WEB_PATHS.get(f"wave{wave_number}_gokegg_img","").endswith("_v2.jpg"):
pass # The key wave2_gokegg_img already points to the v2 file.
if wave_number == 1:
assets["ranked_text_image_ids"] = [
f"wave{wave_number}_ranked_text1_img",
f"wave{wave_number}_ranked_text2_img"
]
else: # Waves 2-7 have one ranked text image
assets["ranked_text_image_ids"] = [f"wave{wave_number}_ranked_text_img"]
# Verify these identifiers exist in _STATIC_IMAGE_WEB_PATHS
if assets["main_image_id"] not in _STATIC_IMAGE_WEB_PATHS: assets["main_image_id"] = None
if assets["gokegg_image_id"] not in _STATIC_IMAGE_WEB_PATHS: assets["gokegg_image_id"] = None
valid_ranked_ids = []
for r_id in assets.get("ranked_text_image_ids", []):
if r_id in _STATIC_IMAGE_WEB_PATHS:
valid_ranked_ids.append(r_id)
assets["ranked_text_image_ids"] = valid_ranked_ids
if not assets["ranked_text_image_ids"]: del assets["ranked_text_image_ids"]
return {k:v for k,v in assets.items() if v is not None}
def get_internal_navigation_info(link_id: str) -> str:
"""
Provides information about where an internal UI link is intended to navigate.
"""
return INTERNAL_NAVIGATION_TARGETS.get(link_id.lower(), "Navigation target not defined for this link ID.")
def get_biorxiv_paper_url() -> str:
"""
Returns the URL for the main bioRxiv paper.
"""
return "https://doi.org/10.1101/2023.01.03.522354"
# --- New Tool for Schema Discovery ---
def discover_excel_files_and_schemas(base_scan_directory_name: str = "www") -> dict:
"""
Discovers Excel files (.xlsx) within a specified base directory (relative to project root),
extracts their column headers, and returns a schema dictionary.
Example base_scan_directory_name: "www"
"""
discovered_schema = {}
# _PROJECT_ROOT should be defined earlier in the file as the absolute path to the project root.
# Example: _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
scan_root_abs = os.path.join(_PROJECT_ROOT, base_scan_directory_name)
if not os.path.isdir(scan_root_abs):
print(f"Error: Base directory for schema discovery not found: {scan_root_abs}")
return {}
for dirpath, _, filenames in os.walk(scan_root_abs):
for filename in filenames:
if filename.endswith(".xlsx") and not filename.startswith("~"): # Ignore temp Excel files
file_abs_path = os.path.join(dirpath, filename)
# Make file_path relative to project root for the schema key and value
file_rel_path = os.path.relpath(file_abs_path, _PROJECT_ROOT).replace("\\\\", "/")
table_identifier = os.path.splitext(filename)[0].replace("-", "_").replace(" ", "_")
try:
# Read only the header row to get column names (efficient)
# Assuming data is on the first sheet by default.
xls = pd.ExcelFile(file_abs_path)
if not xls.sheet_names: # Check if there are any sheets
print(f"[Schema Discovery] Warning: No sheets found in {file_abs_path}")
columns = []
else:
# Use the first sheet name
first_sheet_name = xls.sheet_names[0]
df_header = pd.read_excel(xls, sheet_name=first_sheet_name, nrows=0) # Read 0 rows to just get headers
columns = [str(col) for col in df_header.columns.tolist()] # Ensure columns are strings
discovered_schema[file_rel_path] = {
"file_path": file_rel_path,
"table_identifier": table_identifier,
"columns": columns,
"sheets": xls.sheet_names if xls.sheet_names else [], # Store all sheet names
"last_modified": datetime.now().isoformat(),
"file_size_bytes": os.path.getsize(file_abs_path),
"error": None
}
except Exception as e:
print(f"[Schema Discovery] Error reading or processing headers for {file_abs_path}: {e}")
discovered_schema[file_rel_path] = {
"file_path": file_rel_path,
"table_identifier": table_identifier,
"columns": [],
"sheets": [],
"error": str(e)
}
if not discovered_schema:
print(f"[Schema Discovery] No Excel files found in {scan_root_abs}")
return discovered_schema
# --- New Tool for Listing All Files in WWW ---
def list_all_files_in_www_directory() -> list:
"""
Scans the entire BASE_WWW_PATH directory (and its subdirectories) and returns a list
of dictionaries, each representing a file with its relative path from the project root,
detected MIME type (best guess), and size in bytes.
Excludes common hidden/system files like .DS_Store.
"""
file_manifest = []
# _PROJECT_ROOT and BASE_WWW_PATH should be defined at the top of the file
# BASE_WWW_PATH is already an absolute path to the www directory
if not os.path.isdir(BASE_WWW_PATH):
print(f"Error: WWW directory for file listing not found: {BASE_WWW_PATH}")
return []
# Common hidden/system files/folders to ignore
ignore_list = [".DS_Store", "Thumbs.db"]
ignore_prefixes = ["._"] # For macOS resource fork files
for dirpath, dirnames, filenames in os.walk(BASE_WWW_PATH):
# Optionally filter dirnames to prevent descending into certain hidden folders if needed
# For now, walking all non-hidden listed above.
for filename in filenames:
if filename in ignore_list or any(filename.startswith(p) for p in ignore_prefixes):
continue
file_abs_path = os.path.join(dirpath, filename)
# Make file_path relative to project root for the manifest
file_rel_path_from_project_root = os.path.relpath(file_abs_path, _PROJECT_ROOT).replace("\\", "/")
try:
file_size = os.path.getsize(file_abs_path)
# Guess MIME type
# Ensure mimetypes is imported at the top of the file
if not mimetypes.inited:
mimetypes.init() # Initialize mimetypes if not already done
mime_type, _ = mimetypes.guess_type(file_abs_path)
if mime_type is None:
# Basic fallback based on extension for common types if mimetypes fails
ext = os.path.splitext(filename)[1].lower()
if ext == ".txt" or ext == ".md":
mime_type = "text/plain"
elif ext == ".csv":
mime_type = "text/csv"
elif ext == ".json":
mime_type = "application/json"
else:
mime_type = "application/octet-stream" # Generic binary
file_manifest.append({
"path": file_rel_path_from_project_root,
"type": mime_type,
"size": file_size,
"last_modified": datetime.now().isoformat(),
"error": None
})
except FileNotFoundError: # Should not happen if os.walk found it, but as a safeguard
print(f"[File Manifest] Warning: File {file_abs_path} found by os.walk but then not accessible for size/type.")
continue
except Exception as e:
print(f"[File Manifest] Error processing file {file_abs_path}: {e}")
# Optionally add an error entry to the manifest for this file
file_manifest.append({
"path": file_rel_path_from_project_root,
"type": "unknown/error",
"size": 0,
"last_modified": datetime.now().isoformat(),
"error": str(e)
})
return file_manifest
# --- START: Literature Search Tool Implementation ---
def _normalize_authors(authors_data, source="Unknown"):
"""Helper to normalize author lists from different APIs."""
if not authors_data:
return ["N/A"]
if source == "SemanticScholar": # List of dicts with 'name' key
return [author.get('name', "N/A") for author in authors_data]
if source == "PubMed": # List of strings
return authors_data
if source == "ArXiv": # List of arxiv.Result.Author objects
return [author.name for author in authors_data]
return [str(a) for a in authors_data] # Generic fallback
def _search_semanticscholar_internal(query: str, max_results: int = 2) -> list[dict]:
papers = []
# print(f"[Tool:_search_semanticscholar_internal] Querying Semantic Scholar for: '{query}' (max: {max_results})") # COMMENTED OUT
try:
s2 = SemanticScholar(timeout=15)
# Corrected: 'doi' is not a direct field for search_paper, 'externalIds' should be used.
results = s2.search_paper(query, limit=max_results, fields=['title', 'authors', 'year', 'abstract', 'url', 'venue', 'externalIds'])
if results and results.items:
for item in results.items:
doi_val = item.externalIds.get('DOI') if item.externalIds else None
papers.append({
"title": getattr(item, 'title', "N/A"),
"authors": _normalize_authors(getattr(item, 'authors', []), "SemanticScholar"),
"year": getattr(item, 'year', "N/A"),
"abstract": getattr(item, 'abstract', "N/A")[:500] + "..." if getattr(item, 'abstract', None) else "N/A",
"doi": doi_val, # Use the extracted DOI
"url": getattr(item, 'url', "N/A"),
"venue": getattr(item, 'venue', "N/A"),
"source_api": "Semantic Scholar"
})
except Exception as e:
# This print goes to stderr if run directly, but might still be captured by a simple exec context.
# For agent integration, actual errors should be raised or returned structured.
# For now, we'll assume ManagerAgent's error handling for the overall tool call is preferred.
# Let's comment this out for now to ensure no stdout interference.
# print(f"[Tool:_search_semanticscholar_internal] Error: {e}", file=sys.stderr)
pass # Allow the function to return an empty list on error.
return papers
def _search_pubmed_internal(query: str, max_results: int = 2) -> list[dict]:
papers = []
# print(f"[Tool:_search_pubmed_internal] Querying PubMed for: '{query}' (max: {max_results})") # COMMENTED OUT
try:
handle = Entrez.esearch(db="pubmed", term=query, retmax=str(max_results), sort="relevance")
record = Entrez.read(handle)
handle.close()
ids = record["IdList"]
if not ids:
return papers
handle = Entrez.efetch(db="pubmed", id=ids, rettype="medline", retmode="xml")
records = Entrez.read(handle) # This is MedlineParser.parse, returns a generator usually
handle.close()
for pubmed_article in records.get('PubmedArticle', []): # records is a list of dicts if multiple ids
article = pubmed_article.get('MedlineCitation', {}).get('Article', {})
title = article.get('ArticleTitle', "N/A")
abstract_text_list = article.get('Abstract', {}).get('AbstractText', [])
abstract = " ".join(abstract_text_list)[:500] + "..." if abstract_text_list else "N/A"
year = article.get('Journal', {}).get('JournalIssue', {}).get('PubDate', {}).get('Year', "N/A")
authors_list = []
author_info_list = article.get('AuthorList', [])
for auth in author_info_list:
if auth.get('LastName') and auth.get('ForeName'):
authors_list.append(f"{auth.get('ForeName')} {auth.get('LastName')}")
elif auth.get('CollectiveName'):
authors_list.append(auth.get('CollectiveName'))
doi = None
article_ids = pubmed_article.get('PubmedData', {}).get('ArticleIdList', [])
for aid in article_ids:
if aid.attributes.get('IdType') == 'doi':
doi = str(aid) # The content of the tag is the DOI
break
pmid = pubmed_article.get('MedlineCitation', {}).get('PMID', None)
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" if pmid else "N/A"
venue = article.get('Journal', {}).get('Title', "N/A")
papers.append({
"title": title,
"authors": _normalize_authors(authors_list, "PubMed"),
"year": year,
"abstract": abstract,
"doi": doi,
"url": url,
"venue": venue,
"source_api": "PubMed"
})
if len(papers) >= max_results: # Ensure we don't exceed due to structure of efetch
break
except Exception as e:
# print(f"[Tool:_search_pubmed_internal] Error: {e}", file=sys.stderr) # COMMENTED OUT
pass
return papers
def _search_arxiv_internal(query: str, max_results: int = 2) -> list[dict]:
papers = []
# print(f"[Tool:_search_arxiv_internal] Querying ArXiv for: '{query}' (max: {max_results})") # COMMENTED OUT
try:
search = arxiv.Search(
query = query,
max_results = max_results,
sort_by = arxiv.SortCriterion.Relevance
)
results = list(arxiv.Client().results(search)) # Convert generator to list
for result in results:
papers.append({
"title": getattr(result, 'title', "N/A"),
"authors": _normalize_authors(getattr(result, 'authors', []), "ArXiv"),
"year": getattr(result, 'published').year if getattr(result, 'published', None) else "N/A",
"abstract": getattr(result, 'summary', "N/A").replace('\n', ' ')[:500] + "...", # ArXiv abstracts can have newlines
"doi": getattr(result, 'doi', None),
"url": getattr(result, 'entry_id', "N/A"), # entry_id is the ArXiv URL like http://arxiv.org/abs/xxxx.xxxxx
"venue": "ArXiv", # ArXiv is the venue
"source_api": "ArXiv"
})
except Exception as e:
# print(f"[Tool:_search_arxiv_internal] Error: {e}", file=sys.stderr) # COMMENTED OUT
pass
return papers
def multi_source_literature_search(queries: list[str], max_results_per_query_per_source: int = 1, max_total_unique_papers: int = 10) -> list[dict]:
# print(f"[Tool:multi_source_literature_search] Received {len(queries)} queries. Max results per source/query: {max_results_per_query_per_source}, Max total unique papers: {max_total_unique_papers}") # COMMENTED OUT
unique_papers_found_so_far = []
processed_dois = set()
processed_titles_authors = set()
for query_idx, query_str in enumerate(queries):
if len(unique_papers_found_so_far) >= max_total_unique_papers:
# print(f" Max total unique papers ({max_total_unique_papers}) reached. Stopping query processing early.") # COMMENTED OUT
break
# print(f" Processing query {query_idx+1}/{len(queries)}: '{query_str}'") # COMMENTED OUT
current_query_results_from_all_sources = []
# Semantic Scholar
if len(unique_papers_found_so_far) < max_total_unique_papers:
s2_results = _search_semanticscholar_internal(query_str, max_results_per_query_per_source)
current_query_results_from_all_sources.extend(s2_results)
# PubMed
if len(unique_papers_found_so_far) < max_total_unique_papers:
pubmed_results = _search_pubmed_internal(query_str, max_results_per_query_per_source)
current_query_results_from_all_sources.extend(pubmed_results)
# ArXiv
if len(unique_papers_found_so_far) < max_total_unique_papers:
arxiv_results = _search_arxiv_internal(query_str, max_results_per_query_per_source)
current_query_results_from_all_sources.extend(arxiv_results)
# De-duplicate results from current_query_results_from_all_sources and add to unique_papers_found_so_far
for paper in current_query_results_from_all_sources:
if len(unique_papers_found_so_far) >= max_total_unique_papers:
break
is_new_paper = False
doi = paper.get("doi")
if doi and doi != "N/A":
normalized_doi = doi.lower().strip()
if normalized_doi not in processed_dois:
processed_dois.add(normalized_doi)
is_new_paper = True
else: # Fallback to title + first author
title = paper.get("title", "").lower().strip()
first_author_list = paper.get("authors", [])
first_author = first_author_list[0].lower().strip() if first_author_list and first_author_list[0] != "N/A" else ""
title_author_key = f"{title}|{first_author}"
if title and first_author and title_author_key not in processed_titles_authors:
processed_titles_authors.add(title_author_key)
is_new_paper = True
elif title and not first_author and title not in processed_titles_authors:
processed_titles_authors.add(title)
is_new_paper = True
if is_new_paper:
unique_papers_found_so_far.append(paper)
if len(unique_papers_found_so_far) >= max_total_unique_papers:
# print(f" Max total unique papers ({max_total_unique_papers}) reached after processing query {query_idx+1}.") # COMMENTED OUT
break
final_results = unique_papers_found_so_far[:max_total_unique_papers]
# print(f"[Tool:multi_source_literature_search] Total unique papers found (capped at {max_total_unique_papers}): {len(final_results)}") # COMMENTED OUT
return final_results
# --- END: Literature Search Tool Implementation ---
# --- START: Text Fetching from URLs Tool Implementation ---
def fetch_text_from_urls(paper_info_list: list[dict], max_chars_per_paper: int = 15000) -> list[dict]:
# print(f"[Tool:fetch_text_from_urls] Attempting to fetch text for {len(paper_info_list)} papers.") # COMMENTED OUT
updated_paper_info_list = []
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
for paper in paper_info_list:
url = paper.get("url")
retrieved_text = None
source_api = paper.get("source_api", "Unknown") # Get source for potential specific handling
if not url or not isinstance(url, str) or not url.startswith("http"):
retrieved_text = "Error: Invalid or missing URL."
paper["retrieved_text_content"] = retrieved_text
updated_paper_info_list.append(paper)
# print(f" Skipping paper '{paper.get('title', 'N/A')}' due to invalid URL: {url}") # COMMENTED OUT
continue
# print(f" Fetching text for: '{paper.get('title', 'N/A')}' from {url[:70]}...") # COMMENTED OUT
try:
response = requests.get(url, headers=headers, timeout=20, allow_redirects=True)
response.raise_for_status() # Raise an exception for HTTP errors
soup = BeautifulSoup(response.content, 'html.parser')
# Basic text extraction - attempt to find common article body tags or just get all text
# This will need refinement for specific site structures (e.g., arXiv, PubMed Central)
# For now, a general approach:
body_content = soup.find('body')
if body_content:
# Remove script and style tags
for script_or_style in body_content(["script", "style"]):
script_or_style.decompose()
# Try to get main article content if common tags exist
main_article_tags = ['article', 'main', '.main-content', '.article-body', '.abstract'] # Add more specific selectors
extracted_elements = []
for tag_selector in main_article_tags:
elements = body_content.select(tag_selector)
if elements:
for el in elements:
extracted_elements.append(el.get_text(separator=" ", strip=True))
break # Found a primary content block, assume this is good enough
if extracted_elements:
retrieved_text = " ".join(extracted_elements)
else:
retrieved_text = body_content.get_text(separator=" ", strip=True)
else:
retrieved_text = "Error: Could not find body content in HTML."
if retrieved_text and not retrieved_text.startswith("Error:"):
retrieved_text = retrieved_text[:max_chars_per_paper]
if len(retrieved_text) == max_chars_per_paper:
retrieved_text += "... (truncated)"
# print(f" Successfully extracted ~{len(retrieved_text)} chars.") # COMMENTED OUT
elif not retrieved_text:
retrieved_text = "Error: No text could be extracted."
except requests.exceptions.RequestException as e:
retrieved_text = f"Error fetching URL: {str(e)}"
# print(f" Error fetching URL {url}: {e}") # COMMENTED OUT
except Exception as e:
retrieved_text = f"Error processing HTML: {str(e)}"
# print(f" Error processing HTML for {url}: {e}") # COMMENTED OUT
paper["retrieved_text_content"] = retrieved_text
updated_paper_info_list.append(paper)
# Optional: add a small delay between requests if fetching from many URLs
# time.sleep(0.25)
# print(f"[Tool:fetch_text_from_urls] Finished fetching text for {len(updated_paper_info_list)} papers.") # COMMENTED OUT
return updated_paper_info_list
# --- END: Text Fetching from URLs Tool Implementation ---
# Example of how GenerationAgent would call this tool:
# Assume 'list_of_papers_from_search' is the output from multi_source_literature_search
# print(json.dumps({'intermediate_data_for_llm': fetch_text_from_urls(paper_info_list=list_of_papers_from_search, max_chars_per_paper=10000)}))
def describe_image(file_id: str, api_key: str = None) -> str:
"""
Process an uploaded image or document file and return a description using OpenAI's Vision model.
Args:
file_id (str): OpenAI file ID (starts with "file-")
api_key (str, optional): OpenAI API key. If not provided, will try to use OPENAI_API_KEY environment variable.
Returns:
str: A detailed description of the image or document content
"""
try:
# First check if OpenAI module is available
try:
from openai import OpenAI
except ImportError:
return "Error: The OpenAI module is not installed. Please install it with 'pip install openai'."
if not file_id or not isinstance(file_id, str):
return "Error: Invalid file ID format."
# Determine the image_url based on file_id format
image_url_to_use = {}
if file_id.startswith("data:image/"):
image_url_to_use = {"url": file_id}
elif file_id.startswith("file-"):
image_url_to_use = {"url": file_id} # Current behavior, might work for actual images
else:
# Try to prefix with "file-" if it's a raw ID without it.
# This is a common case if GA forgets to add it from user input.
if len(file_id) > 10 and not file_id.startswith("file-") and " " not in file_id: # basic check for a raw ID
print(f"describe_image: received potentially raw file ID '{file_id}', prefixing with 'file-'.")
image_url_to_use = {"url": f"file-{file_id}"}
else:
return f"Error: Invalid file_id format. Must be an OpenAI file ID (file-...) or a data URL (data:image/...). Received: {file_id}"
# Create an OpenAI client with explicit API key if provided
if api_key:
client = OpenAI(api_key=api_key)
else:
# Try to use environment variable
if not os.environ.get('OPENAI_API_KEY'):
return "Error: OpenAI API key not found. Please provide an API key or set the OPENAI_API_KEY environment variable."
client = OpenAI()
# Prepare the message with the image
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "Please describe this image in detail. If it contains text, transcribe important parts. If it's a scientific figure, explain what it shows. If it's a chart or graph, describe the data visualization and key insights."},
{"type": "image_url", "image_url": image_url_to_use}
]
}
]
# Call the Vision API
response = client.chat.completions.create(
model="gpt-4o", # Model with vision capabilities
messages=messages,
max_tokens=1000,
temperature=0.2 # Lower temperature for more accurate descriptions
)
# Capture token usage
if hasattr(response, 'usage') and response.usage:
usage_info = {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens
}
# Store usage in global collector if available (set by ExecutorAgent)
import builtins
if hasattr(builtins, '__agent_usage_collector__'):
builtins.__agent_usage_collector__.append(usage_info)
# Extract the description from the response
description = response.choices[0].message.content
return description
except Exception as e:
# Return a detailed error message for debugging
error_message = f"Error processing image: {str(e)}"
print(error_message) # Log the error for server-side debugging
return error_message
if __name__ == '__main__':
# Test basic Excel schema discovery
# print("Testing Excel Schema Discovery:")
# schemas = discover_excel_files_and_schemas(base_scan_directory_name="www")
# print(json.dumps(schemas, indent=2))
# print("\n")
# Test WWW file manifest
# print("Testing WWW File Manifest:")
# manifest = list_all_files_in_www_directory(base_directory_name="www")
# print(json.dumps(manifest, indent=2))
# print("\n")
# --- Test Literature Search ---
print("Testing Multi-Source Literature Search Tool:")
test_queries_lit = [
"novel targets for CAR-T cell therapy in solid tumors",
"role of microbiota in cancer immunotherapy response",
"epigenetic regulation of T cell exhaustion"
]
# To see output like GenerationAgent expects:
# search_results_for_llm = {"intermediate_data_for_llm": multi_source_literature_search(queries=test_queries_lit, max_results_per_query_per_source=1)}
# print(json.dumps(search_results_for_llm, indent=2))
# Simpler print for direct tool test:
results = multi_source_literature_search(queries=test_queries_lit, max_results_per_query_per_source=1, max_total_unique_papers=2) # Fetch 2 papers for testing text fetch
print(f"Found {len(results)} unique papers for text fetching test:")
# print(json.dumps(results, indent=2))
if results:
print("\nTesting Text Fetching from URLs Tool:")
# To see output like GenerationAgent expects for LLM:
# fetched_text_data_for_llm = {"intermediate_data_for_llm": fetch_text_from_urls(paper_info_list=results, max_chars_per_paper=5000)}
# print(json.dumps(fetched_text_data_for_llm, indent=2))
# Simpler print for direct tool test:
results_with_text = fetch_text_from_urls(paper_info_list=results, max_chars_per_paper=5000)
print(f"Processed {len(results_with_text)} papers for text content:")
for i, paper in enumerate(results_with_text):
print(f"--- Paper {i+1} ---")
print(f" Title: {paper.get('title')}")
print(f" URL: {paper.get('url')}")
text_content = paper.get('retrieved_text_content', 'Not found')
print(f" Retrieved Text (first 200 chars): {text_content[:200]}...")
print("\n")
# No __main__ block here, this is a module of tools.