Qsearch / App /agentic_rag_system.py
flyfir248's picture
Commit : Updated header.html and routes.py
aa928dd
"""
Agentic AI System for Individual Information Collection and RAG-based Search
Uses Hugging Face Inference API (no local model downloads)
"""
import os
import time
import json
import requests
from typing import List, Dict, Optional, Any
from datetime import datetime
from dataclasses import dataclass, asdict
import hashlib
# Langchain imports
from langchain_huggingface import HuggingFaceEndpoint, HuggingFaceEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
@dataclass
class IndividualProfile:
"""Structured profile for an individual researcher/expert"""
id: str
name: str
affiliation: str
h_index: int
total_citations: int
total_papers: int
interests: List[str]
biography: str
recent_work: List[Dict]
profile_url: str
last_updated: str
source: str
metadata: Dict[str, Any]
class AgenticDataCollector:
"""
Agentic system that autonomously collects information about individuals
from multiple academic sources using intelligent crawling strategies
"""
def __init__(self, hf_token: Optional[str] = None):
self.hf_token = hf_token or os.getenv('HF_TOKEN')
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'AcademicResearchAgent/2.0',
'Accept': 'application/json'
})
# Initialize collection memory (stores what has been collected)
self.collection_memory = {}
def collect_individual_data(self, name: str, additional_context: str = "") -> Optional[IndividualProfile]:
"""
Autonomously collects comprehensive data about an individual
Args:
name: Name of the individual
additional_context: Additional search context (affiliation, field, etc.)
Returns:
IndividualProfile object with collected data
"""
print(f"πŸ€– Agent: Starting data collection for '{name}'")
# Check if already collected recently
cache_key = self._generate_cache_key(name, additional_context)
if cache_key in self.collection_memory:
cached_time = self.collection_memory[cache_key]['timestamp']
if (datetime.now() - cached_time).total_seconds() < 3600: # 1 hour cache
print(f"πŸ“¦ Agent: Using cached data for '{name}'")
return self.collection_memory[cache_key]['profile']
# Multi-step collection process
profile = self._execute_collection_pipeline(name, additional_context)
if profile:
self.collection_memory[cache_key] = {
'profile': profile,
'timestamp': datetime.now()
}
return profile
def _execute_collection_pipeline(self, name: str, context: str) -> Optional[IndividualProfile]:
"""Execute multi-step data collection pipeline"""
# Step 1: Search OpenAlex
print(f" πŸ“ Step 1: Searching OpenAlex...")
openalex_data = self._collect_from_openalex(name, context)
if not openalex_data:
print(f" ❌ No data found in OpenAlex")
return None
# Step 2: Enrich with Google Scholar (if available)
print(f" πŸ“ Step 2: Enriching with Google Scholar...")
scholar_data = self._collect_from_scholar(name, context)
# Step 3: Get recent publications
print(f" πŸ“ Step 3: Collecting recent publications...")
recent_papers = self._collect_recent_publications(openalex_data.get('id'))
# Step 4: Synthesize profile
print(f" πŸ“ Step 4: Synthesizing comprehensive profile...")
profile = self._synthesize_profile(openalex_data, scholar_data, recent_papers)
print(f" βœ… Collection complete for '{name}'")
return profile
def _collect_from_openalex(self, name: str, context: str) -> Optional[Dict]:
"""Collect data from OpenAlex API"""
try:
search_query = f"{name} {context}".strip()
url = "https://api.openalex.org/authors"
params = {
'search': search_query,
'per_page': 1
}
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = data.get('results', [])
if results:
return results[0]
return None
except Exception as e:
print(f" ⚠️ OpenAlex error: {e}")
return None
def _collect_from_scholar(self, name: str, context: str) -> Optional[Dict]:
"""Collect data from Google Scholar (via scholarly)"""
try:
from scholarly import scholarly
search_query = scholarly.search_author(name)
author = next(search_query, None)
if author:
return scholarly.fill(author, sections=['basics', 'indices'])
return None
except Exception as e:
print(f" ⚠️ Scholar error: {e}")
return None
def _collect_recent_publications(self, author_id: str, limit: int = 10) -> List[Dict]:
"""Collect recent publications for an author"""
if not author_id:
return []
try:
url = "https://api.openalex.org/works"
params = {
'filter': f'author.id:{author_id}',
'sort': 'publication_date:desc',
'per_page': limit
}
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
papers = []
for work in data.get('results', []):
papers.append({
'title': work.get('title', ''),
'year': work.get('publication_year', 0),
'cited_by_count': work.get('cited_by_count', 0),
'doi': work.get('doi', ''),
'type': work.get('type', ''),
'venue': work.get('primary_location', {}).get('source', {}).get('display_name', '')
})
return papers
except Exception as e:
print(f" ⚠️ Publications error: {e}")
return []
def _synthesize_profile(self, openalex_data: Dict, scholar_data: Optional[Dict],
recent_papers: List[Dict]) -> IndividualProfile:
"""Synthesize data from multiple sources into a unified profile"""
# Extract basic info
name = openalex_data.get('display_name', 'Unknown')
author_id = openalex_data.get('id', '').split('/')[-1]
# Get affiliation
last_inst = openalex_data.get('last_known_institution', {})
affiliation = last_inst.get('display_name', 'No affiliation')
# Get metrics
summary_stats = openalex_data.get('summary_stats', {})
h_index = summary_stats.get('h_index', 0)
total_citations = openalex_data.get('cited_by_count', 0)
total_papers = openalex_data.get('works_count', 0)
# Get interests/concepts
concepts = openalex_data.get('x_concepts', [])
interests = [c.get('display_name', '') for c in concepts[:10] if c.get('score', 0) > 20]
# Build biography
biography = self._generate_biography(name, affiliation, interests, h_index, total_papers)
# Metadata
metadata = {
'orcid': openalex_data.get('orcid', ''),
'i10_index': summary_stats.get('i10_index', 0),
'works_api_url': openalex_data.get('works_api_url', ''),
'institution_id': last_inst.get('id', ''),
'institution_country': last_inst.get('country_code', ''),
'scholar_data_available': scholar_data is not None
}
if scholar_data:
metadata['scholar_id'] = scholar_data.get('scholar_id', '')
metadata['email_domain'] = scholar_data.get('email_domain', '')
return IndividualProfile(
id=author_id,
name=name,
affiliation=affiliation,
h_index=h_index,
total_citations=total_citations,
total_papers=total_papers,
interests=interests,
biography=biography,
recent_work=recent_papers,
profile_url=f"https://openalex.org/authors/{author_id}",
last_updated=datetime.now().isoformat(),
source='OpenAlex + Google Scholar',
metadata=metadata
)
def _generate_biography(self, name: str, affiliation: str, interests: List[str],
h_index: int, total_papers: int) -> str:
"""Generate a structured biography from collected data"""
bio_parts = [
f"{name} is a researcher",
f"affiliated with {affiliation}" if affiliation != "No affiliation" else "with no listed affiliation",
f"with an h-index of {h_index} and {total_papers} published works."
]
if interests:
bio_parts.append(f"Research interests include: {', '.join(interests[:5])}.")
return " ".join(bio_parts)
def _generate_cache_key(self, name: str, context: str) -> str:
"""Generate a cache key for an individual"""
key_string = f"{name}_{context}".lower().strip()
return hashlib.md5(key_string.encode()).hexdigest()
def batch_collect(self, names: List[str], context: str = "") -> List[IndividualProfile]:
"""Collect data for multiple individuals"""
profiles = []
print(f"πŸš€ Agent: Starting batch collection for {len(names)} individuals")
for i, name in enumerate(names, 1):
print(f"\nπŸ“Š Progress: {i}/{len(names)}")
profile = self.collect_individual_data(name, context)
if profile:
profiles.append(profile)
# Rate limiting
if i < len(names):
time.sleep(1)
print(f"\nβœ… Batch collection complete: {len(profiles)}/{len(names)} profiles collected")
return profiles
class IntelligentRAGSystem:
"""
RAG system optimized for searching individual profiles
Uses HuggingFace API for embeddings and inference (no local models)
"""
def __init__(self, hf_token: Optional[str] = None):
self.hf_token = hf_token or os.getenv('HF_TOKEN')
# Initialize embeddings (lightweight API-based)
print("πŸ”§ Initializing RAG system...")
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={'device': 'cpu'}
)
# Initialize vector store
self.vector_store = InMemoryVectorStore(self.embeddings)
# Initialize LLM
if self.hf_token:
self.llm = HuggingFaceEndpoint(
repo_id="meta-llama/Meta-Llama-3-8B-Instruct",
huggingfacehub_api_token=self.hf_token,
temperature=0.2,
max_new_tokens=512
)
else:
self.llm = None
print("⚠️ Warning: No HF_TOKEN provided, LLM generation disabled")
# Text splitter for chunking
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", ". ", " ", ""]
)
print("βœ… RAG system initialized")
def index_profiles(self, profiles: List[IndividualProfile]):
"""Index individual profiles into the vector store"""
print(f"πŸ“š Indexing {len(profiles)} profiles...")
documents = []
for profile in profiles:
# Create comprehensive text representation
profile_text = self._profile_to_text(profile)
# Split into chunks
chunks = self.text_splitter.split_text(profile_text)
# Create documents with metadata
for chunk in chunks:
doc = Document(
page_content=chunk,
metadata={
'id': profile.id,
'name': profile.name,
'affiliation': profile.affiliation,
'h_index': profile.h_index,
'total_citations': profile.total_citations,
'profile_url': profile.profile_url,
'source': profile.source
}
)
documents.append(doc)
# Add to vector store
self.vector_store.add_documents(documents)
print(f"βœ… Indexed {len(documents)} document chunks from {len(profiles)} profiles")
def search(self, query: str, k: int = 5) -> List[Dict]:
"""Search for relevant profiles"""
print(f"πŸ” Searching for: '{query}'")
# Retrieve relevant documents
results = self.vector_store.similarity_search(query, k=k * 3)
# Deduplicate by profile ID and aggregate
profile_data = {}
for doc in results:
profile_id = doc.metadata['id']
if profile_id not in profile_data:
profile_data[profile_id] = {
'name': doc.metadata['name'],
'affiliation': doc.metadata['affiliation'],
'h_index': doc.metadata['h_index'],
'total_citations': doc.metadata['total_citations'],
'profile_url': doc.metadata['profile_url'],
'source': doc.metadata['source'],
'relevance_score': 0,
'matched_content': []
}
profile_data[profile_id]['matched_content'].append(doc.page_content)
profile_data[profile_id]['relevance_score'] += 1
# Sort by relevance
sorted_profiles = sorted(
profile_data.values(),
key=lambda x: (x['relevance_score'], x['h_index']),
reverse=True
)[:k]
print(f"βœ… Found {len(sorted_profiles)} relevant profiles")
return sorted_profiles
def synthesize_answer(self, query: str, k: int = 5) -> Dict[str, Any]:
"""Generate a synthesized answer using RAG"""
if not self.llm:
return {
'answer': "LLM not available. Please provide HF_TOKEN.",
'sources': []
}
print(f"🧠 Synthesizing answer for: '{query}'")
# Search for relevant profiles
relevant_profiles = self.search(query, k=k)
if not relevant_profiles:
return {
'answer': "No relevant researchers found for this query.",
'sources': []
}
# Build context from retrieved profiles
context = self._build_context(relevant_profiles)
# Create prompt
prompt = ChatPromptTemplate.from_messages([
("system", """You are a research assistant specializing in academic profiles.
Synthesize information about researchers based on the provided context.
Be specific, cite names, and focus on their expertise and contributions."""),
("user", """Query: {query}
Context about relevant researchers:
{context}
Please provide a comprehensive answer about these researchers and their relevance to the query.
Focus on their expertise, key contributions, and why they are relevant.""")
])
# Generate answer
formatted_prompt = prompt.format(query=query, context=context)
answer = self.llm.invoke(formatted_prompt)
print("βœ… Answer generated")
return {
'answer': answer,
'sources': relevant_profiles,
'context_used': len(relevant_profiles)
}
def _profile_to_text(self, profile: IndividualProfile) -> str:
"""Convert a profile to searchable text"""
sections = [
f"Name: {profile.name}",
f"Affiliation: {profile.affiliation}",
f"Biography: {profile.biography}",
f"Research Interests: {', '.join(profile.interests)}",
f"H-Index: {profile.h_index}",
f"Total Citations: {profile.total_citations}",
f"Total Papers: {profile.total_papers}"
]
if profile.recent_work:
sections.append("Recent Publications:")
for paper in profile.recent_work[:5]:
sections.append(f" - {paper.get('title', '')} ({paper.get('year', '')})")
return "\n".join(sections)
def _build_context(self, profiles: List[Dict]) -> str:
"""Build context string from profiles"""
context_parts = []
for i, profile in enumerate(profiles, 1):
context_parts.append(f"\n{i}. {profile['name']} ({profile['affiliation']})")
context_parts.append(f" H-Index: {profile['h_index']}, Citations: {profile['total_citations']}")
context_parts.append(f" Relevant content: {profile['matched_content'][0][:200]}...")
return "\n".join(context_parts)
def get_statistics(self) -> Dict[str, Any]:
"""Get statistics about the indexed data"""
# Note: InMemoryVectorStore doesn't expose document count directly
# This is a workaround
return {
'vector_store_type': 'InMemoryVectorStore',
'embedding_model': 'sentence-transformers/all-MiniLM-L6-v2',
'llm_model': 'meta-llama/Meta-Llama-3-8B-Instruct' if self.llm else 'None',
'status': 'active'
}
class AgenticRAGOrchestrator:
"""
High-level orchestrator that combines data collection and RAG search
"""
def __init__(self, hf_token: Optional[str] = None):
self.collector = AgenticDataCollector(hf_token)
self.rag_system = IntelligentRAGSystem(hf_token)
self.indexed_profiles = []
def discover_and_index(self, query: str, max_profiles: int = 20) -> Dict[str, Any]:
"""
Autonomous discovery: search for individuals, collect data, and index
Args:
query: Search query (field, topic, institution)
max_profiles: Maximum number of profiles to collect
Returns:
Statistics about the discovery process
"""
print(f"\n{'=' * 60}")
print(f"πŸš€ AGENTIC DISCOVERY INITIATED")
print(f"Query: {query}")
print(f"Target: {max_profiles} profiles")
print(f"{'=' * 60}\n")
start_time = time.time()
# Step 1: Discover individuals
print("πŸ“‘ Phase 1: Discovery")
discovered_names = self._discover_individuals(query, max_profiles)
if not discovered_names:
return {
'success': False,
'message': 'No individuals discovered',
'profiles_collected': 0
}
# Step 2: Collect detailed data
print(f"\nπŸ“₯ Phase 2: Data Collection")
profiles = self.collector.batch_collect(discovered_names, query)
# Step 3: Index into RAG system
print(f"\nπŸ“š Phase 3: Indexing")
self.rag_system.index_profiles(profiles)
self.indexed_profiles.extend(profiles)
elapsed_time = time.time() - start_time
print(f"\n{'=' * 60}")
print(f"βœ… DISCOVERY COMPLETE")
print(f"Time elapsed: {elapsed_time:.2f}s")
print(f"Profiles collected: {len(profiles)}")
print(f"{'=' * 60}\n")
return {
'success': True,
'profiles_collected': len(profiles),
'profiles_indexed': len(self.indexed_profiles),
'elapsed_time': elapsed_time,
'query': query
}
def _discover_individuals(self, query: str, limit: int) -> List[str]:
"""Discover individual names from OpenAlex"""
try:
url = "https://api.openalex.org/authors"
params = {
'search': query,
'per_page': limit,
'sort': 'cited_by_count:desc'
}
response = requests.get(url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
names = [author['display_name'] for author in data.get('results', [])]
print(f" βœ… Discovered {len(names)} individuals")
return names
except Exception as e:
print(f" ❌ Discovery error: {e}")
return []
def search(self, query: str, k: int = 5) -> Dict[str, Any]:
"""Search the indexed profiles"""
if not self.indexed_profiles:
return {
'error': 'No profiles indexed yet. Run discover_and_index first.',
'results': []
}
results = self.rag_system.search(query, k=k)
return {
'query': query,
'results': results,
'total_indexed': len(self.indexed_profiles)
}
def ask(self, question: str, k: int = 5) -> Dict[str, Any]:
"""Ask a question and get a synthesized answer"""
if not self.indexed_profiles:
return {
'error': 'No profiles indexed yet. Run discover_and_index first.',
'answer': '',
'sources': []
}
return self.rag_system.synthesize_answer(question, k=k)
def get_all_profiles(self) -> List[IndividualProfile]:
"""Get all indexed profiles"""
return self.indexed_profiles
def export_profiles(self, filepath: str):
"""Export indexed profiles to JSON"""
with open(filepath, 'w') as f:
json.dump(
[asdict(p) for p in self.indexed_profiles],
f,
indent=2
)
print(f"βœ… Exported {len(self.indexed_profiles)} profiles to {filepath}")
# Example usage
if __name__ == "__main__":
# Initialize orchestrator
orchestrator = AgenticRAGOrchestrator()
# Discover and index experts in a field
result = orchestrator.discover_and_index("machine learning", max_profiles=15)
print(f"\nπŸ“Š Discovery Result: {result}")
# Search
search_results = orchestrator.search("neural networks experts", k=5)
print(f"\nπŸ” Search Results:")
for i, profile in enumerate(search_results['results'], 1):
print(f"{i}. {profile['name']} - {profile['affiliation']}")
# Ask a question
answer = orchestrator.ask("Who are the leading researchers in deep learning?", k=5)
print(f"\nπŸ’¬ Answer:")
print(answer['answer'])