""" Advanced Named Entity Recognition Service for Gapura AI Extracts specific entities from irregularity reports """ import os import logging import re from typing import List, Dict, Any, Optional, Tuple from collections import Counter logger = logging.getLogger(__name__) class AdvancedNER: """ Advanced Named Entity Recognition for airport irregularity reports Entity Types: - DAMAGE_TYPE: dented, torn, wet, broken, damaged - QUANTITY: 5 pcs, 2 bags, 10 kg - LOCATION: gate, warehouse, aircraft, apron - TIME: STD, ETA, delay duration - FLIGHT_INFO: flight number, route - AIRLINE: airline names - PERSONNEL: PIC, staff names - EQUIPMENT: ULD, forklift, belt loader - CARGO_TYPE: DG, perishable, fragile """ DAMAGE_PATTERNS = { "DAMAGE_TYPE": [ (r"\b(dented|dent|dents)\b", "dented"), (r"\b(torn|tear|tears|robek)\b", "torn"), (r"\b(wet|basah|lembab|moisture)\b", "wet"), (r"\b(broken|break|broke|pecah)\b", "broken"), (r"\b(damaged|damage|kerusakan|rusak)\b", "damaged"), (r"\b(crushed|crush|penyet)\b", "crushed"), (r"\b(scratched|scratch|gores|tergores)\b", "scratched"), (r"\b(missing|lost|hilang|kehabisan)\b", "missing"), (r"\b(soiled|dirty|kotor|stain)\b", "soiled"), (r"\b(pilfered|pencurian|stolen)\b", "pilfered"), ], } QUANTITY_PATTERNS = { "QUANTITY": [ (r"(\d+)\s*(pcs?|pieces?)", "pieces"), (r"(\d+)\s*(bags?|bagasi)", "bags"), (r"(\d+)\s*(kg|kilogram)", "kg"), (r"(\d+)\s*(uld|container)", "uld"), (r"(\d+)\s*(pallet|pallets)", "pallets"), (r"(\d+)\s*(passengers?|pax|penumpang)", "passengers"), (r"(\d+)\s*(bags?|baggage)", "baggage"), (r"(\d+)\s*(hours?|jam)", "hours"), (r"(\d+)\s*(minutes?|menit)", "minutes"), ], } LOCATION_PATTERNS = { "LOCATION": [ (r"\b(gate\s*[A-Z0-9]+|gate\s+\d+)", "gate"), (r"\b(warehouse|gudang|WH)\b", "warehouse"), (r"\b(aircraft|pesawat|plane)\b", "aircraft"), (r"\b(apron|apron area)\b", "apron"), (r"\b(terminal\s*[0-9]?|T[0-9])\b", "terminal"), (r"\b(baggage\s*(area|claim|hall)?)\b", "baggage_area"), (r"\b(check[- ]?in)\b", "check_in"), (r"\b(boarding\s*(gate|area)?)\b", "boarding"), (r"\b(cargo\s*(area|terminal|warehouse)?)\b", "cargo_area"), (r"\b(ramp|lapangan)\b", "ramp"), ], } TIME_PATTERNS = { "TIME": [ (r"\b(STD|ETD|ATD)[:\s]*(\d{1,2}[:\.]?\d{2})", "std"), (r"\b(ETA|STA|ATA)[:\s]*(\d{1,2}[:\.]?\d{2})", "eta"), ( r"\b(delay|terlambat)\s*(\d+)\s*(hours?|jam|minutes?|menit)", "delay_duration", ), (r"\b(\d{1,2}[:\.]\d{2})\s*(LT|UTC|WIB|WITA|WIT)", "time_with_zone"), (r"\b(arrival|keberangkatan)\b", "arrival_departure"), ], } FLIGHT_PATTERNS = { "FLIGHT_INFO": [ (r"\b([A-Z]{2})\s*(\d{2,4})\b", "flight_number"), (r"\b(flight\s*[A-Z]{2}\s*\d{2,4})\b", "flight"), (r"\b([A-Z]{3})[-/]([A-Z]{3})\b", "route"), (r"\b(CGK|DPS|SUB|UPG|KNO|YIA|BKS|MDC|TKG|PKU)\b", "airport_code"), ], } EQUIPMENT_PATTERNS = { "EQUIPMENT": [ (r"\b(ULD|Unit Load Device)\b", "uld"), (r"\b(forklift|fork lift)\b", "forklift"), (r"\b(belt\s*loader|beltloader)\b", "belt_loader"), (r"\b(pallet\s*loader|palletloader)\b", "pallet_loader"), (r"\b(tow\s*tractor|towtractor)\b", "tow_tractor"), (r"\b(stairs|air stairs|airstairs)\b", "stairs"), (r"\b(container|kontainer)\b", "container"), (r"\b(pallet)\b", "pallet"), (r"\b(chute)\b", "chute"), (r"\b(conveyor)\b", "conveyor"), ], } CARGO_PATTERNS = { "CARGO_TYPE": [ ( r"\b(DG|Dangerous Goods|dangerous\s*goods|barang\s*berbahaya)\b", "dangerous_goods", ), (r"\b(perishable|fresh|frozen|segar)\b", "perishable"), (r"\b(fragile|breakable|mudah\s*pecah)\b", "fragile"), (r"\b(live\s*animals?|AVI|hewan)\b", "live_animals"), (r"\b(valuable|berharga)\b", "valuable"), (r"\b(pharmaceutical|pharma|farmasi)\b", "pharmaceutical"), (r"\b(human\s*remains|AHU)\b", "human_remains"), (r"\b(mail|pos)\b", "mail"), ], } PERSONNEL_PATTERNS = { "PERSONNEL": [ (r"\b(PIC|Person In Charge)\b", "pic"), (r"\b(Load\s*Master|LM)\b", "load_master"), (r"\b(Ground\s*Handler|GH)\b", "ground_handler"), (r"\b(Supervisor|Spv)\b", "supervisor"), (r"\b(Staff|Petugas|Officer)\b", "staff"), (r"\b(Crew|Cabin\s*Crew)\b", "crew"), (r"\b(Engineer|Teknisi)\b", "engineer"), ], } def __init__(self): self.all_patterns = { **self.DAMAGE_PATTERNS, **self.QUANTITY_PATTERNS, **self.LOCATION_PATTERNS, **self.TIME_PATTERNS, **self.FLIGHT_PATTERNS, **self.EQUIPMENT_PATTERNS, **self.CARGO_PATTERNS, **self.PERSONNEL_PATTERNS, } def extract(self, text: str) -> Dict[str, List[Dict[str, Any]]]: """ Extract all entities from text Args: text: Report text Returns: Dict with entity types as keys and list of entities as values """ if not text: return {} text_lower = text.lower() entities = {} for entity_type, patterns in self.all_patterns.items(): extracted = [] for pattern, label in patterns: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: extracted.append( { "text": match.group(0), "label": label, "start": match.start(), "end": match.end(), "confidence": 0.85, } ) if extracted: # Deduplicate seen = set() unique = [] for e in extracted: key = (e["text"].lower(), e["label"]) if key not in seen: seen.add(key) unique.append(e) entities[entity_type] = unique return entities def extract_summary(self, text: str) -> Dict[str, Any]: """ Extract entity summary from text Args: text: Report text Returns: Summary dict with extracted entities organized by type """ entities = self.extract(text) summary = { "damages": [], "quantities": [], "locations": [], "times": [], "flight_info": [], "equipment": [], "cargo_types": [], "personnel": [], "entity_count": 0, } for entity_type, entity_list in entities.items(): for entity in entity_list: label = entity["label"] text_val = entity["text"] if entity_type == "DAMAGE_TYPE": if label not in summary["damages"]: summary["damages"].append(label) elif entity_type == "QUANTITY": summary["quantities"].append(f"{text_val} ({label})") elif entity_type == "LOCATION": if label not in summary["locations"]: summary["locations"].append(label) elif entity_type == "TIME": summary["times"].append(text_val) elif entity_type == "FLIGHT_INFO": summary["flight_info"].append(f"{text_val} ({label})") elif entity_type == "EQUIPMENT": if label not in summary["equipment"]: summary["equipment"].append(label) elif entity_type == "CARGO_TYPE": if label not in summary["cargo_types"]: summary["cargo_types"].append(label) elif entity_type == "PERSONNEL": if label not in summary["personnel"]: summary["personnel"].append(label) summary["entity_count"] += 1 return summary def extract_batch(self, records: List[Dict]) -> List[Dict[str, Any]]: """Extract entities from multiple records""" results = [] for record in records: text = f"{record.get('Report', '')} {record.get('Root_Caused', '')} {record.get('Action_Taken', '')}" entities = self.extract(text) summary = self.extract_summary(text) results.append( { "entities": entities, "summary": summary, } ) return results def get_damage_statistics(self, records: List[Dict]) -> Dict[str, int]: """Get statistics on damage types""" damage_counts = Counter() for record in records: text = record.get("Report", "") entities = self.extract(text) for entity in entities.get("DAMAGE_TYPE", []): damage_counts[entity["label"]] += 1 return dict(damage_counts.most_common()) def get_equipment_statistics(self, records: List[Dict]) -> Dict[str, int]: """Get statistics on equipment mentioned""" equipment_counts = Counter() for record in records: text = f"{record.get('Report', '')} {record.get('Root_Caused', '')}" entities = self.extract(text) for entity in entities.get("EQUIPMENT", []): equipment_counts[entity["label"]] += 1 return dict(equipment_counts.most_common()) _advanced_ner: Optional[AdvancedNER] = None def get_advanced_ner() -> AdvancedNER: """Get singleton NER instance""" global _advanced_ner if _advanced_ner is None: _advanced_ner = AdvancedNER() return _advanced_ner