Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import re | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| import sys | |
| # Add the project root to the path so we can import from src | |
| project_root = Path(__file__).parent.parent | |
| sys.path.append(str(project_root)) | |
| from src.llm.ollama_client import call_ollama, is_ollama_running | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| DATA_DIR = project_root / "dataset-pipeline" / "data" / "extracted" / "extracted" | |
| PROCESSED_DIR = project_root / "data" / "processed" | |
| METADATA_FILE = PROCESSED_DIR / "report_metadata.json" | |
| REQUIRED_KEYS = [ | |
| "report_type", "date", "location", "aircraft_type", "operator", | |
| "flight_type", "fatalities", "probable_cause", "contributing_factors", | |
| "safety_issues", "recommendations", "key_findings" | |
| ] | |
| # ───────────────────────────────────────────── | |
| # REGEX PRE-EXTRACTION for structured fields | |
| # ───────────────────────────────────────────── | |
| def regex_extract_date(content: str) -> str | None: | |
| """Extract accident date using multiple regex patterns.""" | |
| # Pattern 1: Full date like "August 6, 1997" or "July 17, 1996" or "March 3, 1991" | |
| months = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)' | |
| # Check title area first (first 20 lines) | |
| title_area = '\n'.join(content.split('\n')[:20]) | |
| m = re.search(rf'({months}\s+\d{{1,2}},?\s+\d{{4}})', title_area) | |
| if m: | |
| return m.group(1).strip() | |
| # Check History of Flight section opening | |
| hof_match = re.search(r'History of (?:the )?Flight\s*\n+(.*?)(?:\n#|\Z)', content, re.IGNORECASE | re.DOTALL) | |
| if hof_match: | |
| first_para = hof_match.group(1)[:500] | |
| m = re.search(rf'[Oo]n\s+({months}\s+\d{{1,2}},?\s+\d{{4}})', first_para) | |
| if m: | |
| return m.group(1).strip() | |
| # Pattern 2: ISO-style date "2000-12-24" | |
| m = re.search(r'(\d{4}-\d{2}-\d{2})', title_area) | |
| if m: | |
| return m.group(1) | |
| # Pattern 3: In the abstract or citation line | |
| citation_area = '\n'.join(content.split('\n')[:35]) | |
| m = re.search(rf'({months}\s+\d{{1,2}},?\s+\d{{4}})', citation_area) | |
| if m: | |
| return m.group(1).strip() | |
| return None | |
| def regex_extract_fatalities(content: str) -> int | None: | |
| """Extract fatalities from the Injuries to Persons table.""" | |
| # Look for the injury table which has a "Fatal" row with a Total column | |
| # Pattern: | Fatal | ... | <number> | or | <number> | ... | Fatal | | |
| lines = content.split('\n') | |
| for i, line in enumerate(lines): | |
| if 'Fatal' in line and '|' in line: | |
| cells = [c.strip() for c in line.split('|') if c.strip()] | |
| # The Total is typically the last numeric column | |
| for cell in reversed(cells): | |
| try: | |
| val = int(cell) | |
| if val >= 0: | |
| return val | |
| except ValueError: | |
| continue | |
| # Fallback: search for "All X people on board were killed" | |
| m = re.search(r'[Aa]ll\s+(\d+)\s+(?:people|persons|occupants)\s+(?:on board\s+)?(?:were|was)\s+killed', content) | |
| if m: | |
| return int(m.group(1)) | |
| # Fallback: "X passengers aboard were fatally injured" | |
| m = re.search(r'(\d+)\s+passengers?\s+aboard\s+were\s+fatally\s+injured', content) | |
| if m: | |
| return int(m.group(1)) | |
| return None | |
| def regex_extract_from_title(content: str) -> dict: | |
| """Extract aircraft type, operator, location, date from the title/header lines.""" | |
| lines = content.split('\n') | |
| title_area = '\n'.join(lines[:25]) | |
| result = {} | |
| # Common aircraft types | |
| aircraft_pattern = r'(Boeing\s+\d{3}[A-Za-z0-9-]*|Airbus\s+A\d{3}[A-Za-z0-9-]*|McDonnell\s+Douglas\s+[A-Z]+[-]?\d+[A-Za-z0-9-]*|MD-\d+[A-Za-z0-9-]*|Cessna\s+\d+[A-Za-z]*|Beech(?:craft)?\s+\w+|Embraer\s+\w+|Bombardier\s+\w+|de\s+Havilland\s+\w+|ATR[-\s]?\d+)' | |
| m = re.search(aircraft_pattern, title_area, re.IGNORECASE) | |
| if m: | |
| result['aircraft_type'] = m.group(1).strip() | |
| # Try to get aircraft registration (N-number) | |
| reg_match = re.search(r'[,\s](N\d+[A-Z]*)[,\s]', title_area) | |
| if reg_match and 'aircraft_type' in result: | |
| result['aircraft_type'] = result['aircraft_type'] + ', ' + reg_match.group(1) | |
| return result | |
| def regex_extract_operator(content: str) -> str | None: | |
| """Extract airline/operator from title and early text.""" | |
| lines = content.split('\n') | |
| title_area = '\n'.join(lines[:25]) | |
| # Common airline patterns in NTSB titles | |
| # "Federal Express, Inc." "Trans World Airlines" "United Airlines" "American Airlines" | |
| # "Korean Air" "Southwest Airlines" "Delta Air Lines" | |
| airlines = [ | |
| r'((?:United|American|Delta|Southwest|Continental|Northwest|USAir|US\s*Airways|Eastern|TWA|Trans\s+World\s+Airlines|' | |
| r'Federal\s+Express|FedEx|Korean\s+Air|Alaska\s+Airlines|JetBlue|Spirit|Frontier|Allegiant|' | |
| r'Hawaiian|Aloha|Piedmont|Braniff|Pan\s+American|Comair|Atlantic\s+Southeast|' | |
| r'Air\s+Midwest|Executive\s+Airlines|Colgan\s+Air|Pinnacle|Mesa|SkyWest|' | |
| r'Emery\s+Worldwide|Fine\s+Air|Air\s+Sunshine|Casino\s+Express|' | |
| r'ValuJet|ATA|AirTran|Midwest\s+Express|Atlas\s+Air)(?:\s*[\w\s,.]*))', | |
| ] | |
| for pattern in airlines: | |
| m = re.search(pattern, title_area, re.IGNORECASE) | |
| if m: | |
| # Clean up: take just the airline name, not trailing words | |
| name = m.group(1).strip() | |
| # Trim after common suffixes | |
| for suffix in [', Inc.', ' Inc.', ', LLC', ' Airlines', ' Air Lines', ' Air', ' Worldwide']: | |
| idx = name.lower().find(suffix.lower()) | |
| if idx > 0: | |
| name = name[:idx + len(suffix)] | |
| break | |
| return name.strip(' ,.') | |
| return None | |
| def regex_extract_location(content: str) -> str | None: | |
| """Extract accident location from title and History of Flight.""" | |
| lines = content.split('\n') | |
| # Title area typically has location after aircraft registration | |
| title_area = '\n'.join(lines[:25]) | |
| # Pattern: "Near <City>, <State>" or just "<City>, <State/Country>" | |
| m = re.search(r'[Nn]ear\s+([A-Z][a-zA-Z\s]+,\s+[A-Z][a-zA-Z\s]+)', title_area) | |
| if m: | |
| return 'Near ' + m.group(1).strip() | |
| # US state patterns in title | |
| us_states = ( | |
| r'Alabama|Alaska|Arizona|Arkansas|California|Colorado|Connecticut|Delaware|Florida|Georgia|' | |
| r'Hawaii|Idaho|Illinois|Indiana|Iowa|Kansas|Kentucky|Louisiana|Maine|Maryland|Massachusetts|' | |
| r'Michigan|Minnesota|Mississippi|Missouri|Montana|Nebraska|Nevada|New\s+Hampshire|New\s+Jersey|' | |
| r'New\s+Mexico|New\s+York|North\s+Carolina|North\s+Dakota|Ohio|Oklahoma|Oregon|Pennsylvania|' | |
| r'Rhode\s+Island|South\s+Carolina|South\s+Dakota|Tennessee|Texas|Utah|Vermont|Virginia|' | |
| r'Washington|West\s+Virginia|Wisconsin|Wyoming|Guam|Puerto\s+Rico|Colombia' | |
| ) | |
| # Look for "<City>, <State>" in title | |
| m = re.search(rf'([A-Z][a-zA-Z\s]+),\s+({us_states})', title_area) | |
| if m: | |
| return f"{m.group(1).strip()}, {m.group(2).strip()}" | |
| return None | |
| def regex_extract_report_type(filename: str) -> str: | |
| """Determine report type from filename prefix.""" | |
| stem = Path(filename).stem.upper() | |
| if stem.startswith('AAR'): | |
| return 'AAR' # Aircraft Accident Report | |
| elif stem.startswith('AIR'): | |
| return 'AIR' # Aircraft Incident Report / Safety Recommendation | |
| elif stem.startswith('ASR'): | |
| return 'ASR' # Aviation Special Report | |
| elif stem.startswith('MAR'): | |
| return 'MAR' # Marine Accident Report | |
| elif stem.startswith('RAR'): | |
| return 'RAR' # Railroad Accident Report | |
| return 'Unknown' | |
| # ───────────────────────────────────────────── | |
| # LLM EXTRACTION for unstructured fields | |
| # ───────────────────────────────────────────── | |
| def extract_llm_sections(file_path: Path) -> str: | |
| """Extract targeted sections for LLM to analyze (analytical/narrative content only).""" | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| lines = content.split('\n') | |
| header = '\n'.join(lines[:35]) | |
| # Sections for the LLM — only analytical/narrative ones | |
| target_keywords = ['executive summary', 'synopsis', 'abstract', 'probable cause', | |
| 'conclusion', 'finding', 'recommendation', 'history of flight', | |
| 'history of the flight'] | |
| skip_sections = [ | |
| 'airplane information', 'aircraft information', 'personnel information', | |
| 'meteorological', 'wreckage', 'fire', 'survival', 'tests and research', | |
| 'medical', 'communications', 'aids to navigation', 'airport information', | |
| 'flight recorders', 'other damage', 'abbreviation', 'organizational', | |
| 'appendix', 'figure', 'table of', 'contents', 'damage to', | |
| ] | |
| sections = [] | |
| capture = False | |
| buf = [] | |
| line_count = 0 | |
| max_lines = 40 | |
| for line in lines[35:]: | |
| clean = line.strip().lower() | |
| if clean.startswith('#'): | |
| if capture and buf: | |
| sections.append('\n'.join(buf)) | |
| buf = [] | |
| line_count = 0 | |
| capture = False | |
| if any(kw in clean for kw in skip_sections): | |
| capture = False | |
| continue | |
| if any(kw in clean for kw in target_keywords): | |
| buf = [line] | |
| line_count = 0 | |
| max_lines = 50 if 'history' in clean else 35 | |
| capture = True | |
| else: | |
| capture = False | |
| elif capture: | |
| line_count += 1 | |
| if line_count <= max_lines: | |
| buf.append(line) | |
| if capture and buf: | |
| sections.append('\n'.join(buf)) | |
| combined = header + '\n\n---KEY SECTIONS---\n\n' + '\n\n'.join(sections) | |
| words = combined.split() | |
| if len(words) > 5000: | |
| words = words[:5000] | |
| return ' '.join(words) | |
| SYSTEM_PROMPT = """You extract metadata from NTSB aviation accident/incident reports. | |
| Output ONLY a valid JSON object with exactly these keys. Do not add extra keys. | |
| For list fields, provide 1-5 concise items. Use null for unknown values.""" | |
| def extract_with_llm(text: str) -> dict: | |
| """Use LLM for the fields that need interpretation.""" | |
| prompt = f"""Read this NTSB report and extract the following information into JSON. | |
| REPORT TEXT: | |
| {text} | |
| Complete this JSON (fill in values, use null if not found, use [] for empty lists): | |
| {{ | |
| "flight_type": "___", | |
| "probable_cause": "___", | |
| "contributing_factors": ["___"], | |
| "safety_issues": ["___"], | |
| "recommendations": ["___"], | |
| "key_findings": ["___"] | |
| }}""" | |
| response = call_ollama( | |
| prompt=prompt, | |
| system_prompt=SYSTEM_PROMPT, | |
| model="qwen2.5:32b", | |
| temperature=0.0, | |
| max_tokens=2000, | |
| json_mode=True, | |
| timeout=180 | |
| ) | |
| try: | |
| return json.loads(response) | |
| except json.JSONDecodeError: | |
| clean = response.strip() | |
| clean = re.sub(r'^```json?\s*', '', clean) | |
| clean = re.sub(r'\s*```$', '', clean) | |
| return json.loads(clean) | |
| # ───────────────────────────────────────────── | |
| # KEY NORMALIZATION (safety net for LLM output) | |
| # ───────────────────────────────────────────── | |
| LLM_KEY_ALIASES = { | |
| "flight_type": ["operation_type", "flight_operation", "type_of_flight", "operation"], | |
| "probable_cause": ["cause_of_accident", "cause", "probable_cause_determination", "accident_cause"], | |
| "contributing_factors": ["contributing_causes", "factors"], | |
| "safety_issues": ["safety_concerns", "issues"], | |
| "recommendations": ["safety_recommendations"], | |
| "key_findings": ["findings", "main_findings"], | |
| } | |
| def normalize_llm_keys(metadata: dict) -> dict: | |
| """Map alternative key names from LLM to expected schema.""" | |
| normalized = {} | |
| for expected_key, aliases in LLM_KEY_ALIASES.items(): | |
| if expected_key in metadata: | |
| normalized[expected_key] = metadata[expected_key] | |
| else: | |
| for alias in aliases: | |
| if alias in metadata: | |
| normalized[expected_key] = metadata[alias] | |
| break | |
| return normalized | |
| def ensure_schema(metadata: dict) -> dict: | |
| """Ensure all required keys exist with appropriate defaults.""" | |
| list_keys = {'contributing_factors', 'safety_issues', 'recommendations', 'key_findings'} | |
| for key in REQUIRED_KEYS: | |
| if key not in metadata: | |
| metadata[key] = [] if key in list_keys else None | |
| return metadata | |
| # ───────────────────────────────────────────── | |
| # MAIN PIPELINE | |
| # ───────────────────────────────────────────── | |
| def process_metadata(): | |
| if not is_ollama_running(): | |
| logger.error("Ollama is not running. Please start Ollama before extracting metadata.") | |
| return | |
| PROCESSED_DIR.mkdir(parents=True, exist_ok=True) | |
| # Load existing progress | |
| existing_metadata = {} | |
| if METADATA_FILE.exists(): | |
| with open(METADATA_FILE, 'r', encoding='utf-8') as f: | |
| existing_data = json.load(f) | |
| if isinstance(existing_data, list): | |
| for item in existing_data: | |
| rid = item.get("report_id") | |
| if rid: | |
| existing_metadata[rid] = item | |
| md_files = sorted(DATA_DIR.glob("*.md")) | |
| logger.info(f"Found {len(md_files)} markdown files.") | |
| results = list(existing_metadata.values()) | |
| if results: | |
| logger.info(f"Resuming: {len(results)} reports already processed.") | |
| for md_file in tqdm(md_files, desc="Extracting metadata"): | |
| report_id = md_file.stem | |
| if report_id in existing_metadata: | |
| continue | |
| try: | |
| # Read full file content | |
| with open(md_file, 'r', encoding='utf-8') as f: | |
| full_content = f.read() | |
| # ── STEP 1: Regex extraction for structured fields ── | |
| date = regex_extract_date(full_content) | |
| fatalities = regex_extract_fatalities(full_content) | |
| title_info = regex_extract_from_title(full_content) | |
| operator = regex_extract_operator(full_content) | |
| location = regex_extract_location(full_content) | |
| report_type = regex_extract_report_type(md_file.name) | |
| logger.info(f"[REGEX] {report_id}: date={date}, fatalities={fatalities}, " | |
| f"aircraft={title_info.get('aircraft_type')}, operator={operator}, " | |
| f"location={location}") | |
| # ── STEP 2: LLM extraction for analytical fields ── | |
| llm_text = extract_llm_sections(md_file) | |
| llm_data = extract_with_llm(llm_text) | |
| llm_data = normalize_llm_keys(llm_data) | |
| # ── STEP 3: Merge regex + LLM results (regex takes priority) ── | |
| metadata = { | |
| "report_id": report_id, | |
| "report_type": report_type, | |
| "date": date, | |
| "location": location or llm_data.get("location"), | |
| "aircraft_type": title_info.get("aircraft_type") or llm_data.get("aircraft_type"), | |
| "operator": operator or llm_data.get("operator"), | |
| "flight_type": llm_data.get("flight_type"), | |
| "fatalities": fatalities, | |
| "probable_cause": llm_data.get("probable_cause"), | |
| "contributing_factors": llm_data.get("contributing_factors", []), | |
| "safety_issues": llm_data.get("safety_issues", []), | |
| "recommendations": llm_data.get("recommendations", []), | |
| "key_findings": llm_data.get("key_findings", []), | |
| } | |
| metadata = ensure_schema(metadata) | |
| results.append(metadata) | |
| existing_metadata[report_id] = metadata | |
| # Save after every report | |
| with open(METADATA_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| logger.error(f"Failed to process {report_id}: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| logger.info(f"Metadata extraction complete. {len(results)} reports processed.") | |
| logger.info(f"Results saved to {METADATA_FILE}") | |
| if __name__ == "__main__": | |
| process_metadata() | |