import hashlib import io import json import re from dataclasses import dataclass from pathlib import Path from typing import Any import pandas as pd SHEET_ORDER = [ "Summary", "Bands_LTE", "Bands_UTRA", "Bands_GERAN", "Bands_NR", "CA_Combinations", "CA_Assessment", "ENDC_MRDC", "Features", "VoLTE_Assessment", "Release_Inference", "Benchmark_CA_Diff", "Parse_Warnings", ] _MESSAGE_START_RE = re.compile(r"^\s*\d{2}:\d{2}:\d{2}\.\d{3}.*RRC Signaling", re.I) _BOOL_VALUES = {"true", "false", "supported", "notsupported", "present", "absent"} _BITSTRING_RE = re.compile(r"^[01]{8,}$") @dataclass class Node: name: str children: list["Node"] def _empty_sheets() -> dict[str, pd.DataFrame]: return {name: pd.DataFrame() for name in SHEET_ORDER} def _normalize_label(label: str) -> str: text = re.sub(r"\s+", " ", label.strip()) text = re.sub(r"\s+#\d+$", "", text) return text.lower() def _release_sort_key(release: str) -> int: match = re.search(r"(\d+)", str(release)) return int(match.group(1)) if match else -1 def _safe_decode(content: bytes) -> str: for encoding in ("utf-8", "latin-1", "cp1252"): try: return content.decode(encoding) except UnicodeDecodeError: continue return content.decode("utf-8", errors="ignore") def _build_ue_id( source_name: str, content: str, message_index: int, message_count: int ) -> str: base = Path(source_name).stem or "uecap" normalized = re.sub(r"\s+", " ", content).strip().encode("utf-8", errors="ignore") digest = hashlib.sha1(normalized).hexdigest()[:10] if message_count > 1: return f"{base}_m{message_index}_{digest}" return f"{base}_{digest}" def _split_messages(content: str) -> list[tuple[int, str]]: lines = content.splitlines() indices = [idx for idx, line in enumerate(lines) if _MESSAGE_START_RE.search(line)] if not indices: return [(1, content)] indices.append(len(lines)) parts: list[tuple[int, str]] = [] for i in range(len(indices) - 1): start, end = indices[i], indices[i + 1] chunk = "\n".join(lines[start:end]).strip() if chunk: parts.append((i + 1, chunk)) return parts or [(1, content)] def _tokenize(content: str) -> list[str]: tokens: list[str] = [] for raw_line in content.splitlines(): line = raw_line.strip() if not line: continue parts = re.split(r"(\{|\})", line) for part in parts: part = part.strip() if part: tokens.append(part) return tokens def _parse_entries(tokens: list[str], start: int = 0) -> tuple[list[Node], int]: entries: list[Node] = [] i = start while i < len(tokens): token = tokens[i] if token == "}": return entries, i + 1 if token == "{": nested, i = _parse_entries(tokens, i + 1) entries.extend(nested) continue label = token i += 1 children: list[Node] = [] if i < len(tokens) and tokens[i] == "{": children, i = _parse_entries(tokens, i + 1) entries.append(Node(name=label, children=children)) return entries, i def _flatten_tree( nodes: list[Node], ue_id: str, source_file: str ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[str]]: kv_records: list[dict[str, Any]] = [] leaf_records: list[dict[str, Any]] = [] paths: list[str] = [] def walk(node: Node, ancestors: list[str]) -> None: current_path = ancestors + [node.name] normalized_path = ".".join(_normalize_label(item) for item in current_path) paths.append(normalized_path) if not node.children: leaf_records.append( { "ue_id": ue_id, "source_file": source_file, "path": ".".join(current_path), "path_normalized": normalized_path, "leaf_value": node.name.strip(), } ) return if len(node.children) == 1 and not node.children[0].children: value = node.children[0].name.strip() kv_records.append( { "ue_id": ue_id, "source_file": source_file, "path": ".".join(current_path), "path_normalized": normalized_path, "key": node.name.strip(), "key_normalized": _normalize_label(node.name), "value": value, } ) for child in node.children: walk(child, current_path) for node in nodes: walk(node, []) return kv_records, leaf_records, paths def _collect_subtree_kv(node: Node) -> list[dict[str, str]]: records: list[dict[str, str]] = [] def walk(current: Node, ancestors: list[str]) -> None: current_path = ancestors + [current.name] if len(current.children) == 1 and not current.children[0].children: records.append( { "path": ".".join(current_path), "key": current.name.strip(), "key_normalized": _normalize_label(current.name), "value": current.children[0].name.strip(), } ) for child in current.children: walk(child, current_path) walk(node, []) return records def _detect_rats(paths: list[str], kv_records: list[dict[str, Any]]) -> list[str]: evidence = " ".join( paths + [f"{rec['key_normalized']}={str(rec['value']).lower()}" for rec in kv_records] ) rats: list[str] = [] if "eutra" in evidence: rats.append("LTE") if "utra" in evidence or "wcdma" in evidence: rats.append("WCDMA") if "geran" in evidence or re.search(r"\bgsm\d*", evidence): rats.append("GSM") if any( marker in evidence for marker in [ "supportedbandlistnr", "featureset", "mrdc", "endc", "nr-rat", "bandnr", " rat-type.nr", ] ): rats.append("NR") return rats def _extract_release_explicit(kv_records: list[dict[str, Any]]) -> str | None: for record in kv_records: if record["key_normalized"] == "accessstratumrelease": value = str(record["value"]).strip().lower() if value: return value return None def _extract_bands_lte(kv_records: list[dict[str, Any]]) -> pd.DataFrame: rows: list[dict[str, Any]] = [] for record in kv_records: key = record["key_normalized"] if "bandeutra" not in key: continue value = str(record["value"]).strip() if not re.fullmatch(r"\d+", value): continue rows.append( { "ue_id": record["ue_id"], "source_file": record["source_file"], "band_lte": int(value), "raw_key": record["key"], "path": record["path"], } ) if not rows: return pd.DataFrame( columns=["ue_id", "source_file", "band_lte", "raw_key", "path"] ) return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_lte", "raw_key"]) def _extract_bands_utra(kv_records: list[dict[str, Any]]) -> pd.DataFrame: rows: list[dict[str, Any]] = [] for record in kv_records: key = record["key_normalized"] value = str(record["value"]).strip() if "supportedbandutra-fdd" not in key and "bandutra" not in key: continue if not value: continue rows.append( { "ue_id": record["ue_id"], "source_file": record["source_file"], "band_utra": value, "raw_key": record["key"], "path": record["path"], } ) if not rows: return pd.DataFrame( columns=["ue_id", "source_file", "band_utra", "raw_key", "path"] ) return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_utra", "raw_key"]) def _extract_bands_geran(kv_records: list[dict[str, Any]]) -> pd.DataFrame: rows: list[dict[str, Any]] = [] for record in kv_records: key = record["key_normalized"] value = str(record["value"]).strip() if "supportedbandgeran" not in key and "bandgeran" not in key: continue if not value: continue rows.append( { "ue_id": record["ue_id"], "source_file": record["source_file"], "band_geran": value, "raw_key": record["key"], "path": record["path"], } ) if not rows: return pd.DataFrame( columns=["ue_id", "source_file", "band_geran", "raw_key", "path"] ) return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_geran", "raw_key"]) def _extract_bands_nr(kv_records: list[dict[str, Any]]) -> pd.DataFrame: rows: list[dict[str, Any]] = [] for record in kv_records: key = record["key_normalized"] value = str(record["value"]).strip() if ( "bandnr" not in key and "supportedbandnr" not in key and "supportedbandlistnr" not in key ): continue rows.append( { "ue_id": record["ue_id"], "source_file": record["source_file"], "band_nr": value, "raw_key": record["key"], "path": record["path"], } ) if not rows: return pd.DataFrame( columns=["ue_id", "source_file", "band_nr", "raw_key", "path"] ) return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_nr", "raw_key"]) def _extract_component_from_band_parameters(node: Node) -> dict[str, Any]: kv = _collect_subtree_kv(node) band_lte = next( ( int(record["value"]) for record in kv if record["key_normalized"].startswith("bandeutra") and re.fullmatch(r"\d+", record["value"]) ), None, ) band_nr = next( ( str(record["value"]).strip() for record in kv if "bandnr" in record["key_normalized"] and str(record["value"]).strip() ), None, ) ul_class = next( ( str(record["value"]).strip().lower() for record in kv if "ca-bandwidthclassul" in record["key_normalized"] ), None, ) dl_class = next( ( str(record["value"]).strip().lower() for record in kv if "ca-bandwidthclassdl" in record["key_normalized"] ), None, ) return { "component_id": node.name, "band_lte": band_lte, "band_nr": band_nr, "ul_class": ul_class, "dl_class": dl_class, } def _extract_ca_combinations( nodes: list[Node], ue_id: str, source_file: str ) -> pd.DataFrame: rows: list[dict[str, Any]] = [] def walk(node: Node, ancestors: list[Node]) -> None: key = _normalize_label(node.name) if key.startswith("bandcombinationparameters"): ancestor_names = [_normalize_label(item.name) for item in ancestors] combo_scope = next( ( ancestors[idx].name for idx in range(len(ancestors) - 1, -1, -1) if ancestor_names[idx].startswith("supportedbandcombination") ), "", ) components: list[dict[str, Any]] = [] for child in node.children: if _normalize_label(child.name).startswith("bandparameters"): components.append(_extract_component_from_band_parameters(child)) bands_lte = sorted( {comp["band_lte"] for comp in components if comp.get("band_lte") is not None} ) bands_nr = sorted( {str(comp["band_nr"]) for comp in components if comp.get("band_nr")} ) ul_classes = sorted( { str(comp["ul_class"]).lower() for comp in components if str(comp.get("ul_class", "")).strip() } ) dl_classes = sorted( { str(comp["dl_class"]).lower() for comp in components if str(comp.get("dl_class", "")).strip() } ) component_count = len( [comp for comp in components if comp.get("band_lte") or comp.get("band_nr")] ) rows.append( { "ue_id": ue_id, "source_file": source_file, "combination_id": node.name, "combination_scope": combo_scope, "lte_bands": ",".join(f"B{band}" for band in bands_lte), "nr_bands": ",".join(bands_nr), "ul_classes": ",".join(ul_classes), "dl_classes": ",".join(dl_classes), "component_count": component_count, "components_json": json.dumps(components, ensure_ascii=False), } ) for child in node.children: walk(child, ancestors + [node]) for root in nodes: walk(root, []) if not rows: return pd.DataFrame( columns=[ "ue_id", "source_file", "combination_id", "combination_scope", "lte_bands", "nr_bands", "ul_classes", "dl_classes", "component_count", "components_json", ] ) return pd.DataFrame(rows).drop_duplicates( subset=[ "ue_id", "combination_id", "combination_scope", "lte_bands", "nr_bands", "ul_classes", "dl_classes", ] ) def _load_json_file(path: Path) -> dict[str, Any]: if not path.exists(): return {} return json.loads(path.read_text(encoding="utf-8")) def _load_ca_rules(ca_rules_path: str | None = None) -> dict[str, Any]: default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_ca_rules.json" target = Path(ca_rules_path) if ca_rules_path else default_path data = _load_json_file(target) return { "allowed_classes": [ str(c).lower() for c in data.get("allowed_classes", ["a", "b", "c", "d", "e", "f"]) ], "prefer_class_order": str(data.get("prefer_class_order", "dl_then_ul")).lower(), } def _format_nr_band(raw_band: str) -> str: text = str(raw_band).strip().lower() if not text: return text if text.startswith("n"): return text.upper() if re.fullmatch(r"\d+", text): return f"N{text}" return text.upper() def _build_ca_assessment_df(ca_df: pd.DataFrame, ca_rules_path: str | None = None) -> pd.DataFrame: columns = [ "ue_id", "source_file", "combination_id", "combination_scope", "combo_norm", "component_count", "dl_class_set", "ul_class_set", "combo_type", "consistency_status", "remarks", ] if ca_df is None or ca_df.empty: return pd.DataFrame(columns=columns) rules = _load_ca_rules(ca_rules_path=ca_rules_path) allowed_classes = set(rules["allowed_classes"]) prefer_dl = rules["prefer_class_order"] == "dl_then_ul" rows: list[dict[str, Any]] = [] for _, combo in ca_df.iterrows(): components = json.loads(str(combo.get("components_json") or "[]")) parts: list[str] = [] missing_class_count = 0 unknown_class_count = 0 has_lte = False has_nr = False dl_set: set[str] = set() ul_set: set[str] = set() for comp in components: band_label = None band_lte = comp.get("band_lte") band_nr = comp.get("band_nr") dl_class = str(comp.get("dl_class") or "").strip().lower() ul_class = str(comp.get("ul_class") or "").strip().lower() if band_lte is not None: band_label = f"B{band_lte}" has_lte = True elif band_nr: band_label = _format_nr_band(str(band_nr)) has_nr = True if not band_label: continue dl_set.update({dl_class.upper()} if dl_class else set()) ul_set.update({ul_class.upper()} if ul_class else set()) selected_class = dl_class if prefer_dl else ul_class if not selected_class: selected_class = ul_class if prefer_dl else dl_class class_suffix = "" if selected_class: if selected_class in allowed_classes: class_suffix = selected_class.upper() else: unknown_class_count += 1 else: missing_class_count += 1 parts.append(f"{band_label}{class_suffix}") component_count = len(parts) if component_count == 0: status = "invalid" combo_type = "Unknown" elif has_lte and has_nr: combo_type = "MR-DC/EN-DC candidate" status = ( "valid" if missing_class_count == 0 and unknown_class_count == 0 else "partially_valid" ) elif has_lte and component_count > 1: combo_type = "LTE CA" status = ( "valid" if missing_class_count == 0 and unknown_class_count == 0 else "partially_valid" ) elif has_nr and component_count > 1: combo_type = "NR CA" status = ( "valid" if missing_class_count == 0 and unknown_class_count == 0 else "partially_valid" ) else: combo_type = "Single/Unknown" status = "partially_valid" remarks_parts: list[str] = [] if missing_class_count: remarks_parts.append(f"{missing_class_count} component(s) without bandwidth class.") if unknown_class_count: remarks_parts.append(f"{unknown_class_count} component(s) with unknown class token.") if component_count == 0: remarks_parts.append("No valid CA component extracted.") rows.append( { "ue_id": combo.get("ue_id"), "source_file": combo.get("source_file"), "combination_id": combo.get("combination_id"), "combination_scope": combo.get("combination_scope"), "combo_norm": "+".join(parts), "component_count": component_count, "dl_class_set": ",".join(sorted(dl_set)), "ul_class_set": ",".join(sorted(ul_set)), "combo_type": combo_type, "consistency_status": status, "remarks": " ".join(remarks_parts), } ) return pd.DataFrame(rows, columns=columns) def _extract_endc_mrdc( kv_records: list[dict[str, Any]], ca_assessment_df: pd.DataFrame, ue_id: str, source_file: str ) -> pd.DataFrame: rows: list[dict[str, Any]] = [] for record in kv_records: path = record["path_normalized"] if any( marker in path for marker in [ "mrdc", "endc", "featureset", "supportedbandlistnr", "supportedbandcombinationnr", ] ): rows.append( { "ue_id": record["ue_id"], "source_file": record["source_file"], "item_type": "path_feature", "item_key": record["key"], "item_value": str(record["value"]), "path": record["path"], } ) if ca_assessment_df is not None and not ca_assessment_df.empty: for _, combo in ca_assessment_df.iterrows(): if str(combo.get("combo_type")) == "MR-DC/EN-DC candidate": rows.append( { "ue_id": ue_id, "source_file": source_file, "item_type": "combo", "item_key": str(combo.get("combination_id", "")), "item_value": str(combo.get("combo_norm", "")), "path": str(combo.get("combination_scope", "")), } ) if not rows: return pd.DataFrame( columns=["ue_id", "source_file", "item_type", "item_key", "item_value", "path"] ) return pd.DataFrame(rows).drop_duplicates() def _extract_features(kv_records: list[dict[str, Any]]) -> pd.DataFrame: rows: list[dict[str, Any]] = [] excluded_prefixes = ( "bandeutra", "supportedbandeutra", "supportedbandutra", "supportedbandgeran", "supportedbandnr", "ca-bandwidthclass", ) excluded_exact = {"accessstratumrelease"} for record in kv_records: key_norm = record["key_normalized"] value = str(record["value"]).strip() value_norm = value.lower() if key_norm in excluded_exact or key_norm.startswith(excluded_prefixes): continue if not value: continue if value_norm in _BOOL_VALUES: value_type = "flag" elif _BITSTRING_RE.fullmatch(value): value_type = "bitstring" elif re.fullmatch(r"\d+", value): value_type = "number" else: value_type = "text" rows.append( { "ue_id": record["ue_id"], "source_file": record["source_file"], "feature_name": record["key"], "feature_value": value, "value_type": value_type, "path": record["path"], } ) if not rows: return pd.DataFrame( columns=[ "ue_id", "source_file", "feature_name", "feature_value", "value_type", "path", ] ) return pd.DataFrame(rows).drop_duplicates() def _extract_categories(kv_records: list[dict[str, Any]]) -> dict[str, str]: categories: dict[str, str] = {} for record in kv_records: key_norm = record["key_normalized"] if "ue-category" in key_norm: categories[record["key"]] = str(record["value"]) return categories def _load_release_rules(rules_path: str | None = None) -> list[dict[str, Any]]: default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_release_rules.json" target = Path(rules_path) if rules_path else default_path data = _load_json_file(target) return data.get("rules", []) def infer_release(extracted: dict[str, Any], rules_path: str | None = None) -> dict[str, Any]: explicit_release = extracted.get("explicit_release") evidence_strings = [ str(item) for item in extracted.get("evidence", []) if str(item).strip() ] rules = _load_release_rules(rules_path=rules_path) triggered: list[dict[str, Any]] = [] scores: dict[str, float] = {} for rule in rules: release = str(rule.get("release", "")).strip().lower() patterns = [str(pattern) for pattern in rule.get("patterns", []) if str(pattern).strip()] weight = float(rule.get("weight", 1.0)) rule_id = str(rule.get("rule_id", "rule")).strip() description = str(rule.get("description", "")).strip() matched_patterns: list[str] = [] for pattern in patterns: regex = re.compile(pattern, re.I) if any(regex.search(item) for item in evidence_strings): matched_patterns.append(pattern) if not matched_patterns or not release: continue scores[release] = scores.get(release, 0.0) + weight triggered.append( { "rule_id": rule_id, "release": release, "weight": weight, "matched_patterns": ", ".join(matched_patterns), "description": description, } ) inferred_release = None confidence = 0.0 if scores: sorted_scores = sorted( scores.items(), key=lambda item: (item[1], _release_sort_key(item[0])), reverse=True, ) inferred_release = sorted_scores[0][0] total = sum(scores.values()) confidence = float(sorted_scores[0][1] / total) if total else 0.0 final_release = explicit_release or inferred_release or "unknown" if explicit_release: confidence = 1.0 return { "explicit_release": explicit_release, "inferred_release": inferred_release, "final_release": final_release, "confidence": round(confidence, 4), "triggered_rules": triggered, } def _load_volte_rules(volte_rules_path: str | None = None) -> dict[str, Any]: default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_volte_rules.json" target = Path(volte_rules_path) if volte_rules_path else default_path data = _load_json_file(target) return { "thresholds": data.get("thresholds", {"supported": 70, "likely": 40}), "missing_critical_penalty": int(data.get("missing_critical_penalty", 20)), "hard_negative_penalty": int(data.get("hard_negative_penalty", 40)), "hard_negative_patterns": data.get( "hard_negative_patterns", [r"voiceoverps.*notsupported", r"voice-over-ps.*notsupported", r"srvcc.*notsupported"], ), "rules": data.get("rules", []), "critical_signals": data.get( "critical_signals", [ {"name": "ims", "patterns": [r"\bims\b"]}, {"name": "voice_over_ps", "patterns": [r"voiceoverps", r"voice-over-ps"]}, {"name": "srvcc", "patterns": [r"\bsrvcc\b"]}, ], ), } def assess_volte_support( extracted: dict[str, Any], volte_rules_path: str | None = None ) -> dict[str, Any]: rules = _load_volte_rules(volte_rules_path=volte_rules_path) thresholds = rules["thresholds"] evidence = [str(item).lower() for item in extracted.get("evidence", []) if str(item).strip()] matched_rules: list[dict[str, Any]] = [] score = 0.0 explicit_positive_found = False for rule in rules["rules"]: patterns = [str(pattern) for pattern in rule.get("patterns", []) if str(pattern).strip()] if not patterns: continue matched = [] for pattern in patterns: if any(re.search(pattern, item, re.I) for item in evidence): matched.append(pattern) if not matched: continue category = str(rule.get("category", "implicit_positive")).strip().lower() weight = float(rule.get("weight", 0)) score += weight if category == "explicit_positive": explicit_positive_found = True matched_rules.append( { "rule_id": str(rule.get("rule_id", "rule")), "category": category, "weight": weight, "matched_patterns": ", ".join(matched), "description": str(rule.get("description", "")).strip(), } ) missing_signals: list[str] = [] for signal in rules["critical_signals"]: name = str(signal.get("name", "signal")) patterns = [str(pattern) for pattern in signal.get("patterns", []) if str(pattern).strip()] if not any( re.search(pattern, item, re.I) for pattern in patterns for item in evidence ): missing_signals.append(name) hard_negative_patterns = [ str(pattern) for pattern in rules.get("hard_negative_patterns", []) if str(pattern).strip() ] hard_negative_matches: list[str] = [] for pattern in hard_negative_patterns: if any(re.search(pattern, item, re.I) for item in evidence): hard_negative_matches.append(pattern) if not explicit_positive_found: score -= float(rules["missing_critical_penalty"]) if hard_negative_matches: score -= float(rules.get("hard_negative_penalty", 40)) score = max(0.0, min(100.0, score)) confidence = round(score / 100.0, 4) supported_threshold = float(thresholds.get("supported", 70)) likely_threshold = float(thresholds.get("likely", 40)) if explicit_positive_found and score >= supported_threshold: status = "Supported" elif score >= likely_threshold: status = "Likely" elif score > 0: status = "Unknown" else: status = "Not indicated" # Strict mode: explicit negative indicator blocks "Supported". if hard_negative_matches and status == "Supported": status = "Unknown" explicit_evidence = [ rule["rule_id"] for rule in matched_rules if rule["category"] == "explicit_positive" ] implicit_evidence = [ rule["rule_id"] for rule in matched_rules if rule["category"] == "implicit_positive" ] notes: list[str] = [] if not explicit_positive_found: notes.append("No explicit IMS/VoPS/SRVCC indicator found in this capability text.") if missing_signals: notes.append(f"Missing critical signals: {', '.join(missing_signals)}.") if hard_negative_matches: notes.append("Explicit negative VoLTE indicator found (notsupported). Supported verdict is blocked.") return { "volte_status": status, "volte_score": round(score, 2), "confidence": confidence, "explicit_evidence": ", ".join(explicit_evidence), "implicit_evidence": ", ".join(implicit_evidence), "missing_signals": ", ".join(missing_signals), "notes": " ".join(notes), "matched_rules": matched_rules, } def _build_volte_assessment_df( ue_id: str, source_file: str, evidence: list[str], volte_rules_path: str | None = None, enabled: bool = True, ) -> pd.DataFrame: columns = [ "ue_id", "source_file", "volte_status", "volte_score", "confidence", "explicit_evidence", "implicit_evidence", "missing_signals", "notes", ] if not enabled: return pd.DataFrame(columns=columns) result = assess_volte_support({"evidence": evidence}, volte_rules_path=volte_rules_path) return pd.DataFrame( [ { "ue_id": ue_id, "source_file": source_file, "volte_status": result.get("volte_status"), "volte_score": result.get("volte_score"), "confidence": result.get("confidence"), "explicit_evidence": result.get("explicit_evidence"), "implicit_evidence": result.get("implicit_evidence"), "missing_signals": result.get("missing_signals"), "notes": result.get("notes"), } ] ) def _normalize_combo_for_compare(combo: str) -> str: text = str(combo).strip().upper() if not text: return "" text = re.sub(r"^CA[_:\-\s]*", "", text) text = text.replace(" ", "").replace("_", "") text = text.replace("-", "+") text = re.sub(r"\++", "+", text).strip("+") if not text: return "" components = [part for part in text.split("+") if part] if not components: return "" return "+".join(sorted(components)) def _build_benchmark_ca_diff( ue_id: str, source_file: str, ca_assessment_df: pd.DataFrame, benchmark_combos: list[str] | None = None, ) -> pd.DataFrame: columns = ["ue_id", "source_file", "combo_norm", "status"] if not benchmark_combos: return pd.DataFrame(columns=columns) parser_set = { _normalize_combo_for_compare(combo) for combo in ca_assessment_df.get("combo_norm", pd.Series(dtype=str)).tolist() if _normalize_combo_for_compare(combo) } benchmark_set = {_normalize_combo_for_compare(combo) for combo in benchmark_combos} benchmark_set = {combo for combo in benchmark_set if combo} rows: list[dict[str, Any]] = [] for combo in sorted(parser_set & benchmark_set): rows.append( { "ue_id": ue_id, "source_file": source_file, "combo_norm": combo, "status": "exact_match", } ) for combo in sorted(parser_set - benchmark_set): rows.append( { "ue_id": ue_id, "source_file": source_file, "combo_norm": combo, "status": "missing_in_benchmark", } ) for combo in sorted(benchmark_set - parser_set): rows.append( { "ue_id": ue_id, "source_file": source_file, "combo_norm": combo, "status": "missing_in_log", } ) if not rows: return pd.DataFrame(columns=columns) return pd.DataFrame(rows, columns=columns) def _build_release_inference_df( ue_id: str, source_file: str, release_result: dict[str, Any] ) -> pd.DataFrame: triggered = release_result.get("triggered_rules", []) if not triggered: return pd.DataFrame( [ { "ue_id": ue_id, "source_file": source_file, "explicit_release": release_result.get("explicit_release"), "inferred_release": release_result.get("inferred_release"), "final_release": release_result.get("final_release"), "confidence": release_result.get("confidence"), "rule_id": None, "rule_release": None, "rule_weight": None, "matched_patterns": None, "rule_description": None, } ] ) rows: list[dict[str, Any]] = [] for rule in triggered: rows.append( { "ue_id": ue_id, "source_file": source_file, "explicit_release": release_result.get("explicit_release"), "inferred_release": release_result.get("inferred_release"), "final_release": release_result.get("final_release"), "confidence": release_result.get("confidence"), "rule_id": rule.get("rule_id"), "rule_release": rule.get("release"), "rule_weight": rule.get("weight"), "matched_patterns": rule.get("matched_patterns"), "rule_description": rule.get("description"), } ) return pd.DataFrame(rows) def _parse_single_message( content: str, source_name: str, message_index: int, message_count: int, rules_path: str | None = None, ca_rules_path: str | None = None, volte_rules_path: str | None = None, enable_volte_assessment: bool = True, benchmark_combos: list[str] | None = None, ) -> dict[str, pd.DataFrame]: sheets = _empty_sheets() warnings: list[dict[str, Any]] = [] tokens = _tokenize(content) ue_id = _build_ue_id( source_name, content, message_index=message_index, message_count=message_count ) if not tokens: warnings.append( { "ue_id": ue_id, "source_file": source_name, "severity": "error", "message": "No tokens found in input text.", } ) sheets["Parse_Warnings"] = pd.DataFrame(warnings) return sheets open_count = sum(1 for token in tokens if token == "{") close_count = sum(1 for token in tokens if token == "}") if open_count != close_count: warnings.append( { "ue_id": ue_id, "source_file": source_name, "severity": "warning", "message": f"Brace count mismatch: open={open_count}, close={close_count}. Parsing will continue.", } ) nodes, _ = _parse_entries(tokens) kv_records, leaf_records, normalized_paths = _flatten_tree( nodes, ue_id=ue_id, source_file=source_name ) rats = _detect_rats(normalized_paths, kv_records) explicit_release = _extract_release_explicit(kv_records) categories = _extract_categories(kv_records) bands_lte = _extract_bands_lte(kv_records) bands_utra = _extract_bands_utra(kv_records) bands_geran = _extract_bands_geran(kv_records) bands_nr = _extract_bands_nr(kv_records) ca_df = _extract_ca_combinations(nodes, ue_id=ue_id, source_file=source_name) ca_assessment_df = _build_ca_assessment_df(ca_df, ca_rules_path=ca_rules_path) endc_df = _extract_endc_mrdc( kv_records, ca_assessment_df=ca_assessment_df, ue_id=ue_id, source_file=source_name ) features_df = _extract_features(kv_records) evidence = ( normalized_paths + [row["leaf_value"] for row in leaf_records] + [f"{row['key_normalized']}={str(row['value']).lower()}" for row in kv_records] ) release_result = infer_release( {"explicit_release": explicit_release, "evidence": evidence}, rules_path=rules_path, ) release_df = _build_release_inference_df( ue_id=ue_id, source_file=source_name, release_result=release_result ) volte_df = _build_volte_assessment_df( ue_id=ue_id, source_file=source_name, evidence=evidence, volte_rules_path=volte_rules_path, enabled=enable_volte_assessment, ) benchmark_df = _build_benchmark_ca_diff( ue_id=ue_id, source_file=source_name, ca_assessment_df=ca_assessment_df, benchmark_combos=benchmark_combos, ) if bands_lte.empty: warnings.append( { "ue_id": ue_id, "source_file": source_name, "severity": "warning", "message": "No LTE bands detected in this message.", } ) if not explicit_release and not release_result.get("inferred_release"): warnings.append( { "ue_id": ue_id, "source_file": source_name, "severity": "warning", "message": "Release could not be inferred from current rules.", } ) volte_status = None volte_score = None if not volte_df.empty: volte_status = volte_df.iloc[0]["volte_status"] volte_score = volte_df.iloc[0]["volte_score"] benchmark_mismatch_count = int( len( benchmark_df[ benchmark_df["status"].isin(["missing_in_benchmark", "missing_in_log"]) ].index ) ) if not benchmark_df.empty else 0 normalized_combos = ( ca_assessment_df["combo_norm"].astype(str).str.strip() if not ca_assessment_df.empty else pd.Series(dtype=str) ) normalized_combos = normalized_combos[normalized_combos != ""] summary_row = { "ue_id": ue_id, "source_file": source_name, "message_index": message_index, "rats_detected": ",".join(rats), "release_explicit": release_result.get("explicit_release"), "release_inferred": release_result.get("inferred_release"), "release_final": release_result.get("final_release"), "release_confidence": release_result.get("confidence"), "volte_status": volte_status, "volte_score": volte_score, "lte_band_count": int(bands_lte["band_lte"].nunique()) if not bands_lte.empty else 0, "utra_band_count": int(bands_utra["band_utra"].nunique()) if not bands_utra.empty else 0, "geran_band_count": int(bands_geran["band_geran"].nunique()) if not bands_geran.empty else 0, "nr_band_count": int(bands_nr["band_nr"].nunique()) if not bands_nr.empty else 0, "ca_combination_count": int(len(ca_df.index)), "ca_combo_normalized_count": int(normalized_combos.nunique()) if not ca_assessment_df.empty else 0, "endc_mrdc_item_count": int(len(endc_df.index)), "feature_count": int(len(features_df.index)), "benchmark_mismatch_count": benchmark_mismatch_count, "warning_count": int(len(warnings)), "ue_categories": ", ".join(f"{key}={value}" for key, value in categories.items()), "parser_profile": "decoded_tree_txt", } sheets["Summary"] = pd.DataFrame([summary_row]) sheets["Bands_LTE"] = bands_lte sheets["Bands_UTRA"] = bands_utra sheets["Bands_GERAN"] = bands_geran sheets["Bands_NR"] = bands_nr sheets["CA_Combinations"] = ca_df sheets["CA_Assessment"] = ca_assessment_df sheets["ENDC_MRDC"] = endc_df sheets["Features"] = features_df sheets["VoLTE_Assessment"] = volte_df sheets["Release_Inference"] = release_df sheets["Benchmark_CA_Diff"] = benchmark_df sheets["Parse_Warnings"] = pd.DataFrame(warnings) return sheets def _merge_sheet_dicts(sheet_sets: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]: merged = _empty_sheets() for sheet_name in SHEET_ORDER: dfs = [ sheets[sheet_name] for sheets in sheet_sets if sheet_name in sheets and not sheets[sheet_name].empty ] merged[sheet_name] = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() return merged def parse_uecap_text( content: str, source_name: str, rules_path: str | None = None, ca_rules_path: str | None = None, volte_rules_path: str | None = None, enable_volte_assessment: bool = True, benchmark_combos: list[str] | None = None, ) -> dict[str, pd.DataFrame]: messages = _split_messages(content) parsed_messages: list[dict[str, pd.DataFrame]] = [] message_count = len(messages) for message_index, message_text in messages: parsed_messages.append( _parse_single_message( content=message_text, source_name=source_name, message_index=message_index, message_count=message_count, rules_path=rules_path, ca_rules_path=ca_rules_path, volte_rules_path=volte_rules_path, enable_volte_assessment=enable_volte_assessment, benchmark_combos=benchmark_combos, ) ) return _merge_sheet_dicts(parsed_messages) def parse_uecap_files( files: list[tuple[str, bytes]], rules_path: str | None = None, ca_rules_path: str | None = None, volte_rules_path: str | None = None, enable_volte_assessment: bool = True, benchmark_combos: list[str] | None = None, ) -> dict[str, pd.DataFrame]: parsed_files: list[dict[str, pd.DataFrame]] = [] for source_name, content in files: text = _safe_decode(content) parsed_files.append( parse_uecap_text( text, source_name=source_name, rules_path=rules_path, ca_rules_path=ca_rules_path, volte_rules_path=volte_rules_path, enable_volte_assessment=enable_volte_assessment, benchmark_combos=benchmark_combos, ) ) return _merge_sheet_dicts(parsed_files) def to_excel_bytes(sheets: dict[str, pd.DataFrame]) -> bytes: buffer = io.BytesIO() with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer: for sheet_name in SHEET_ORDER: df = sheets.get(sheet_name, pd.DataFrame()) if df is None or df.empty: pd.DataFrame({"info": ["No data extracted for this sheet."]}).to_excel( writer, index=False, sheet_name=sheet_name[:31] ) else: df.to_excel(writer, index=False, sheet_name=sheet_name[:31]) return buffer.getvalue()