| import hashlib |
| import io |
| import json |
| import re |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import pandas as pd |
|
|
|
|
| SHEET_ORDER = [ |
| "Summary", |
| "Bands_LTE", |
| "Bands_UTRA", |
| "Bands_GERAN", |
| "Bands_NR", |
| "CA_Combinations", |
| "CA_Assessment", |
| "ENDC_MRDC", |
| "Features", |
| "VoLTE_Assessment", |
| "Release_Inference", |
| "Benchmark_CA_Diff", |
| "Parse_Warnings", |
| ] |
|
|
| _MESSAGE_START_RE = re.compile(r"^\s*\d{2}:\d{2}:\d{2}\.\d{3}.*RRC Signaling", re.I) |
| _BOOL_VALUES = {"true", "false", "supported", "notsupported", "present", "absent"} |
| _BITSTRING_RE = re.compile(r"^[01]{8,}$") |
|
|
|
|
| @dataclass |
| class Node: |
| name: str |
| children: list["Node"] |
|
|
|
|
| def _empty_sheets() -> dict[str, pd.DataFrame]: |
| return {name: pd.DataFrame() for name in SHEET_ORDER} |
|
|
|
|
| def _normalize_label(label: str) -> str: |
| text = re.sub(r"\s+", " ", label.strip()) |
| text = re.sub(r"\s+#\d+$", "", text) |
| return text.lower() |
|
|
|
|
| def _release_sort_key(release: str) -> int: |
| match = re.search(r"(\d+)", str(release)) |
| return int(match.group(1)) if match else -1 |
|
|
|
|
| def _safe_decode(content: bytes) -> str: |
| for encoding in ("utf-8", "latin-1", "cp1252"): |
| try: |
| return content.decode(encoding) |
| except UnicodeDecodeError: |
| continue |
| return content.decode("utf-8", errors="ignore") |
|
|
|
|
| def _build_ue_id( |
| source_name: str, content: str, message_index: int, message_count: int |
| ) -> str: |
| base = Path(source_name).stem or "uecap" |
| normalized = re.sub(r"\s+", " ", content).strip().encode("utf-8", errors="ignore") |
| digest = hashlib.sha1(normalized).hexdigest()[:10] |
| if message_count > 1: |
| return f"{base}_m{message_index}_{digest}" |
| return f"{base}_{digest}" |
|
|
|
|
| def _split_messages(content: str) -> list[tuple[int, str]]: |
| lines = content.splitlines() |
| indices = [idx for idx, line in enumerate(lines) if _MESSAGE_START_RE.search(line)] |
| if not indices: |
| return [(1, content)] |
|
|
| indices.append(len(lines)) |
| parts: list[tuple[int, str]] = [] |
| for i in range(len(indices) - 1): |
| start, end = indices[i], indices[i + 1] |
| chunk = "\n".join(lines[start:end]).strip() |
| if chunk: |
| parts.append((i + 1, chunk)) |
| return parts or [(1, content)] |
|
|
|
|
| def _tokenize(content: str) -> list[str]: |
| tokens: list[str] = [] |
| for raw_line in content.splitlines(): |
| line = raw_line.strip() |
| if not line: |
| continue |
| parts = re.split(r"(\{|\})", line) |
| for part in parts: |
| part = part.strip() |
| if part: |
| tokens.append(part) |
| return tokens |
|
|
|
|
| def _parse_entries(tokens: list[str], start: int = 0) -> tuple[list[Node], int]: |
| entries: list[Node] = [] |
| i = start |
| while i < len(tokens): |
| token = tokens[i] |
| if token == "}": |
| return entries, i + 1 |
| if token == "{": |
| nested, i = _parse_entries(tokens, i + 1) |
| entries.extend(nested) |
| continue |
|
|
| label = token |
| i += 1 |
| children: list[Node] = [] |
| if i < len(tokens) and tokens[i] == "{": |
| children, i = _parse_entries(tokens, i + 1) |
| entries.append(Node(name=label, children=children)) |
| return entries, i |
|
|
|
|
| def _flatten_tree( |
| nodes: list[Node], ue_id: str, source_file: str |
| ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[str]]: |
| kv_records: list[dict[str, Any]] = [] |
| leaf_records: list[dict[str, Any]] = [] |
| paths: list[str] = [] |
|
|
| def walk(node: Node, ancestors: list[str]) -> None: |
| current_path = ancestors + [node.name] |
| normalized_path = ".".join(_normalize_label(item) for item in current_path) |
| paths.append(normalized_path) |
|
|
| if not node.children: |
| leaf_records.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "path": ".".join(current_path), |
| "path_normalized": normalized_path, |
| "leaf_value": node.name.strip(), |
| } |
| ) |
| return |
|
|
| if len(node.children) == 1 and not node.children[0].children: |
| value = node.children[0].name.strip() |
| kv_records.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "path": ".".join(current_path), |
| "path_normalized": normalized_path, |
| "key": node.name.strip(), |
| "key_normalized": _normalize_label(node.name), |
| "value": value, |
| } |
| ) |
|
|
| for child in node.children: |
| walk(child, current_path) |
|
|
| for node in nodes: |
| walk(node, []) |
| return kv_records, leaf_records, paths |
|
|
|
|
| def _collect_subtree_kv(node: Node) -> list[dict[str, str]]: |
| records: list[dict[str, str]] = [] |
|
|
| def walk(current: Node, ancestors: list[str]) -> None: |
| current_path = ancestors + [current.name] |
| if len(current.children) == 1 and not current.children[0].children: |
| records.append( |
| { |
| "path": ".".join(current_path), |
| "key": current.name.strip(), |
| "key_normalized": _normalize_label(current.name), |
| "value": current.children[0].name.strip(), |
| } |
| ) |
| for child in current.children: |
| walk(child, current_path) |
|
|
| walk(node, []) |
| return records |
|
|
|
|
| def _detect_rats(paths: list[str], kv_records: list[dict[str, Any]]) -> list[str]: |
| evidence = " ".join( |
| paths + [f"{rec['key_normalized']}={str(rec['value']).lower()}" for rec in kv_records] |
| ) |
| rats: list[str] = [] |
| if "eutra" in evidence: |
| rats.append("LTE") |
| if "utra" in evidence or "wcdma" in evidence: |
| rats.append("WCDMA") |
| if "geran" in evidence or re.search(r"\bgsm\d*", evidence): |
| rats.append("GSM") |
| if any( |
| marker in evidence |
| for marker in [ |
| "supportedbandlistnr", |
| "featureset", |
| "mrdc", |
| "endc", |
| "nr-rat", |
| "bandnr", |
| " rat-type.nr", |
| ] |
| ): |
| rats.append("NR") |
| return rats |
|
|
|
|
| def _extract_release_explicit(kv_records: list[dict[str, Any]]) -> str | None: |
| for record in kv_records: |
| if record["key_normalized"] == "accessstratumrelease": |
| value = str(record["value"]).strip().lower() |
| if value: |
| return value |
| return None |
|
|
|
|
| def _extract_bands_lte(kv_records: list[dict[str, Any]]) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
| for record in kv_records: |
| key = record["key_normalized"] |
| if "bandeutra" not in key: |
| continue |
| value = str(record["value"]).strip() |
| if not re.fullmatch(r"\d+", value): |
| continue |
| rows.append( |
| { |
| "ue_id": record["ue_id"], |
| "source_file": record["source_file"], |
| "band_lte": int(value), |
| "raw_key": record["key"], |
| "path": record["path"], |
| } |
| ) |
| if not rows: |
| return pd.DataFrame( |
| columns=["ue_id", "source_file", "band_lte", "raw_key", "path"] |
| ) |
| return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_lte", "raw_key"]) |
|
|
|
|
| def _extract_bands_utra(kv_records: list[dict[str, Any]]) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
| for record in kv_records: |
| key = record["key_normalized"] |
| value = str(record["value"]).strip() |
| if "supportedbandutra-fdd" not in key and "bandutra" not in key: |
| continue |
| if not value: |
| continue |
| rows.append( |
| { |
| "ue_id": record["ue_id"], |
| "source_file": record["source_file"], |
| "band_utra": value, |
| "raw_key": record["key"], |
| "path": record["path"], |
| } |
| ) |
| if not rows: |
| return pd.DataFrame( |
| columns=["ue_id", "source_file", "band_utra", "raw_key", "path"] |
| ) |
| return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_utra", "raw_key"]) |
|
|
|
|
| def _extract_bands_geran(kv_records: list[dict[str, Any]]) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
| for record in kv_records: |
| key = record["key_normalized"] |
| value = str(record["value"]).strip() |
| if "supportedbandgeran" not in key and "bandgeran" not in key: |
| continue |
| if not value: |
| continue |
| rows.append( |
| { |
| "ue_id": record["ue_id"], |
| "source_file": record["source_file"], |
| "band_geran": value, |
| "raw_key": record["key"], |
| "path": record["path"], |
| } |
| ) |
| if not rows: |
| return pd.DataFrame( |
| columns=["ue_id", "source_file", "band_geran", "raw_key", "path"] |
| ) |
| return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_geran", "raw_key"]) |
|
|
|
|
| def _extract_bands_nr(kv_records: list[dict[str, Any]]) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
| for record in kv_records: |
| key = record["key_normalized"] |
| value = str(record["value"]).strip() |
| if ( |
| "bandnr" not in key |
| and "supportedbandnr" not in key |
| and "supportedbandlistnr" not in key |
| ): |
| continue |
| rows.append( |
| { |
| "ue_id": record["ue_id"], |
| "source_file": record["source_file"], |
| "band_nr": value, |
| "raw_key": record["key"], |
| "path": record["path"], |
| } |
| ) |
| if not rows: |
| return pd.DataFrame( |
| columns=["ue_id", "source_file", "band_nr", "raw_key", "path"] |
| ) |
| return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_nr", "raw_key"]) |
|
|
|
|
| def _extract_component_from_band_parameters(node: Node) -> dict[str, Any]: |
| kv = _collect_subtree_kv(node) |
| band_lte = next( |
| ( |
| int(record["value"]) |
| for record in kv |
| if record["key_normalized"].startswith("bandeutra") |
| and re.fullmatch(r"\d+", record["value"]) |
| ), |
| None, |
| ) |
| band_nr = next( |
| ( |
| str(record["value"]).strip() |
| for record in kv |
| if "bandnr" in record["key_normalized"] and str(record["value"]).strip() |
| ), |
| None, |
| ) |
| ul_class = next( |
| ( |
| str(record["value"]).strip().lower() |
| for record in kv |
| if "ca-bandwidthclassul" in record["key_normalized"] |
| ), |
| None, |
| ) |
| dl_class = next( |
| ( |
| str(record["value"]).strip().lower() |
| for record in kv |
| if "ca-bandwidthclassdl" in record["key_normalized"] |
| ), |
| None, |
| ) |
| return { |
| "component_id": node.name, |
| "band_lte": band_lte, |
| "band_nr": band_nr, |
| "ul_class": ul_class, |
| "dl_class": dl_class, |
| } |
|
|
|
|
| def _extract_ca_combinations( |
| nodes: list[Node], ue_id: str, source_file: str |
| ) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
|
|
| def walk(node: Node, ancestors: list[Node]) -> None: |
| key = _normalize_label(node.name) |
| if key.startswith("bandcombinationparameters"): |
| ancestor_names = [_normalize_label(item.name) for item in ancestors] |
| combo_scope = next( |
| ( |
| ancestors[idx].name |
| for idx in range(len(ancestors) - 1, -1, -1) |
| if ancestor_names[idx].startswith("supportedbandcombination") |
| ), |
| "", |
| ) |
| components: list[dict[str, Any]] = [] |
| for child in node.children: |
| if _normalize_label(child.name).startswith("bandparameters"): |
| components.append(_extract_component_from_band_parameters(child)) |
|
|
| bands_lte = sorted( |
| {comp["band_lte"] for comp in components if comp.get("band_lte") is not None} |
| ) |
| bands_nr = sorted( |
| {str(comp["band_nr"]) for comp in components if comp.get("band_nr")} |
| ) |
| ul_classes = sorted( |
| { |
| str(comp["ul_class"]).lower() |
| for comp in components |
| if str(comp.get("ul_class", "")).strip() |
| } |
| ) |
| dl_classes = sorted( |
| { |
| str(comp["dl_class"]).lower() |
| for comp in components |
| if str(comp.get("dl_class", "")).strip() |
| } |
| ) |
| component_count = len( |
| [comp for comp in components if comp.get("band_lte") or comp.get("band_nr")] |
| ) |
|
|
| rows.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "combination_id": node.name, |
| "combination_scope": combo_scope, |
| "lte_bands": ",".join(f"B{band}" for band in bands_lte), |
| "nr_bands": ",".join(bands_nr), |
| "ul_classes": ",".join(ul_classes), |
| "dl_classes": ",".join(dl_classes), |
| "component_count": component_count, |
| "components_json": json.dumps(components, ensure_ascii=False), |
| } |
| ) |
|
|
| for child in node.children: |
| walk(child, ancestors + [node]) |
|
|
| for root in nodes: |
| walk(root, []) |
|
|
| if not rows: |
| return pd.DataFrame( |
| columns=[ |
| "ue_id", |
| "source_file", |
| "combination_id", |
| "combination_scope", |
| "lte_bands", |
| "nr_bands", |
| "ul_classes", |
| "dl_classes", |
| "component_count", |
| "components_json", |
| ] |
| ) |
| return pd.DataFrame(rows).drop_duplicates( |
| subset=[ |
| "ue_id", |
| "combination_id", |
| "combination_scope", |
| "lte_bands", |
| "nr_bands", |
| "ul_classes", |
| "dl_classes", |
| ] |
| ) |
|
|
|
|
| def _load_json_file(path: Path) -> dict[str, Any]: |
| if not path.exists(): |
| return {} |
| return json.loads(path.read_text(encoding="utf-8")) |
|
|
|
|
| def _load_ca_rules(ca_rules_path: str | None = None) -> dict[str, Any]: |
| default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_ca_rules.json" |
| target = Path(ca_rules_path) if ca_rules_path else default_path |
| data = _load_json_file(target) |
| return { |
| "allowed_classes": [ |
| str(c).lower() for c in data.get("allowed_classes", ["a", "b", "c", "d", "e", "f"]) |
| ], |
| "prefer_class_order": str(data.get("prefer_class_order", "dl_then_ul")).lower(), |
| } |
|
|
|
|
| def _format_nr_band(raw_band: str) -> str: |
| text = str(raw_band).strip().lower() |
| if not text: |
| return text |
| if text.startswith("n"): |
| return text.upper() |
| if re.fullmatch(r"\d+", text): |
| return f"N{text}" |
| return text.upper() |
|
|
|
|
| def _build_ca_assessment_df(ca_df: pd.DataFrame, ca_rules_path: str | None = None) -> pd.DataFrame: |
| columns = [ |
| "ue_id", |
| "source_file", |
| "combination_id", |
| "combination_scope", |
| "combo_norm", |
| "component_count", |
| "dl_class_set", |
| "ul_class_set", |
| "combo_type", |
| "consistency_status", |
| "remarks", |
| ] |
| if ca_df is None or ca_df.empty: |
| return pd.DataFrame(columns=columns) |
|
|
| rules = _load_ca_rules(ca_rules_path=ca_rules_path) |
| allowed_classes = set(rules["allowed_classes"]) |
| prefer_dl = rules["prefer_class_order"] == "dl_then_ul" |
|
|
| rows: list[dict[str, Any]] = [] |
| for _, combo in ca_df.iterrows(): |
| components = json.loads(str(combo.get("components_json") or "[]")) |
| parts: list[str] = [] |
| missing_class_count = 0 |
| unknown_class_count = 0 |
| has_lte = False |
| has_nr = False |
| dl_set: set[str] = set() |
| ul_set: set[str] = set() |
|
|
| for comp in components: |
| band_label = None |
| band_lte = comp.get("band_lte") |
| band_nr = comp.get("band_nr") |
| dl_class = str(comp.get("dl_class") or "").strip().lower() |
| ul_class = str(comp.get("ul_class") or "").strip().lower() |
|
|
| if band_lte is not None: |
| band_label = f"B{band_lte}" |
| has_lte = True |
| elif band_nr: |
| band_label = _format_nr_band(str(band_nr)) |
| has_nr = True |
|
|
| if not band_label: |
| continue |
|
|
| dl_set.update({dl_class.upper()} if dl_class else set()) |
| ul_set.update({ul_class.upper()} if ul_class else set()) |
|
|
| selected_class = dl_class if prefer_dl else ul_class |
| if not selected_class: |
| selected_class = ul_class if prefer_dl else dl_class |
|
|
| class_suffix = "" |
| if selected_class: |
| if selected_class in allowed_classes: |
| class_suffix = selected_class.upper() |
| else: |
| unknown_class_count += 1 |
| else: |
| missing_class_count += 1 |
|
|
| parts.append(f"{band_label}{class_suffix}") |
|
|
| component_count = len(parts) |
| if component_count == 0: |
| status = "invalid" |
| combo_type = "Unknown" |
| elif has_lte and has_nr: |
| combo_type = "MR-DC/EN-DC candidate" |
| status = ( |
| "valid" |
| if missing_class_count == 0 and unknown_class_count == 0 |
| else "partially_valid" |
| ) |
| elif has_lte and component_count > 1: |
| combo_type = "LTE CA" |
| status = ( |
| "valid" |
| if missing_class_count == 0 and unknown_class_count == 0 |
| else "partially_valid" |
| ) |
| elif has_nr and component_count > 1: |
| combo_type = "NR CA" |
| status = ( |
| "valid" |
| if missing_class_count == 0 and unknown_class_count == 0 |
| else "partially_valid" |
| ) |
| else: |
| combo_type = "Single/Unknown" |
| status = "partially_valid" |
|
|
| remarks_parts: list[str] = [] |
| if missing_class_count: |
| remarks_parts.append(f"{missing_class_count} component(s) without bandwidth class.") |
| if unknown_class_count: |
| remarks_parts.append(f"{unknown_class_count} component(s) with unknown class token.") |
| if component_count == 0: |
| remarks_parts.append("No valid CA component extracted.") |
|
|
| rows.append( |
| { |
| "ue_id": combo.get("ue_id"), |
| "source_file": combo.get("source_file"), |
| "combination_id": combo.get("combination_id"), |
| "combination_scope": combo.get("combination_scope"), |
| "combo_norm": "+".join(parts), |
| "component_count": component_count, |
| "dl_class_set": ",".join(sorted(dl_set)), |
| "ul_class_set": ",".join(sorted(ul_set)), |
| "combo_type": combo_type, |
| "consistency_status": status, |
| "remarks": " ".join(remarks_parts), |
| } |
| ) |
|
|
| return pd.DataFrame(rows, columns=columns) |
|
|
|
|
| def _extract_endc_mrdc( |
| kv_records: list[dict[str, Any]], ca_assessment_df: pd.DataFrame, ue_id: str, source_file: str |
| ) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
|
|
| for record in kv_records: |
| path = record["path_normalized"] |
| if any( |
| marker in path |
| for marker in [ |
| "mrdc", |
| "endc", |
| "featureset", |
| "supportedbandlistnr", |
| "supportedbandcombinationnr", |
| ] |
| ): |
| rows.append( |
| { |
| "ue_id": record["ue_id"], |
| "source_file": record["source_file"], |
| "item_type": "path_feature", |
| "item_key": record["key"], |
| "item_value": str(record["value"]), |
| "path": record["path"], |
| } |
| ) |
|
|
| if ca_assessment_df is not None and not ca_assessment_df.empty: |
| for _, combo in ca_assessment_df.iterrows(): |
| if str(combo.get("combo_type")) == "MR-DC/EN-DC candidate": |
| rows.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "item_type": "combo", |
| "item_key": str(combo.get("combination_id", "")), |
| "item_value": str(combo.get("combo_norm", "")), |
| "path": str(combo.get("combination_scope", "")), |
| } |
| ) |
|
|
| if not rows: |
| return pd.DataFrame( |
| columns=["ue_id", "source_file", "item_type", "item_key", "item_value", "path"] |
| ) |
| return pd.DataFrame(rows).drop_duplicates() |
|
|
|
|
| def _extract_features(kv_records: list[dict[str, Any]]) -> pd.DataFrame: |
| rows: list[dict[str, Any]] = [] |
| excluded_prefixes = ( |
| "bandeutra", |
| "supportedbandeutra", |
| "supportedbandutra", |
| "supportedbandgeran", |
| "supportedbandnr", |
| "ca-bandwidthclass", |
| ) |
| excluded_exact = {"accessstratumrelease"} |
|
|
| for record in kv_records: |
| key_norm = record["key_normalized"] |
| value = str(record["value"]).strip() |
| value_norm = value.lower() |
|
|
| if key_norm in excluded_exact or key_norm.startswith(excluded_prefixes): |
| continue |
| if not value: |
| continue |
|
|
| if value_norm in _BOOL_VALUES: |
| value_type = "flag" |
| elif _BITSTRING_RE.fullmatch(value): |
| value_type = "bitstring" |
| elif re.fullmatch(r"\d+", value): |
| value_type = "number" |
| else: |
| value_type = "text" |
|
|
| rows.append( |
| { |
| "ue_id": record["ue_id"], |
| "source_file": record["source_file"], |
| "feature_name": record["key"], |
| "feature_value": value, |
| "value_type": value_type, |
| "path": record["path"], |
| } |
| ) |
|
|
| if not rows: |
| return pd.DataFrame( |
| columns=[ |
| "ue_id", |
| "source_file", |
| "feature_name", |
| "feature_value", |
| "value_type", |
| "path", |
| ] |
| ) |
| return pd.DataFrame(rows).drop_duplicates() |
|
|
|
|
| def _extract_categories(kv_records: list[dict[str, Any]]) -> dict[str, str]: |
| categories: dict[str, str] = {} |
| for record in kv_records: |
| key_norm = record["key_normalized"] |
| if "ue-category" in key_norm: |
| categories[record["key"]] = str(record["value"]) |
| return categories |
|
|
|
|
| def _load_release_rules(rules_path: str | None = None) -> list[dict[str, Any]]: |
| default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_release_rules.json" |
| target = Path(rules_path) if rules_path else default_path |
| data = _load_json_file(target) |
| return data.get("rules", []) |
|
|
|
|
| def infer_release(extracted: dict[str, Any], rules_path: str | None = None) -> dict[str, Any]: |
| explicit_release = extracted.get("explicit_release") |
| evidence_strings = [ |
| str(item) for item in extracted.get("evidence", []) if str(item).strip() |
| ] |
| rules = _load_release_rules(rules_path=rules_path) |
|
|
| triggered: list[dict[str, Any]] = [] |
| scores: dict[str, float] = {} |
|
|
| for rule in rules: |
| release = str(rule.get("release", "")).strip().lower() |
| patterns = [str(pattern) for pattern in rule.get("patterns", []) if str(pattern).strip()] |
| weight = float(rule.get("weight", 1.0)) |
| rule_id = str(rule.get("rule_id", "rule")).strip() |
| description = str(rule.get("description", "")).strip() |
|
|
| matched_patterns: list[str] = [] |
| for pattern in patterns: |
| regex = re.compile(pattern, re.I) |
| if any(regex.search(item) for item in evidence_strings): |
| matched_patterns.append(pattern) |
|
|
| if not matched_patterns or not release: |
| continue |
|
|
| scores[release] = scores.get(release, 0.0) + weight |
| triggered.append( |
| { |
| "rule_id": rule_id, |
| "release": release, |
| "weight": weight, |
| "matched_patterns": ", ".join(matched_patterns), |
| "description": description, |
| } |
| ) |
|
|
| inferred_release = None |
| confidence = 0.0 |
| if scores: |
| sorted_scores = sorted( |
| scores.items(), |
| key=lambda item: (item[1], _release_sort_key(item[0])), |
| reverse=True, |
| ) |
| inferred_release = sorted_scores[0][0] |
| total = sum(scores.values()) |
| confidence = float(sorted_scores[0][1] / total) if total else 0.0 |
|
|
| final_release = explicit_release or inferred_release or "unknown" |
| if explicit_release: |
| confidence = 1.0 |
|
|
| return { |
| "explicit_release": explicit_release, |
| "inferred_release": inferred_release, |
| "final_release": final_release, |
| "confidence": round(confidence, 4), |
| "triggered_rules": triggered, |
| } |
|
|
|
|
| def _load_volte_rules(volte_rules_path: str | None = None) -> dict[str, Any]: |
| default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_volte_rules.json" |
| target = Path(volte_rules_path) if volte_rules_path else default_path |
| data = _load_json_file(target) |
| return { |
| "thresholds": data.get("thresholds", {"supported": 70, "likely": 40}), |
| "missing_critical_penalty": int(data.get("missing_critical_penalty", 20)), |
| "hard_negative_penalty": int(data.get("hard_negative_penalty", 40)), |
| "hard_negative_patterns": data.get( |
| "hard_negative_patterns", |
| [r"voiceoverps.*notsupported", r"voice-over-ps.*notsupported", r"srvcc.*notsupported"], |
| ), |
| "rules": data.get("rules", []), |
| "critical_signals": data.get( |
| "critical_signals", |
| [ |
| {"name": "ims", "patterns": [r"\bims\b"]}, |
| {"name": "voice_over_ps", "patterns": [r"voiceoverps", r"voice-over-ps"]}, |
| {"name": "srvcc", "patterns": [r"\bsrvcc\b"]}, |
| ], |
| ), |
| } |
|
|
|
|
| def assess_volte_support( |
| extracted: dict[str, Any], volte_rules_path: str | None = None |
| ) -> dict[str, Any]: |
| rules = _load_volte_rules(volte_rules_path=volte_rules_path) |
| thresholds = rules["thresholds"] |
| evidence = [str(item).lower() for item in extracted.get("evidence", []) if str(item).strip()] |
|
|
| matched_rules: list[dict[str, Any]] = [] |
| score = 0.0 |
| explicit_positive_found = False |
|
|
| for rule in rules["rules"]: |
| patterns = [str(pattern) for pattern in rule.get("patterns", []) if str(pattern).strip()] |
| if not patterns: |
| continue |
| matched = [] |
| for pattern in patterns: |
| if any(re.search(pattern, item, re.I) for item in evidence): |
| matched.append(pattern) |
| if not matched: |
| continue |
|
|
| category = str(rule.get("category", "implicit_positive")).strip().lower() |
| weight = float(rule.get("weight", 0)) |
| score += weight |
| if category == "explicit_positive": |
| explicit_positive_found = True |
| matched_rules.append( |
| { |
| "rule_id": str(rule.get("rule_id", "rule")), |
| "category": category, |
| "weight": weight, |
| "matched_patterns": ", ".join(matched), |
| "description": str(rule.get("description", "")).strip(), |
| } |
| ) |
|
|
| missing_signals: list[str] = [] |
| for signal in rules["critical_signals"]: |
| name = str(signal.get("name", "signal")) |
| patterns = [str(pattern) for pattern in signal.get("patterns", []) if str(pattern).strip()] |
| if not any( |
| re.search(pattern, item, re.I) for pattern in patterns for item in evidence |
| ): |
| missing_signals.append(name) |
|
|
| hard_negative_patterns = [ |
| str(pattern) for pattern in rules.get("hard_negative_patterns", []) if str(pattern).strip() |
| ] |
| hard_negative_matches: list[str] = [] |
| for pattern in hard_negative_patterns: |
| if any(re.search(pattern, item, re.I) for item in evidence): |
| hard_negative_matches.append(pattern) |
|
|
| if not explicit_positive_found: |
| score -= float(rules["missing_critical_penalty"]) |
| if hard_negative_matches: |
| score -= float(rules.get("hard_negative_penalty", 40)) |
|
|
| score = max(0.0, min(100.0, score)) |
| confidence = round(score / 100.0, 4) |
|
|
| supported_threshold = float(thresholds.get("supported", 70)) |
| likely_threshold = float(thresholds.get("likely", 40)) |
|
|
| if explicit_positive_found and score >= supported_threshold: |
| status = "Supported" |
| elif score >= likely_threshold: |
| status = "Likely" |
| elif score > 0: |
| status = "Unknown" |
| else: |
| status = "Not indicated" |
|
|
| |
| if hard_negative_matches and status == "Supported": |
| status = "Unknown" |
|
|
| explicit_evidence = [ |
| rule["rule_id"] for rule in matched_rules if rule["category"] == "explicit_positive" |
| ] |
| implicit_evidence = [ |
| rule["rule_id"] for rule in matched_rules if rule["category"] == "implicit_positive" |
| ] |
|
|
| notes: list[str] = [] |
| if not explicit_positive_found: |
| notes.append("No explicit IMS/VoPS/SRVCC indicator found in this capability text.") |
| if missing_signals: |
| notes.append(f"Missing critical signals: {', '.join(missing_signals)}.") |
| if hard_negative_matches: |
| notes.append("Explicit negative VoLTE indicator found (notsupported). Supported verdict is blocked.") |
|
|
| return { |
| "volte_status": status, |
| "volte_score": round(score, 2), |
| "confidence": confidence, |
| "explicit_evidence": ", ".join(explicit_evidence), |
| "implicit_evidence": ", ".join(implicit_evidence), |
| "missing_signals": ", ".join(missing_signals), |
| "notes": " ".join(notes), |
| "matched_rules": matched_rules, |
| } |
|
|
|
|
| def _build_volte_assessment_df( |
| ue_id: str, |
| source_file: str, |
| evidence: list[str], |
| volte_rules_path: str | None = None, |
| enabled: bool = True, |
| ) -> pd.DataFrame: |
| columns = [ |
| "ue_id", |
| "source_file", |
| "volte_status", |
| "volte_score", |
| "confidence", |
| "explicit_evidence", |
| "implicit_evidence", |
| "missing_signals", |
| "notes", |
| ] |
| if not enabled: |
| return pd.DataFrame(columns=columns) |
|
|
| result = assess_volte_support({"evidence": evidence}, volte_rules_path=volte_rules_path) |
| return pd.DataFrame( |
| [ |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "volte_status": result.get("volte_status"), |
| "volte_score": result.get("volte_score"), |
| "confidence": result.get("confidence"), |
| "explicit_evidence": result.get("explicit_evidence"), |
| "implicit_evidence": result.get("implicit_evidence"), |
| "missing_signals": result.get("missing_signals"), |
| "notes": result.get("notes"), |
| } |
| ] |
| ) |
|
|
|
|
| def _normalize_combo_for_compare(combo: str) -> str: |
| text = str(combo).strip().upper() |
| if not text: |
| return "" |
| text = re.sub(r"^CA[_:\-\s]*", "", text) |
| text = text.replace(" ", "").replace("_", "") |
| text = text.replace("-", "+") |
| text = re.sub(r"\++", "+", text).strip("+") |
| if not text: |
| return "" |
| components = [part for part in text.split("+") if part] |
| if not components: |
| return "" |
| return "+".join(sorted(components)) |
|
|
|
|
| def _build_benchmark_ca_diff( |
| ue_id: str, |
| source_file: str, |
| ca_assessment_df: pd.DataFrame, |
| benchmark_combos: list[str] | None = None, |
| ) -> pd.DataFrame: |
| columns = ["ue_id", "source_file", "combo_norm", "status"] |
| if not benchmark_combos: |
| return pd.DataFrame(columns=columns) |
|
|
| parser_set = { |
| _normalize_combo_for_compare(combo) |
| for combo in ca_assessment_df.get("combo_norm", pd.Series(dtype=str)).tolist() |
| if _normalize_combo_for_compare(combo) |
| } |
| benchmark_set = {_normalize_combo_for_compare(combo) for combo in benchmark_combos} |
| benchmark_set = {combo for combo in benchmark_set if combo} |
|
|
| rows: list[dict[str, Any]] = [] |
| for combo in sorted(parser_set & benchmark_set): |
| rows.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "combo_norm": combo, |
| "status": "exact_match", |
| } |
| ) |
| for combo in sorted(parser_set - benchmark_set): |
| rows.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "combo_norm": combo, |
| "status": "missing_in_benchmark", |
| } |
| ) |
| for combo in sorted(benchmark_set - parser_set): |
| rows.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "combo_norm": combo, |
| "status": "missing_in_log", |
| } |
| ) |
| if not rows: |
| return pd.DataFrame(columns=columns) |
| return pd.DataFrame(rows, columns=columns) |
|
|
|
|
| def _build_release_inference_df( |
| ue_id: str, source_file: str, release_result: dict[str, Any] |
| ) -> pd.DataFrame: |
| triggered = release_result.get("triggered_rules", []) |
| if not triggered: |
| return pd.DataFrame( |
| [ |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "explicit_release": release_result.get("explicit_release"), |
| "inferred_release": release_result.get("inferred_release"), |
| "final_release": release_result.get("final_release"), |
| "confidence": release_result.get("confidence"), |
| "rule_id": None, |
| "rule_release": None, |
| "rule_weight": None, |
| "matched_patterns": None, |
| "rule_description": None, |
| } |
| ] |
| ) |
|
|
| rows: list[dict[str, Any]] = [] |
| for rule in triggered: |
| rows.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_file, |
| "explicit_release": release_result.get("explicit_release"), |
| "inferred_release": release_result.get("inferred_release"), |
| "final_release": release_result.get("final_release"), |
| "confidence": release_result.get("confidence"), |
| "rule_id": rule.get("rule_id"), |
| "rule_release": rule.get("release"), |
| "rule_weight": rule.get("weight"), |
| "matched_patterns": rule.get("matched_patterns"), |
| "rule_description": rule.get("description"), |
| } |
| ) |
| return pd.DataFrame(rows) |
|
|
|
|
| def _parse_single_message( |
| content: str, |
| source_name: str, |
| message_index: int, |
| message_count: int, |
| rules_path: str | None = None, |
| ca_rules_path: str | None = None, |
| volte_rules_path: str | None = None, |
| enable_volte_assessment: bool = True, |
| benchmark_combos: list[str] | None = None, |
| ) -> dict[str, pd.DataFrame]: |
| sheets = _empty_sheets() |
| warnings: list[dict[str, Any]] = [] |
|
|
| tokens = _tokenize(content) |
| ue_id = _build_ue_id( |
| source_name, content, message_index=message_index, message_count=message_count |
| ) |
|
|
| if not tokens: |
| warnings.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_name, |
| "severity": "error", |
| "message": "No tokens found in input text.", |
| } |
| ) |
| sheets["Parse_Warnings"] = pd.DataFrame(warnings) |
| return sheets |
|
|
| open_count = sum(1 for token in tokens if token == "{") |
| close_count = sum(1 for token in tokens if token == "}") |
| if open_count != close_count: |
| warnings.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_name, |
| "severity": "warning", |
| "message": f"Brace count mismatch: open={open_count}, close={close_count}. Parsing will continue.", |
| } |
| ) |
|
|
| nodes, _ = _parse_entries(tokens) |
| kv_records, leaf_records, normalized_paths = _flatten_tree( |
| nodes, ue_id=ue_id, source_file=source_name |
| ) |
| rats = _detect_rats(normalized_paths, kv_records) |
| explicit_release = _extract_release_explicit(kv_records) |
|
|
| categories = _extract_categories(kv_records) |
| bands_lte = _extract_bands_lte(kv_records) |
| bands_utra = _extract_bands_utra(kv_records) |
| bands_geran = _extract_bands_geran(kv_records) |
| bands_nr = _extract_bands_nr(kv_records) |
| ca_df = _extract_ca_combinations(nodes, ue_id=ue_id, source_file=source_name) |
| ca_assessment_df = _build_ca_assessment_df(ca_df, ca_rules_path=ca_rules_path) |
| endc_df = _extract_endc_mrdc( |
| kv_records, ca_assessment_df=ca_assessment_df, ue_id=ue_id, source_file=source_name |
| ) |
| features_df = _extract_features(kv_records) |
|
|
| evidence = ( |
| normalized_paths |
| + [row["leaf_value"] for row in leaf_records] |
| + [f"{row['key_normalized']}={str(row['value']).lower()}" for row in kv_records] |
| ) |
| release_result = infer_release( |
| {"explicit_release": explicit_release, "evidence": evidence}, |
| rules_path=rules_path, |
| ) |
| release_df = _build_release_inference_df( |
| ue_id=ue_id, source_file=source_name, release_result=release_result |
| ) |
| volte_df = _build_volte_assessment_df( |
| ue_id=ue_id, |
| source_file=source_name, |
| evidence=evidence, |
| volte_rules_path=volte_rules_path, |
| enabled=enable_volte_assessment, |
| ) |
| benchmark_df = _build_benchmark_ca_diff( |
| ue_id=ue_id, |
| source_file=source_name, |
| ca_assessment_df=ca_assessment_df, |
| benchmark_combos=benchmark_combos, |
| ) |
|
|
| if bands_lte.empty: |
| warnings.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_name, |
| "severity": "warning", |
| "message": "No LTE bands detected in this message.", |
| } |
| ) |
| if not explicit_release and not release_result.get("inferred_release"): |
| warnings.append( |
| { |
| "ue_id": ue_id, |
| "source_file": source_name, |
| "severity": "warning", |
| "message": "Release could not be inferred from current rules.", |
| } |
| ) |
|
|
| volte_status = None |
| volte_score = None |
| if not volte_df.empty: |
| volte_status = volte_df.iloc[0]["volte_status"] |
| volte_score = volte_df.iloc[0]["volte_score"] |
|
|
| benchmark_mismatch_count = int( |
| len( |
| benchmark_df[ |
| benchmark_df["status"].isin(["missing_in_benchmark", "missing_in_log"]) |
| ].index |
| ) |
| ) if not benchmark_df.empty else 0 |
|
|
| normalized_combos = ( |
| ca_assessment_df["combo_norm"].astype(str).str.strip() |
| if not ca_assessment_df.empty |
| else pd.Series(dtype=str) |
| ) |
| normalized_combos = normalized_combos[normalized_combos != ""] |
|
|
| summary_row = { |
| "ue_id": ue_id, |
| "source_file": source_name, |
| "message_index": message_index, |
| "rats_detected": ",".join(rats), |
| "release_explicit": release_result.get("explicit_release"), |
| "release_inferred": release_result.get("inferred_release"), |
| "release_final": release_result.get("final_release"), |
| "release_confidence": release_result.get("confidence"), |
| "volte_status": volte_status, |
| "volte_score": volte_score, |
| "lte_band_count": int(bands_lte["band_lte"].nunique()) if not bands_lte.empty else 0, |
| "utra_band_count": int(bands_utra["band_utra"].nunique()) if not bands_utra.empty else 0, |
| "geran_band_count": int(bands_geran["band_geran"].nunique()) if not bands_geran.empty else 0, |
| "nr_band_count": int(bands_nr["band_nr"].nunique()) if not bands_nr.empty else 0, |
| "ca_combination_count": int(len(ca_df.index)), |
| "ca_combo_normalized_count": int(normalized_combos.nunique()) if not ca_assessment_df.empty else 0, |
| "endc_mrdc_item_count": int(len(endc_df.index)), |
| "feature_count": int(len(features_df.index)), |
| "benchmark_mismatch_count": benchmark_mismatch_count, |
| "warning_count": int(len(warnings)), |
| "ue_categories": ", ".join(f"{key}={value}" for key, value in categories.items()), |
| "parser_profile": "decoded_tree_txt", |
| } |
|
|
| sheets["Summary"] = pd.DataFrame([summary_row]) |
| sheets["Bands_LTE"] = bands_lte |
| sheets["Bands_UTRA"] = bands_utra |
| sheets["Bands_GERAN"] = bands_geran |
| sheets["Bands_NR"] = bands_nr |
| sheets["CA_Combinations"] = ca_df |
| sheets["CA_Assessment"] = ca_assessment_df |
| sheets["ENDC_MRDC"] = endc_df |
| sheets["Features"] = features_df |
| sheets["VoLTE_Assessment"] = volte_df |
| sheets["Release_Inference"] = release_df |
| sheets["Benchmark_CA_Diff"] = benchmark_df |
| sheets["Parse_Warnings"] = pd.DataFrame(warnings) |
| return sheets |
|
|
|
|
| def _merge_sheet_dicts(sheet_sets: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]: |
| merged = _empty_sheets() |
| for sheet_name in SHEET_ORDER: |
| dfs = [ |
| sheets[sheet_name] |
| for sheets in sheet_sets |
| if sheet_name in sheets and not sheets[sheet_name].empty |
| ] |
| merged[sheet_name] = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() |
| return merged |
|
|
|
|
| def parse_uecap_text( |
| content: str, |
| source_name: str, |
| rules_path: str | None = None, |
| ca_rules_path: str | None = None, |
| volte_rules_path: str | None = None, |
| enable_volte_assessment: bool = True, |
| benchmark_combos: list[str] | None = None, |
| ) -> dict[str, pd.DataFrame]: |
| messages = _split_messages(content) |
| parsed_messages: list[dict[str, pd.DataFrame]] = [] |
| message_count = len(messages) |
|
|
| for message_index, message_text in messages: |
| parsed_messages.append( |
| _parse_single_message( |
| content=message_text, |
| source_name=source_name, |
| message_index=message_index, |
| message_count=message_count, |
| rules_path=rules_path, |
| ca_rules_path=ca_rules_path, |
| volte_rules_path=volte_rules_path, |
| enable_volte_assessment=enable_volte_assessment, |
| benchmark_combos=benchmark_combos, |
| ) |
| ) |
|
|
| return _merge_sheet_dicts(parsed_messages) |
|
|
|
|
| def parse_uecap_files( |
| files: list[tuple[str, bytes]], |
| rules_path: str | None = None, |
| ca_rules_path: str | None = None, |
| volte_rules_path: str | None = None, |
| enable_volte_assessment: bool = True, |
| benchmark_combos: list[str] | None = None, |
| ) -> dict[str, pd.DataFrame]: |
| parsed_files: list[dict[str, pd.DataFrame]] = [] |
| for source_name, content in files: |
| text = _safe_decode(content) |
| parsed_files.append( |
| parse_uecap_text( |
| text, |
| source_name=source_name, |
| rules_path=rules_path, |
| ca_rules_path=ca_rules_path, |
| volte_rules_path=volte_rules_path, |
| enable_volte_assessment=enable_volte_assessment, |
| benchmark_combos=benchmark_combos, |
| ) |
| ) |
| return _merge_sheet_dicts(parsed_files) |
|
|
|
|
| def to_excel_bytes(sheets: dict[str, pd.DataFrame]) -> bytes: |
| buffer = io.BytesIO() |
| with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer: |
| for sheet_name in SHEET_ORDER: |
| df = sheets.get(sheet_name, pd.DataFrame()) |
| if df is None or df.empty: |
| pd.DataFrame({"info": ["No data extracted for this sheet."]}).to_excel( |
| writer, index=False, sheet_name=sheet_name[:31] |
| ) |
| else: |
| df.to_excel(writer, index=False, sheet_name=sheet_name[:31]) |
| return buffer.getvalue() |
|
|