Spaces:
Sleeping
Sleeping
| import os, json, shutil, urllib.request, urllib.parse, requests | |
| from xml.etree import ElementTree as ET | |
| from langchain_core.tools import tool | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.figure_factory as ff | |
| from sklearn.decomposition import PCA | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.cluster import AgglomerativeClustering | |
| # Use /data for persistence if running on HF Spaces with a mounted bucket, otherwise use local directory | |
| ROOT_DIR = "/data" if os.path.exists("/data") else os.path.dirname(os.path.abspath(__file__)) | |
| CSV_PATH = os.path.join(ROOT_DIR, "papers.csv") | |
| OUT_DIR = os.path.join(ROOT_DIR, "outputs") | |
| PDF_DIR = os.path.join(ROOT_DIR, "pdfs") | |
| SCRATCH_DIR = os.path.join(ROOT_DIR, "scratch") | |
| for _d in [OUT_DIR, PDF_DIR, SCRATCH_DIR]: os.makedirs(_d, exist_ok=True) | |
| HEADERS = ["Sr No", "Title", "DOI", "Web Link", "Authors", "Date of Publication", "Journal", "Abstract", "No of Citations", "Research Type", "Research Type Confidence", "Research Type Reason", "Findings"] | |
| PAJAIS_TAXONOMY = { | |
| "AI & Machine Learning": ["deep learning","neural networks","NLP","computer vision","reinforcement learning","transformers"], | |
| "Ethics & Society": ["bias","fairness","privacy","accountability","transparency","misinformation"], | |
| "Healthcare & Medicine": ["clinical","medical","health","diagnosis","patient","drug","genomics"], | |
| "Education & Learning": ["e-learning","pedagogy","curriculum","students","teaching","assessment"], | |
| "Business & Economics": ["market","finance","enterprise","productivity","innovation","supply chain"], | |
| "Environment & Sustainability": ["climate","energy","green","carbon","sustainability","smart grid"], | |
| "Governance & Policy": ["regulation","policy","government","law","compliance","cybersecurity"], | |
| "Human-Computer Interaction": ["usability","UX","interface","interaction","accessibility","chatbot"], | |
| } | |
| _MODEL = None | |
| def _embed(): | |
| global _MODEL | |
| if _MODEL is None: | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| _MODEL = SentenceTransformer("all-MiniLM-L6-v2") | |
| except Exception as e: print(f">>> Embedding load failed: {e}") | |
| return _MODEL | |
| def search_academic_source(query: str, source: str) -> str: | |
| """Search academic sources. Valid sources: 'google_scholar', 'arxiv', 'pubmed', 'hf_papers', 'tavily', 'apify', 'semantic_scholar', 'openalex', 'scopus', 'web_of_science'.""" | |
| try: | |
| src = source.lower() | |
| if "google" in src or "scholar" in src: | |
| from langchain_community.utilities import SerpAPIWrapper | |
| return SerpAPIWrapper(params={"engine": "google_scholar"}).run(query)[:5000] | |
| elif "arxiv" in src: | |
| from langchain_community.utilities import ArxivAPIWrapper | |
| return ArxivAPIWrapper(top_k_results=5).run(query)[:5000] | |
| elif "pubmed" in src: | |
| from langchain_community.utilities.pubmed import PubMedAPIWrapper | |
| return PubMedAPIWrapper(top_k_results=5).run(query)[:5000] | |
| elif "hf_papers" in src or "huggingface" in src: | |
| res = requests.get(f"https://huggingface.co/api/papers/search?q={urllib.parse.quote(query)}").json() | |
| return str(res[:5])[:5000] if isinstance(res, list) else "HF Search Error" | |
| elif "apify" in src: | |
| from langchain_community.utilities import ApifyWrapper | |
| apify = ApifyWrapper() | |
| return apify.run("apify/arxiv-scraper", run_input={"searchSubj": "all", "searchQuery": query, "maxResults": 5})[:5000] | |
| elif "semantic_scholar" in src: | |
| from langchain_community.utilities.semanticscholar import SemanticScholarAPIWrapper | |
| return SemanticScholarAPIWrapper(top_k_results=5).run(query)[:5000] | |
| elif "openalex" in src: | |
| api_key = os.getenv("OPENALEX_API_KEY") | |
| url = f"https://api.openalex.org/works?search={urllib.parse.quote(query)}&per-page=5" | |
| headers = {"User-Agent": "AcademicAgent/1.0 (mailto:agent@research-platform.org)"} | |
| if api_key: headers["api_key"] = api_key # Higher rate limits if key provided | |
| res = requests.get(url, headers=headers).json() | |
| return str(res.get("results", []))[:5000] | |
| elif "scopus" in src: | |
| api_key = os.getenv("SCOPUS_API_KEY") | |
| if not api_key: return "Error: SCOPUS_API_KEY missing. Request the user to provide it or switch to 'semantic_scholar' or 'openalex'." | |
| res = requests.get(f"https://api.elsevier.com/content/search/scopus?query={urllib.parse.quote(query)}&count=5", headers={"X-ELS-APIKey": api_key, "Accept": "application/json"}).json() | |
| return str(res)[:5000] | |
| elif "web_of_science" in src or "wos" in src: | |
| api_key = os.getenv("WOS_API_KEY") | |
| if not api_key: return "Error: WOS_API_KEY missing. Request the user to provide it or switch to 'semantic_scholar' or 'openalex'." | |
| res = requests.get(f"https://wos-api.clarivate.com/api/wos?databaseId=WOS&usrQuery={urllib.parse.quote(query)}&count=5&firstRecord=1", headers={"X-ApiKey": api_key}).json() | |
| return str(res)[:5000] | |
| else: | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| return str(TavilySearchResults(max_results=5).invoke(f"{query} academic peer reviewed paper"))[:5000] | |
| except Exception as e: return f"Error searching {source}: {str(e)}" | |
| def _download_pdf(url: str, filename: str): | |
| try: | |
| r = requests.get(url, stream=True, timeout=12) | |
| if r.status_code == 200: | |
| if not filename.lower().endswith(".pdf"): filename += ".pdf" | |
| path = os.path.join(PDF_DIR, "".join(x for x in filename if x.isalnum() or x in "._- ")) | |
| with open(path, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8192): f.write(chunk) | |
| return True | |
| except: pass | |
| return False | |
| def save_papers(papers_json: str) -> str: | |
| """Persist filtered papers to papers.csv. papers_json: JSON array of paper dicts.""" | |
| MAX_TOTAL_PAPERS = 120 | |
| try: | |
| incoming = json.loads(papers_json) | |
| if not isinstance(incoming, list): incoming = [incoming] | |
| # Robust Key Normalization (Handles LLM returning 'research_type' instead of 'Research Type') | |
| clean_incoming = [] | |
| for p in incoming: | |
| norm_p = {str(k).lower().replace("_", " ").strip(): v for k, v in p.items()} | |
| clean_p = {} | |
| for col in HEADERS: | |
| clean_p[col] = norm_p.get(col.lower().replace("_", " ").strip(), "") | |
| if "year" in norm_p and not clean_p.get("Date of Publication"): | |
| clean_p["Date of Publication"] = norm_p["year"] | |
| clean_incoming.append(clean_p) | |
| incoming = clean_incoming | |
| existing = pd.read_csv(CSV_PATH) if os.path.exists(CSV_PATH) else pd.DataFrame(columns=HEADERS) | |
| # Schema Migration: Ensure new columns exist in old CSVs | |
| for col in ["Research Type", "Research Type Confidence", "Research Type Reason", "Findings"]: | |
| if col not in existing.columns: | |
| existing[col] = "" | |
| existing["Title"] = existing["Title"].fillna("") | |
| seen = set(existing["Title"].astype(str).str.lower().str.strip()) | |
| # Hard Filter for AI and Societal Impact | |
| def is_relevant(p): | |
| txt = (str(p.get("Title", "")) + " " + str(p.get("Abstract", ""))).lower() | |
| ai_words = ["ai ", "artificial intelligence", "machine learning", "deep learning", "nlp", "llm", "chatgpt", "neural network"] | |
| impact_words = ["societ", "ethic", "bias", "fairness", "privacy", "impact", "policy", "govern", "regulation", "human"] | |
| return any(w in txt for w in ai_words) and any(w in txt for w in impact_words) | |
| # Strict Academic Filter: Discard Non-Papers | |
| def is_research_paper(p): | |
| txt = (str(p.get("Title", "")) + " " + str(p.get("Journal", "")) + " " + str(p.get("Abstract", ""))).lower() | |
| banned_words = ["book", "chapter", "news", "editorial", "encyclopedia", "newspaper", "magazine"] | |
| return not any(w in txt.split() for w in banned_words) # split() guarantees strict word matching | |
| valid_items = [p for p in incoming if p.get("Title","").strip().lower() not in seen and is_relevant(p) and is_research_paper(p)] | |
| discarded_count = len([p for p in incoming if p.get("Title","").strip().lower() not in seen and not (is_relevant(p) and is_research_paper(p))]) | |
| new_rows = valid_items | |
| # Enforce MAX_TOTAL_PAPERS = 120 (Prioritize existing CSV papers) | |
| total_current = len(existing) | |
| if total_current >= MAX_TOTAL_PAPERS: | |
| return f"Save skipped: CSV already has {total_current} papers (cap: {MAX_TOTAL_PAPERS})." | |
| allowed_new = MAX_TOTAL_PAPERS - total_current | |
| final_new_rows = new_rows[:allowed_new] | |
| for i, p in enumerate(final_new_rows): p["Sr No"] = total_current + i + 1 | |
| # UPSERT: Update existing rows with new classification data if they were skipped as duplicates | |
| updated_count = 0 | |
| for p in incoming: | |
| title = p.get("Title", "").strip().lower() | |
| if title in seen and p.get("Research Type"): | |
| idx = existing[existing["Title"].str.lower().str.strip() == title].index | |
| if not idx.empty: | |
| existing.loc[idx[0], "Research Type"] = p.get("Research Type", "") | |
| existing.loc[idx[0], "Research Type Confidence"] = p.get("Research Type Confidence", "") | |
| existing.loc[idx[0], "Research Type Reason"] = p.get("Research Type Reason", "") | |
| existing.loc[idx[0], "Findings"] = p.get("Findings", "") | |
| updated_count += 1 | |
| if final_new_rows or updated_count > 0: | |
| if final_new_rows: | |
| existing = pd.concat([existing, pd.DataFrame(final_new_rows)]) | |
| existing.to_csv(CSV_PATH, index=False) | |
| # Auto-download PDFs for new rows | |
| download_count = 0 | |
| for p in final_new_rows: | |
| link = p.get("Web Link", "") | |
| if link and str(link).lower().endswith(".pdf"): | |
| if _download_pdf(link, p.get("Title", "paper")[:50]): | |
| download_count += 1 | |
| enrich_doi.invoke({}) # Auto-fill DOIs for new papers | |
| msg = f"Saved {len(final_new_rows)} papers. Total: {len(existing)}." | |
| if updated_count > 0: msg += f" (Updated {updated_count} existing papers with Research Types)." | |
| if download_count > 0: msg += f" (Archived {download_count} PDFs)." | |
| if discarded_count > 0: msg += f" (Discarded {discarded_count} papers for not aligning with AI & Societal Impact)." | |
| if len(new_rows) > allowed_new: msg += f" (Truncated {len(new_rows) - allowed_new} exceeding cap)." | |
| return msg | |
| return "0 new papers (all duplicates). No existing papers were updated." | |
| except Exception as e: return f"Save error: {e}" | |
| def get_paper_batch(offset: int = 0, limit: int = 20) -> str: | |
| """Read a slice of papers (Sr No, Title, Abstract) from CSV for LLM batch processing.""" | |
| if not os.path.exists(CSV_PATH): return "No papers.csv." | |
| return pd.read_csv(CSV_PATH)[["Sr No","Title","Abstract"]].fillna("").iloc[offset:offset+limit].to_json(orient="records") | |
| def read_pdf_text(filepath: str) -> str: | |
| """Extract raw text from an uploaded PDF for the LLM to parse into paper fields (Title, Authors, Abstract, Year, DOI, Journal). | |
| Use filepath='list' to see all PDFs in the upload folder.""" | |
| if filepath == "list": | |
| files = [f for f in os.listdir(PDF_DIR) if f.lower().endswith(".pdf")] | |
| return "\n".join(files) if files else "No PDFs uploaded yet." | |
| path = os.path.join(PDF_DIR, os.path.basename(filepath)) if not os.path.isabs(filepath) else filepath | |
| if not os.path.exists(path): return f"File not found: {filepath}" | |
| try: | |
| import pdfplumber | |
| with pdfplumber.open(path) as pdf: | |
| return "\n".join(p.extract_text() or "" for p in pdf.pages)[:10000] | |
| except Exception as e: | |
| if "pdfplumber" in str(e) or "ModuleNotFoundError" in str(type(e).__name__): | |
| print(">>> WARNING: 'pdfplumber' module not found. PDF extraction disabled.") | |
| return f"PDF read error: {e}" | |
| def read_word_text(filepath: str) -> str: | |
| """Extract text from a Word (.docx) file for the LLM to parse. | |
| Use filepath='list' to see files in the pdfs/ folder (which handles both PDF/Word uploads).""" | |
| if filepath == "list": | |
| files = [f for f in os.listdir(PDF_DIR) if f.lower().endswith(".docx")] | |
| return "\n".join(files) if files else "No Word docs uploaded." | |
| path = os.path.join(PDF_DIR, os.path.basename(filepath)) if not os.path.isabs(filepath) else filepath | |
| if not os.path.exists(path): return f"File not found: {filepath}" | |
| try: | |
| import docx | |
| doc = docx.Document(path) | |
| return "\n".join([p.text for p in doc.paragraphs])[:10000] | |
| except Exception as e: | |
| if "docx" in str(e) or "ModuleNotFoundError" in str(type(e).__name__): | |
| print(">>> WARNING: 'python-docx' module not found. Word extraction disabled.") | |
| return f"Word read error: {e}" | |
| def import_from_scratch() -> str: | |
| """Check the local scratch/ folder for .csv, .pdf, or .docx files and import them into the analysis system.""" | |
| files = os.listdir(SCRATCH_DIR) | |
| if not files: return "No files in scratch/ folder." | |
| imported = [] | |
| for f in files: | |
| src = os.path.join(SCRATCH_DIR, f) | |
| if f.lower().endswith(".csv"): | |
| df = pd.read_csv(src) | |
| save_papers.invoke({"papers_json": df.to_json(orient="records")}) | |
| imported.append(f) | |
| elif f.lower().endswith((".pdf", ".docx")): | |
| dest = os.path.join(PDF_DIR, f) | |
| shutil.copy(src, dest) | |
| imported.append(f) | |
| return f"Imported from scratch: {', '.join(imported)}" if imported else "No compatible files found." | |
| def enrich_doi() -> str: | |
| """Look up and fill missing DOI values in papers.csv using the CrossRef API (searches by paper title).""" | |
| if not os.path.exists(CSV_PATH): return "No CSV found." | |
| try: | |
| df = pd.read_csv(CSV_PATH); updated = 0 | |
| for i, row in df.iterrows(): | |
| if str(row.get("DOI","")).strip() in ("", "nan"): | |
| try: | |
| q = urllib.parse.quote(str(row.get("Title",""))[:100]) | |
| with urllib.request.urlopen(f"https://api.crossref.org/works?query.title={q}&rows=1&mailto=agent@research.ai", timeout=8) as r: | |
| items = json.loads(r.read()).get("message",{}).get("items",[]) | |
| if items and items[0].get("DOI"): | |
| df.at[i,"DOI"] = items[0]["DOI"]; updated += 1 | |
| except: pass | |
| df.to_csv(CSV_PATH, index=False) | |
| return f"DOI enrichment done. Updated {updated}/{len(df)} records." | |
| except Exception as e: return f"Enrich error: {e}" | |
| def cluster_and_visualize(mode: str, n_clusters: int = 0) -> str: | |
| """Runs AgglomerativeClustering on paper embeddings → saves 3 charts (intertopic, heatmap, dendrogram). | |
| Returns JSON cluster assignments for LLM to label: {"k":N,"assignments":[{"Sr No":1,"Title":"...","cluster":0},...]}""" | |
| if not os.path.exists(CSV_PATH): return "Error: papers.csv missing." | |
| model = _embed() | |
| if model is None: return "Error: Embedding model unavailable." | |
| try: | |
| df = pd.read_csv(CSV_PATH) | |
| texts = df[mode.capitalize()].fillna("").tolist() | |
| embs = model.encode(texts) | |
| k = n_clusters if n_clusters > 1 else max(2, min(8, len(df) // 3)) | |
| labels = AgglomerativeClustering(n_clusters=k).fit_predict(embs) | |
| coords = PCA(n_components=2).fit_transform(embs) | |
| df["x"], df["y"], df["Cluster"] = coords[:,0], coords[:,1], labels.astype(str) | |
| px.scatter(df, x="x", y="y", color="Cluster", hover_data=["Title"], title=f"Intertopic Distance ({mode})").write_html(os.path.join(OUT_DIR, f"{mode}_intertopic.html")) | |
| px.imshow(cosine_similarity(embs), title=f"Similarity Heatmap ({mode})", color_continuous_scale="Viridis").write_html(os.path.join(OUT_DIR, f"{mode}_heatmap.html")) | |
| from scipy.cluster.hierarchy import linkage as _lnk | |
| fig = ff.create_dendrogram(embs, labels=[t[:25] for t in texts], linkagefun=lambda x: _lnk(x, method="ward")) | |
| fig.update_layout(title=f"Hierarchical Dendrogram ({mode})") | |
| # Truncate titles to save LLM tokens (prevents 429 Rate Limits from exceeding context) | |
| assignments = [] | |
| for i in range(len(df)): | |
| full_title = str(df.iloc[i]["Title"]) | |
| trunc_title = full_title[:120] + "..." if len(full_title) > 120 else full_title | |
| assignments.append({"Sr No": int(df.iloc[i]["Sr No"]), "Title": trunc_title, "cluster": int(labels[i])}) | |
| return json.dumps({"k": k, "assignments": assignments}) | |
| except Exception as e: return f"Clustering error: {e}" | |
| def save_output(filename: str, content: str) -> str: | |
| """Save any text/JSON content to the outputs/ folder.""" | |
| try: | |
| with open(os.path.join(OUT_DIR, os.path.basename(filename)), "w", encoding="utf-8") as f: f.write(content) | |
| return f"Saved {filename}." | |
| except Exception as e: return str(e) | |
| def read_output(filename: str) -> str: | |
| """Read a file from outputs/. Use 'list' to list files, 'count' for paper count.""" | |
| if filename == "list": return "\n".join(os.listdir(OUT_DIR)) | |
| if filename == "count": return str(len(pd.read_csv(CSV_PATH))) if os.path.exists(CSV_PATH) else "0" | |
| path = os.path.join(OUT_DIR, os.path.basename(filename)) | |
| return open(path, encoding="utf-8").read()[:8000] if os.path.exists(path) else "File not found." | |
| def get_pajais_taxonomy() -> str: | |
| """Returns the full PAJAIS taxonomy schema (categories + keywords) for the LLM to map themes against.""" | |
| return json.dumps(PAJAIS_TAXONOMY, indent=2) |