FinalMultiAgent / tools.py
shahidshaikh's picture
Update tools.py
5ee1aa1 verified
import os, json, shutil, urllib.request, urllib.parse, requests
from xml.etree import ElementTree as ET
from langchain_core.tools import tool
import pandas as pd
import plotly.express as px
import plotly.figure_factory as ff
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import AgglomerativeClustering
# Use /data for persistence if running on HF Spaces with a mounted bucket, otherwise use local directory
ROOT_DIR = "/data" if os.path.exists("/data") else os.path.dirname(os.path.abspath(__file__))
CSV_PATH = os.path.join(ROOT_DIR, "papers.csv")
OUT_DIR = os.path.join(ROOT_DIR, "outputs")
PDF_DIR = os.path.join(ROOT_DIR, "pdfs")
SCRATCH_DIR = os.path.join(ROOT_DIR, "scratch")
for _d in [OUT_DIR, PDF_DIR, SCRATCH_DIR]: os.makedirs(_d, exist_ok=True)
HEADERS = ["Sr No", "Title", "DOI", "Web Link", "Authors", "Date of Publication", "Journal", "Abstract", "No of Citations", "Research Type", "Research Type Confidence", "Research Type Reason", "Findings"]
PAJAIS_TAXONOMY = {
"AI & Machine Learning": ["deep learning","neural networks","NLP","computer vision","reinforcement learning","transformers"],
"Ethics & Society": ["bias","fairness","privacy","accountability","transparency","misinformation"],
"Healthcare & Medicine": ["clinical","medical","health","diagnosis","patient","drug","genomics"],
"Education & Learning": ["e-learning","pedagogy","curriculum","students","teaching","assessment"],
"Business & Economics": ["market","finance","enterprise","productivity","innovation","supply chain"],
"Environment & Sustainability": ["climate","energy","green","carbon","sustainability","smart grid"],
"Governance & Policy": ["regulation","policy","government","law","compliance","cybersecurity"],
"Human-Computer Interaction": ["usability","UX","interface","interaction","accessibility","chatbot"],
}
_MODEL = None
def _embed():
global _MODEL
if _MODEL is None:
try:
from sentence_transformers import SentenceTransformer
_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
except Exception as e: print(f">>> Embedding load failed: {e}")
return _MODEL
@tool
def search_academic_source(query: str, source: str) -> str:
"""Search academic sources. Valid sources: 'google_scholar', 'arxiv', 'pubmed', 'hf_papers', 'tavily', 'apify', 'semantic_scholar', 'openalex', 'scopus', 'web_of_science'."""
try:
src = source.lower()
if "google" in src or "scholar" in src:
from langchain_community.utilities import SerpAPIWrapper
return SerpAPIWrapper(params={"engine": "google_scholar"}).run(query)[:5000]
elif "arxiv" in src:
from langchain_community.utilities import ArxivAPIWrapper
return ArxivAPIWrapper(top_k_results=5).run(query)[:5000]
elif "pubmed" in src:
from langchain_community.utilities.pubmed import PubMedAPIWrapper
return PubMedAPIWrapper(top_k_results=5).run(query)[:5000]
elif "hf_papers" in src or "huggingface" in src:
res = requests.get(f"https://huggingface.co/api/papers/search?q={urllib.parse.quote(query)}").json()
return str(res[:5])[:5000] if isinstance(res, list) else "HF Search Error"
elif "apify" in src:
from langchain_community.utilities import ApifyWrapper
apify = ApifyWrapper()
return apify.run("apify/arxiv-scraper", run_input={"searchSubj": "all", "searchQuery": query, "maxResults": 5})[:5000]
elif "semantic_scholar" in src:
from langchain_community.utilities.semanticscholar import SemanticScholarAPIWrapper
return SemanticScholarAPIWrapper(top_k_results=5).run(query)[:5000]
elif "openalex" in src:
api_key = os.getenv("OPENALEX_API_KEY")
url = f"https://api.openalex.org/works?search={urllib.parse.quote(query)}&per-page=5"
headers = {"User-Agent": "AcademicAgent/1.0 (mailto:agent@research-platform.org)"}
if api_key: headers["api_key"] = api_key # Higher rate limits if key provided
res = requests.get(url, headers=headers).json()
return str(res.get("results", []))[:5000]
elif "scopus" in src:
api_key = os.getenv("SCOPUS_API_KEY")
if not api_key: return "Error: SCOPUS_API_KEY missing. Request the user to provide it or switch to 'semantic_scholar' or 'openalex'."
res = requests.get(f"https://api.elsevier.com/content/search/scopus?query={urllib.parse.quote(query)}&count=5", headers={"X-ELS-APIKey": api_key, "Accept": "application/json"}).json()
return str(res)[:5000]
elif "web_of_science" in src or "wos" in src:
api_key = os.getenv("WOS_API_KEY")
if not api_key: return "Error: WOS_API_KEY missing. Request the user to provide it or switch to 'semantic_scholar' or 'openalex'."
res = requests.get(f"https://wos-api.clarivate.com/api/wos?databaseId=WOS&usrQuery={urllib.parse.quote(query)}&count=5&firstRecord=1", headers={"X-ApiKey": api_key}).json()
return str(res)[:5000]
else:
from langchain_community.tools.tavily_search import TavilySearchResults
return str(TavilySearchResults(max_results=5).invoke(f"{query} academic peer reviewed paper"))[:5000]
except Exception as e: return f"Error searching {source}: {str(e)}"
def _download_pdf(url: str, filename: str):
try:
r = requests.get(url, stream=True, timeout=12)
if r.status_code == 200:
if not filename.lower().endswith(".pdf"): filename += ".pdf"
path = os.path.join(PDF_DIR, "".join(x for x in filename if x.isalnum() or x in "._- "))
with open(path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
return True
except: pass
return False
@tool
def save_papers(papers_json: str) -> str:
"""Persist filtered papers to papers.csv. papers_json: JSON array of paper dicts."""
MAX_TOTAL_PAPERS = 120
try:
incoming = json.loads(papers_json)
if not isinstance(incoming, list): incoming = [incoming]
# Robust Key Normalization (Handles LLM returning 'research_type' instead of 'Research Type')
clean_incoming = []
for p in incoming:
norm_p = {str(k).lower().replace("_", " ").strip(): v for k, v in p.items()}
clean_p = {}
for col in HEADERS:
clean_p[col] = norm_p.get(col.lower().replace("_", " ").strip(), "")
if "year" in norm_p and not clean_p.get("Date of Publication"):
clean_p["Date of Publication"] = norm_p["year"]
clean_incoming.append(clean_p)
incoming = clean_incoming
existing = pd.read_csv(CSV_PATH) if os.path.exists(CSV_PATH) else pd.DataFrame(columns=HEADERS)
# Schema Migration: Ensure new columns exist in old CSVs
for col in ["Research Type", "Research Type Confidence", "Research Type Reason", "Findings"]:
if col not in existing.columns:
existing[col] = ""
existing["Title"] = existing["Title"].fillna("")
seen = set(existing["Title"].astype(str).str.lower().str.strip())
# Hard Filter for AI and Societal Impact
def is_relevant(p):
txt = (str(p.get("Title", "")) + " " + str(p.get("Abstract", ""))).lower()
ai_words = ["ai ", "artificial intelligence", "machine learning", "deep learning", "nlp", "llm", "chatgpt", "neural network"]
impact_words = ["societ", "ethic", "bias", "fairness", "privacy", "impact", "policy", "govern", "regulation", "human"]
return any(w in txt for w in ai_words) and any(w in txt for w in impact_words)
# Strict Academic Filter: Discard Non-Papers
def is_research_paper(p):
txt = (str(p.get("Title", "")) + " " + str(p.get("Journal", "")) + " " + str(p.get("Abstract", ""))).lower()
banned_words = ["book", "chapter", "news", "editorial", "encyclopedia", "newspaper", "magazine"]
return not any(w in txt.split() for w in banned_words) # split() guarantees strict word matching
valid_items = [p for p in incoming if p.get("Title","").strip().lower() not in seen and is_relevant(p) and is_research_paper(p)]
discarded_count = len([p for p in incoming if p.get("Title","").strip().lower() not in seen and not (is_relevant(p) and is_research_paper(p))])
new_rows = valid_items
# Enforce MAX_TOTAL_PAPERS = 120 (Prioritize existing CSV papers)
total_current = len(existing)
if total_current >= MAX_TOTAL_PAPERS:
return f"Save skipped: CSV already has {total_current} papers (cap: {MAX_TOTAL_PAPERS})."
allowed_new = MAX_TOTAL_PAPERS - total_current
final_new_rows = new_rows[:allowed_new]
for i, p in enumerate(final_new_rows): p["Sr No"] = total_current + i + 1
# UPSERT: Update existing rows with new classification data if they were skipped as duplicates
updated_count = 0
for p in incoming:
title = p.get("Title", "").strip().lower()
if title in seen and p.get("Research Type"):
idx = existing[existing["Title"].str.lower().str.strip() == title].index
if not idx.empty:
existing.loc[idx[0], "Research Type"] = p.get("Research Type", "")
existing.loc[idx[0], "Research Type Confidence"] = p.get("Research Type Confidence", "")
existing.loc[idx[0], "Research Type Reason"] = p.get("Research Type Reason", "")
existing.loc[idx[0], "Findings"] = p.get("Findings", "")
updated_count += 1
if final_new_rows or updated_count > 0:
if final_new_rows:
existing = pd.concat([existing, pd.DataFrame(final_new_rows)])
existing.to_csv(CSV_PATH, index=False)
# Auto-download PDFs for new rows
download_count = 0
for p in final_new_rows:
link = p.get("Web Link", "")
if link and str(link).lower().endswith(".pdf"):
if _download_pdf(link, p.get("Title", "paper")[:50]):
download_count += 1
enrich_doi.invoke({}) # Auto-fill DOIs for new papers
msg = f"Saved {len(final_new_rows)} papers. Total: {len(existing)}."
if updated_count > 0: msg += f" (Updated {updated_count} existing papers with Research Types)."
if download_count > 0: msg += f" (Archived {download_count} PDFs)."
if discarded_count > 0: msg += f" (Discarded {discarded_count} papers for not aligning with AI & Societal Impact)."
if len(new_rows) > allowed_new: msg += f" (Truncated {len(new_rows) - allowed_new} exceeding cap)."
return msg
return "0 new papers (all duplicates). No existing papers were updated."
except Exception as e: return f"Save error: {e}"
@tool
def get_paper_batch(offset: int = 0, limit: int = 20) -> str:
"""Read a slice of papers (Sr No, Title, Abstract) from CSV for LLM batch processing."""
if not os.path.exists(CSV_PATH): return "No papers.csv."
return pd.read_csv(CSV_PATH)[["Sr No","Title","Abstract"]].fillna("").iloc[offset:offset+limit].to_json(orient="records")
@tool
def read_pdf_text(filepath: str) -> str:
"""Extract raw text from an uploaded PDF for the LLM to parse into paper fields (Title, Authors, Abstract, Year, DOI, Journal).
Use filepath='list' to see all PDFs in the upload folder."""
if filepath == "list":
files = [f for f in os.listdir(PDF_DIR) if f.lower().endswith(".pdf")]
return "\n".join(files) if files else "No PDFs uploaded yet."
path = os.path.join(PDF_DIR, os.path.basename(filepath)) if not os.path.isabs(filepath) else filepath
if not os.path.exists(path): return f"File not found: {filepath}"
try:
import pdfplumber
with pdfplumber.open(path) as pdf:
return "\n".join(p.extract_text() or "" for p in pdf.pages)[:10000]
except Exception as e:
if "pdfplumber" in str(e) or "ModuleNotFoundError" in str(type(e).__name__):
print(">>> WARNING: 'pdfplumber' module not found. PDF extraction disabled.")
return f"PDF read error: {e}"
@tool
def read_word_text(filepath: str) -> str:
"""Extract text from a Word (.docx) file for the LLM to parse.
Use filepath='list' to see files in the pdfs/ folder (which handles both PDF/Word uploads)."""
if filepath == "list":
files = [f for f in os.listdir(PDF_DIR) if f.lower().endswith(".docx")]
return "\n".join(files) if files else "No Word docs uploaded."
path = os.path.join(PDF_DIR, os.path.basename(filepath)) if not os.path.isabs(filepath) else filepath
if not os.path.exists(path): return f"File not found: {filepath}"
try:
import docx
doc = docx.Document(path)
return "\n".join([p.text for p in doc.paragraphs])[:10000]
except Exception as e:
if "docx" in str(e) or "ModuleNotFoundError" in str(type(e).__name__):
print(">>> WARNING: 'python-docx' module not found. Word extraction disabled.")
return f"Word read error: {e}"
@tool
def import_from_scratch() -> str:
"""Check the local scratch/ folder for .csv, .pdf, or .docx files and import them into the analysis system."""
files = os.listdir(SCRATCH_DIR)
if not files: return "No files in scratch/ folder."
imported = []
for f in files:
src = os.path.join(SCRATCH_DIR, f)
if f.lower().endswith(".csv"):
df = pd.read_csv(src)
save_papers.invoke({"papers_json": df.to_json(orient="records")})
imported.append(f)
elif f.lower().endswith((".pdf", ".docx")):
dest = os.path.join(PDF_DIR, f)
shutil.copy(src, dest)
imported.append(f)
return f"Imported from scratch: {', '.join(imported)}" if imported else "No compatible files found."
@tool
def enrich_doi() -> str:
"""Look up and fill missing DOI values in papers.csv using the CrossRef API (searches by paper title)."""
if not os.path.exists(CSV_PATH): return "No CSV found."
try:
df = pd.read_csv(CSV_PATH); updated = 0
for i, row in df.iterrows():
if str(row.get("DOI","")).strip() in ("", "nan"):
try:
q = urllib.parse.quote(str(row.get("Title",""))[:100])
with urllib.request.urlopen(f"https://api.crossref.org/works?query.title={q}&rows=1&mailto=agent@research.ai", timeout=8) as r:
items = json.loads(r.read()).get("message",{}).get("items",[])
if items and items[0].get("DOI"):
df.at[i,"DOI"] = items[0]["DOI"]; updated += 1
except: pass
df.to_csv(CSV_PATH, index=False)
return f"DOI enrichment done. Updated {updated}/{len(df)} records."
except Exception as e: return f"Enrich error: {e}"
@tool
def cluster_and_visualize(mode: str, n_clusters: int = 0) -> str:
"""Runs AgglomerativeClustering on paper embeddings → saves 3 charts (intertopic, heatmap, dendrogram).
Returns JSON cluster assignments for LLM to label: {"k":N,"assignments":[{"Sr No":1,"Title":"...","cluster":0},...]}"""
if not os.path.exists(CSV_PATH): return "Error: papers.csv missing."
model = _embed()
if model is None: return "Error: Embedding model unavailable."
try:
df = pd.read_csv(CSV_PATH)
texts = df[mode.capitalize()].fillna("").tolist()
embs = model.encode(texts)
k = n_clusters if n_clusters > 1 else max(2, min(8, len(df) // 3))
labels = AgglomerativeClustering(n_clusters=k).fit_predict(embs)
coords = PCA(n_components=2).fit_transform(embs)
df["x"], df["y"], df["Cluster"] = coords[:,0], coords[:,1], labels.astype(str)
px.scatter(df, x="x", y="y", color="Cluster", hover_data=["Title"], title=f"Intertopic Distance ({mode})").write_html(os.path.join(OUT_DIR, f"{mode}_intertopic.html"))
px.imshow(cosine_similarity(embs), title=f"Similarity Heatmap ({mode})", color_continuous_scale="Viridis").write_html(os.path.join(OUT_DIR, f"{mode}_heatmap.html"))
from scipy.cluster.hierarchy import linkage as _lnk
fig = ff.create_dendrogram(embs, labels=[t[:25] for t in texts], linkagefun=lambda x: _lnk(x, method="ward"))
fig.update_layout(title=f"Hierarchical Dendrogram ({mode})")
# Truncate titles to save LLM tokens (prevents 429 Rate Limits from exceeding context)
assignments = []
for i in range(len(df)):
full_title = str(df.iloc[i]["Title"])
trunc_title = full_title[:120] + "..." if len(full_title) > 120 else full_title
assignments.append({"Sr No": int(df.iloc[i]["Sr No"]), "Title": trunc_title, "cluster": int(labels[i])})
return json.dumps({"k": k, "assignments": assignments})
except Exception as e: return f"Clustering error: {e}"
@tool
def save_output(filename: str, content: str) -> str:
"""Save any text/JSON content to the outputs/ folder."""
try:
with open(os.path.join(OUT_DIR, os.path.basename(filename)), "w", encoding="utf-8") as f: f.write(content)
return f"Saved {filename}."
except Exception as e: return str(e)
@tool
def read_output(filename: str) -> str:
"""Read a file from outputs/. Use 'list' to list files, 'count' for paper count."""
if filename == "list": return "\n".join(os.listdir(OUT_DIR))
if filename == "count": return str(len(pd.read_csv(CSV_PATH))) if os.path.exists(CSV_PATH) else "0"
path = os.path.join(OUT_DIR, os.path.basename(filename))
return open(path, encoding="utf-8").read()[:8000] if os.path.exists(path) else "File not found."
@tool
def get_pajais_taxonomy() -> str:
"""Returns the full PAJAIS taxonomy schema (categories + keywords) for the LLM to map themes against."""
return json.dumps(PAJAIS_TAXONOMY, indent=2)