Preformu / layers /output_normalizer.py
Kevinshh's picture
Upload full project
aecf8ce verified
"""
Output Normalization Layer.
This module is responsible for normalizing LLM outputs into structured,
consistent data suitable for report generation. It handles:
1. Terminology standardization (CN/EN)
2. Tone normalization (removing casual language)
3. Uncertainty annotation
4. Structure extraction from natural language
Design Philosophy:
- The LLM output is treated as "raw material" that needs refinement
- Normalization ensures consistency regardless of model variations
- Forbidden phrases are filtered to maintain professional tone
"""
import re
from typing import Optional, List, Dict, Any
from datetime import datetime
import json
from schemas.canonical_schema import (
AnalysisResult,
ReactiveGroup,
PhysicochemicalProperties,
ExcipientProfile,
InteractionMechanism,
FormulationStrategy,
BilingualText,
RiskLevel,
ConfidenceLevel,
PropertyType,
ImpurityProfile,
)
from config.settings import settings
class OutputNormalizer:
"""
Normalizes LLM outputs into structured report-ready data.
This class takes raw text from LLM responses and transforms it into
the AnalysisResult schema, ensuring consistency and filtering
inappropriate content.
"""
def __init__(self):
"""Initialize the normalizer with configuration."""
self.terminology_map = settings.output.terminology_map
self.forbidden_phrases = settings.output.forbidden_phrases
self.risk_level_keywords = {
RiskLevel.NONE: ["无风险", "no risk", "不具备", "不存在"],
RiskLevel.LOW: ["低风险", "low risk", "轻微", "可控"],
RiskLevel.MEDIUM: ["中等", "需关注", "medium", "moderate", "关注"],
RiskLevel.HIGH: ["高风险", "high risk", "显著", "严重"],
}
def normalize(
self,
api_name: str,
api_smiles: Optional[str],
excipient_name: str,
raw_outputs: Dict[str, str],
) -> AnalysisResult:
"""
Normalize all raw LLM outputs into a structured AnalysisResult.
Args:
api_name: Name of the API
api_smiles: SMILES notation of the API
excipient_name: Name of the primary excipient
raw_outputs: Dictionary of raw outputs by dimension
Returns:
AnalysisResult: Fully structured and normalized result
"""
# Generate report metadata
report_id = self._generate_report_id()
date = datetime.now().strftime("%Y-%m-%d")
# Normalize each dimension
reactive_groups = self._extract_reactive_groups(
raw_outputs.get("api_structure", "")
)
physicochemical = self._extract_physicochemical(
raw_outputs.get("api_structure", "")
)
excipient_profile = self._extract_excipient_profile(
raw_outputs.get("excipient_analysis", ""),
excipient_name
)
interactions = self._extract_interactions(
raw_outputs.get("compatibility", "")
)
strategies = self._extract_formulation_strategies(
raw_outputs.get("compatibility", "") +
raw_outputs.get("synthesis", "")
)
# Extract uncertainty information
assumptions, limitations = self._extract_uncertainties(
raw_outputs.get("synthesis", "")
)
return AnalysisResult(
report_id=report_id,
date=date,
api_name=api_name,
api_smiles=api_smiles,
excipient_name=excipient_name,
reactive_groups=reactive_groups,
physicochemical=physicochemical,
excipient_profile=excipient_profile,
interactions=interactions,
formulation_strategies=strategies,
assumptions=assumptions,
limitations=limitations,
data_sources=["药典通用知识", "ICH指南原则", "结构活性关系分析"],
)
def normalize_text(self, text: str) -> str:
"""
Normalize a piece of text by applying terminology mapping
and removing forbidden phrases.
Args:
text: Raw text to normalize
Returns:
Normalized text
"""
result = text
# Apply terminology standardization
for original, replacement in self.terminology_map.items():
result = result.replace(original, f"{original} ({replacement})")
# Remove forbidden phrases
for phrase in self.forbidden_phrases:
result = re.sub(
rf'\b{re.escape(phrase)}\b',
'',
result,
flags=re.IGNORECASE
)
# Clean up extra whitespace
result = re.sub(r'\s+', ' ', result).strip()
return result
def _generate_report_id(self) -> str:
"""Generate a unique report ID."""
prefix = settings.report.id_prefix
timestamp = datetime.now().strftime("%Y-%m%d")
# Simple counter - in production would use proper sequence
import random
seq = f"X{random.randint(10, 99)}"
return f"{prefix}-{timestamp}-{seq}"
def _extract_reactive_groups(self, text: str) -> List[ReactiveGroup]:
"""Extract reactive group information from API analysis."""
groups = []
# Pattern-based extraction for common group types
group_patterns = [
(r"伯胺[基团]*\s*\(Primary\s*Amine\)", "伯胺基团", "Primary Amine", PropertyType.BASIC),
(r"仲胺[基团]*\s*\(Secondary\s*Amine\)", "仲胺基团", "Secondary Amine", PropertyType.BASIC),
(r"硫醚[基团]*\s*\(Thioether\)", "硫醚基团", "Thioether", PropertyType.NEUTRAL),
(r"酚羟基\s*\(Phenolic\s*Hydroxyl\)", "酚羟基", "Phenolic Hydroxyl", PropertyType.ACIDIC),
(r"羧基\s*\(Carboxyl\)", "羧基", "Carboxyl Group", PropertyType.ACIDIC),
(r"芳香杂环\s*\(Aromatic\s*Heterocycle", "芳香杂环", "Aromatic Heterocycle", PropertyType.BASIC),
]
for pattern, cn_name, en_name, prop_type in group_patterns:
if re.search(pattern, text, re.IGNORECASE):
# Extract potential reactions for this group
reactions = self._extract_reactions_for_group(cn_name, text)
groups.append(ReactiveGroup(
name=BilingualText(cn=cn_name, en=en_name),
property_type=prop_type,
potential_reactions=reactions,
))
# If no groups found, create a default entry
if not groups:
groups.append(ReactiveGroup(
name=BilingualText(cn="待分析", en="Pending Analysis"),
property_type=PropertyType.NEUTRAL,
potential_reactions=[],
))
return groups
def _extract_reactions_for_group(
self,
group_name: str,
text: str
) -> List[BilingualText]:
"""Extract potential reactions associated with a functional group."""
reactions = []
# Common reaction mappings
reaction_map = {
"伯胺": [
("美拉德反应", "Maillard Reaction"),
("氧化脱氨", "Oxidative Deamination"),
("席夫碱形成", "Schiff Base Formation"),
],
"硫醚": [
("氧化成亚砜", "Oxidation to Sulfoxide"),
("氧化成砜", "Oxidation to Sulfone"),
],
"酚羟基": [
("氧化", "Oxidation"),
("光氧化", "Photooxidation"),
],
}
for key, rxn_list in reaction_map.items():
if key in group_name:
for cn, en in rxn_list:
reactions.append(BilingualText(cn=cn, en=en))
return reactions
def _extract_physicochemical(self, text: str) -> Optional[PhysicochemicalProperties]:
"""Extract physicochemical properties from analysis."""
# Default values - would be parsed from text in full implementation
acidity = BilingualText(cn="碱性", en="Basic")
# Try to extract LogP
logp_match = re.search(r'LogP[:\s~约]*([0-9.]+)', text)
logp = float(logp_match.group(1)) if logp_match else None
# Try to extract H-bond donors/acceptors
hbd_match = re.search(r'氢键供体[:\s]*(\d+)', text)
hba_match = re.search(r'氢键受体[:\s]*(\d+)', text)
return PhysicochemicalProperties(
acidity_basicity=acidity,
logp=logp,
h_bond_donors=int(hbd_match.group(1)) if hbd_match else None,
h_bond_acceptors=int(hba_match.group(1)) if hba_match else None,
risk_profile="含多个碱性氮原子,硫醚对氧化敏感" if "硫醚" in text else None,
)
def _extract_excipient_profile(
self,
text: str,
excipient_name: str
) -> Optional[ExcipientProfile]:
"""Extract excipient profile from analysis."""
# Extract formula if present
formula_match = re.search(r'化学式[:\s]*([A-Za-z0-9₀-₉]+)', text)
formula = formula_match.group(1) if formula_match else None
# Extract key properties
key_properties = []
if "直接压片" in text:
key_properties.append("适合直接压片工艺")
if "低吸湿性" in text or "<1%" in text:
key_properties.append("低吸湿性(<1% at 90% RH)")
if "pH" in text:
key_properties.append("微环境pH约为6.5-7.5")
# Try to extract impurity info
impurity_profile = None
if "Fe" in text or "金属离子" in text:
impurity_profile = ImpurityProfile(
fe_ppm=10.0, # Typical value
mn_ppm=1.0, # Typical value
)
return ExcipientProfile(
name=BilingualText(
cn=excipient_name,
en=self._translate_excipient_name(excipient_name)
),
formula=formula,
key_properties=key_properties,
impurity_profile=impurity_profile,
microenvironment="弱碱性环境" if "碱性" in text else None,
compatibility_notes=self.normalize_text(text[:200]) if text else None,
)
def _translate_excipient_name(self, cn_name: str) -> str:
"""Translate common excipient names to English."""
translations = {
"无水磷酸氢钙": "DCP Anhydrous",
"磷酸氢钙": "Dibasic Calcium Phosphate",
"乳糖": "Lactose",
"微晶纤维素": "Microcrystalline Cellulose",
"硬脂酸镁": "Magnesium Stearate",
"淀粉": "Starch",
"甘露醇": "Mannitol",
}
return translations.get(cn_name, cn_name)
def _extract_interactions(self, text: str) -> List[InteractionMechanism]:
"""Extract interaction mechanisms from compatibility analysis."""
interactions = []
# Define interaction types to look for
interaction_types = [
("美拉德反应", "Maillard Reaction"),
("氧化反应", "Oxidation"),
("酸碱反应", "Acid-Base Interaction"),
("水解反应", "Hydrolysis"),
("吸附作用", "Adsorption"),
]
for cn_name, en_name in interaction_types:
# Check if this interaction type is discussed
if cn_name in text:
# Determine risk level from context
risk = self._determine_risk_level(text, cn_name)
# Extract mechanism analysis
mechanism = self._extract_mechanism_for_type(text, cn_name)
# Extract expert notes
expert_notes = self._extract_expert_notes(text, cn_name)
interactions.append(InteractionMechanism(
reaction_type=BilingualText(cn=cn_name, en=en_name),
risk_level=risk,
mechanism_analysis=mechanism,
expert_notes=expert_notes,
confidence=ConfidenceLevel.MEDIUM,
))
return interactions
def _determine_risk_level(self, text: str, reaction_type: str) -> RiskLevel:
"""Determine risk level for a specific reaction type from text."""
# Find the section about this reaction type
# Simple heuristic: look for risk keywords near the reaction type mention
# Search window around the reaction type
idx = text.find(reaction_type)
if idx == -1:
return RiskLevel.MEDIUM
window = text[max(0, idx-50):min(len(text), idx+200)]
for level, keywords in self.risk_level_keywords.items():
for keyword in keywords:
if keyword in window.lower():
return level
return RiskLevel.MEDIUM
def _extract_mechanism_for_type(self, text: str, reaction_type: str) -> str:
"""Extract mechanism analysis for a specific reaction type."""
# Look for mechanism description after the reaction type
patterns = [
rf'{reaction_type}.*?机制分析[:\s]*([^#]+?)(?=\n\n|\n###|\n##|$)',
rf'{reaction_type}.*?(?:因为|由于|主要是)([^。]+。)',
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
return self.normalize_text(match.group(1)[:300])
return "请参阅详细分析报告"
def _extract_expert_notes(self, text: str, reaction_type: str) -> str:
"""Extract expert commentary for a reaction type."""
patterns = [
rf'{reaction_type}.*?专家点评[:\s]*([^#\n]+)',
rf'{reaction_type}.*?建议[:\s]*([^#\n]+)',
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
return self.normalize_text(match.group(1)[:200])
return "需结合实验数据进一步评估"
def _extract_formulation_strategies(self, text: str) -> List[FormulationStrategy]:
"""Extract formulation strategy recommendations."""
strategies = []
# Look for numbered recommendations
pattern = r'(\d+)\.\s*\*\*([^*]+)\*\*[:\s]*([^\n]+)'
matches = re.findall(pattern, text)
for _, title, description in matches[:5]: # Limit to 5 strategies
strategies.append(FormulationStrategy(
title=self.normalize_text(title.strip()),
description=self.normalize_text(description.strip()),
))
# Add default strategies if none found
if not strategies:
strategies = [
FormulationStrategy(
title="辅料选择优化",
description="建议选用低金属离子规格辅料",
),
FormulationStrategy(
title="稳定剂考虑",
description="根据风险评估结果考虑添加适当稳定剂",
),
]
return strategies
def _extract_uncertainties(self, text: str) -> tuple:
"""Extract assumptions and limitations from synthesis."""
assumptions = []
limitations = []
# Extract assumptions
assumption_pattern = r'假设[:\s]*([^。\n]+)'
for match in re.finditer(assumption_pattern, text):
assumptions.append(match.group(1).strip())
# Extract limitations
limitation_pattern = r'局限[:\s]*([^。\n]+)'
for match in re.finditer(limitation_pattern, text):
limitations.append(match.group(1).strip())
# Default values if none found
if not assumptions:
assumptions = [
"分析基于SMILES结构推断",
"假设正常制剂工艺条件",
]
if not limitations:
limitations = [
"具体批次数据需COA确认",
"相容性结论需稳定性试验验证",
]
return assumptions, limitations