| """ |
| Agentic AI System for Individual Information Collection and RAG-based Search |
| Uses Hugging Face Inference API (no local model downloads) |
| """ |
|
|
| import os |
| import time |
| import json |
| import requests |
| from typing import List, Dict, Optional, Any |
| from datetime import datetime |
| from dataclasses import dataclass, asdict |
| import hashlib |
|
|
| |
| from langchain_huggingface import HuggingFaceEndpoint, HuggingFaceEmbeddings |
| from langchain_core.vectorstores import InMemoryVectorStore |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.documents import Document |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
|
|
|
|
| @dataclass |
| class IndividualProfile: |
| """Structured profile for an individual researcher/expert""" |
| id: str |
| name: str |
| affiliation: str |
| h_index: int |
| total_citations: int |
| total_papers: int |
| interests: List[str] |
| biography: str |
| recent_work: List[Dict] |
| profile_url: str |
| last_updated: str |
| source: str |
| metadata: Dict[str, Any] |
|
|
|
|
| class AgenticDataCollector: |
| """ |
| Agentic system that autonomously collects information about individuals |
| from multiple academic sources using intelligent crawling strategies |
| """ |
|
|
| def __init__(self, hf_token: Optional[str] = None): |
| self.hf_token = hf_token or os.getenv('HF_TOKEN') |
| self.session = requests.Session() |
| self.session.headers.update({ |
| 'User-Agent': 'AcademicResearchAgent/2.0', |
| 'Accept': 'application/json' |
| }) |
|
|
| |
| self.collection_memory = {} |
|
|
| def collect_individual_data(self, name: str, additional_context: str = "") -> Optional[IndividualProfile]: |
| """ |
| Autonomously collects comprehensive data about an individual |
| |
| Args: |
| name: Name of the individual |
| additional_context: Additional search context (affiliation, field, etc.) |
| |
| Returns: |
| IndividualProfile object with collected data |
| """ |
| print(f"π€ Agent: Starting data collection for '{name}'") |
|
|
| |
| cache_key = self._generate_cache_key(name, additional_context) |
| if cache_key in self.collection_memory: |
| cached_time = self.collection_memory[cache_key]['timestamp'] |
| if (datetime.now() - cached_time).total_seconds() < 3600: |
| print(f"π¦ Agent: Using cached data for '{name}'") |
| return self.collection_memory[cache_key]['profile'] |
|
|
| |
| profile = self._execute_collection_pipeline(name, additional_context) |
|
|
| if profile: |
| self.collection_memory[cache_key] = { |
| 'profile': profile, |
| 'timestamp': datetime.now() |
| } |
|
|
| return profile |
|
|
| def _execute_collection_pipeline(self, name: str, context: str) -> Optional[IndividualProfile]: |
| """Execute multi-step data collection pipeline""" |
|
|
| |
| print(f" π Step 1: Searching OpenAlex...") |
| openalex_data = self._collect_from_openalex(name, context) |
|
|
| if not openalex_data: |
| print(f" β No data found in OpenAlex") |
| return None |
|
|
| |
| print(f" π Step 2: Enriching with Google Scholar...") |
| scholar_data = self._collect_from_scholar(name, context) |
|
|
| |
| print(f" π Step 3: Collecting recent publications...") |
| recent_papers = self._collect_recent_publications(openalex_data.get('id')) |
|
|
| |
| print(f" π Step 4: Synthesizing comprehensive profile...") |
| profile = self._synthesize_profile(openalex_data, scholar_data, recent_papers) |
|
|
| print(f" β
Collection complete for '{name}'") |
| return profile |
|
|
| def _collect_from_openalex(self, name: str, context: str) -> Optional[Dict]: |
| """Collect data from OpenAlex API""" |
| try: |
| search_query = f"{name} {context}".strip() |
| url = "https://api.openalex.org/authors" |
| params = { |
| 'search': search_query, |
| 'per_page': 1 |
| } |
|
|
| response = self.session.get(url, params=params, timeout=10) |
| response.raise_for_status() |
| data = response.json() |
|
|
| results = data.get('results', []) |
| if results: |
| return results[0] |
| return None |
|
|
| except Exception as e: |
| print(f" β οΈ OpenAlex error: {e}") |
| return None |
|
|
| def _collect_from_scholar(self, name: str, context: str) -> Optional[Dict]: |
| """Collect data from Google Scholar (via scholarly)""" |
| try: |
| from scholarly import scholarly |
|
|
| search_query = scholarly.search_author(name) |
| author = next(search_query, None) |
|
|
| if author: |
| return scholarly.fill(author, sections=['basics', 'indices']) |
| return None |
|
|
| except Exception as e: |
| print(f" β οΈ Scholar error: {e}") |
| return None |
|
|
| def _collect_recent_publications(self, author_id: str, limit: int = 10) -> List[Dict]: |
| """Collect recent publications for an author""" |
| if not author_id: |
| return [] |
|
|
| try: |
| url = "https://api.openalex.org/works" |
| params = { |
| 'filter': f'author.id:{author_id}', |
| 'sort': 'publication_date:desc', |
| 'per_page': limit |
| } |
|
|
| response = self.session.get(url, params=params, timeout=10) |
| response.raise_for_status() |
| data = response.json() |
|
|
| papers = [] |
| for work in data.get('results', []): |
| papers.append({ |
| 'title': work.get('title', ''), |
| 'year': work.get('publication_year', 0), |
| 'cited_by_count': work.get('cited_by_count', 0), |
| 'doi': work.get('doi', ''), |
| 'type': work.get('type', ''), |
| 'venue': work.get('primary_location', {}).get('source', {}).get('display_name', '') |
| }) |
|
|
| return papers |
|
|
| except Exception as e: |
| print(f" β οΈ Publications error: {e}") |
| return [] |
|
|
| def _synthesize_profile(self, openalex_data: Dict, scholar_data: Optional[Dict], |
| recent_papers: List[Dict]) -> IndividualProfile: |
| """Synthesize data from multiple sources into a unified profile""" |
|
|
| |
| name = openalex_data.get('display_name', 'Unknown') |
| author_id = openalex_data.get('id', '').split('/')[-1] |
|
|
| |
| last_inst = openalex_data.get('last_known_institution', {}) |
| affiliation = last_inst.get('display_name', 'No affiliation') |
|
|
| |
| summary_stats = openalex_data.get('summary_stats', {}) |
| h_index = summary_stats.get('h_index', 0) |
| total_citations = openalex_data.get('cited_by_count', 0) |
| total_papers = openalex_data.get('works_count', 0) |
|
|
| |
| concepts = openalex_data.get('x_concepts', []) |
| interests = [c.get('display_name', '') for c in concepts[:10] if c.get('score', 0) > 20] |
|
|
| |
| biography = self._generate_biography(name, affiliation, interests, h_index, total_papers) |
|
|
| |
| metadata = { |
| 'orcid': openalex_data.get('orcid', ''), |
| 'i10_index': summary_stats.get('i10_index', 0), |
| 'works_api_url': openalex_data.get('works_api_url', ''), |
| 'institution_id': last_inst.get('id', ''), |
| 'institution_country': last_inst.get('country_code', ''), |
| 'scholar_data_available': scholar_data is not None |
| } |
|
|
| if scholar_data: |
| metadata['scholar_id'] = scholar_data.get('scholar_id', '') |
| metadata['email_domain'] = scholar_data.get('email_domain', '') |
|
|
| return IndividualProfile( |
| id=author_id, |
| name=name, |
| affiliation=affiliation, |
| h_index=h_index, |
| total_citations=total_citations, |
| total_papers=total_papers, |
| interests=interests, |
| biography=biography, |
| recent_work=recent_papers, |
| profile_url=f"https://openalex.org/authors/{author_id}", |
| last_updated=datetime.now().isoformat(), |
| source='OpenAlex + Google Scholar', |
| metadata=metadata |
| ) |
|
|
| def _generate_biography(self, name: str, affiliation: str, interests: List[str], |
| h_index: int, total_papers: int) -> str: |
| """Generate a structured biography from collected data""" |
| bio_parts = [ |
| f"{name} is a researcher", |
| f"affiliated with {affiliation}" if affiliation != "No affiliation" else "with no listed affiliation", |
| f"with an h-index of {h_index} and {total_papers} published works." |
| ] |
|
|
| if interests: |
| bio_parts.append(f"Research interests include: {', '.join(interests[:5])}.") |
|
|
| return " ".join(bio_parts) |
|
|
| def _generate_cache_key(self, name: str, context: str) -> str: |
| """Generate a cache key for an individual""" |
| key_string = f"{name}_{context}".lower().strip() |
| return hashlib.md5(key_string.encode()).hexdigest() |
|
|
| def batch_collect(self, names: List[str], context: str = "") -> List[IndividualProfile]: |
| """Collect data for multiple individuals""" |
| profiles = [] |
|
|
| print(f"π Agent: Starting batch collection for {len(names)} individuals") |
|
|
| for i, name in enumerate(names, 1): |
| print(f"\nπ Progress: {i}/{len(names)}") |
| profile = self.collect_individual_data(name, context) |
|
|
| if profile: |
| profiles.append(profile) |
|
|
| |
| if i < len(names): |
| time.sleep(1) |
|
|
| print(f"\nβ
Batch collection complete: {len(profiles)}/{len(names)} profiles collected") |
| return profiles |
|
|
|
|
| class IntelligentRAGSystem: |
| """ |
| RAG system optimized for searching individual profiles |
| Uses HuggingFace API for embeddings and inference (no local models) |
| """ |
|
|
| def __init__(self, hf_token: Optional[str] = None): |
| self.hf_token = hf_token or os.getenv('HF_TOKEN') |
|
|
| |
| print("π§ Initializing RAG system...") |
| self.embeddings = HuggingFaceEmbeddings( |
| model_name="sentence-transformers/all-MiniLM-L6-v2", |
| model_kwargs={'device': 'cpu'} |
| ) |
|
|
| |
| self.vector_store = InMemoryVectorStore(self.embeddings) |
|
|
| |
| if self.hf_token: |
| self.llm = HuggingFaceEndpoint( |
| repo_id="meta-llama/Meta-Llama-3-8B-Instruct", |
| huggingfacehub_api_token=self.hf_token, |
| temperature=0.2, |
| max_new_tokens=512 |
| ) |
| else: |
| self.llm = None |
| print("β οΈ Warning: No HF_TOKEN provided, LLM generation disabled") |
|
|
| |
| self.text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=500, |
| chunk_overlap=50, |
| separators=["\n\n", "\n", ". ", " ", ""] |
| ) |
|
|
| print("β
RAG system initialized") |
|
|
| def index_profiles(self, profiles: List[IndividualProfile]): |
| """Index individual profiles into the vector store""" |
| print(f"π Indexing {len(profiles)} profiles...") |
|
|
| documents = [] |
|
|
| for profile in profiles: |
| |
| profile_text = self._profile_to_text(profile) |
|
|
| |
| chunks = self.text_splitter.split_text(profile_text) |
|
|
| |
| for chunk in chunks: |
| doc = Document( |
| page_content=chunk, |
| metadata={ |
| 'id': profile.id, |
| 'name': profile.name, |
| 'affiliation': profile.affiliation, |
| 'h_index': profile.h_index, |
| 'total_citations': profile.total_citations, |
| 'profile_url': profile.profile_url, |
| 'source': profile.source |
| } |
| ) |
| documents.append(doc) |
|
|
| |
| self.vector_store.add_documents(documents) |
|
|
| print(f"β
Indexed {len(documents)} document chunks from {len(profiles)} profiles") |
|
|
| def search(self, query: str, k: int = 5) -> List[Dict]: |
| """Search for relevant profiles""" |
| print(f"π Searching for: '{query}'") |
|
|
| |
| results = self.vector_store.similarity_search(query, k=k * 3) |
|
|
| |
| profile_data = {} |
|
|
| for doc in results: |
| profile_id = doc.metadata['id'] |
|
|
| if profile_id not in profile_data: |
| profile_data[profile_id] = { |
| 'name': doc.metadata['name'], |
| 'affiliation': doc.metadata['affiliation'], |
| 'h_index': doc.metadata['h_index'], |
| 'total_citations': doc.metadata['total_citations'], |
| 'profile_url': doc.metadata['profile_url'], |
| 'source': doc.metadata['source'], |
| 'relevance_score': 0, |
| 'matched_content': [] |
| } |
|
|
| profile_data[profile_id]['matched_content'].append(doc.page_content) |
| profile_data[profile_id]['relevance_score'] += 1 |
|
|
| |
| sorted_profiles = sorted( |
| profile_data.values(), |
| key=lambda x: (x['relevance_score'], x['h_index']), |
| reverse=True |
| )[:k] |
|
|
| print(f"β
Found {len(sorted_profiles)} relevant profiles") |
| return sorted_profiles |
|
|
| def synthesize_answer(self, query: str, k: int = 5) -> Dict[str, Any]: |
| """Generate a synthesized answer using RAG""" |
| if not self.llm: |
| return { |
| 'answer': "LLM not available. Please provide HF_TOKEN.", |
| 'sources': [] |
| } |
|
|
| print(f"π§ Synthesizing answer for: '{query}'") |
|
|
| |
| relevant_profiles = self.search(query, k=k) |
|
|
| if not relevant_profiles: |
| return { |
| 'answer': "No relevant researchers found for this query.", |
| 'sources': [] |
| } |
|
|
| |
| context = self._build_context(relevant_profiles) |
|
|
| |
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", """You are a research assistant specializing in academic profiles. |
| Synthesize information about researchers based on the provided context. |
| Be specific, cite names, and focus on their expertise and contributions."""), |
| ("user", """Query: {query} |
| |
| Context about relevant researchers: |
| {context} |
| |
| Please provide a comprehensive answer about these researchers and their relevance to the query. |
| Focus on their expertise, key contributions, and why they are relevant.""") |
| ]) |
|
|
| |
| formatted_prompt = prompt.format(query=query, context=context) |
| answer = self.llm.invoke(formatted_prompt) |
|
|
| print("β
Answer generated") |
|
|
| return { |
| 'answer': answer, |
| 'sources': relevant_profiles, |
| 'context_used': len(relevant_profiles) |
| } |
|
|
| def _profile_to_text(self, profile: IndividualProfile) -> str: |
| """Convert a profile to searchable text""" |
| sections = [ |
| f"Name: {profile.name}", |
| f"Affiliation: {profile.affiliation}", |
| f"Biography: {profile.biography}", |
| f"Research Interests: {', '.join(profile.interests)}", |
| f"H-Index: {profile.h_index}", |
| f"Total Citations: {profile.total_citations}", |
| f"Total Papers: {profile.total_papers}" |
| ] |
|
|
| if profile.recent_work: |
| sections.append("Recent Publications:") |
| for paper in profile.recent_work[:5]: |
| sections.append(f" - {paper.get('title', '')} ({paper.get('year', '')})") |
|
|
| return "\n".join(sections) |
|
|
| def _build_context(self, profiles: List[Dict]) -> str: |
| """Build context string from profiles""" |
| context_parts = [] |
|
|
| for i, profile in enumerate(profiles, 1): |
| context_parts.append(f"\n{i}. {profile['name']} ({profile['affiliation']})") |
| context_parts.append(f" H-Index: {profile['h_index']}, Citations: {profile['total_citations']}") |
| context_parts.append(f" Relevant content: {profile['matched_content'][0][:200]}...") |
|
|
| return "\n".join(context_parts) |
|
|
| def get_statistics(self) -> Dict[str, Any]: |
| """Get statistics about the indexed data""" |
| |
| |
| return { |
| 'vector_store_type': 'InMemoryVectorStore', |
| 'embedding_model': 'sentence-transformers/all-MiniLM-L6-v2', |
| 'llm_model': 'meta-llama/Meta-Llama-3-8B-Instruct' if self.llm else 'None', |
| 'status': 'active' |
| } |
|
|
|
|
| class AgenticRAGOrchestrator: |
| """ |
| High-level orchestrator that combines data collection and RAG search |
| """ |
|
|
| def __init__(self, hf_token: Optional[str] = None): |
| self.collector = AgenticDataCollector(hf_token) |
| self.rag_system = IntelligentRAGSystem(hf_token) |
| self.indexed_profiles = [] |
|
|
| def discover_and_index(self, query: str, max_profiles: int = 20) -> Dict[str, Any]: |
| """ |
| Autonomous discovery: search for individuals, collect data, and index |
| |
| Args: |
| query: Search query (field, topic, institution) |
| max_profiles: Maximum number of profiles to collect |
| |
| Returns: |
| Statistics about the discovery process |
| """ |
| print(f"\n{'=' * 60}") |
| print(f"π AGENTIC DISCOVERY INITIATED") |
| print(f"Query: {query}") |
| print(f"Target: {max_profiles} profiles") |
| print(f"{'=' * 60}\n") |
|
|
| start_time = time.time() |
|
|
| |
| print("π‘ Phase 1: Discovery") |
| discovered_names = self._discover_individuals(query, max_profiles) |
|
|
| if not discovered_names: |
| return { |
| 'success': False, |
| 'message': 'No individuals discovered', |
| 'profiles_collected': 0 |
| } |
|
|
| |
| print(f"\nπ₯ Phase 2: Data Collection") |
| profiles = self.collector.batch_collect(discovered_names, query) |
|
|
| |
| print(f"\nπ Phase 3: Indexing") |
| self.rag_system.index_profiles(profiles) |
| self.indexed_profiles.extend(profiles) |
|
|
| elapsed_time = time.time() - start_time |
|
|
| print(f"\n{'=' * 60}") |
| print(f"β
DISCOVERY COMPLETE") |
| print(f"Time elapsed: {elapsed_time:.2f}s") |
| print(f"Profiles collected: {len(profiles)}") |
| print(f"{'=' * 60}\n") |
|
|
| return { |
| 'success': True, |
| 'profiles_collected': len(profiles), |
| 'profiles_indexed': len(self.indexed_profiles), |
| 'elapsed_time': elapsed_time, |
| 'query': query |
| } |
|
|
| def _discover_individuals(self, query: str, limit: int) -> List[str]: |
| """Discover individual names from OpenAlex""" |
| try: |
| url = "https://api.openalex.org/authors" |
| params = { |
| 'search': query, |
| 'per_page': limit, |
| 'sort': 'cited_by_count:desc' |
| } |
|
|
| response = requests.get(url, params=params, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
|
|
| names = [author['display_name'] for author in data.get('results', [])] |
| print(f" β
Discovered {len(names)} individuals") |
| return names |
|
|
| except Exception as e: |
| print(f" β Discovery error: {e}") |
| return [] |
|
|
| def search(self, query: str, k: int = 5) -> Dict[str, Any]: |
| """Search the indexed profiles""" |
| if not self.indexed_profiles: |
| return { |
| 'error': 'No profiles indexed yet. Run discover_and_index first.', |
| 'results': [] |
| } |
|
|
| results = self.rag_system.search(query, k=k) |
|
|
| return { |
| 'query': query, |
| 'results': results, |
| 'total_indexed': len(self.indexed_profiles) |
| } |
|
|
| def ask(self, question: str, k: int = 5) -> Dict[str, Any]: |
| """Ask a question and get a synthesized answer""" |
| if not self.indexed_profiles: |
| return { |
| 'error': 'No profiles indexed yet. Run discover_and_index first.', |
| 'answer': '', |
| 'sources': [] |
| } |
|
|
| return self.rag_system.synthesize_answer(question, k=k) |
|
|
| def get_all_profiles(self) -> List[IndividualProfile]: |
| """Get all indexed profiles""" |
| return self.indexed_profiles |
|
|
| def export_profiles(self, filepath: str): |
| """Export indexed profiles to JSON""" |
| with open(filepath, 'w') as f: |
| json.dump( |
| [asdict(p) for p in self.indexed_profiles], |
| f, |
| indent=2 |
| ) |
| print(f"β
Exported {len(self.indexed_profiles)} profiles to {filepath}") |
|
|
|
|
| |
| if __name__ == "__main__": |
| |
| orchestrator = AgenticRAGOrchestrator() |
|
|
| |
| result = orchestrator.discover_and_index("machine learning", max_profiles=15) |
| print(f"\nπ Discovery Result: {result}") |
|
|
| |
| search_results = orchestrator.search("neural networks experts", k=5) |
| print(f"\nπ Search Results:") |
| for i, profile in enumerate(search_results['results'], 1): |
| print(f"{i}. {profile['name']} - {profile['affiliation']}") |
|
|
| |
| answer = orchestrator.ask("Who are the leading researchers in deep learning?", k=5) |
| print(f"\n㪠Answer:") |
| print(answer['answer']) |