"""Research/Science Domain Plugin Scores research competency based on: - Publication record (papers, citations) - Lab experience and duration - Research project depth - Thesis/dissertation summaries """ import re import time import logging from typing import Dict, List from .base_plugin import BaseDomainPlugin, DomainScore from .plugin_factory import register_plugin logger = logging.getLogger(__name__) @register_plugin('research') class ResearchPlugin(BaseDomainPlugin): """Research/Science domain scoring plugin""" def __init__(self): super().__init__() # Research indicators self.publication_venues = [ 'journal', 'conference', 'proceedings', 'ieee', 'acm', 'springer', 'elsevier', 'nature', 'science', 'arxiv' ] self.research_methods = [ 'experiment', 'methodology', 'hypothesis', 'literature review', 'data collection', 'statistical analysis', 'simulation', 'survey' ] def _get_domain_type(self) -> str: return 'research' def _get_feature_weights(self) -> Dict[str, float]: return { 'publication_score': 0.35, 'lab_experience_score': 0.25, 'research_depth_score': 0.25, 'thesis_quality_score': 0.15 } def get_required_fields(self) -> List[str]: return ['research_description'] def get_optional_fields(self) -> List[str]: return ['publications_text', 'lab_experience_text', 'thesis_summary'] def score(self, evidence_data: Dict) -> DomainScore: """Calculate research domain score""" start_time = time.time() features = {} # Publication analysis publications = evidence_data.get('publications_text', '') features['publication_score'] = self._analyze_publications(publications) # Lab experience lab_exp = evidence_data.get('lab_experience_text', '') features['lab_experience_score'] = self._analyze_lab_experience(lab_exp) # Research depth from main description research_desc = evidence_data.get('research_description', '') features['research_depth_score'] = self._analyze_research_depth(research_desc) # Thesis quality thesis = evidence_data.get('thesis_summary', '') features['thesis_quality_score'] = self._analyze_thesis(thesis) # Calculate weighted score score = sum(features[k] * self.feature_weights[k] for k in features.keys()) # Calculate confidence confidence = self.calculate_confidence(evidence_data) processing_time = (time.time() - start_time) * 1000 return DomainScore( domain_type='research', score=min(score, 1.0), confidence=confidence, raw_features=features, processing_time_ms=processing_time ) def _analyze_publications(self, publications_text: str) -> float: """ Analyze publication record Returns: 0-1 score based on number and quality of publications """ if not publications_text or len(publications_text) < 30: return 0.0 text_lower = publications_text.lower() score = 0.0 # Count publication mentions (by common patterns) # Pattern: "Paper title" or [1] Reference format title_patterns = [ r'"([^"]+)"', # Quoted titles r'\[\d+\]', # Numbered references r'\d{4}\.\s', # Year format (2023. Title...) ] pub_count = 0 for pattern in title_patterns: matches = re.findall(pattern, publications_text) pub_count = max(pub_count, len(matches)) # Score based on publication count count_score = min(pub_count / 5, 0.6) # 5+ pubs = 0.6 score += count_score # Venue quality bonus venue_count = sum(1 for venue in self.publication_venues if venue in text_lower) venue_score = min(venue_count / 3, 0.4) # 3+ venues = 0.4 score += venue_score logger.info(f"Publication score: {score:.2f} ({pub_count} pubs, {venue_count} venues)") return min(score, 1.0) def _analyze_lab_experience(self, lab_text: str) -> float: """ Analyze laboratory experience Returns: 0-1 score based on duration and depth """ if not lab_text or len(lab_text) < 30: return 0.0 text_lower = lab_text.lower() score = 0.0 # Extract duration (months/years) duration_patterns = [ (r'(\d+)\s*years?', 12), # Convert years to months (r'(\d+)\s*months?', 1), ] max_duration = 0 for pattern, multiplier in duration_patterns: matches = re.findall(pattern, text_lower) if matches: duration = max([int(m) * multiplier for m in matches]) max_duration = max(max_duration, duration) # Duration score (12 months = max) duration_score = min(max_duration / 12, 0.5) score += duration_score # Lab quality indicators quality_keywords = ['research lab', 'professor', 'phd', 'equipment', 'experiment', 'protocol'] quality_count = sum(1 for kw in quality_keywords if kw in text_lower) quality_score = min(quality_count / 4, 0.5) score += quality_score logger.info(f"Lab experience: {score:.2f} ({max_duration} months)") return min(score, 1.0) def _analyze_research_depth(self, research_desc: str) -> float: """ Analyze research methodology depth Returns: 0-1 score based on methodology sophistication """ if not research_desc or len(research_desc) < 50: return 0.0 text_lower = research_desc.lower() score = 0.0 # Research method mentions method_count = sum(1 for method in self.research_methods if method in text_lower) method_score = min(method_count / 4, 0.5) score += method_score # Technical depth indicators technical_terms = [ 'algorithm', 'model', 'framework', 'dataset', 'validation', 'baseline', 'benchmark', 'evaluation', 'metrics', 'results' ] tech_count = sum(1 for term in technical_terms if term in text_lower) tech_score = min(tech_count / 5, 0.3) score += tech_score # Length as depth proxy length_score = min(len(research_desc) / 1000, 0.2) score += length_score logger.info(f"Research depth: {score:.2f}") return min(score, 1.0) def _analyze_thesis(self, thesis_text: str) -> float: """ Analyze thesis/dissertation quality Returns: 0-1 score based on structure and depth """ if not thesis_text or len(thesis_text) < 100: return 0.0 text_lower = thesis_text.lower() score = 0.0 # Thesis structure keywords structure_keywords = [ 'abstract', 'introduction', 'methodology', 'results', 'discussion', 'conclusion', 'references', 'chapter' ] structure_count = sum(1 for kw in structure_keywords if kw in text_lower) structure_score = min(structure_count / 5, 0.5) score += structure_score # Academic rigor indicators rigor_keywords = [ 'research question', 'objective', 'contribution', 'limitation', 'future work', 'significance', 'novelty', 'finding' ] rigor_count = sum(1 for kw in rigor_keywords if kw in text_lower) rigor_score = min(rigor_count / 4, 0.3) score += rigor_score # Length bonus length_score = min(len(thesis_text) / 2000, 0.2) score += length_score logger.info(f"Thesis quality: {score:.2f}") return min(score, 1.0)