| """ |
| Discovery Fabric Engine - Multi-Source Research Intelligence |
| Integrates: OpenAlex, Zenodo, ROR, PubMed (via Entrez), Google Scholar |
| """ |
|
|
| from flask import Blueprint, request, jsonify, render_template |
| import requests |
| from Bio import Entrez |
| import os |
| from datetime import datetime |
| import numpy as np |
| from collections import defaultdict |
| import re |
| from urllib.parse import quote |
| import time |
|
|
| |
| fabric_bp = Blueprint('fabric', __name__) |
|
|
| |
| Entrez.email = os.getenv('Entrez_email', 'your_email@example.com') |
| Entrez.api_key = os.getenv('Entrez_api_key', '') |
|
|
| |
| OPENALEX_API = "https://api.openalex.org" |
| ZENODO_API = "https://zenodo.org/api/records" |
| ROR_API = "https://api.ror.org/organizations" |
| SEMANTIC_SCHOLAR_API = "https://api.semanticscholar.org/graph/v1" |
|
|
|
|
| class ResearcherProfile: |
| """Unified researcher object across all data sources""" |
|
|
| def __init__(self): |
| self.global_id = None |
| self.names = [] |
| self.orcid = None |
| self.affiliations = [] |
| self.openalex_id = None |
| self.scholar_id = None |
| self.zenodo_records = [] |
| self.pubmed_ids = [] |
|
|
| |
| self.citations = 0 |
| self.h_index = 0 |
| self.i10_index = 0 |
| self.paper_count = 0 |
| self.dataset_downloads = 0 |
| self.code_repos = 0 |
|
|
| |
| self.ror_id = None |
| self.institution_name = None |
| self.institution_tier = 0.5 |
|
|
| |
| self.topics = [] |
| self.topic_score = 0.0 |
|
|
| |
| self.network_centrality = 0.0 |
|
|
| |
| self.cpi_score = 0.0 |
|
|
| def calculate_cpi(self): |
| """Calculate Composite Performance Index++""" |
| |
| scholarly_authority = ( |
| np.log1p(self.citations) * 0.5 + |
| self.h_index * 0.3 + |
| self.i10_index * 0.2 |
| ) |
|
|
| |
| topical_alignment = self.topic_score |
|
|
| |
| practical_impact = ( |
| np.log1p(self.dataset_downloads) * 0.6 + |
| np.log1p(self.code_repos) * 0.4 |
| ) |
|
|
| |
| institutional_power = self.institution_tier |
|
|
| |
| network_centrality = self.network_centrality |
|
|
| |
| self.cpi_score = ( |
| 0.30 * scholarly_authority + |
| 0.25 * topical_alignment + |
| 0.20 * practical_impact + |
| 0.15 * institutional_power + |
| 0.10 * network_centrality |
| ) |
|
|
| return self.cpi_score |
|
|
| def to_dict(self): |
| """Convert to JSON-serializable dictionary""" |
| return { |
| 'global_id': self.global_id, |
| 'name': self.names[0] if self.names else 'Unknown', |
| 'orcid': self.orcid, |
| 'institution': self.institution_name, |
| 'citations': self.citations, |
| 'h_index': self.h_index, |
| 'paper_count': self.paper_count, |
| 'datasets': len(self.zenodo_records), |
| 'cpi_score': round(self.cpi_score, 3), |
| 'topics': self.topics[:5], |
| 'openalex_id': self.openalex_id |
| } |
|
|
|
|
| class DiscoveryFabricEngine: |
| """Main intelligence engine for multi-source research discovery""" |
|
|
| def __init__(self): |
| self.researchers = {} |
| self.institution_cache = {} |
|
|
| def run(self, query, depth='surface', field='life_sciences', persona='graduate'): |
| """ |
| Main execution pipeline |
| |
| Args: |
| query: Search query string |
| depth: 'surface', 'citation', 'raw_data' |
| field: 'life_sciences', 'physics', 'social_sci', 'engineering' |
| persona: 'graduate', 'specialist', 'super' |
| """ |
| results = { |
| 'query': query, |
| 'timestamp': datetime.now().isoformat(), |
| 'experts': [], |
| 'datasets': [], |
| 'citation_network': {}, |
| 'knowledge_gaps': [], |
| 'paper_density': 0, |
| 'field_stats': {} |
| } |
|
|
| |
| print(f"[Fabric] Phase 1: OpenAlex query for '{query}'") |
| openalex_data = self._query_openalex(query, field, persona) |
|
|
| if not openalex_data: |
| return results |
|
|
| |
| print(f"[Fabric] Phase 2: Building researcher profiles") |
| self._build_researcher_profiles(openalex_data, query) |
|
|
| |
| print(f"[Fabric] Phase 3: Enriching with Zenodo, PubMed, ROR") |
| if depth in ['citation', 'raw_data']: |
| self._enrich_with_zenodo(query, top_n=30) |
|
|
| if depth == 'raw_data': |
| self._enrich_with_pubmed(query, top_n=20) |
|
|
| |
| self._enrich_institutions() |
|
|
| |
| print(f"[Fabric] Phase 4: Calculating CPI++ scores") |
| ranked_researchers = self._rank_researchers(persona) |
|
|
| |
| results['experts'] = [r.to_dict() for r in ranked_researchers[:50]] |
| results['paper_density'] = self._calculate_paper_density(openalex_data) |
| results['field_stats'] = self._calculate_field_stats(openalex_data) |
|
|
| if depth == 'citation': |
| results['citation_network'] = self._build_citation_network(ranked_researchers[:20]) |
|
|
| if depth == 'raw_data': |
| results['datasets'] = self._aggregate_datasets(ranked_researchers[:30]) |
| results['knowledge_gaps'] = self._identify_knowledge_gaps(openalex_data) |
|
|
| return results |
|
|
| def _query_openalex(self, query, field, persona): |
| """Query OpenAlex API""" |
| |
| field_concepts = { |
| 'life_sciences': 'C86803240', |
| 'physics': 'C121332964', |
| 'social_sci': 'C144024400', |
| 'engineering': 'C127413603' |
| } |
|
|
| concept_id = field_concepts.get(field, 'C86803240') |
|
|
| |
| per_page = { |
| 'graduate': 100, |
| 'specialist': 50, |
| 'super': 25 |
| }.get(persona, 100) |
|
|
| url = f"{OPENALEX_API}/works" |
| params = { |
| 'filter': f'concepts.id:{concept_id},default.search:{query}', |
| 'per_page': per_page, |
| 'page': 1, |
| 'sort': 'cited_by_count:desc', |
| 'select': 'id,title,authorships,cited_by_count,publication_year,concepts,topics' |
| } |
|
|
| headers = {'User-Agent': 'QsearchIntelligence/1.0 (mailto:' + Entrez.email + ')'} |
|
|
| try: |
| response = requests.get(url, params=params, headers=headers, timeout=10) |
| response.raise_for_status() |
| data = response.json() |
| return data.get('results', []) |
| except Exception as e: |
| print(f"[Fabric] OpenAlex error: {e}") |
| return [] |
|
|
| def _build_researcher_profiles(self, works, query): |
| """Build unified researcher profiles from OpenAlex data""" |
| author_works = defaultdict(list) |
|
|
| |
| for work in works: |
| for authorship in work.get('authorships', []): |
| author = authorship.get('author', {}) |
| author_id = author.get('id', '') |
|
|
| if not author_id: |
| continue |
|
|
| author_works[author_id].append({ |
| 'work': work, |
| 'authorship': authorship |
| }) |
|
|
| |
| for author_id, works_list in author_works.items(): |
| if author_id in self.researchers: |
| continue |
|
|
| profile = ResearcherProfile() |
| profile.global_id = author_id |
| profile.openalex_id = author_id |
|
|
| |
| first_authorship = works_list[0]['authorship'] |
| author_data = first_authorship.get('author', {}) |
|
|
| profile.names.append(author_data.get('display_name', 'Unknown')) |
| profile.orcid = author_data.get('orcid') |
|
|
| |
| institutions = first_authorship.get('institutions', []) |
| if institutions: |
| inst = institutions[0] |
| profile.institution_name = inst.get('display_name') |
| profile.ror_id = inst.get('ror') |
|
|
| |
| profile.paper_count = len(works_list) |
| profile.citations = sum( |
| w['work'].get('cited_by_count', 0) for w in works_list |
| ) |
|
|
| |
| citations_sorted = sorted( |
| [w['work'].get('cited_by_count', 0) for w in works_list], |
| reverse=True |
| ) |
| profile.h_index = self._calculate_h_index(citations_sorted) |
| profile.i10_index = sum(1 for c in citations_sorted if c >= 10) |
|
|
| |
| all_topics = [] |
| for w in works_list: |
| topics = w['work'].get('topics', []) |
| for topic in topics[:3]: |
| all_topics.append(topic.get('display_name', '')) |
|
|
| |
| topic_counts = defaultdict(int) |
| for topic in all_topics: |
| topic_counts[topic] += 1 |
|
|
| profile.topics = [t for t, _ in sorted( |
| topic_counts.items(), key=lambda x: x[1], reverse=True |
| )[:5]] |
|
|
| |
| profile.topic_score = self._calculate_topic_score(profile.topics, query) |
|
|
| self.researchers[author_id] = profile |
|
|
| def _calculate_h_index(self, citations_sorted): |
| """Calculate h-index from sorted citation counts""" |
| h = 0 |
| for i, citations in enumerate(citations_sorted, 1): |
| if citations >= i: |
| h = i |
| else: |
| break |
| return h |
|
|
| def _calculate_topic_score(self, topics, query): |
| """Simple topic-query alignment score""" |
| query_terms = set(query.lower().split()) |
| topic_terms = set(' '.join(topics).lower().split()) |
|
|
| if not query_terms or not topic_terms: |
| return 0.0 |
|
|
| overlap = len(query_terms & topic_terms) |
| score = overlap / len(query_terms) |
|
|
| return min(score * 10, 10.0) |
|
|
| def _enrich_with_zenodo(self, query, top_n=30): |
| """Enrich top researchers with Zenodo datasets""" |
| |
| top_researchers = sorted( |
| self.researchers.values(), |
| key=lambda r: r.citations, |
| reverse=True |
| )[:top_n] |
|
|
| for researcher in top_researchers: |
| |
| if researcher.orcid: |
| zenodo_query = f'orcid:{researcher.orcid}' |
| else: |
| |
| name = researcher.names[0] if researcher.names else '' |
| zenodo_query = f'{name} {query}' |
|
|
| try: |
| url = ZENODO_API |
| params = { |
| 'q': zenodo_query, |
| 'size': 10, |
| 'type': 'dataset' |
| } |
|
|
| response = requests.get(url, params=params, timeout=5) |
| if response.status_code == 200: |
| data = response.json() |
| hits = data.get('hits', {}).get('hits', []) |
|
|
| researcher.zenodo_records = hits |
| researcher.dataset_downloads = sum( |
| h.get('stats', {}).get('downloads', 0) for h in hits |
| ) |
|
|
| |
| researcher.code_repos = sum( |
| 1 for h in hits |
| if 'software' in h.get('metadata', {}).get('resource_type', {}).get('type', '').lower() |
| ) |
|
|
| time.sleep(0.1) |
|
|
| except Exception as e: |
| print(f"[Fabric] Zenodo error for {researcher.names[0]}: {e}") |
| continue |
|
|
| def _enrich_with_pubmed(self, query, top_n=20): |
| """Enrich with PubMed data via Entrez""" |
| top_researchers = sorted( |
| self.researchers.values(), |
| key=lambda r: r.citations, |
| reverse=True |
| )[:top_n] |
|
|
| for researcher in top_researchers: |
| name = researcher.names[0] if researcher.names else '' |
|
|
| |
| search_query = f'{name}[Author] AND {query}' |
|
|
| try: |
| handle = Entrez.esearch(db="pubmed", term=search_query, retmax=20) |
| record = Entrez.read(handle) |
| handle.close() |
|
|
| researcher.pubmed_ids = record.get('IdList', []) |
|
|
| time.sleep(0.34) |
|
|
| except Exception as e: |
| print(f"[Fabric] PubMed error for {name}: {e}") |
| continue |
|
|
| def _enrich_institutions(self): |
| """Enrich institution data via ROR""" |
| unique_rors = set() |
| for researcher in self.researchers.values(): |
| if researcher.ror_id and researcher.ror_id not in self.institution_cache: |
| unique_rors.add(researcher.ror_id) |
|
|
| |
| for ror_id in unique_rors: |
| try: |
| |
| ror_short = ror_id.split('/')[-1] if '/' in ror_id else ror_id |
| url = f"{ROR_API}/{ror_short}" |
|
|
| response = requests.get(url, timeout=5) |
| if response.status_code == 200: |
| data = response.json() |
|
|
| |
| tier = self._calculate_institution_tier(data) |
|
|
| self.institution_cache[ror_id] = { |
| 'name': data.get('name'), |
| 'types': data.get('types', []), |
| 'country': data.get('country', {}).get('country_code'), |
| 'tier': tier |
| } |
|
|
| time.sleep(0.1) |
|
|
| except Exception as e: |
| print(f"[Fabric] ROR error for {ror_id}: {e}") |
| continue |
|
|
| |
| for researcher in self.researchers.values(): |
| if researcher.ror_id in self.institution_cache: |
| inst_data = self.institution_cache[researcher.ror_id] |
| researcher.institution_tier = inst_data['tier'] |
|
|
| def _calculate_institution_tier(self, ror_data): |
| """Calculate institution tier weight""" |
| types = ror_data.get('types', []) |
| country = ror_data.get('country', {}).get('country_code', '') |
|
|
| |
| tier = 0.5 |
|
|
| |
| if 'Education' in types: |
| tier = 0.85 |
|
|
| |
| if 'Healthcare' in types: |
| tier = 0.90 |
|
|
| |
| if 'Company' in types: |
| tier = 0.80 |
|
|
| |
| if 'Government' in types: |
| tier = 0.75 |
|
|
| |
| major_countries = ['US', 'GB', 'DE', 'FR', 'CA', 'AU', 'JP', 'CH', 'NL', 'SE'] |
| if country in major_countries: |
| tier = min(tier * 1.15, 1.0) |
|
|
| return tier |
|
|
| def _rank_researchers(self, persona): |
| """Calculate CPI++ and rank researchers""" |
| |
| for researcher in self.researchers.values(): |
| researcher.calculate_cpi() |
|
|
| |
| ranked = sorted( |
| self.researchers.values(), |
| key=lambda r: r.cpi_score, |
| reverse=True |
| ) |
|
|
| |
| if persona == 'specialist': |
| |
| ranked = [r for r in ranked if r.h_index >= 10 and r.paper_count >= 5] |
| elif persona == 'super': |
| |
| ranked = [r for r in ranked if r.h_index >= 20 and r.paper_count >= 10 and r.citations >= 500] |
|
|
| return ranked |
|
|
| def _calculate_paper_density(self, works): |
| """Calculate publishing density (papers per month)""" |
| if not works: |
| return 0 |
|
|
| |
| recent_count = sum( |
| 1 for w in works |
| if w.get('publication_year', 0) >= datetime.now().year - 2 |
| ) |
|
|
| density = recent_count / 24 |
| return round(density, 2) |
|
|
| def _calculate_field_stats(self, works): |
| """Calculate field statistics""" |
| stats = { |
| 'total_papers': len(works), |
| 'avg_citations': 0, |
| 'top_concepts': [] |
| } |
|
|
| if works: |
| stats['avg_citations'] = sum( |
| w.get('cited_by_count', 0) for w in works |
| ) / len(works) |
|
|
| |
| concept_counts = defaultdict(int) |
| for work in works: |
| for concept in work.get('concepts', [])[:5]: |
| name = concept.get('display_name') |
| if name: |
| concept_counts[name] += 1 |
|
|
| stats['top_concepts'] = [ |
| {'name': name, 'count': count} |
| for name, count in sorted( |
| concept_counts.items(), key=lambda x: x[1], reverse=True |
| )[:10] |
| ] |
|
|
| return stats |
|
|
| def _build_citation_network(self, researchers): |
| """Build citation network graph""" |
| network = { |
| 'nodes': [], |
| 'edges': [] |
| } |
|
|
| for researcher in researchers: |
| network['nodes'].append({ |
| 'id': researcher.global_id, |
| 'name': researcher.names[0] if researcher.names else 'Unknown', |
| 'citations': researcher.citations, |
| 'papers': researcher.paper_count, |
| 'institution': researcher.institution_name |
| }) |
|
|
| |
| |
|
|
| return network |
|
|
| def _aggregate_datasets(self, researchers): |
| """Aggregate datasets from top researchers""" |
| datasets = [] |
|
|
| for researcher in researchers: |
| for record in researcher.zenodo_records[:3]: |
| metadata = record.get('metadata', {}) |
| datasets.append({ |
| 'title': metadata.get('title', 'Untitled'), |
| 'author': researcher.names[0] if researcher.names else 'Unknown', |
| 'downloads': record.get('stats', {}).get('downloads', 0), |
| 'doi': metadata.get('doi', ''), |
| 'description': metadata.get('description', '')[:200] |
| }) |
|
|
| |
| datasets.sort(key=lambda d: d['downloads'], reverse=True) |
|
|
| return datasets[:20] |
|
|
| def _identify_knowledge_gaps(self, works): |
| """Identify potential knowledge gaps""" |
| gaps = [] |
|
|
| |
| topic_stats = defaultdict(lambda: {'count': 0, 'citations': 0}) |
|
|
| for work in works: |
| for topic in work.get('topics', [])[:3]: |
| name = topic.get('display_name') |
| if name: |
| topic_stats[name]['count'] += 1 |
| topic_stats[name]['citations'] += work.get('cited_by_count', 0) |
|
|
| |
| for topic, stats in topic_stats.items(): |
| if stats['count'] < 10 and stats['count'] > 0: |
| avg_citations = stats['citations'] / stats['count'] |
| if avg_citations > 20: |
| gaps.append({ |
| 'topic': topic, |
| 'paper_count': stats['count'], |
| 'avg_citations': round(avg_citations, 1), |
| 'potential': 'HIGH' |
| }) |
|
|
| |
| gaps.sort(key=lambda g: g['avg_citations'], reverse=True) |
|
|
| return gaps[:10] |
|
|
|
|
| |
|
|
| @fabric_bp.route('/discovery') |
| def discovery_page(): |
| """Render the discovery UI page""" |
| return render_template('index.html') |
|
|
|
|
| @fabric_bp.route('/discovery/search') |
| def discovery_search(): |
| """ |
| Main discovery search endpoint |
| |
| Query params: |
| q: search query |
| depth: surface|citation|raw_data (from radio buttons) |
| field: life_sciences|physics|social_sci|engineering |
| mode: graduate|specialist|super (persona) |
| """ |
| query = request.args.get('q', '').strip() |
|
|
| if not query: |
| return jsonify({'error': 'Query parameter required'}), 400 |
|
|
| |
| depth_map = { |
| 'surface': 'surface', |
| 'mid': 'citation', |
| 'deep': 'raw_data' |
| } |
|
|
| depth = request.args.get('depth', 'surface') |
| depth = depth_map.get(depth, 'surface') |
|
|
| field = request.args.get('field', 'life_sciences') |
| persona = request.args.get('mode', 'graduate') |
|
|
| |
| engine = DiscoveryFabricEngine() |
|
|
| try: |
| results = engine.run(query, depth, field, persona) |
| return jsonify(results) |
| except Exception as e: |
| print(f"[Fabric] Error: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
|
|
| @fabric_bp.route('/discovery/expert/<expert_id>') |
| def expert_detail(expert_id): |
| """Get detailed expert profile""" |
| |
| |
| return jsonify({ |
| 'expert_id': expert_id, |
| 'message': 'Expert detail endpoint - to be implemented' |
| }) |
|
|
| |
| |
| |