Qsearch / App /discovery_fabric.py
flyfir248's picture
Deployment v1.0 - Zero Secret History
d21c4c8
"""
Discovery Fabric Engine - Multi-Source Research Intelligence
Integrates: OpenAlex, Zenodo, ROR, PubMed (via Entrez), Google Scholar
"""
from flask import Blueprint, request, jsonify, render_template
import requests
from Bio import Entrez
import os
from datetime import datetime
import numpy as np
from collections import defaultdict
import re
from urllib.parse import quote
import time
# Initialize Blueprint
fabric_bp = Blueprint('fabric', __name__)
# Configuration from environment
Entrez.email = os.getenv('Entrez_email', 'your_email@example.com')
Entrez.api_key = os.getenv('Entrez_api_key', '')
# API Endpoints
OPENALEX_API = "https://api.openalex.org"
ZENODO_API = "https://zenodo.org/api/records"
ROR_API = "https://api.ror.org/organizations"
SEMANTIC_SCHOLAR_API = "https://api.semanticscholar.org/graph/v1"
class ResearcherProfile:
"""Unified researcher object across all data sources"""
def __init__(self):
self.global_id = None
self.names = []
self.orcid = None
self.affiliations = []
self.openalex_id = None
self.scholar_id = None
self.zenodo_records = []
self.pubmed_ids = []
# Metrics
self.citations = 0
self.h_index = 0
self.i10_index = 0
self.paper_count = 0
self.dataset_downloads = 0
self.code_repos = 0
# Institution
self.ror_id = None
self.institution_name = None
self.institution_tier = 0.5
# Topic alignment
self.topics = []
self.topic_score = 0.0
# Network
self.network_centrality = 0.0
# CPI++ Score
self.cpi_score = 0.0
def calculate_cpi(self):
"""Calculate Composite Performance Index++"""
# 1. Scholarly Authority (30%)
scholarly_authority = (
np.log1p(self.citations) * 0.5 +
self.h_index * 0.3 +
self.i10_index * 0.2
)
# 2. Topical Alignment (25%) - pre-calculated
topical_alignment = self.topic_score
# 3. Practical Impact (20%)
practical_impact = (
np.log1p(self.dataset_downloads) * 0.6 +
np.log1p(self.code_repos) * 0.4
)
# 4. Institutional Power (15%)
institutional_power = self.institution_tier
# 5. Network Centrality (10%)
network_centrality = self.network_centrality
# Weighted sum
self.cpi_score = (
0.30 * scholarly_authority +
0.25 * topical_alignment +
0.20 * practical_impact +
0.15 * institutional_power +
0.10 * network_centrality
)
return self.cpi_score
def to_dict(self):
"""Convert to JSON-serializable dictionary"""
return {
'global_id': self.global_id,
'name': self.names[0] if self.names else 'Unknown',
'orcid': self.orcid,
'institution': self.institution_name,
'citations': self.citations,
'h_index': self.h_index,
'paper_count': self.paper_count,
'datasets': len(self.zenodo_records),
'cpi_score': round(self.cpi_score, 3),
'topics': self.topics[:5],
'openalex_id': self.openalex_id
}
class DiscoveryFabricEngine:
"""Main intelligence engine for multi-source research discovery"""
def __init__(self):
self.researchers = {}
self.institution_cache = {}
def run(self, query, depth='surface', field='life_sciences', persona='graduate'):
"""
Main execution pipeline
Args:
query: Search query string
depth: 'surface', 'citation', 'raw_data'
field: 'life_sciences', 'physics', 'social_sci', 'engineering'
persona: 'graduate', 'specialist', 'super'
"""
results = {
'query': query,
'timestamp': datetime.now().isoformat(),
'experts': [],
'datasets': [],
'citation_network': {},
'knowledge_gaps': [],
'paper_density': 0,
'field_stats': {}
}
# Phase 1: OpenAlex backbone (200ms target)
print(f"[Fabric] Phase 1: OpenAlex query for '{query}'")
openalex_data = self._query_openalex(query, field, persona)
if not openalex_data:
return results
# Phase 2: Build researcher profiles
print(f"[Fabric] Phase 2: Building researcher profiles")
self._build_researcher_profiles(openalex_data, query)
# Phase 3: Enrich with additional sources
print(f"[Fabric] Phase 3: Enriching with Zenodo, PubMed, ROR")
if depth in ['citation', 'raw_data']:
self._enrich_with_zenodo(query, top_n=30)
if depth == 'raw_data':
self._enrich_with_pubmed(query, top_n=20)
# Phase 4: Institution enrichment via ROR
self._enrich_institutions()
# Phase 5: Calculate CPI++ and rank
print(f"[Fabric] Phase 4: Calculating CPI++ scores")
ranked_researchers = self._rank_researchers(persona)
# Phase 6: Build response
results['experts'] = [r.to_dict() for r in ranked_researchers[:50]]
results['paper_density'] = self._calculate_paper_density(openalex_data)
results['field_stats'] = self._calculate_field_stats(openalex_data)
if depth == 'citation':
results['citation_network'] = self._build_citation_network(ranked_researchers[:20])
if depth == 'raw_data':
results['datasets'] = self._aggregate_datasets(ranked_researchers[:30])
results['knowledge_gaps'] = self._identify_knowledge_gaps(openalex_data)
return results
def _query_openalex(self, query, field, persona):
"""Query OpenAlex API"""
# Map field to OpenAlex concept
field_concepts = {
'life_sciences': 'C86803240', # Biology
'physics': 'C121332964', # Physics
'social_sci': 'C144024400', # Social Science
'engineering': 'C127413603' # Engineering
}
concept_id = field_concepts.get(field, 'C86803240')
# Adjust per_page based on persona
per_page = {
'graduate': 100,
'specialist': 50,
'super': 25
}.get(persona, 100)
url = f"{OPENALEX_API}/works"
params = {
'filter': f'concepts.id:{concept_id},default.search:{query}',
'per_page': per_page,
'page': 1,
'sort': 'cited_by_count:desc',
'select': 'id,title,authorships,cited_by_count,publication_year,concepts,topics'
}
headers = {'User-Agent': 'QsearchIntelligence/1.0 (mailto:' + Entrez.email + ')'}
try:
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
return data.get('results', [])
except Exception as e:
print(f"[Fabric] OpenAlex error: {e}")
return []
def _build_researcher_profiles(self, works, query):
"""Build unified researcher profiles from OpenAlex data"""
author_works = defaultdict(list)
# Group works by author
for work in works:
for authorship in work.get('authorships', []):
author = authorship.get('author', {})
author_id = author.get('id', '')
if not author_id:
continue
author_works[author_id].append({
'work': work,
'authorship': authorship
})
# Create profiles
for author_id, works_list in author_works.items():
if author_id in self.researchers:
continue
profile = ResearcherProfile()
profile.global_id = author_id
profile.openalex_id = author_id
# Aggregate from works
first_authorship = works_list[0]['authorship']
author_data = first_authorship.get('author', {})
profile.names.append(author_data.get('display_name', 'Unknown'))
profile.orcid = author_data.get('orcid')
# Get institution from first authorship
institutions = first_authorship.get('institutions', [])
if institutions:
inst = institutions[0]
profile.institution_name = inst.get('display_name')
profile.ror_id = inst.get('ror')
# Aggregate metrics
profile.paper_count = len(works_list)
profile.citations = sum(
w['work'].get('cited_by_count', 0) for w in works_list
)
# Simple h-index calculation
citations_sorted = sorted(
[w['work'].get('cited_by_count', 0) for w in works_list],
reverse=True
)
profile.h_index = self._calculate_h_index(citations_sorted)
profile.i10_index = sum(1 for c in citations_sorted if c >= 10)
# Topic extraction
all_topics = []
for w in works_list:
topics = w['work'].get('topics', [])
for topic in topics[:3]: # Top 3 topics per paper
all_topics.append(topic.get('display_name', ''))
# Count topic frequency
topic_counts = defaultdict(int)
for topic in all_topics:
topic_counts[topic] += 1
profile.topics = [t for t, _ in sorted(
topic_counts.items(), key=lambda x: x[1], reverse=True
)[:5]]
# Calculate topic alignment with query
profile.topic_score = self._calculate_topic_score(profile.topics, query)
self.researchers[author_id] = profile
def _calculate_h_index(self, citations_sorted):
"""Calculate h-index from sorted citation counts"""
h = 0
for i, citations in enumerate(citations_sorted, 1):
if citations >= i:
h = i
else:
break
return h
def _calculate_topic_score(self, topics, query):
"""Simple topic-query alignment score"""
query_terms = set(query.lower().split())
topic_terms = set(' '.join(topics).lower().split())
if not query_terms or not topic_terms:
return 0.0
overlap = len(query_terms & topic_terms)
score = overlap / len(query_terms)
return min(score * 10, 10.0) # Scale to 0-10
def _enrich_with_zenodo(self, query, top_n=30):
"""Enrich top researchers with Zenodo datasets"""
# Get top N researchers by current score
top_researchers = sorted(
self.researchers.values(),
key=lambda r: r.citations,
reverse=True
)[:top_n]
for researcher in top_researchers:
# Search by ORCID if available
if researcher.orcid:
zenodo_query = f'orcid:{researcher.orcid}'
else:
# Search by name + topic
name = researcher.names[0] if researcher.names else ''
zenodo_query = f'{name} {query}'
try:
url = ZENODO_API
params = {
'q': zenodo_query,
'size': 10,
'type': 'dataset'
}
response = requests.get(url, params=params, timeout=5)
if response.status_code == 200:
data = response.json()
hits = data.get('hits', {}).get('hits', [])
researcher.zenodo_records = hits
researcher.dataset_downloads = sum(
h.get('stats', {}).get('downloads', 0) for h in hits
)
# Count code repositories
researcher.code_repos = sum(
1 for h in hits
if 'software' in h.get('metadata', {}).get('resource_type', {}).get('type', '').lower()
)
time.sleep(0.1) # Rate limiting
except Exception as e:
print(f"[Fabric] Zenodo error for {researcher.names[0]}: {e}")
continue
def _enrich_with_pubmed(self, query, top_n=20):
"""Enrich with PubMed data via Entrez"""
top_researchers = sorted(
self.researchers.values(),
key=lambda r: r.citations,
reverse=True
)[:top_n]
for researcher in top_researchers:
name = researcher.names[0] if researcher.names else ''
# Search PubMed
search_query = f'{name}[Author] AND {query}'
try:
handle = Entrez.esearch(db="pubmed", term=search_query, retmax=20)
record = Entrez.read(handle)
handle.close()
researcher.pubmed_ids = record.get('IdList', [])
time.sleep(0.34) # NCBI rate limit: 3 requests/sec with API key
except Exception as e:
print(f"[Fabric] PubMed error for {name}: {e}")
continue
def _enrich_institutions(self):
"""Enrich institution data via ROR"""
unique_rors = set()
for researcher in self.researchers.values():
if researcher.ror_id and researcher.ror_id not in self.institution_cache:
unique_rors.add(researcher.ror_id)
# Fetch ROR data
for ror_id in unique_rors:
try:
# Extract ROR ID from URL
ror_short = ror_id.split('/')[-1] if '/' in ror_id else ror_id
url = f"{ROR_API}/{ror_short}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
# Calculate tier based on types and country
tier = self._calculate_institution_tier(data)
self.institution_cache[ror_id] = {
'name': data.get('name'),
'types': data.get('types', []),
'country': data.get('country', {}).get('country_code'),
'tier': tier
}
time.sleep(0.1)
except Exception as e:
print(f"[Fabric] ROR error for {ror_id}: {e}")
continue
# Apply to researchers
for researcher in self.researchers.values():
if researcher.ror_id in self.institution_cache:
inst_data = self.institution_cache[researcher.ror_id]
researcher.institution_tier = inst_data['tier']
def _calculate_institution_tier(self, ror_data):
"""Calculate institution tier weight"""
types = ror_data.get('types', [])
country = ror_data.get('country', {}).get('country_code', '')
# Base tier
tier = 0.5
# Education institutions
if 'Education' in types:
tier = 0.85
# Healthcare
if 'Healthcare' in types:
tier = 0.90
# Company/Industry
if 'Company' in types:
tier = 0.80
# Government
if 'Government' in types:
tier = 0.75
# Boost for major research countries
major_countries = ['US', 'GB', 'DE', 'FR', 'CA', 'AU', 'JP', 'CH', 'NL', 'SE']
if country in major_countries:
tier = min(tier * 1.15, 1.0)
return tier
def _rank_researchers(self, persona):
"""Calculate CPI++ and rank researchers"""
# Calculate CPI for all
for researcher in self.researchers.values():
researcher.calculate_cpi()
# Sort by CPI
ranked = sorted(
self.researchers.values(),
key=lambda r: r.cpi_score,
reverse=True
)
# Apply persona filtering
if persona == 'specialist':
# Filter: h-index >= 10, papers >= 5
ranked = [r for r in ranked if r.h_index >= 10 and r.paper_count >= 5]
elif persona == 'super':
# Filter: h-index >= 20, papers >= 10, citations >= 500
ranked = [r for r in ranked if r.h_index >= 20 and r.paper_count >= 10 and r.citations >= 500]
return ranked
def _calculate_paper_density(self, works):
"""Calculate publishing density (papers per month)"""
if not works:
return 0
# Count recent papers (last 2 years)
recent_count = sum(
1 for w in works
if w.get('publication_year', 0) >= datetime.now().year - 2
)
density = recent_count / 24 # Papers per month
return round(density, 2)
def _calculate_field_stats(self, works):
"""Calculate field statistics"""
stats = {
'total_papers': len(works),
'avg_citations': 0,
'top_concepts': []
}
if works:
stats['avg_citations'] = sum(
w.get('cited_by_count', 0) for w in works
) / len(works)
# Count concepts
concept_counts = defaultdict(int)
for work in works:
for concept in work.get('concepts', [])[:5]:
name = concept.get('display_name')
if name:
concept_counts[name] += 1
stats['top_concepts'] = [
{'name': name, 'count': count}
for name, count in sorted(
concept_counts.items(), key=lambda x: x[1], reverse=True
)[:10]
]
return stats
def _build_citation_network(self, researchers):
"""Build citation network graph"""
network = {
'nodes': [],
'edges': []
}
for researcher in researchers:
network['nodes'].append({
'id': researcher.global_id,
'name': researcher.names[0] if researcher.names else 'Unknown',
'citations': researcher.citations,
'papers': researcher.paper_count,
'institution': researcher.institution_name
})
# Note: Building actual citation edges requires additional OpenAlex API calls
# For now, we return node structure that can be enhanced client-side
return network
def _aggregate_datasets(self, researchers):
"""Aggregate datasets from top researchers"""
datasets = []
for researcher in researchers:
for record in researcher.zenodo_records[:3]: # Top 3 per researcher
metadata = record.get('metadata', {})
datasets.append({
'title': metadata.get('title', 'Untitled'),
'author': researcher.names[0] if researcher.names else 'Unknown',
'downloads': record.get('stats', {}).get('downloads', 0),
'doi': metadata.get('doi', ''),
'description': metadata.get('description', '')[:200]
})
# Sort by downloads
datasets.sort(key=lambda d: d['downloads'], reverse=True)
return datasets[:20]
def _identify_knowledge_gaps(self, works):
"""Identify potential knowledge gaps"""
gaps = []
# Look for topics with low paper counts but high citation potential
topic_stats = defaultdict(lambda: {'count': 0, 'citations': 0})
for work in works:
for topic in work.get('topics', [])[:3]:
name = topic.get('display_name')
if name:
topic_stats[name]['count'] += 1
topic_stats[name]['citations'] += work.get('cited_by_count', 0)
# Find topics with low counts but high average citations
for topic, stats in topic_stats.items():
if stats['count'] < 10 and stats['count'] > 0:
avg_citations = stats['citations'] / stats['count']
if avg_citations > 20:
gaps.append({
'topic': topic,
'paper_count': stats['count'],
'avg_citations': round(avg_citations, 1),
'potential': 'HIGH'
})
# Sort by average citations
gaps.sort(key=lambda g: g['avg_citations'], reverse=True)
return gaps[:10]
# Flask Routes
@fabric_bp.route('/discovery')
def discovery_page():
"""Render the discovery UI page"""
return render_template('index.html')
@fabric_bp.route('/discovery/search')
def discovery_search():
"""
Main discovery search endpoint
Query params:
q: search query
depth: surface|citation|raw_data (from radio buttons)
field: life_sciences|physics|social_sci|engineering
mode: graduate|specialist|super (persona)
"""
query = request.args.get('q', '').strip()
if not query:
return jsonify({'error': 'Query parameter required'}), 400
# Get UI parameters
depth_map = {
'surface': 'surface',
'mid': 'citation',
'deep': 'raw_data'
}
depth = request.args.get('depth', 'surface')
depth = depth_map.get(depth, 'surface')
field = request.args.get('field', 'life_sciences')
persona = request.args.get('mode', 'graduate')
# Initialize and run engine
engine = DiscoveryFabricEngine()
try:
results = engine.run(query, depth, field, persona)
return jsonify(results)
except Exception as e:
print(f"[Fabric] Error: {e}")
return jsonify({'error': str(e)}), 500
@fabric_bp.route('/discovery/expert/<expert_id>')
def expert_detail(expert_id):
"""Get detailed expert profile"""
# This would query individual expert details
# For now, returns placeholder
return jsonify({
'expert_id': expert_id,
'message': 'Expert detail endpoint - to be implemented'
})
# Register blueprint in your main app.py:
# from discovery_fabric import fabric_bp
# app.register_blueprint(fabric_bp)