feat: redesigned the interface and optimized the performance of the analysis implementation (#11)
f4ed71d
unverified
| """ | |
| File Processor - Handles citation file parsing and Excel I/O operations. | |
| Optimized for efficient file handling with streaming and chunked processing. | |
| """ | |
| import logging | |
| import os | |
| import re | |
| from typing import Dict, List, Optional, Tuple | |
| import pandas as pd | |
| # Constants | |
| REQUIRED_COLUMNS = ('Title', 'Authors', 'Abstract', 'DOI') | |
| PREVIEW_RECORD_COUNT = 3 | |
| PREVIEW_FIELD_LENGTHS = {'DOI': 50, 'Title': 100, 'Authors': 100, 'Abstract': 200} | |
| # Pre-compiled regex patterns | |
| SCOPUS_RECORD_PATTERN = re.compile(r'\nER\s*-\s*') | |
| class FileProcessor: | |
| """Handles citation file parsing and Excel I/O operations.""" | |
| __slots__ = ('data_dir',) | |
| def __init__(self, data_dir: str): | |
| self.data_dir = data_dir | |
| def parse_nbib(self, file_path: str) -> Tuple[Optional[str], str]: | |
| """Parse PubMed NBIB file to Excel format.""" | |
| if not self._validate_file(file_path): | |
| return None, "Invalid file" | |
| try: | |
| records = [] | |
| record: Dict[str, str] = {} | |
| authors: List[str] = [] | |
| current_field: Optional[str] = None | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if line.startswith('TI - '): | |
| record['Title'] = line[6:].strip() | |
| current_field = 'Title' | |
| elif line.startswith('AB - '): | |
| record['Abstract'] = line[6:].strip() | |
| current_field = 'Abstract' | |
| elif line.startswith('AU - '): | |
| authors.append(line[6:].strip()) | |
| current_field = None | |
| elif line.startswith('LID - ') and '[doi]' in line: | |
| record['DOI'] = line[6:].replace(' [doi]', '').strip() | |
| current_field = None | |
| elif line.startswith('PMID- '): | |
| if record: | |
| record['Authors'] = '; '.join(authors) | |
| records.append(record) | |
| record = {} | |
| authors = [] | |
| current_field = None | |
| elif line.startswith(' ') and current_field in ('Abstract', 'Title'): | |
| record[current_field] += ' ' + line.strip() | |
| # Save last record | |
| if record: | |
| record['Authors'] = '; '.join(authors) | |
| records.append(record) | |
| return self._save_records(records, "extracted_data.xlsx") | |
| except Exception as e: | |
| logging.error(f"NBIB parsing error: {e}") | |
| return None, f"Error: {str(e)}" | |
| def parse_wos_ris(self, file_path: str) -> Tuple[Optional[str], str]: | |
| """Parse Web of Science RIS file to Excel format.""" | |
| if not self._validate_file(file_path): | |
| return None, "Invalid file" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| if not content: | |
| return None, "Empty file" | |
| records = [] | |
| for article in content.split("\nER -"): | |
| if not article.strip(): | |
| continue | |
| record: Dict[str, str] = {} | |
| authors: List[str] = [] | |
| for line in article.strip().split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if line.startswith('TI - '): | |
| record['Title'] = line[6:].strip() | |
| elif line.startswith('AB - '): | |
| record['Abstract'] = line[6:].strip() | |
| elif line.startswith('AU - '): | |
| authors.append(line[6:].strip()) | |
| elif line.startswith('DO - '): | |
| record['DOI'] = line[6:].strip() | |
| elif line.startswith(' '): | |
| if 'Abstract' in record: | |
| record['Abstract'] += ' ' + line.strip() | |
| elif 'Title' in record: | |
| record['Title'] += ' ' + line.strip() | |
| if record: | |
| record['Authors'] = '; '.join(authors) | |
| records.append(record) | |
| return self._save_records(records, "extracted_data.xlsx") | |
| except Exception as e: | |
| logging.error(f"WOS RIS parsing error: {e}") | |
| return None, f"Error: {str(e)}" | |
| def parse_embase_ris(self, file_path: str) -> Tuple[Optional[str], str]: | |
| """Parse Embase RIS file to Excel format.""" | |
| if not self._validate_file(file_path): | |
| return None, "Invalid file" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| if not content: | |
| return None, "Empty file" | |
| records = [] | |
| for article in content.split("\n\n"): | |
| if not article.strip(): | |
| continue | |
| record: Dict[str, str] = {} | |
| authors: List[str] = [] | |
| for line in article.strip().split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if line.startswith('T1 - '): | |
| record['Title'] = line[6:].strip() | |
| elif line.startswith('N2 - '): | |
| record['Abstract'] = line[6:].strip() | |
| elif line.startswith('A1 - '): | |
| authors.append(line[6:].strip()) | |
| elif line.startswith('DO - '): | |
| record['DOI'] = line[6:].strip() | |
| elif line.startswith(' '): | |
| if 'Abstract' in record: | |
| record['Abstract'] += ' ' + line.strip() | |
| elif 'Title' in record: | |
| record['Title'] += ' ' + line.strip() | |
| if record: | |
| record['Authors'] = '; '.join(authors) if authors else '' | |
| records.append(record) | |
| return self._save_records(records, "extracted_data.xlsx") | |
| except Exception as e: | |
| logging.error(f"Embase RIS parsing error: {e}") | |
| return None, f"Error: {str(e)}" | |
| def parse_scopus_ris(self, file_path: str) -> Tuple[Optional[str], str]: | |
| """Parse Scopus RIS file to Excel format.""" | |
| if not self._validate_file(file_path): | |
| return None, "Invalid file" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| if not content: | |
| return None, "Empty file" | |
| records = [] | |
| for article in SCOPUS_RECORD_PATTERN.split(content): | |
| if not article.strip(): | |
| continue | |
| record: Dict[str, str] = {} | |
| authors: List[str] = [] | |
| for line in article.strip().split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if line.startswith('TI - '): | |
| record['Title'] = line[6:].strip() | |
| elif line.startswith('AB - '): | |
| record['Abstract'] = line[6:].strip() | |
| elif line.startswith('AU - '): | |
| authors.append(line[6:].strip()) | |
| elif line.startswith('DO - '): | |
| record['DOI'] = line[6:].strip() | |
| elif line.startswith(' '): | |
| if 'Abstract' in record: | |
| record['Abstract'] += ' ' + line.strip() | |
| elif 'Title' in record: | |
| record['Title'] += ' ' + line.strip() | |
| record['Authors'] = '; '.join(authors) | |
| records.append(record) | |
| return self._save_records(records, "extracted_data.xlsx") | |
| except Exception as e: | |
| logging.error(f"Scopus RIS parsing error: {e}") | |
| return None, f"Error: {str(e)}" | |
| def load_excel(self, file_path: str) -> Optional[pd.DataFrame]: | |
| """Load Excel file with proper index handling.""" | |
| try: | |
| df = pd.read_excel(file_path, index_col=0) | |
| # Ensure proper index setup | |
| if "Index" in df.columns: | |
| df.set_index("Index", inplace=True) | |
| elif df.index.name != "Index": | |
| df.index.name = "Index" | |
| # Normalize index | |
| df.index = df.index.astype(str).str.strip() | |
| # Remove duplicates | |
| if df.index.duplicated().any(): | |
| logging.warning(f"Removing duplicate indices in {file_path}") | |
| df = df[~df.index.duplicated(keep='first')] | |
| return df | |
| except Exception as e: | |
| logging.error(f"Excel load error: {e}") | |
| return None | |
| def save_excel(self, df: pd.DataFrame, filename: str) -> str: | |
| """Save DataFrame to Excel file.""" | |
| try: | |
| df = df.copy() | |
| # Handle Index column conflict | |
| if "Index" in df.columns: | |
| # If there's already an Index column, save it as Original_Index to avoid conflict | |
| df = df.rename(columns={"Index": "Original_Index"}) | |
| # Ensure proper index | |
| if df.index.name != "Index": | |
| df.index.name = "Index" | |
| df.index = df.index.astype(str) | |
| # Remove duplicates | |
| if df.index.duplicated().any(): | |
| logging.warning(f"Removing duplicate indices when saving {filename}") | |
| df = df[~df.index.duplicated(keep='first')] | |
| output_path = os.path.join(self.data_dir, filename) | |
| df.to_excel(output_path, index=True) | |
| return output_path | |
| except Exception as e: | |
| logging.error(f"Excel save error: {e}") | |
| return "" | |
| def _validate_file(self, file_path: str) -> bool: | |
| """Validate file exists and is readable.""" | |
| return bool(file_path and os.path.exists(file_path)) | |
| def _save_records(self, records: List[Dict], filename: str) -> Tuple[Optional[str], str]: | |
| """Save parsed records to Excel and generate preview.""" | |
| if not records: | |
| return None, "No records found" | |
| df = pd.DataFrame(records) | |
| # Ensure all required columns exist | |
| for col in REQUIRED_COLUMNS: | |
| if col not in df.columns: | |
| df[col] = '' | |
| df.index.name = 'Index' | |
| output_path = os.path.join(self.data_dir, filename) | |
| df.to_excel(output_path, index=True) | |
| preview = self._generate_preview(records) | |
| return output_path, preview | |
| def _generate_preview(self, records: List[Dict]) -> str: | |
| """Generate preview text for parsed records.""" | |
| lines = [] | |
| for i, record in enumerate(records[:PREVIEW_RECORD_COUNT]): | |
| lines.append(f"\nRecord {i}:") | |
| for field, max_len in PREVIEW_FIELD_LENGTHS.items(): | |
| value = record.get(field, '')[:max_len] | |
| suffix = '...' if len(record.get(field, '')) > max_len else '' | |
| lines.append(f"{field}: {value}{suffix}") | |
| lines.append("-" * 80) | |
| lines.append(f"\nTotal records extracted: {len(records)}") | |
| return '\n'.join(lines) | |