| """
|
| Stability Data Extractor - LLM-Powered Format-Agnostic Extraction
|
|
|
| This module uses LLM to understand and extract stability data from ANY file format.
|
| No hardcoded patterns - the LLM interprets the data structure dynamically.
|
| """
|
|
|
| import json
|
| import re
|
| from typing import Dict, List, Any, Optional
|
| from pathlib import Path
|
|
|
|
|
| class StabilityDataExtractor:
|
| """
|
| LLM-powered stability data extractor.
|
|
|
| Handles ANY format by using LLM to understand the data structure.
|
| Falls back to heuristic extraction if LLM is unavailable.
|
| """
|
|
|
|
|
| EXTRACTION_PROMPT = """你是药物稳定性数据提取专家。请从以下文本中提取稳定性数据。
|
|
|
| 【数据内容】
|
| {text_content}
|
|
|
| 【用户分析目标】
|
| {goal}
|
|
|
| 【任务】
|
| 请识别并提取:
|
| 1. 批次信息(批次名称/ID)
|
| 2. 存储条件(如25°C/60%RH长期, 40°C/75%RH加速等)
|
| 3. 时间点(月)
|
| 4. 质量指标数值(如杂质含量、含量等)
|
|
|
| 【输出格式】
|
| 请严格按以下JSON格式输出:
|
| ```json
|
| {{
|
| "batches": [
|
| {{
|
| "batch_id": "批次ID",
|
| "batch_name": "批次名称",
|
| "conditions": [
|
| {{
|
| "condition_id": "条件描述(如 25C_60RH)",
|
| "condition_type": "longterm|accelerated|stress",
|
| "timepoints": [0, 3, 6, 9],
|
| "cqa_data": [
|
| {{
|
| "cqa_name": "指标名称(如总杂质)",
|
| "values": [0.1, 0.12, 0.15, 0.18]
|
| }}
|
| ]
|
| }}
|
| ]
|
| }}
|
| ],
|
| "specification_limit": 0.5,
|
| "primary_cqa": "主要质量指标名称"
|
| }}
|
| ```
|
|
|
| 如果无法识别数据结构,请返回空的batches数组并在"extraction_notes"字段说明原因。
|
| """
|
|
|
| def __init__(self, model_invoker=None):
|
| """
|
| Initialize extractor.
|
|
|
| Args:
|
| model_invoker: LLM invoker instance (lazy-loaded if not provided)
|
| """
|
| self._model_invoker = model_invoker
|
| self.extracted_data = {}
|
| self.metadata = {}
|
|
|
| @property
|
| def model_invoker(self):
|
| """Lazy-load model invoker."""
|
| if self._model_invoker is None:
|
| try:
|
| from layers.model_invoker import ModelInvoker
|
| self._model_invoker = ModelInvoker()
|
| except Exception as e:
|
| print(f"Warning: Could not load ModelInvoker: {e}")
|
| self._model_invoker = None
|
| return self._model_invoker
|
|
|
| def extract_from_text(self, text_content: str, goal: str = "") -> Dict[str, Any]:
|
| """
|
| Extract stability data from text using LLM.
|
|
|
| Args:
|
| text_content: Raw text from parsed files
|
| goal: Analysis goal from user
|
|
|
| Returns:
|
| Structured data dictionary with batches
|
| """
|
|
|
| result = {
|
| "batches": [],
|
| "specification_limit": 0.5,
|
| "primary_cqa": "总杂质",
|
| "target_timepoints": self._extract_target_timepoints(goal),
|
| "extraction_method": "none",
|
| "extraction_notes": ""
|
| }
|
|
|
| if not text_content or len(text_content.strip()) < 50:
|
| result["extraction_notes"] = "文本内容过短,无法提取数据"
|
| return result
|
|
|
|
|
| llm_result = self._extract_with_llm(text_content, goal)
|
| if llm_result and llm_result.get("batches"):
|
| result.update(llm_result)
|
| result["extraction_method"] = "llm"
|
| return result
|
|
|
|
|
| heuristic_result = self._extract_with_heuristics(text_content, goal)
|
| if heuristic_result and heuristic_result.get("batches"):
|
| result.update(heuristic_result)
|
| result["extraction_method"] = "heuristic"
|
| return result
|
|
|
| result["extraction_notes"] = "无法识别数据格式,请确保文件包含时间点和数值数据"
|
| return result
|
|
|
| def _extract_with_llm(self, text_content: str, goal: str) -> Optional[Dict]:
|
| """Use LLM to extract structured data."""
|
| if not self.model_invoker:
|
| return None
|
|
|
| try:
|
|
|
| max_chars = 8000
|
| truncated_text = text_content[:max_chars]
|
| if len(text_content) > max_chars:
|
| truncated_text += "\n... [文本已截断]"
|
|
|
| prompt = self.EXTRACTION_PROMPT.format(
|
| text_content=truncated_text,
|
| goal=goal or "分析稳定性数据"
|
| )
|
|
|
| response = self.model_invoker.invoke(
|
| system_prompt="你是专业的药物稳定性数据提取助手。请从文本中提取结构化的稳定性数据。",
|
| user_prompt=prompt,
|
| temperature=0.1
|
| )
|
|
|
| if response and hasattr(response, 'content'):
|
| content = response.content
|
| elif isinstance(response, str):
|
| content = response
|
| else:
|
| return None
|
|
|
|
|
| json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
|
| if json_match:
|
| json_str = json_match.group(1)
|
| else:
|
|
|
| json_str = content.strip()
|
| if not json_str.startswith('{'):
|
| return None
|
|
|
| extracted = json.loads(json_str)
|
| return extracted
|
|
|
| except Exception as e:
|
| print(f"LLM extraction failed: {e}")
|
| return None
|
|
|
| def _extract_with_heuristics(self, text_content: str, goal: str) -> Optional[Dict]:
|
| """
|
| Fallback heuristic extraction using pattern recognition.
|
| More flexible than previous hardcoded approach.
|
| """
|
| batches = []
|
|
|
|
|
| tables = self._find_time_series_tables(text_content)
|
|
|
| for i, table in enumerate(tables):
|
| batch = self._create_batch_from_table(table, i, text_content)
|
| if batch:
|
| batches.append(batch)
|
|
|
| if batches:
|
| return {"batches": batches}
|
|
|
| return None
|
|
|
| def _find_time_series_tables(self, text: str) -> List[Dict]:
|
| """Find patterns that look like time-series data tables."""
|
| tables = []
|
| lines = text.split('\n')
|
|
|
|
|
| time_pattern = r'(\d+)\s*[MmHhDd月周天]'
|
|
|
| for i, line in enumerate(lines):
|
| time_matches = re.findall(time_pattern, line)
|
| if len(time_matches) >= 2:
|
|
|
| times = [int(t) for t in time_matches]
|
|
|
|
|
| data_rows = []
|
| for j in range(i+1, min(i+15, len(lines))):
|
| numbers = re.findall(r'(\d+\.?\d*)', lines[j])
|
| if len(numbers) >= len(times):
|
|
|
| try:
|
| values = [float(n) for n in numbers[:len(times)]]
|
| if all(0 <= v <= 200 for v in values):
|
| row_type = self._identify_row_type(lines[j])
|
| data_rows.append({
|
| "values": values,
|
| "type": row_type,
|
| "raw": lines[j]
|
| })
|
| except:
|
| pass
|
|
|
| if data_rows:
|
|
|
| context_start = max(0, i - 10)
|
| context = '\n'.join(lines[context_start:i+1])
|
|
|
| tables.append({
|
| "times": times,
|
| "rows": data_rows,
|
| "context": context,
|
| "line_number": i
|
| })
|
|
|
| return tables
|
|
|
| def _identify_row_type(self, line: str) -> str:
|
| """Identify what type of measurement a row represents."""
|
| line_lower = line.lower()
|
|
|
| if any(kw in line_lower for kw in ['杂质', 'impurity', '杂']):
|
| return 'impurity'
|
| elif any(kw in line_lower for kw in ['含量', 'assay', 'content']):
|
| return 'assay'
|
| elif any(kw in line_lower for kw in ['水分', 'moisture', 'water']):
|
| return 'moisture'
|
| elif any(kw in line_lower for kw in ['溶出', 'dissolution']):
|
| return 'dissolution'
|
|
|
| return 'unknown'
|
|
|
| def _create_batch_from_table(self, table: Dict, index: int, full_text: str) -> Optional[Dict]:
|
| """Create a batch structure from extracted table data."""
|
| context = table.get("context", "")
|
|
|
|
|
| batch_name = self._extract_batch_name(context, full_text, index)
|
|
|
|
|
| condition_info = self._extract_condition(context)
|
|
|
|
|
| cqa_list = []
|
| for row in table.get("rows", []):
|
| cqa_name = "总杂质" if row["type"] == "impurity" else (
|
| "含量" if row["type"] == "assay" else "质量指标"
|
| )
|
| cqa_list.append({
|
| "cqa_name": cqa_name,
|
| "values": row["values"]
|
| })
|
|
|
| if not cqa_list:
|
| return None
|
|
|
| return {
|
| "batch_id": batch_name.replace(" ", "_"),
|
| "batch_name": batch_name,
|
| "batch_type": "target",
|
| "conditions": [{
|
| "condition_id": condition_info["id"],
|
| "condition_type": condition_info["type"],
|
| "timepoints": table["times"],
|
| "cqa_data": cqa_list
|
| }]
|
| }
|
|
|
| def _extract_batch_name(self, context: str, full_text: str, index: int) -> str:
|
| """Extract batch name from context using various patterns."""
|
| patterns = [
|
| r'批[次号][::\s]*([A-Za-z0-9\-_]+)',
|
| r'Batch[::\s]*([A-Za-z0-9\-_]+)',
|
| r'([A-Z]{2,3}[-_]\d{4,}[-_]?[A-Z0-9]*)',
|
| r'([SF][-_]?\d{4}[-_]?\d+)',
|
| r'样品[::\s]*(.{3,20})',
|
| ]
|
|
|
| for pattern in patterns:
|
| match = re.search(pattern, context, re.IGNORECASE)
|
| if match:
|
| name = match.group(1).strip()
|
| if len(name) >= 3:
|
| return name
|
|
|
|
|
| return f"批次{index + 1}"
|
|
|
| def _extract_condition(self, context: str) -> Dict[str, str]:
|
| """Extract storage condition from context."""
|
| context_lower = context.lower()
|
|
|
|
|
| if any(kw in context_lower for kw in ['40°c', '40℃', '40c', '加速']):
|
| return {"id": "40C_Accelerated", "type": "accelerated"}
|
| elif any(kw in context_lower for kw in ['25°c', '25℃', '25c', '长期']):
|
| return {"id": "25C_LongTerm", "type": "longterm"}
|
| elif any(kw in context_lower for kw in ['60°c', '60℃', '60c', '高温']):
|
| return {"id": "60C_Stress", "type": "stress"}
|
| elif any(kw in context_lower for kw in ['30°c', '30℃', '30c', '中间']):
|
| return {"id": "30C_Intermediate", "type": "intermediate"}
|
|
|
| return {"id": "Unknown_Condition", "type": "unknown"}
|
|
|
| def _extract_target_timepoints(self, goal: str) -> List[int]:
|
| """Extract target prediction timepoints from goal text."""
|
| timepoints = []
|
|
|
| patterns = [
|
| r'(\d+)\s*[个]?月',
|
| r'(\d+)\s*[Mm]',
|
| r'(\d+)\s*months?'
|
| ]
|
|
|
| for pattern in patterns:
|
| matches = re.findall(pattern, goal)
|
| timepoints.extend([int(m) for m in matches])
|
|
|
| timepoints = sorted(list(set(timepoints)))
|
|
|
| if not timepoints:
|
| timepoints = [24, 36]
|
|
|
| return timepoints
|
|
|
|
|
|
|
| def extract_stability_data(file_paths: List[str], goal: str) -> Dict[str, Any]:
|
| """
|
| Main entry point for data extraction.
|
| """
|
| from utils.file_parsers import parse_file
|
|
|
| extractor = StabilityDataExtractor()
|
| all_text = ""
|
|
|
| for path in file_paths:
|
| try:
|
| content = parse_file(path)
|
| if content:
|
| all_text += f"\n=== File: {path} ===\n{content}\n"
|
| except Exception as e:
|
| print(f"Error parsing {path}: {e}")
|
|
|
| return extractor.extract_from_text(all_text, goal)
|
|
|