"""Utility functions for searching, filtering, and formatting ITU document data.""" import re from datetime import datetime from common.models import split_title def comma_separated_list(elements): """Join elements into a comma-separated string.""" return ", ".join(str(e) for e in elements) def _normalize_work_item_name(name): """Normalize work item name for flexible matching. Handles variations like: - X.1254rev vs X.1254.rev (missing dot before suffix) - XSTR.kdc-qkdn vs XSTR.kdc_QKDN (hyphen vs underscore, case) """ if not name: return "" n = name.lower() # Normalize hyphens and underscores n = n.replace('-', '_') # Remove dots before common suffixes to normalize (X.1254.rev -> X.1254rev) n = re.sub(r'\.(rev|amd|cor|sup)($|[^a-z])', r'\1\2', n) return n def find_td_by_name(table_rows, name): """Find a TableRow whose title contains the given name. Uses normalized matching to handle variations like: - Hyphen vs underscore (XSTR.kdc-qkdn vs XSTR.kdc_QKDN) - Missing dots (X.1254rev vs X.1254.rev) - Case differences Returns (questionName, tableRow) or ("", None). """ # First try exact match for row in table_rows: if name in row.title: question_name = row.questions[0].value if row.questions else "" return (question_name, row) # Try normalized match norm_name = _normalize_work_item_name(name) for row in table_rows: norm_title = _normalize_work_item_name(row.title) if norm_name in norm_title: question_name = row.questions[0].value if row.questions else "" return (question_name, row) return ("", None) def _normalize_number(value): """Normalize a document number by stripping spaces and leading zeros.""" return str(value).strip().lstrip('0') or '0' def find_td_by_number(table_rows, number): """Find a TableRow by its document number. Returns (questionName, tableRow) or ("", None). """ normalized = _normalize_number(number) for row in table_rows: if _normalize_number(row.number.value) == normalized: question_name = row.questions[0].value if row.questions else "" return (question_name, row) return ("", None) def find_question_name_td_and_a5(table_rows, number): """Find a TableRow by number and its associated A.5 justification document. Returns (questionName, tableRow, a5TableRow). """ question_name = "" td = None a5 = None title = None normalized_number = _normalize_number(number) for row in table_rows: if _normalize_number(row.number.value) == normalized_number: question_name = row.questions[0].value if row.questions else "" td = row title = row.title break if title is not None: for row in table_rows: if title in row.title and "A.5" in row.title: a5 = row break return (question_name, td, a5) def compare_stripped(string1, string2): """Compare two strings ignoring all spaces.""" return string1.replace(' ', '') == string2.replace(' ', '') def stripped_starts_with(string1, string2): """Check if string2 starts with string1 (original logic preserved).""" return string2.startswith(string1) def is_new_work_item(string): """Check if a title indicates a new work item proposal. Matches: "Proposal", "NWI", or "new work item" in the title. """ lower = string.lower() return ('proposal' in lower or 'nwi' in lower or ('new' in lower and 'work' in lower and 'item' in lower)) # --- Role extraction helpers --- def get_rapporteurs(question_details): """Get formatted rapporteur names from a Question object.""" rapporteurs = [] for role in question_details.roles: if role.roleName in ("Rapporteur", "Co-rapporteur"): rapporteurs.append( f"{role.firstName} {role.lastName} ({role.company}, {role.address})" ) return rapporteurs def get_associate_rapporteurs(question_details): """Get formatted associate rapporteur names from a Question object.""" return [ f"{role.firstName} {role.lastName} ({role.company}, {role.address})" for role in question_details.roles if role.roleName == "Associate rapporteur" ] def get_chairs(working_party_details): """Get formatted chair names from a WorkingParty object.""" return [ f"{role.firstName} {role.lastName} ({role.company}, {role.address})" for role in working_party_details.roles if role.roleName in ("Chair", "Co-Chair") ] def get_vice_chairs(working_party_details): """Get formatted vice-chair names from a WorkingParty object.""" return [ f"{role.firstName} {role.lastName} ({role.company}, {role.address})" for role in working_party_details.roles if role.roleName == "Vice-chair" ] # --- Document search helpers --- def get_document_title(table_rows, number): """Find the title of a document by its number.""" normalized = _normalize_number(number) for row in table_rows: if _normalize_number(row.number.value) == normalized: return row.title return "" def get_liaison_destination(table_rows, number): """Extract the liaison destination from a document title (text in [to ...]).""" normalized = _normalize_number(number) for row in table_rows: if _normalize_number(row.number.value) == normalized: title = row.title idx1 = title.find('[to') if idx1 >= 0: idx2 = title.find(']', idx1) if idx2 > idx1: return title[idx1 + 1:idx2] return "" def get_meeting_reports(table_rows, question, group): """Find all rapporteur group meeting reports for a given question. Matches titles containing "RGM" or "Rapporteur Group Meeting" for the question. """ reports = [] q_pattern = f"q{question}/{group}".lower() for row in table_rows: title_lower = row.title.lower() # Match "RGM" or "Rapporteur Group Meeting" with the question number if q_pattern in title_lower and ("rgm" in title_lower or "rapporteur group meeting" in title_lower): reports.append(row) return reports # --- Work programme helpers (shared by WP and Question reports) --- def extract_alt_name(work_item_name): """Extract alternate name from 'X.1096 (ex X.bvm)' -> 'X.bvm'.""" idx1 = work_item_name.find('(ex ') if idx1 < 0: return None idx2 = work_item_name.find(')', idx1) if idx2 < 0: return None return work_item_name[idx1 + 4:idx2].strip() def auto_detect_from_work_programme(work_item_details, wp_rows, approval, determination, consent, non_normative): """Auto-detect approval/consent/determination/agreement from work programme status. For each work item with a non-'Under study' status, finds the matching WP TD and adds its number to the appropriate list. Returns a dict mapping TD number -> WorkItem for use in table generation. """ td_to_work_item = {} for wi in work_item_details: status = (wi.status or '').strip() if not status or status.startswith('Under study'): continue if status.startswith('Approved'): target = approval elif status.startswith('Determined'): target = determination elif status.startswith('Consented'): target = consent elif status.startswith('Agreed'): target = non_normative else: continue name = wi.workItem _, td = find_td_by_name(wp_rows, name) if td is None: alt = extract_alt_name(name) if alt: _, td = find_td_by_name(wp_rows, alt) if td is not None: td_num = td.number.value.strip() if td_num not in target: target.append(td_num) td_to_work_item[td_num] = wi return td_to_work_item def extract_new_work_item_info(title, wp_rows): """Extract work item name and title from a new work item contribution title. Returns (work_item_name, text_title). """ # Pattern 1: X.name or XSTR.name followed by quoted title # e.g., "X.f2sp \"FAPI 2.0 security profile\"" or "XSTR.gidi \"Title here\"" m = re.search(r'(X\.[A-Za-z0-9._-]+|XSTR\.[A-Za-z0-9._-]+)\s*[,:]?\s*"([^"]+)"', title) if m: name = m.group(1).rstrip(',') text = m.group(2).strip() return name, text # Pattern 2: X.name: title (without quotes) # e.g., "X.sc-sd: Security capability..." m = re.search(r'(X\.[A-Za-z0-9._-]+|XSTR\.[A-Za-z0-9._-]+)\s*:\s*(.+)$', title) if m: name = m.group(1).rstrip(',') text = m.group(2).strip().strip('"') return name, text # Pattern 3: TR.name with optional title m = re.search(r'(TR\.[A-Za-z0-9._-]+)\s*[,:]?\s*"?([^"]*)"?', title) if m: name = m.group(1).rstrip(',') text = (m.group(2) or "").strip() return name, text return "", "" def detect_outgoing_liaisons(wp_rows): """Auto-detect outgoing liaison TD numbers from WP TDs. Detects titles starting with 'LS/o' (case-insensitive) or containing 'liaison statement' with outgoing indicators. Returns a list of TD number strings. """ outgoing_ls = [] for row in wp_rows: title = row.title or '' title_lower = title.lower().strip() if (title_lower.startswith('ls/o') or ('liaison statement' in title_lower and ('outgoing' in title_lower or '[to ' in title_lower))): val = row.number.value.strip() if val and val not in outgoing_ls: outgoing_ls.append(val) return outgoing_ls def detect_processed_work_items(work_item_details, wp_rows): """Detect under study work items that have a corresponding TD. Cross-references work programme items with "Under study" status against the TD list to find items that were actually processed. Returns a list of tuples: (work_item_name, td_number, td_row). """ processed = [] for wi in work_item_details: status = (wi.status or '').strip() if not status.startswith('Under study'): continue name = wi.workItem if not name: continue _, td = find_td_by_name(wp_rows, name) if td is None: alt = extract_alt_name(name) if alt: _, td = find_td_by_name(wp_rows, alt) if td is not None: processed.append((name, td.number.value.strip(), td)) return processed def parse_timing(timing_str): """Parse work programme timing string to a datetime for comparison. Handles: '2026-06', '2026-12', '2027-Q1', '2027-Q2', etc. Returns datetime or None. """ timing_str = timing_str.strip() quarter_map = {'Q1': '03', 'Q2': '06', 'Q3': '09', 'Q4': '12'} for q, month in quarter_map.items(): if q in timing_str: year = timing_str.split('-')[0] try: return datetime(int(year), int(month), 28) except ValueError: return None try: return datetime.strptime(timing_str, '%Y-%m') except ValueError: return None def print_work_programme_summary(work_items): """Print a summary of scraped work programme items.""" print(f"\n Work Programme: {len(work_items)} item(s)") for wi in work_items: status = wi.status or "?" process = wi.approvalProcess or "?" version = wi.version or "?" name = wi.workItem or "?" if len(name) > 30: name = name[:27] + "..." print(f" {name:<30} Status: {status:<20} Version: {version:<10} Process: {process}")