""" Output Normalization Layer. This module is responsible for normalizing LLM outputs into structured, consistent data suitable for report generation. It handles: 1. Terminology standardization (CN/EN) 2. Tone normalization (removing casual language) 3. Uncertainty annotation 4. Structure extraction from natural language Design Philosophy: - The LLM output is treated as "raw material" that needs refinement - Normalization ensures consistency regardless of model variations - Forbidden phrases are filtered to maintain professional tone """ import re from typing import Optional, List, Dict, Any from datetime import datetime import json from schemas.canonical_schema import ( AnalysisResult, ReactiveGroup, PhysicochemicalProperties, ExcipientProfile, InteractionMechanism, FormulationStrategy, BilingualText, RiskLevel, ConfidenceLevel, PropertyType, ImpurityProfile, ) from config.settings import settings class OutputNormalizer: """ Normalizes LLM outputs into structured report-ready data. This class takes raw text from LLM responses and transforms it into the AnalysisResult schema, ensuring consistency and filtering inappropriate content. """ def __init__(self): """Initialize the normalizer with configuration.""" self.terminology_map = settings.output.terminology_map self.forbidden_phrases = settings.output.forbidden_phrases self.risk_level_keywords = { RiskLevel.NONE: ["无风险", "no risk", "不具备", "不存在"], RiskLevel.LOW: ["低风险", "low risk", "轻微", "可控"], RiskLevel.MEDIUM: ["中等", "需关注", "medium", "moderate", "关注"], RiskLevel.HIGH: ["高风险", "high risk", "显著", "严重"], } def normalize( self, api_name: str, api_smiles: Optional[str], excipient_name: str, raw_outputs: Dict[str, str], ) -> AnalysisResult: """ Normalize all raw LLM outputs into a structured AnalysisResult. Args: api_name: Name of the API api_smiles: SMILES notation of the API excipient_name: Name of the primary excipient raw_outputs: Dictionary of raw outputs by dimension Returns: AnalysisResult: Fully structured and normalized result """ # Generate report metadata report_id = self._generate_report_id() date = datetime.now().strftime("%Y-%m-%d") # Normalize each dimension reactive_groups = self._extract_reactive_groups( raw_outputs.get("api_structure", "") ) physicochemical = self._extract_physicochemical( raw_outputs.get("api_structure", "") ) excipient_profile = self._extract_excipient_profile( raw_outputs.get("excipient_analysis", ""), excipient_name ) interactions = self._extract_interactions( raw_outputs.get("compatibility", "") ) strategies = self._extract_formulation_strategies( raw_outputs.get("compatibility", "") + raw_outputs.get("synthesis", "") ) # Extract uncertainty information assumptions, limitations = self._extract_uncertainties( raw_outputs.get("synthesis", "") ) return AnalysisResult( report_id=report_id, date=date, api_name=api_name, api_smiles=api_smiles, excipient_name=excipient_name, reactive_groups=reactive_groups, physicochemical=physicochemical, excipient_profile=excipient_profile, interactions=interactions, formulation_strategies=strategies, assumptions=assumptions, limitations=limitations, data_sources=["药典通用知识", "ICH指南原则", "结构活性关系分析"], ) def normalize_text(self, text: str) -> str: """ Normalize a piece of text by applying terminology mapping and removing forbidden phrases. Args: text: Raw text to normalize Returns: Normalized text """ result = text # Apply terminology standardization for original, replacement in self.terminology_map.items(): result = result.replace(original, f"{original} ({replacement})") # Remove forbidden phrases for phrase in self.forbidden_phrases: result = re.sub( rf'\b{re.escape(phrase)}\b', '', result, flags=re.IGNORECASE ) # Clean up extra whitespace result = re.sub(r'\s+', ' ', result).strip() return result def _generate_report_id(self) -> str: """Generate a unique report ID.""" prefix = settings.report.id_prefix timestamp = datetime.now().strftime("%Y-%m%d") # Simple counter - in production would use proper sequence import random seq = f"X{random.randint(10, 99)}" return f"{prefix}-{timestamp}-{seq}" def _extract_reactive_groups(self, text: str) -> List[ReactiveGroup]: """Extract reactive group information from API analysis.""" groups = [] # Pattern-based extraction for common group types group_patterns = [ (r"伯胺[基团]*\s*\(Primary\s*Amine\)", "伯胺基团", "Primary Amine", PropertyType.BASIC), (r"仲胺[基团]*\s*\(Secondary\s*Amine\)", "仲胺基团", "Secondary Amine", PropertyType.BASIC), (r"硫醚[基团]*\s*\(Thioether\)", "硫醚基团", "Thioether", PropertyType.NEUTRAL), (r"酚羟基\s*\(Phenolic\s*Hydroxyl\)", "酚羟基", "Phenolic Hydroxyl", PropertyType.ACIDIC), (r"羧基\s*\(Carboxyl\)", "羧基", "Carboxyl Group", PropertyType.ACIDIC), (r"芳香杂环\s*\(Aromatic\s*Heterocycle", "芳香杂环", "Aromatic Heterocycle", PropertyType.BASIC), ] for pattern, cn_name, en_name, prop_type in group_patterns: if re.search(pattern, text, re.IGNORECASE): # Extract potential reactions for this group reactions = self._extract_reactions_for_group(cn_name, text) groups.append(ReactiveGroup( name=BilingualText(cn=cn_name, en=en_name), property_type=prop_type, potential_reactions=reactions, )) # If no groups found, create a default entry if not groups: groups.append(ReactiveGroup( name=BilingualText(cn="待分析", en="Pending Analysis"), property_type=PropertyType.NEUTRAL, potential_reactions=[], )) return groups def _extract_reactions_for_group( self, group_name: str, text: str ) -> List[BilingualText]: """Extract potential reactions associated with a functional group.""" reactions = [] # Common reaction mappings reaction_map = { "伯胺": [ ("美拉德反应", "Maillard Reaction"), ("氧化脱氨", "Oxidative Deamination"), ("席夫碱形成", "Schiff Base Formation"), ], "硫醚": [ ("氧化成亚砜", "Oxidation to Sulfoxide"), ("氧化成砜", "Oxidation to Sulfone"), ], "酚羟基": [ ("氧化", "Oxidation"), ("光氧化", "Photooxidation"), ], } for key, rxn_list in reaction_map.items(): if key in group_name: for cn, en in rxn_list: reactions.append(BilingualText(cn=cn, en=en)) return reactions def _extract_physicochemical(self, text: str) -> Optional[PhysicochemicalProperties]: """Extract physicochemical properties from analysis.""" # Default values - would be parsed from text in full implementation acidity = BilingualText(cn="碱性", en="Basic") # Try to extract LogP logp_match = re.search(r'LogP[:\s~约]*([0-9.]+)', text) logp = float(logp_match.group(1)) if logp_match else None # Try to extract H-bond donors/acceptors hbd_match = re.search(r'氢键供体[:\s]*(\d+)', text) hba_match = re.search(r'氢键受体[:\s]*(\d+)', text) return PhysicochemicalProperties( acidity_basicity=acidity, logp=logp, h_bond_donors=int(hbd_match.group(1)) if hbd_match else None, h_bond_acceptors=int(hba_match.group(1)) if hba_match else None, risk_profile="含多个碱性氮原子,硫醚对氧化敏感" if "硫醚" in text else None, ) def _extract_excipient_profile( self, text: str, excipient_name: str ) -> Optional[ExcipientProfile]: """Extract excipient profile from analysis.""" # Extract formula if present formula_match = re.search(r'化学式[:\s]*([A-Za-z0-9₀-₉]+)', text) formula = formula_match.group(1) if formula_match else None # Extract key properties key_properties = [] if "直接压片" in text: key_properties.append("适合直接压片工艺") if "低吸湿性" in text or "<1%" in text: key_properties.append("低吸湿性(<1% at 90% RH)") if "pH" in text: key_properties.append("微环境pH约为6.5-7.5") # Try to extract impurity info impurity_profile = None if "Fe" in text or "金属离子" in text: impurity_profile = ImpurityProfile( fe_ppm=10.0, # Typical value mn_ppm=1.0, # Typical value ) return ExcipientProfile( name=BilingualText( cn=excipient_name, en=self._translate_excipient_name(excipient_name) ), formula=formula, key_properties=key_properties, impurity_profile=impurity_profile, microenvironment="弱碱性环境" if "碱性" in text else None, compatibility_notes=self.normalize_text(text[:200]) if text else None, ) def _translate_excipient_name(self, cn_name: str) -> str: """Translate common excipient names to English.""" translations = { "无水磷酸氢钙": "DCP Anhydrous", "磷酸氢钙": "Dibasic Calcium Phosphate", "乳糖": "Lactose", "微晶纤维素": "Microcrystalline Cellulose", "硬脂酸镁": "Magnesium Stearate", "淀粉": "Starch", "甘露醇": "Mannitol", } return translations.get(cn_name, cn_name) def _extract_interactions(self, text: str) -> List[InteractionMechanism]: """Extract interaction mechanisms from compatibility analysis.""" interactions = [] # Define interaction types to look for interaction_types = [ ("美拉德反应", "Maillard Reaction"), ("氧化反应", "Oxidation"), ("酸碱反应", "Acid-Base Interaction"), ("水解反应", "Hydrolysis"), ("吸附作用", "Adsorption"), ] for cn_name, en_name in interaction_types: # Check if this interaction type is discussed if cn_name in text: # Determine risk level from context risk = self._determine_risk_level(text, cn_name) # Extract mechanism analysis mechanism = self._extract_mechanism_for_type(text, cn_name) # Extract expert notes expert_notes = self._extract_expert_notes(text, cn_name) interactions.append(InteractionMechanism( reaction_type=BilingualText(cn=cn_name, en=en_name), risk_level=risk, mechanism_analysis=mechanism, expert_notes=expert_notes, confidence=ConfidenceLevel.MEDIUM, )) return interactions def _determine_risk_level(self, text: str, reaction_type: str) -> RiskLevel: """Determine risk level for a specific reaction type from text.""" # Find the section about this reaction type # Simple heuristic: look for risk keywords near the reaction type mention # Search window around the reaction type idx = text.find(reaction_type) if idx == -1: return RiskLevel.MEDIUM window = text[max(0, idx-50):min(len(text), idx+200)] for level, keywords in self.risk_level_keywords.items(): for keyword in keywords: if keyword in window.lower(): return level return RiskLevel.MEDIUM def _extract_mechanism_for_type(self, text: str, reaction_type: str) -> str: """Extract mechanism analysis for a specific reaction type.""" # Look for mechanism description after the reaction type patterns = [ rf'{reaction_type}.*?机制分析[:\s]*([^#]+?)(?=\n\n|\n###|\n##|$)', rf'{reaction_type}.*?(?:因为|由于|主要是)([^。]+。)', ] for pattern in patterns: match = re.search(pattern, text, re.DOTALL) if match: return self.normalize_text(match.group(1)[:300]) return "请参阅详细分析报告" def _extract_expert_notes(self, text: str, reaction_type: str) -> str: """Extract expert commentary for a reaction type.""" patterns = [ rf'{reaction_type}.*?专家点评[:\s]*([^#\n]+)', rf'{reaction_type}.*?建议[:\s]*([^#\n]+)', ] for pattern in patterns: match = re.search(pattern, text, re.DOTALL) if match: return self.normalize_text(match.group(1)[:200]) return "需结合实验数据进一步评估" def _extract_formulation_strategies(self, text: str) -> List[FormulationStrategy]: """Extract formulation strategy recommendations.""" strategies = [] # Look for numbered recommendations pattern = r'(\d+)\.\s*\*\*([^*]+)\*\*[:\s]*([^\n]+)' matches = re.findall(pattern, text) for _, title, description in matches[:5]: # Limit to 5 strategies strategies.append(FormulationStrategy( title=self.normalize_text(title.strip()), description=self.normalize_text(description.strip()), )) # Add default strategies if none found if not strategies: strategies = [ FormulationStrategy( title="辅料选择优化", description="建议选用低金属离子规格辅料", ), FormulationStrategy( title="稳定剂考虑", description="根据风险评估结果考虑添加适当稳定剂", ), ] return strategies def _extract_uncertainties(self, text: str) -> tuple: """Extract assumptions and limitations from synthesis.""" assumptions = [] limitations = [] # Extract assumptions assumption_pattern = r'假设[:\s]*([^。\n]+)' for match in re.finditer(assumption_pattern, text): assumptions.append(match.group(1).strip()) # Extract limitations limitation_pattern = r'局限[:\s]*([^。\n]+)' for match in re.finditer(limitation_pattern, text): limitations.append(match.group(1).strip()) # Default values if none found if not assumptions: assumptions = [ "分析基于SMILES结构推断", "假设正常制剂工艺条件", ] if not limitations: limitations = [ "具体批次数据需COA确认", "相容性结论需稳定性试验验证", ] return assumptions, limitations