review-screening-analyzer / file_processor.py
chitsanfei's picture
feat: redesigned the interface and optimized the performance of the analysis implementation (#11)
f4ed71d unverified
"""
File Processor - Handles citation file parsing and Excel I/O operations.
Optimized for efficient file handling with streaming and chunked processing.
"""
import logging
import os
import re
from typing import Dict, List, Optional, Tuple
import pandas as pd
# Constants
REQUIRED_COLUMNS = ('Title', 'Authors', 'Abstract', 'DOI')
PREVIEW_RECORD_COUNT = 3
PREVIEW_FIELD_LENGTHS = {'DOI': 50, 'Title': 100, 'Authors': 100, 'Abstract': 200}
# Pre-compiled regex patterns
SCOPUS_RECORD_PATTERN = re.compile(r'\nER\s*-\s*')
class FileProcessor:
"""Handles citation file parsing and Excel I/O operations."""
__slots__ = ('data_dir',)
def __init__(self, data_dir: str):
self.data_dir = data_dir
def parse_nbib(self, file_path: str) -> Tuple[Optional[str], str]:
"""Parse PubMed NBIB file to Excel format."""
if not self._validate_file(file_path):
return None, "Invalid file"
try:
records = []
record: Dict[str, str] = {}
authors: List[str] = []
current_field: Optional[str] = None
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith('TI - '):
record['Title'] = line[6:].strip()
current_field = 'Title'
elif line.startswith('AB - '):
record['Abstract'] = line[6:].strip()
current_field = 'Abstract'
elif line.startswith('AU - '):
authors.append(line[6:].strip())
current_field = None
elif line.startswith('LID - ') and '[doi]' in line:
record['DOI'] = line[6:].replace(' [doi]', '').strip()
current_field = None
elif line.startswith('PMID- '):
if record:
record['Authors'] = '; '.join(authors)
records.append(record)
record = {}
authors = []
current_field = None
elif line.startswith(' ') and current_field in ('Abstract', 'Title'):
record[current_field] += ' ' + line.strip()
# Save last record
if record:
record['Authors'] = '; '.join(authors)
records.append(record)
return self._save_records(records, "extracted_data.xlsx")
except Exception as e:
logging.error(f"NBIB parsing error: {e}")
return None, f"Error: {str(e)}"
def parse_wos_ris(self, file_path: str) -> Tuple[Optional[str], str]:
"""Parse Web of Science RIS file to Excel format."""
if not self._validate_file(file_path):
return None, "Invalid file"
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content:
return None, "Empty file"
records = []
for article in content.split("\nER -"):
if not article.strip():
continue
record: Dict[str, str] = {}
authors: List[str] = []
for line in article.strip().split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('TI - '):
record['Title'] = line[6:].strip()
elif line.startswith('AB - '):
record['Abstract'] = line[6:].strip()
elif line.startswith('AU - '):
authors.append(line[6:].strip())
elif line.startswith('DO - '):
record['DOI'] = line[6:].strip()
elif line.startswith(' '):
if 'Abstract' in record:
record['Abstract'] += ' ' + line.strip()
elif 'Title' in record:
record['Title'] += ' ' + line.strip()
if record:
record['Authors'] = '; '.join(authors)
records.append(record)
return self._save_records(records, "extracted_data.xlsx")
except Exception as e:
logging.error(f"WOS RIS parsing error: {e}")
return None, f"Error: {str(e)}"
def parse_embase_ris(self, file_path: str) -> Tuple[Optional[str], str]:
"""Parse Embase RIS file to Excel format."""
if not self._validate_file(file_path):
return None, "Invalid file"
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content:
return None, "Empty file"
records = []
for article in content.split("\n\n"):
if not article.strip():
continue
record: Dict[str, str] = {}
authors: List[str] = []
for line in article.strip().split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('T1 - '):
record['Title'] = line[6:].strip()
elif line.startswith('N2 - '):
record['Abstract'] = line[6:].strip()
elif line.startswith('A1 - '):
authors.append(line[6:].strip())
elif line.startswith('DO - '):
record['DOI'] = line[6:].strip()
elif line.startswith(' '):
if 'Abstract' in record:
record['Abstract'] += ' ' + line.strip()
elif 'Title' in record:
record['Title'] += ' ' + line.strip()
if record:
record['Authors'] = '; '.join(authors) if authors else ''
records.append(record)
return self._save_records(records, "extracted_data.xlsx")
except Exception as e:
logging.error(f"Embase RIS parsing error: {e}")
return None, f"Error: {str(e)}"
def parse_scopus_ris(self, file_path: str) -> Tuple[Optional[str], str]:
"""Parse Scopus RIS file to Excel format."""
if not self._validate_file(file_path):
return None, "Invalid file"
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content:
return None, "Empty file"
records = []
for article in SCOPUS_RECORD_PATTERN.split(content):
if not article.strip():
continue
record: Dict[str, str] = {}
authors: List[str] = []
for line in article.strip().split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('TI - '):
record['Title'] = line[6:].strip()
elif line.startswith('AB - '):
record['Abstract'] = line[6:].strip()
elif line.startswith('AU - '):
authors.append(line[6:].strip())
elif line.startswith('DO - '):
record['DOI'] = line[6:].strip()
elif line.startswith(' '):
if 'Abstract' in record:
record['Abstract'] += ' ' + line.strip()
elif 'Title' in record:
record['Title'] += ' ' + line.strip()
record['Authors'] = '; '.join(authors)
records.append(record)
return self._save_records(records, "extracted_data.xlsx")
except Exception as e:
logging.error(f"Scopus RIS parsing error: {e}")
return None, f"Error: {str(e)}"
def load_excel(self, file_path: str) -> Optional[pd.DataFrame]:
"""Load Excel file with proper index handling."""
try:
df = pd.read_excel(file_path, index_col=0)
# Ensure proper index setup
if "Index" in df.columns:
df.set_index("Index", inplace=True)
elif df.index.name != "Index":
df.index.name = "Index"
# Normalize index
df.index = df.index.astype(str).str.strip()
# Remove duplicates
if df.index.duplicated().any():
logging.warning(f"Removing duplicate indices in {file_path}")
df = df[~df.index.duplicated(keep='first')]
return df
except Exception as e:
logging.error(f"Excel load error: {e}")
return None
def save_excel(self, df: pd.DataFrame, filename: str) -> str:
"""Save DataFrame to Excel file."""
try:
df = df.copy()
# Handle Index column conflict
if "Index" in df.columns:
# If there's already an Index column, save it as Original_Index to avoid conflict
df = df.rename(columns={"Index": "Original_Index"})
# Ensure proper index
if df.index.name != "Index":
df.index.name = "Index"
df.index = df.index.astype(str)
# Remove duplicates
if df.index.duplicated().any():
logging.warning(f"Removing duplicate indices when saving {filename}")
df = df[~df.index.duplicated(keep='first')]
output_path = os.path.join(self.data_dir, filename)
df.to_excel(output_path, index=True)
return output_path
except Exception as e:
logging.error(f"Excel save error: {e}")
return ""
def _validate_file(self, file_path: str) -> bool:
"""Validate file exists and is readable."""
return bool(file_path and os.path.exists(file_path))
def _save_records(self, records: List[Dict], filename: str) -> Tuple[Optional[str], str]:
"""Save parsed records to Excel and generate preview."""
if not records:
return None, "No records found"
df = pd.DataFrame(records)
# Ensure all required columns exist
for col in REQUIRED_COLUMNS:
if col not in df.columns:
df[col] = ''
df.index.name = 'Index'
output_path = os.path.join(self.data_dir, filename)
df.to_excel(output_path, index=True)
preview = self._generate_preview(records)
return output_path, preview
def _generate_preview(self, records: List[Dict]) -> str:
"""Generate preview text for parsed records."""
lines = []
for i, record in enumerate(records[:PREVIEW_RECORD_COUNT]):
lines.append(f"\nRecord {i}:")
for field, max_len in PREVIEW_FIELD_LENGTHS.items():
value = record.get(field, '')[:max_len]
suffix = '...' if len(record.get(field, '')) > max_len else ''
lines.append(f"{field}: {value}{suffix}")
lines.append("-" * 80)
lines.append(f"\nTotal records extracted: {len(records)}")
return '\n'.join(lines)