FCT / services /domain_plugins /research_plugin.py
Parthnuwal7
Adding analytical content
3d015cd
"""Research/Science Domain Plugin
Scores research competency based on:
- Publication record (papers, citations)
- Lab experience and duration
- Research project depth
- Thesis/dissertation summaries
"""
import re
import time
import logging
from typing import Dict, List
from .base_plugin import BaseDomainPlugin, DomainScore
from .plugin_factory import register_plugin
logger = logging.getLogger(__name__)
@register_plugin('research')
class ResearchPlugin(BaseDomainPlugin):
"""Research/Science domain scoring plugin"""
def __init__(self):
super().__init__()
# Research indicators
self.publication_venues = [
'journal', 'conference', 'proceedings', 'ieee', 'acm',
'springer', 'elsevier', 'nature', 'science', 'arxiv'
]
self.research_methods = [
'experiment', 'methodology', 'hypothesis', 'literature review',
'data collection', 'statistical analysis', 'simulation', 'survey'
]
def _get_domain_type(self) -> str:
return 'research'
def _get_feature_weights(self) -> Dict[str, float]:
return {
'publication_score': 0.35,
'lab_experience_score': 0.25,
'research_depth_score': 0.25,
'thesis_quality_score': 0.15
}
def get_required_fields(self) -> List[str]:
return ['research_description']
def get_optional_fields(self) -> List[str]:
return ['publications_text', 'lab_experience_text', 'thesis_summary']
def score(self, evidence_data: Dict) -> DomainScore:
"""Calculate research domain score"""
start_time = time.time()
features = {}
# Publication analysis
publications = evidence_data.get('publications_text', '')
features['publication_score'] = self._analyze_publications(publications)
# Lab experience
lab_exp = evidence_data.get('lab_experience_text', '')
features['lab_experience_score'] = self._analyze_lab_experience(lab_exp)
# Research depth from main description
research_desc = evidence_data.get('research_description', '')
features['research_depth_score'] = self._analyze_research_depth(research_desc)
# Thesis quality
thesis = evidence_data.get('thesis_summary', '')
features['thesis_quality_score'] = self._analyze_thesis(thesis)
# Calculate weighted score
score = sum(features[k] * self.feature_weights[k] for k in features.keys())
# Calculate confidence
confidence = self.calculate_confidence(evidence_data)
processing_time = (time.time() - start_time) * 1000
return DomainScore(
domain_type='research',
score=min(score, 1.0),
confidence=confidence,
raw_features=features,
processing_time_ms=processing_time
)
def _analyze_publications(self, publications_text: str) -> float:
"""
Analyze publication record
Returns: 0-1 score based on number and quality of publications
"""
if not publications_text or len(publications_text) < 30:
return 0.0
text_lower = publications_text.lower()
score = 0.0
# Count publication mentions (by common patterns)
# Pattern: "Paper title" or [1] Reference format
title_patterns = [
r'"([^"]+)"', # Quoted titles
r'\[\d+\]', # Numbered references
r'\d{4}\.\s', # Year format (2023. Title...)
]
pub_count = 0
for pattern in title_patterns:
matches = re.findall(pattern, publications_text)
pub_count = max(pub_count, len(matches))
# Score based on publication count
count_score = min(pub_count / 5, 0.6) # 5+ pubs = 0.6
score += count_score
# Venue quality bonus
venue_count = sum(1 for venue in self.publication_venues if venue in text_lower)
venue_score = min(venue_count / 3, 0.4) # 3+ venues = 0.4
score += venue_score
logger.info(f"Publication score: {score:.2f} ({pub_count} pubs, {venue_count} venues)")
return min(score, 1.0)
def _analyze_lab_experience(self, lab_text: str) -> float:
"""
Analyze laboratory experience
Returns: 0-1 score based on duration and depth
"""
if not lab_text or len(lab_text) < 30:
return 0.0
text_lower = lab_text.lower()
score = 0.0
# Extract duration (months/years)
duration_patterns = [
(r'(\d+)\s*years?', 12), # Convert years to months
(r'(\d+)\s*months?', 1),
]
max_duration = 0
for pattern, multiplier in duration_patterns:
matches = re.findall(pattern, text_lower)
if matches:
duration = max([int(m) * multiplier for m in matches])
max_duration = max(max_duration, duration)
# Duration score (12 months = max)
duration_score = min(max_duration / 12, 0.5)
score += duration_score
# Lab quality indicators
quality_keywords = ['research lab', 'professor', 'phd', 'equipment', 'experiment', 'protocol']
quality_count = sum(1 for kw in quality_keywords if kw in text_lower)
quality_score = min(quality_count / 4, 0.5)
score += quality_score
logger.info(f"Lab experience: {score:.2f} ({max_duration} months)")
return min(score, 1.0)
def _analyze_research_depth(self, research_desc: str) -> float:
"""
Analyze research methodology depth
Returns: 0-1 score based on methodology sophistication
"""
if not research_desc or len(research_desc) < 50:
return 0.0
text_lower = research_desc.lower()
score = 0.0
# Research method mentions
method_count = sum(1 for method in self.research_methods if method in text_lower)
method_score = min(method_count / 4, 0.5)
score += method_score
# Technical depth indicators
technical_terms = [
'algorithm', 'model', 'framework', 'dataset', 'validation',
'baseline', 'benchmark', 'evaluation', 'metrics', 'results'
]
tech_count = sum(1 for term in technical_terms if term in text_lower)
tech_score = min(tech_count / 5, 0.3)
score += tech_score
# Length as depth proxy
length_score = min(len(research_desc) / 1000, 0.2)
score += length_score
logger.info(f"Research depth: {score:.2f}")
return min(score, 1.0)
def _analyze_thesis(self, thesis_text: str) -> float:
"""
Analyze thesis/dissertation quality
Returns: 0-1 score based on structure and depth
"""
if not thesis_text or len(thesis_text) < 100:
return 0.0
text_lower = thesis_text.lower()
score = 0.0
# Thesis structure keywords
structure_keywords = [
'abstract', 'introduction', 'methodology', 'results',
'discussion', 'conclusion', 'references', 'chapter'
]
structure_count = sum(1 for kw in structure_keywords if kw in text_lower)
structure_score = min(structure_count / 5, 0.5)
score += structure_score
# Academic rigor indicators
rigor_keywords = [
'research question', 'objective', 'contribution', 'limitation',
'future work', 'significance', 'novelty', 'finding'
]
rigor_count = sum(1 for kw in rigor_keywords if kw in text_lower)
rigor_score = min(rigor_count / 4, 0.3)
score += rigor_score
# Length bonus
length_score = min(len(thesis_text) / 2000, 0.2)
score += length_score
logger.info(f"Thesis quality: {score:.2f}")
return min(score, 1.0)