""" Stability Data Extractor - LLM-Powered Format-Agnostic Extraction This module uses LLM to understand and extract stability data from ANY file format. No hardcoded patterns - the LLM interprets the data structure dynamically. """ import json import re from typing import Dict, List, Any, Optional from pathlib import Path class StabilityDataExtractor: """ LLM-powered stability data extractor. Handles ANY format by using LLM to understand the data structure. Falls back to heuristic extraction if LLM is unavailable. """ # Prompt for LLM to extract structured stability data EXTRACTION_PROMPT = """你是药物稳定性数据提取专家。请从以下文本中提取稳定性数据。 【数据内容】 {text_content} 【用户分析目标】 {goal} 【任务】 请识别并提取: 1. 批次信息(批次名称/ID) 2. 存储条件(如25°C/60%RH长期, 40°C/75%RH加速等) 3. 时间点(月) 4. 质量指标数值(如杂质含量、含量等) 【输出格式】 请严格按以下JSON格式输出: ```json {{ "batches": [ {{ "batch_id": "批次ID", "batch_name": "批次名称", "conditions": [ {{ "condition_id": "条件描述(如 25C_60RH)", "condition_type": "longterm|accelerated|stress", "timepoints": [0, 3, 6, 9], "cqa_data": [ {{ "cqa_name": "指标名称(如总杂质)", "values": [0.1, 0.12, 0.15, 0.18] }} ] }} ] }} ], "specification_limit": 0.5, "primary_cqa": "主要质量指标名称" }} ``` 如果无法识别数据结构,请返回空的batches数组并在"extraction_notes"字段说明原因。 """ def __init__(self, model_invoker=None): """ Initialize extractor. Args: model_invoker: LLM invoker instance (lazy-loaded if not provided) """ self._model_invoker = model_invoker self.extracted_data = {} self.metadata = {} @property def model_invoker(self): """Lazy-load model invoker.""" if self._model_invoker is None: try: from layers.model_invoker import ModelInvoker self._model_invoker = ModelInvoker() except Exception as e: print(f"Warning: Could not load ModelInvoker: {e}") self._model_invoker = None return self._model_invoker def extract_from_text(self, text_content: str, goal: str = "") -> Dict[str, Any]: """ Extract stability data from text using LLM. Args: text_content: Raw text from parsed files goal: Analysis goal from user Returns: Structured data dictionary with batches """ # Default result structure result = { "batches": [], "specification_limit": 0.5, "primary_cqa": "总杂质", "target_timepoints": self._extract_target_timepoints(goal), "extraction_method": "none", "extraction_notes": "" } if not text_content or len(text_content.strip()) < 50: result["extraction_notes"] = "文本内容过短,无法提取数据" return result # Try LLM extraction first llm_result = self._extract_with_llm(text_content, goal) if llm_result and llm_result.get("batches"): result.update(llm_result) result["extraction_method"] = "llm" return result # Fallback to heuristic extraction heuristic_result = self._extract_with_heuristics(text_content, goal) if heuristic_result and heuristic_result.get("batches"): result.update(heuristic_result) result["extraction_method"] = "heuristic" return result result["extraction_notes"] = "无法识别数据格式,请确保文件包含时间点和数值数据" return result def _extract_with_llm(self, text_content: str, goal: str) -> Optional[Dict]: """Use LLM to extract structured data.""" if not self.model_invoker: return None try: # Truncate text to avoid token limits max_chars = 8000 truncated_text = text_content[:max_chars] if len(text_content) > max_chars: truncated_text += "\n... [文本已截断]" prompt = self.EXTRACTION_PROMPT.format( text_content=truncated_text, goal=goal or "分析稳定性数据" ) response = self.model_invoker.invoke( system_prompt="你是专业的药物稳定性数据提取助手。请从文本中提取结构化的稳定性数据。", user_prompt=prompt, temperature=0.1 ) if response and hasattr(response, 'content'): content = response.content elif isinstance(response, str): content = response else: return None # Parse JSON from response json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content) if json_match: json_str = json_match.group(1) else: # Try to find raw JSON json_str = content.strip() if not json_str.startswith('{'): return None extracted = json.loads(json_str) return extracted except Exception as e: print(f"LLM extraction failed: {e}") return None def _extract_with_heuristics(self, text_content: str, goal: str) -> Optional[Dict]: """ Fallback heuristic extraction using pattern recognition. More flexible than previous hardcoded approach. """ batches = [] # Find all tables with time-series data tables = self._find_time_series_tables(text_content) for i, table in enumerate(tables): batch = self._create_batch_from_table(table, i, text_content) if batch: batches.append(batch) if batches: return {"batches": batches} return None def _find_time_series_tables(self, text: str) -> List[Dict]: """Find patterns that look like time-series data tables.""" tables = [] lines = text.split('\n') # Pattern for time headers: 0M, 3M, 6M or 0月, 3月 etc. time_pattern = r'(\d+)\s*[MmHhDd月周天]' for i, line in enumerate(lines): time_matches = re.findall(time_pattern, line) if len(time_matches) >= 2: # Found potential time header times = [int(t) for t in time_matches] # Look for data rows below data_rows = [] for j in range(i+1, min(i+15, len(lines))): numbers = re.findall(r'(\d+\.?\d*)', lines[j]) if len(numbers) >= len(times): # Check if numbers are in plausible range for stability data try: values = [float(n) for n in numbers[:len(times)]] if all(0 <= v <= 200 for v in values): # Reasonable range row_type = self._identify_row_type(lines[j]) data_rows.append({ "values": values, "type": row_type, "raw": lines[j] }) except: pass if data_rows: # Find context for batch/condition identification context_start = max(0, i - 10) context = '\n'.join(lines[context_start:i+1]) tables.append({ "times": times, "rows": data_rows, "context": context, "line_number": i }) return tables def _identify_row_type(self, line: str) -> str: """Identify what type of measurement a row represents.""" line_lower = line.lower() if any(kw in line_lower for kw in ['杂质', 'impurity', '杂']): return 'impurity' elif any(kw in line_lower for kw in ['含量', 'assay', 'content']): return 'assay' elif any(kw in line_lower for kw in ['水分', 'moisture', 'water']): return 'moisture' elif any(kw in line_lower for kw in ['溶出', 'dissolution']): return 'dissolution' return 'unknown' def _create_batch_from_table(self, table: Dict, index: int, full_text: str) -> Optional[Dict]: """Create a batch structure from extracted table data.""" context = table.get("context", "") # Try to identify batch name from context batch_name = self._extract_batch_name(context, full_text, index) # Try to identify condition condition_info = self._extract_condition(context) # Build CQA data cqa_list = [] for row in table.get("rows", []): cqa_name = "总杂质" if row["type"] == "impurity" else ( "含量" if row["type"] == "assay" else "质量指标" ) cqa_list.append({ "cqa_name": cqa_name, "values": row["values"] }) if not cqa_list: return None return { "batch_id": batch_name.replace(" ", "_"), "batch_name": batch_name, "batch_type": "target", "conditions": [{ "condition_id": condition_info["id"], "condition_type": condition_info["type"], "timepoints": table["times"], "cqa_data": cqa_list }] } def _extract_batch_name(self, context: str, full_text: str, index: int) -> str: """Extract batch name from context using various patterns.""" patterns = [ r'批[次号][::\s]*([A-Za-z0-9\-_]+)', r'Batch[::\s]*([A-Za-z0-9\-_]+)', r'([A-Z]{2,3}[-_]\d{4,}[-_]?[A-Z0-9]*)', # Common batch ID format r'([SF][-_]?\d{4}[-_]?\d+)', # SF-xxxx format r'样品[::\s]*(.{3,20})', ] for pattern in patterns: match = re.search(pattern, context, re.IGNORECASE) if match: name = match.group(1).strip() if len(name) >= 3: return name # Fallback: use numbered batch return f"批次{index + 1}" def _extract_condition(self, context: str) -> Dict[str, str]: """Extract storage condition from context.""" context_lower = context.lower() # Check for specific conditions if any(kw in context_lower for kw in ['40°c', '40℃', '40c', '加速']): return {"id": "40C_Accelerated", "type": "accelerated"} elif any(kw in context_lower for kw in ['25°c', '25℃', '25c', '长期']): return {"id": "25C_LongTerm", "type": "longterm"} elif any(kw in context_lower for kw in ['60°c', '60℃', '60c', '高温']): return {"id": "60C_Stress", "type": "stress"} elif any(kw in context_lower for kw in ['30°c', '30℃', '30c', '中间']): return {"id": "30C_Intermediate", "type": "intermediate"} return {"id": "Unknown_Condition", "type": "unknown"} def _extract_target_timepoints(self, goal: str) -> List[int]: """Extract target prediction timepoints from goal text.""" timepoints = [] patterns = [ r'(\d+)\s*[个]?月', r'(\d+)\s*[Mm]', r'(\d+)\s*months?' ] for pattern in patterns: matches = re.findall(pattern, goal) timepoints.extend([int(m) for m in matches]) timepoints = sorted(list(set(timepoints))) if not timepoints: timepoints = [24, 36] return timepoints # Convenience function for backward compatibility def extract_stability_data(file_paths: List[str], goal: str) -> Dict[str, Any]: """ Main entry point for data extraction. """ from utils.file_parsers import parse_file extractor = StabilityDataExtractor() all_text = "" for path in file_paths: try: content = parse_file(path) if content: all_text += f"\n=== File: {path} ===\n{content}\n" except Exception as e: print(f"Error parsing {path}: {e}") return extractor.extract_from_text(all_text, goal)