Preformu / utils /stability_data_extractor.py
Kevinshh's picture
Upload stability_data_extractor.py
11452a6 verified
"""
Stability Data Extractor - LLM-Powered Format-Agnostic Extraction
This module uses LLM to understand and extract stability data from ANY file format.
No hardcoded patterns - the LLM interprets the data structure dynamically.
"""
import json
import re
from typing import Dict, List, Any, Optional
from pathlib import Path
class StabilityDataExtractor:
"""
LLM-powered stability data extractor.
Handles ANY format by using LLM to understand the data structure.
Falls back to heuristic extraction if LLM is unavailable.
"""
# Prompt for LLM to extract structured stability data
EXTRACTION_PROMPT = """你是药物稳定性数据提取专家。请从以下文本中提取稳定性数据。
【数据内容】
{text_content}
【用户分析目标】
{goal}
【任务】
请识别并提取:
1. 批次信息(批次名称/ID)
2. 存储条件(如25°C/60%RH长期, 40°C/75%RH加速等)
3. 时间点(月)
4. 质量指标数值(如杂质含量、含量等)
【输出格式】
请严格按以下JSON格式输出:
```json
{{
"batches": [
{{
"batch_id": "批次ID",
"batch_name": "批次名称",
"conditions": [
{{
"condition_id": "条件描述(如 25C_60RH)",
"condition_type": "longterm|accelerated|stress",
"timepoints": [0, 3, 6, 9],
"cqa_data": [
{{
"cqa_name": "指标名称(如总杂质)",
"values": [0.1, 0.12, 0.15, 0.18]
}}
]
}}
]
}}
],
"specification_limit": 0.5,
"primary_cqa": "主要质量指标名称"
}}
```
如果无法识别数据结构,请返回空的batches数组并在"extraction_notes"字段说明原因。
"""
def __init__(self, model_invoker=None):
"""
Initialize extractor.
Args:
model_invoker: LLM invoker instance (lazy-loaded if not provided)
"""
self._model_invoker = model_invoker
self.extracted_data = {}
self.metadata = {}
@property
def model_invoker(self):
"""Lazy-load model invoker."""
if self._model_invoker is None:
try:
from layers.model_invoker import ModelInvoker
self._model_invoker = ModelInvoker()
except Exception as e:
print(f"Warning: Could not load ModelInvoker: {e}")
self._model_invoker = None
return self._model_invoker
def extract_from_text(self, text_content: str, goal: str = "") -> Dict[str, Any]:
"""
Extract stability data from text using LLM.
Args:
text_content: Raw text from parsed files
goal: Analysis goal from user
Returns:
Structured data dictionary with batches
"""
# Default result structure
result = {
"batches": [],
"specification_limit": 0.5,
"primary_cqa": "总杂质",
"target_timepoints": self._extract_target_timepoints(goal),
"extraction_method": "none",
"extraction_notes": ""
}
if not text_content or len(text_content.strip()) < 50:
result["extraction_notes"] = "文本内容过短,无法提取数据"
return result
# Try LLM extraction first
llm_result = self._extract_with_llm(text_content, goal)
if llm_result and llm_result.get("batches"):
result.update(llm_result)
result["extraction_method"] = "llm"
return result
# Fallback to heuristic extraction
heuristic_result = self._extract_with_heuristics(text_content, goal)
if heuristic_result and heuristic_result.get("batches"):
result.update(heuristic_result)
result["extraction_method"] = "heuristic"
return result
result["extraction_notes"] = "无法识别数据格式,请确保文件包含时间点和数值数据"
return result
def _extract_with_llm(self, text_content: str, goal: str) -> Optional[Dict]:
"""Use LLM to extract structured data."""
if not self.model_invoker:
return None
try:
# Truncate text to avoid token limits
max_chars = 8000
truncated_text = text_content[:max_chars]
if len(text_content) > max_chars:
truncated_text += "\n... [文本已截断]"
prompt = self.EXTRACTION_PROMPT.format(
text_content=truncated_text,
goal=goal or "分析稳定性数据"
)
response = self.model_invoker.invoke(
system_prompt="你是专业的药物稳定性数据提取助手。请从文本中提取结构化的稳定性数据。",
user_prompt=prompt,
temperature=0.1
)
if response and hasattr(response, 'content'):
content = response.content
elif isinstance(response, str):
content = response
else:
return None
# Parse JSON from response
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
if json_match:
json_str = json_match.group(1)
else:
# Try to find raw JSON
json_str = content.strip()
if not json_str.startswith('{'):
return None
extracted = json.loads(json_str)
return extracted
except Exception as e:
print(f"LLM extraction failed: {e}")
return None
def _extract_with_heuristics(self, text_content: str, goal: str) -> Optional[Dict]:
"""
Fallback heuristic extraction using pattern recognition.
More flexible than previous hardcoded approach.
"""
batches = []
# Find all tables with time-series data
tables = self._find_time_series_tables(text_content)
for i, table in enumerate(tables):
batch = self._create_batch_from_table(table, i, text_content)
if batch:
batches.append(batch)
if batches:
return {"batches": batches}
return None
def _find_time_series_tables(self, text: str) -> List[Dict]:
"""Find patterns that look like time-series data tables."""
tables = []
lines = text.split('\n')
# Pattern for time headers: 0M, 3M, 6M or 0月, 3月 etc.
time_pattern = r'(\d+)\s*[MmHhDd月周天]'
for i, line in enumerate(lines):
time_matches = re.findall(time_pattern, line)
if len(time_matches) >= 2:
# Found potential time header
times = [int(t) for t in time_matches]
# Look for data rows below
data_rows = []
for j in range(i+1, min(i+15, len(lines))):
numbers = re.findall(r'(\d+\.?\d*)', lines[j])
if len(numbers) >= len(times):
# Check if numbers are in plausible range for stability data
try:
values = [float(n) for n in numbers[:len(times)]]
if all(0 <= v <= 200 for v in values): # Reasonable range
row_type = self._identify_row_type(lines[j])
data_rows.append({
"values": values,
"type": row_type,
"raw": lines[j]
})
except:
pass
if data_rows:
# Find context for batch/condition identification
context_start = max(0, i - 10)
context = '\n'.join(lines[context_start:i+1])
tables.append({
"times": times,
"rows": data_rows,
"context": context,
"line_number": i
})
return tables
def _identify_row_type(self, line: str) -> str:
"""Identify what type of measurement a row represents."""
line_lower = line.lower()
if any(kw in line_lower for kw in ['杂质', 'impurity', '杂']):
return 'impurity'
elif any(kw in line_lower for kw in ['含量', 'assay', 'content']):
return 'assay'
elif any(kw in line_lower for kw in ['水分', 'moisture', 'water']):
return 'moisture'
elif any(kw in line_lower for kw in ['溶出', 'dissolution']):
return 'dissolution'
return 'unknown'
def _create_batch_from_table(self, table: Dict, index: int, full_text: str) -> Optional[Dict]:
"""Create a batch structure from extracted table data."""
context = table.get("context", "")
# Try to identify batch name from context
batch_name = self._extract_batch_name(context, full_text, index)
# Try to identify condition
condition_info = self._extract_condition(context)
# Build CQA data
cqa_list = []
for row in table.get("rows", []):
cqa_name = "总杂质" if row["type"] == "impurity" else (
"含量" if row["type"] == "assay" else "质量指标"
)
cqa_list.append({
"cqa_name": cqa_name,
"values": row["values"]
})
if not cqa_list:
return None
return {
"batch_id": batch_name.replace(" ", "_"),
"batch_name": batch_name,
"batch_type": "target",
"conditions": [{
"condition_id": condition_info["id"],
"condition_type": condition_info["type"],
"timepoints": table["times"],
"cqa_data": cqa_list
}]
}
def _extract_batch_name(self, context: str, full_text: str, index: int) -> str:
"""Extract batch name from context using various patterns."""
patterns = [
r'批[次号][::\s]*([A-Za-z0-9\-_]+)',
r'Batch[::\s]*([A-Za-z0-9\-_]+)',
r'([A-Z]{2,3}[-_]\d{4,}[-_]?[A-Z0-9]*)', # Common batch ID format
r'([SF][-_]?\d{4}[-_]?\d+)', # SF-xxxx format
r'样品[::\s]*(.{3,20})',
]
for pattern in patterns:
match = re.search(pattern, context, re.IGNORECASE)
if match:
name = match.group(1).strip()
if len(name) >= 3:
return name
# Fallback: use numbered batch
return f"批次{index + 1}"
def _extract_condition(self, context: str) -> Dict[str, str]:
"""Extract storage condition from context."""
context_lower = context.lower()
# Check for specific conditions
if any(kw in context_lower for kw in ['40°c', '40℃', '40c', '加速']):
return {"id": "40C_Accelerated", "type": "accelerated"}
elif any(kw in context_lower for kw in ['25°c', '25℃', '25c', '长期']):
return {"id": "25C_LongTerm", "type": "longterm"}
elif any(kw in context_lower for kw in ['60°c', '60℃', '60c', '高温']):
return {"id": "60C_Stress", "type": "stress"}
elif any(kw in context_lower for kw in ['30°c', '30℃', '30c', '中间']):
return {"id": "30C_Intermediate", "type": "intermediate"}
return {"id": "Unknown_Condition", "type": "unknown"}
def _extract_target_timepoints(self, goal: str) -> List[int]:
"""Extract target prediction timepoints from goal text."""
timepoints = []
patterns = [
r'(\d+)\s*[个]?月',
r'(\d+)\s*[Mm]',
r'(\d+)\s*months?'
]
for pattern in patterns:
matches = re.findall(pattern, goal)
timepoints.extend([int(m) for m in matches])
timepoints = sorted(list(set(timepoints)))
if not timepoints:
timepoints = [24, 36]
return timepoints
# Convenience function for backward compatibility
def extract_stability_data(file_paths: List[str], goal: str) -> Dict[str, Any]:
"""
Main entry point for data extraction.
"""
from utils.file_parsers import parse_file
extractor = StabilityDataExtractor()
all_text = ""
for path in file_paths:
try:
content = parse_file(path)
if content:
all_text += f"\n=== File: {path} ===\n{content}\n"
except Exception as e:
print(f"Error parsing {path}: {e}")
return extractor.extract_from_text(all_text, goal)