db_query / queries /process_ue_capability.py
DavMelchi's picture
feat: add 3gpp ue capability parser with volte assessment
7b6d659
import hashlib
import io
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import pandas as pd
SHEET_ORDER = [
"Summary",
"Bands_LTE",
"Bands_UTRA",
"Bands_GERAN",
"Bands_NR",
"CA_Combinations",
"CA_Assessment",
"ENDC_MRDC",
"Features",
"VoLTE_Assessment",
"Release_Inference",
"Benchmark_CA_Diff",
"Parse_Warnings",
]
_MESSAGE_START_RE = re.compile(r"^\s*\d{2}:\d{2}:\d{2}\.\d{3}.*RRC Signaling", re.I)
_BOOL_VALUES = {"true", "false", "supported", "notsupported", "present", "absent"}
_BITSTRING_RE = re.compile(r"^[01]{8,}$")
@dataclass
class Node:
name: str
children: list["Node"]
def _empty_sheets() -> dict[str, pd.DataFrame]:
return {name: pd.DataFrame() for name in SHEET_ORDER}
def _normalize_label(label: str) -> str:
text = re.sub(r"\s+", " ", label.strip())
text = re.sub(r"\s+#\d+$", "", text)
return text.lower()
def _release_sort_key(release: str) -> int:
match = re.search(r"(\d+)", str(release))
return int(match.group(1)) if match else -1
def _safe_decode(content: bytes) -> str:
for encoding in ("utf-8", "latin-1", "cp1252"):
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
return content.decode("utf-8", errors="ignore")
def _build_ue_id(
source_name: str, content: str, message_index: int, message_count: int
) -> str:
base = Path(source_name).stem or "uecap"
normalized = re.sub(r"\s+", " ", content).strip().encode("utf-8", errors="ignore")
digest = hashlib.sha1(normalized).hexdigest()[:10]
if message_count > 1:
return f"{base}_m{message_index}_{digest}"
return f"{base}_{digest}"
def _split_messages(content: str) -> list[tuple[int, str]]:
lines = content.splitlines()
indices = [idx for idx, line in enumerate(lines) if _MESSAGE_START_RE.search(line)]
if not indices:
return [(1, content)]
indices.append(len(lines))
parts: list[tuple[int, str]] = []
for i in range(len(indices) - 1):
start, end = indices[i], indices[i + 1]
chunk = "\n".join(lines[start:end]).strip()
if chunk:
parts.append((i + 1, chunk))
return parts or [(1, content)]
def _tokenize(content: str) -> list[str]:
tokens: list[str] = []
for raw_line in content.splitlines():
line = raw_line.strip()
if not line:
continue
parts = re.split(r"(\{|\})", line)
for part in parts:
part = part.strip()
if part:
tokens.append(part)
return tokens
def _parse_entries(tokens: list[str], start: int = 0) -> tuple[list[Node], int]:
entries: list[Node] = []
i = start
while i < len(tokens):
token = tokens[i]
if token == "}":
return entries, i + 1
if token == "{":
nested, i = _parse_entries(tokens, i + 1)
entries.extend(nested)
continue
label = token
i += 1
children: list[Node] = []
if i < len(tokens) and tokens[i] == "{":
children, i = _parse_entries(tokens, i + 1)
entries.append(Node(name=label, children=children))
return entries, i
def _flatten_tree(
nodes: list[Node], ue_id: str, source_file: str
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[str]]:
kv_records: list[dict[str, Any]] = []
leaf_records: list[dict[str, Any]] = []
paths: list[str] = []
def walk(node: Node, ancestors: list[str]) -> None:
current_path = ancestors + [node.name]
normalized_path = ".".join(_normalize_label(item) for item in current_path)
paths.append(normalized_path)
if not node.children:
leaf_records.append(
{
"ue_id": ue_id,
"source_file": source_file,
"path": ".".join(current_path),
"path_normalized": normalized_path,
"leaf_value": node.name.strip(),
}
)
return
if len(node.children) == 1 and not node.children[0].children:
value = node.children[0].name.strip()
kv_records.append(
{
"ue_id": ue_id,
"source_file": source_file,
"path": ".".join(current_path),
"path_normalized": normalized_path,
"key": node.name.strip(),
"key_normalized": _normalize_label(node.name),
"value": value,
}
)
for child in node.children:
walk(child, current_path)
for node in nodes:
walk(node, [])
return kv_records, leaf_records, paths
def _collect_subtree_kv(node: Node) -> list[dict[str, str]]:
records: list[dict[str, str]] = []
def walk(current: Node, ancestors: list[str]) -> None:
current_path = ancestors + [current.name]
if len(current.children) == 1 and not current.children[0].children:
records.append(
{
"path": ".".join(current_path),
"key": current.name.strip(),
"key_normalized": _normalize_label(current.name),
"value": current.children[0].name.strip(),
}
)
for child in current.children:
walk(child, current_path)
walk(node, [])
return records
def _detect_rats(paths: list[str], kv_records: list[dict[str, Any]]) -> list[str]:
evidence = " ".join(
paths + [f"{rec['key_normalized']}={str(rec['value']).lower()}" for rec in kv_records]
)
rats: list[str] = []
if "eutra" in evidence:
rats.append("LTE")
if "utra" in evidence or "wcdma" in evidence:
rats.append("WCDMA")
if "geran" in evidence or re.search(r"\bgsm\d*", evidence):
rats.append("GSM")
if any(
marker in evidence
for marker in [
"supportedbandlistnr",
"featureset",
"mrdc",
"endc",
"nr-rat",
"bandnr",
" rat-type.nr",
]
):
rats.append("NR")
return rats
def _extract_release_explicit(kv_records: list[dict[str, Any]]) -> str | None:
for record in kv_records:
if record["key_normalized"] == "accessstratumrelease":
value = str(record["value"]).strip().lower()
if value:
return value
return None
def _extract_bands_lte(kv_records: list[dict[str, Any]]) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for record in kv_records:
key = record["key_normalized"]
if "bandeutra" not in key:
continue
value = str(record["value"]).strip()
if not re.fullmatch(r"\d+", value):
continue
rows.append(
{
"ue_id": record["ue_id"],
"source_file": record["source_file"],
"band_lte": int(value),
"raw_key": record["key"],
"path": record["path"],
}
)
if not rows:
return pd.DataFrame(
columns=["ue_id", "source_file", "band_lte", "raw_key", "path"]
)
return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_lte", "raw_key"])
def _extract_bands_utra(kv_records: list[dict[str, Any]]) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for record in kv_records:
key = record["key_normalized"]
value = str(record["value"]).strip()
if "supportedbandutra-fdd" not in key and "bandutra" not in key:
continue
if not value:
continue
rows.append(
{
"ue_id": record["ue_id"],
"source_file": record["source_file"],
"band_utra": value,
"raw_key": record["key"],
"path": record["path"],
}
)
if not rows:
return pd.DataFrame(
columns=["ue_id", "source_file", "band_utra", "raw_key", "path"]
)
return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_utra", "raw_key"])
def _extract_bands_geran(kv_records: list[dict[str, Any]]) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for record in kv_records:
key = record["key_normalized"]
value = str(record["value"]).strip()
if "supportedbandgeran" not in key and "bandgeran" not in key:
continue
if not value:
continue
rows.append(
{
"ue_id": record["ue_id"],
"source_file": record["source_file"],
"band_geran": value,
"raw_key": record["key"],
"path": record["path"],
}
)
if not rows:
return pd.DataFrame(
columns=["ue_id", "source_file", "band_geran", "raw_key", "path"]
)
return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_geran", "raw_key"])
def _extract_bands_nr(kv_records: list[dict[str, Any]]) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for record in kv_records:
key = record["key_normalized"]
value = str(record["value"]).strip()
if (
"bandnr" not in key
and "supportedbandnr" not in key
and "supportedbandlistnr" not in key
):
continue
rows.append(
{
"ue_id": record["ue_id"],
"source_file": record["source_file"],
"band_nr": value,
"raw_key": record["key"],
"path": record["path"],
}
)
if not rows:
return pd.DataFrame(
columns=["ue_id", "source_file", "band_nr", "raw_key", "path"]
)
return pd.DataFrame(rows).drop_duplicates(subset=["ue_id", "band_nr", "raw_key"])
def _extract_component_from_band_parameters(node: Node) -> dict[str, Any]:
kv = _collect_subtree_kv(node)
band_lte = next(
(
int(record["value"])
for record in kv
if record["key_normalized"].startswith("bandeutra")
and re.fullmatch(r"\d+", record["value"])
),
None,
)
band_nr = next(
(
str(record["value"]).strip()
for record in kv
if "bandnr" in record["key_normalized"] and str(record["value"]).strip()
),
None,
)
ul_class = next(
(
str(record["value"]).strip().lower()
for record in kv
if "ca-bandwidthclassul" in record["key_normalized"]
),
None,
)
dl_class = next(
(
str(record["value"]).strip().lower()
for record in kv
if "ca-bandwidthclassdl" in record["key_normalized"]
),
None,
)
return {
"component_id": node.name,
"band_lte": band_lte,
"band_nr": band_nr,
"ul_class": ul_class,
"dl_class": dl_class,
}
def _extract_ca_combinations(
nodes: list[Node], ue_id: str, source_file: str
) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
def walk(node: Node, ancestors: list[Node]) -> None:
key = _normalize_label(node.name)
if key.startswith("bandcombinationparameters"):
ancestor_names = [_normalize_label(item.name) for item in ancestors]
combo_scope = next(
(
ancestors[idx].name
for idx in range(len(ancestors) - 1, -1, -1)
if ancestor_names[idx].startswith("supportedbandcombination")
),
"",
)
components: list[dict[str, Any]] = []
for child in node.children:
if _normalize_label(child.name).startswith("bandparameters"):
components.append(_extract_component_from_band_parameters(child))
bands_lte = sorted(
{comp["band_lte"] for comp in components if comp.get("band_lte") is not None}
)
bands_nr = sorted(
{str(comp["band_nr"]) for comp in components if comp.get("band_nr")}
)
ul_classes = sorted(
{
str(comp["ul_class"]).lower()
for comp in components
if str(comp.get("ul_class", "")).strip()
}
)
dl_classes = sorted(
{
str(comp["dl_class"]).lower()
for comp in components
if str(comp.get("dl_class", "")).strip()
}
)
component_count = len(
[comp for comp in components if comp.get("band_lte") or comp.get("band_nr")]
)
rows.append(
{
"ue_id": ue_id,
"source_file": source_file,
"combination_id": node.name,
"combination_scope": combo_scope,
"lte_bands": ",".join(f"B{band}" for band in bands_lte),
"nr_bands": ",".join(bands_nr),
"ul_classes": ",".join(ul_classes),
"dl_classes": ",".join(dl_classes),
"component_count": component_count,
"components_json": json.dumps(components, ensure_ascii=False),
}
)
for child in node.children:
walk(child, ancestors + [node])
for root in nodes:
walk(root, [])
if not rows:
return pd.DataFrame(
columns=[
"ue_id",
"source_file",
"combination_id",
"combination_scope",
"lte_bands",
"nr_bands",
"ul_classes",
"dl_classes",
"component_count",
"components_json",
]
)
return pd.DataFrame(rows).drop_duplicates(
subset=[
"ue_id",
"combination_id",
"combination_scope",
"lte_bands",
"nr_bands",
"ul_classes",
"dl_classes",
]
)
def _load_json_file(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
return json.loads(path.read_text(encoding="utf-8"))
def _load_ca_rules(ca_rules_path: str | None = None) -> dict[str, Any]:
default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_ca_rules.json"
target = Path(ca_rules_path) if ca_rules_path else default_path
data = _load_json_file(target)
return {
"allowed_classes": [
str(c).lower() for c in data.get("allowed_classes", ["a", "b", "c", "d", "e", "f"])
],
"prefer_class_order": str(data.get("prefer_class_order", "dl_then_ul")).lower(),
}
def _format_nr_band(raw_band: str) -> str:
text = str(raw_band).strip().lower()
if not text:
return text
if text.startswith("n"):
return text.upper()
if re.fullmatch(r"\d+", text):
return f"N{text}"
return text.upper()
def _build_ca_assessment_df(ca_df: pd.DataFrame, ca_rules_path: str | None = None) -> pd.DataFrame:
columns = [
"ue_id",
"source_file",
"combination_id",
"combination_scope",
"combo_norm",
"component_count",
"dl_class_set",
"ul_class_set",
"combo_type",
"consistency_status",
"remarks",
]
if ca_df is None or ca_df.empty:
return pd.DataFrame(columns=columns)
rules = _load_ca_rules(ca_rules_path=ca_rules_path)
allowed_classes = set(rules["allowed_classes"])
prefer_dl = rules["prefer_class_order"] == "dl_then_ul"
rows: list[dict[str, Any]] = []
for _, combo in ca_df.iterrows():
components = json.loads(str(combo.get("components_json") or "[]"))
parts: list[str] = []
missing_class_count = 0
unknown_class_count = 0
has_lte = False
has_nr = False
dl_set: set[str] = set()
ul_set: set[str] = set()
for comp in components:
band_label = None
band_lte = comp.get("band_lte")
band_nr = comp.get("band_nr")
dl_class = str(comp.get("dl_class") or "").strip().lower()
ul_class = str(comp.get("ul_class") or "").strip().lower()
if band_lte is not None:
band_label = f"B{band_lte}"
has_lte = True
elif band_nr:
band_label = _format_nr_band(str(band_nr))
has_nr = True
if not band_label:
continue
dl_set.update({dl_class.upper()} if dl_class else set())
ul_set.update({ul_class.upper()} if ul_class else set())
selected_class = dl_class if prefer_dl else ul_class
if not selected_class:
selected_class = ul_class if prefer_dl else dl_class
class_suffix = ""
if selected_class:
if selected_class in allowed_classes:
class_suffix = selected_class.upper()
else:
unknown_class_count += 1
else:
missing_class_count += 1
parts.append(f"{band_label}{class_suffix}")
component_count = len(parts)
if component_count == 0:
status = "invalid"
combo_type = "Unknown"
elif has_lte and has_nr:
combo_type = "MR-DC/EN-DC candidate"
status = (
"valid"
if missing_class_count == 0 and unknown_class_count == 0
else "partially_valid"
)
elif has_lte and component_count > 1:
combo_type = "LTE CA"
status = (
"valid"
if missing_class_count == 0 and unknown_class_count == 0
else "partially_valid"
)
elif has_nr and component_count > 1:
combo_type = "NR CA"
status = (
"valid"
if missing_class_count == 0 and unknown_class_count == 0
else "partially_valid"
)
else:
combo_type = "Single/Unknown"
status = "partially_valid"
remarks_parts: list[str] = []
if missing_class_count:
remarks_parts.append(f"{missing_class_count} component(s) without bandwidth class.")
if unknown_class_count:
remarks_parts.append(f"{unknown_class_count} component(s) with unknown class token.")
if component_count == 0:
remarks_parts.append("No valid CA component extracted.")
rows.append(
{
"ue_id": combo.get("ue_id"),
"source_file": combo.get("source_file"),
"combination_id": combo.get("combination_id"),
"combination_scope": combo.get("combination_scope"),
"combo_norm": "+".join(parts),
"component_count": component_count,
"dl_class_set": ",".join(sorted(dl_set)),
"ul_class_set": ",".join(sorted(ul_set)),
"combo_type": combo_type,
"consistency_status": status,
"remarks": " ".join(remarks_parts),
}
)
return pd.DataFrame(rows, columns=columns)
def _extract_endc_mrdc(
kv_records: list[dict[str, Any]], ca_assessment_df: pd.DataFrame, ue_id: str, source_file: str
) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for record in kv_records:
path = record["path_normalized"]
if any(
marker in path
for marker in [
"mrdc",
"endc",
"featureset",
"supportedbandlistnr",
"supportedbandcombinationnr",
]
):
rows.append(
{
"ue_id": record["ue_id"],
"source_file": record["source_file"],
"item_type": "path_feature",
"item_key": record["key"],
"item_value": str(record["value"]),
"path": record["path"],
}
)
if ca_assessment_df is not None and not ca_assessment_df.empty:
for _, combo in ca_assessment_df.iterrows():
if str(combo.get("combo_type")) == "MR-DC/EN-DC candidate":
rows.append(
{
"ue_id": ue_id,
"source_file": source_file,
"item_type": "combo",
"item_key": str(combo.get("combination_id", "")),
"item_value": str(combo.get("combo_norm", "")),
"path": str(combo.get("combination_scope", "")),
}
)
if not rows:
return pd.DataFrame(
columns=["ue_id", "source_file", "item_type", "item_key", "item_value", "path"]
)
return pd.DataFrame(rows).drop_duplicates()
def _extract_features(kv_records: list[dict[str, Any]]) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
excluded_prefixes = (
"bandeutra",
"supportedbandeutra",
"supportedbandutra",
"supportedbandgeran",
"supportedbandnr",
"ca-bandwidthclass",
)
excluded_exact = {"accessstratumrelease"}
for record in kv_records:
key_norm = record["key_normalized"]
value = str(record["value"]).strip()
value_norm = value.lower()
if key_norm in excluded_exact or key_norm.startswith(excluded_prefixes):
continue
if not value:
continue
if value_norm in _BOOL_VALUES:
value_type = "flag"
elif _BITSTRING_RE.fullmatch(value):
value_type = "bitstring"
elif re.fullmatch(r"\d+", value):
value_type = "number"
else:
value_type = "text"
rows.append(
{
"ue_id": record["ue_id"],
"source_file": record["source_file"],
"feature_name": record["key"],
"feature_value": value,
"value_type": value_type,
"path": record["path"],
}
)
if not rows:
return pd.DataFrame(
columns=[
"ue_id",
"source_file",
"feature_name",
"feature_value",
"value_type",
"path",
]
)
return pd.DataFrame(rows).drop_duplicates()
def _extract_categories(kv_records: list[dict[str, Any]]) -> dict[str, str]:
categories: dict[str, str] = {}
for record in kv_records:
key_norm = record["key_normalized"]
if "ue-category" in key_norm:
categories[record["key"]] = str(record["value"])
return categories
def _load_release_rules(rules_path: str | None = None) -> list[dict[str, Any]]:
default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_release_rules.json"
target = Path(rules_path) if rules_path else default_path
data = _load_json_file(target)
return data.get("rules", [])
def infer_release(extracted: dict[str, Any], rules_path: str | None = None) -> dict[str, Any]:
explicit_release = extracted.get("explicit_release")
evidence_strings = [
str(item) for item in extracted.get("evidence", []) if str(item).strip()
]
rules = _load_release_rules(rules_path=rules_path)
triggered: list[dict[str, Any]] = []
scores: dict[str, float] = {}
for rule in rules:
release = str(rule.get("release", "")).strip().lower()
patterns = [str(pattern) for pattern in rule.get("patterns", []) if str(pattern).strip()]
weight = float(rule.get("weight", 1.0))
rule_id = str(rule.get("rule_id", "rule")).strip()
description = str(rule.get("description", "")).strip()
matched_patterns: list[str] = []
for pattern in patterns:
regex = re.compile(pattern, re.I)
if any(regex.search(item) for item in evidence_strings):
matched_patterns.append(pattern)
if not matched_patterns or not release:
continue
scores[release] = scores.get(release, 0.0) + weight
triggered.append(
{
"rule_id": rule_id,
"release": release,
"weight": weight,
"matched_patterns": ", ".join(matched_patterns),
"description": description,
}
)
inferred_release = None
confidence = 0.0
if scores:
sorted_scores = sorted(
scores.items(),
key=lambda item: (item[1], _release_sort_key(item[0])),
reverse=True,
)
inferred_release = sorted_scores[0][0]
total = sum(scores.values())
confidence = float(sorted_scores[0][1] / total) if total else 0.0
final_release = explicit_release or inferred_release or "unknown"
if explicit_release:
confidence = 1.0
return {
"explicit_release": explicit_release,
"inferred_release": inferred_release,
"final_release": final_release,
"confidence": round(confidence, 4),
"triggered_rules": triggered,
}
def _load_volte_rules(volte_rules_path: str | None = None) -> dict[str, Any]:
default_path = Path(__file__).resolve().parents[1] / "data" / "uecap_volte_rules.json"
target = Path(volte_rules_path) if volte_rules_path else default_path
data = _load_json_file(target)
return {
"thresholds": data.get("thresholds", {"supported": 70, "likely": 40}),
"missing_critical_penalty": int(data.get("missing_critical_penalty", 20)),
"hard_negative_penalty": int(data.get("hard_negative_penalty", 40)),
"hard_negative_patterns": data.get(
"hard_negative_patterns",
[r"voiceoverps.*notsupported", r"voice-over-ps.*notsupported", r"srvcc.*notsupported"],
),
"rules": data.get("rules", []),
"critical_signals": data.get(
"critical_signals",
[
{"name": "ims", "patterns": [r"\bims\b"]},
{"name": "voice_over_ps", "patterns": [r"voiceoverps", r"voice-over-ps"]},
{"name": "srvcc", "patterns": [r"\bsrvcc\b"]},
],
),
}
def assess_volte_support(
extracted: dict[str, Any], volte_rules_path: str | None = None
) -> dict[str, Any]:
rules = _load_volte_rules(volte_rules_path=volte_rules_path)
thresholds = rules["thresholds"]
evidence = [str(item).lower() for item in extracted.get("evidence", []) if str(item).strip()]
matched_rules: list[dict[str, Any]] = []
score = 0.0
explicit_positive_found = False
for rule in rules["rules"]:
patterns = [str(pattern) for pattern in rule.get("patterns", []) if str(pattern).strip()]
if not patterns:
continue
matched = []
for pattern in patterns:
if any(re.search(pattern, item, re.I) for item in evidence):
matched.append(pattern)
if not matched:
continue
category = str(rule.get("category", "implicit_positive")).strip().lower()
weight = float(rule.get("weight", 0))
score += weight
if category == "explicit_positive":
explicit_positive_found = True
matched_rules.append(
{
"rule_id": str(rule.get("rule_id", "rule")),
"category": category,
"weight": weight,
"matched_patterns": ", ".join(matched),
"description": str(rule.get("description", "")).strip(),
}
)
missing_signals: list[str] = []
for signal in rules["critical_signals"]:
name = str(signal.get("name", "signal"))
patterns = [str(pattern) for pattern in signal.get("patterns", []) if str(pattern).strip()]
if not any(
re.search(pattern, item, re.I) for pattern in patterns for item in evidence
):
missing_signals.append(name)
hard_negative_patterns = [
str(pattern) for pattern in rules.get("hard_negative_patterns", []) if str(pattern).strip()
]
hard_negative_matches: list[str] = []
for pattern in hard_negative_patterns:
if any(re.search(pattern, item, re.I) for item in evidence):
hard_negative_matches.append(pattern)
if not explicit_positive_found:
score -= float(rules["missing_critical_penalty"])
if hard_negative_matches:
score -= float(rules.get("hard_negative_penalty", 40))
score = max(0.0, min(100.0, score))
confidence = round(score / 100.0, 4)
supported_threshold = float(thresholds.get("supported", 70))
likely_threshold = float(thresholds.get("likely", 40))
if explicit_positive_found and score >= supported_threshold:
status = "Supported"
elif score >= likely_threshold:
status = "Likely"
elif score > 0:
status = "Unknown"
else:
status = "Not indicated"
# Strict mode: explicit negative indicator blocks "Supported".
if hard_negative_matches and status == "Supported":
status = "Unknown"
explicit_evidence = [
rule["rule_id"] for rule in matched_rules if rule["category"] == "explicit_positive"
]
implicit_evidence = [
rule["rule_id"] for rule in matched_rules if rule["category"] == "implicit_positive"
]
notes: list[str] = []
if not explicit_positive_found:
notes.append("No explicit IMS/VoPS/SRVCC indicator found in this capability text.")
if missing_signals:
notes.append(f"Missing critical signals: {', '.join(missing_signals)}.")
if hard_negative_matches:
notes.append("Explicit negative VoLTE indicator found (notsupported). Supported verdict is blocked.")
return {
"volte_status": status,
"volte_score": round(score, 2),
"confidence": confidence,
"explicit_evidence": ", ".join(explicit_evidence),
"implicit_evidence": ", ".join(implicit_evidence),
"missing_signals": ", ".join(missing_signals),
"notes": " ".join(notes),
"matched_rules": matched_rules,
}
def _build_volte_assessment_df(
ue_id: str,
source_file: str,
evidence: list[str],
volte_rules_path: str | None = None,
enabled: bool = True,
) -> pd.DataFrame:
columns = [
"ue_id",
"source_file",
"volte_status",
"volte_score",
"confidence",
"explicit_evidence",
"implicit_evidence",
"missing_signals",
"notes",
]
if not enabled:
return pd.DataFrame(columns=columns)
result = assess_volte_support({"evidence": evidence}, volte_rules_path=volte_rules_path)
return pd.DataFrame(
[
{
"ue_id": ue_id,
"source_file": source_file,
"volte_status": result.get("volte_status"),
"volte_score": result.get("volte_score"),
"confidence": result.get("confidence"),
"explicit_evidence": result.get("explicit_evidence"),
"implicit_evidence": result.get("implicit_evidence"),
"missing_signals": result.get("missing_signals"),
"notes": result.get("notes"),
}
]
)
def _normalize_combo_for_compare(combo: str) -> str:
text = str(combo).strip().upper()
if not text:
return ""
text = re.sub(r"^CA[_:\-\s]*", "", text)
text = text.replace(" ", "").replace("_", "")
text = text.replace("-", "+")
text = re.sub(r"\++", "+", text).strip("+")
if not text:
return ""
components = [part for part in text.split("+") if part]
if not components:
return ""
return "+".join(sorted(components))
def _build_benchmark_ca_diff(
ue_id: str,
source_file: str,
ca_assessment_df: pd.DataFrame,
benchmark_combos: list[str] | None = None,
) -> pd.DataFrame:
columns = ["ue_id", "source_file", "combo_norm", "status"]
if not benchmark_combos:
return pd.DataFrame(columns=columns)
parser_set = {
_normalize_combo_for_compare(combo)
for combo in ca_assessment_df.get("combo_norm", pd.Series(dtype=str)).tolist()
if _normalize_combo_for_compare(combo)
}
benchmark_set = {_normalize_combo_for_compare(combo) for combo in benchmark_combos}
benchmark_set = {combo for combo in benchmark_set if combo}
rows: list[dict[str, Any]] = []
for combo in sorted(parser_set & benchmark_set):
rows.append(
{
"ue_id": ue_id,
"source_file": source_file,
"combo_norm": combo,
"status": "exact_match",
}
)
for combo in sorted(parser_set - benchmark_set):
rows.append(
{
"ue_id": ue_id,
"source_file": source_file,
"combo_norm": combo,
"status": "missing_in_benchmark",
}
)
for combo in sorted(benchmark_set - parser_set):
rows.append(
{
"ue_id": ue_id,
"source_file": source_file,
"combo_norm": combo,
"status": "missing_in_log",
}
)
if not rows:
return pd.DataFrame(columns=columns)
return pd.DataFrame(rows, columns=columns)
def _build_release_inference_df(
ue_id: str, source_file: str, release_result: dict[str, Any]
) -> pd.DataFrame:
triggered = release_result.get("triggered_rules", [])
if not triggered:
return pd.DataFrame(
[
{
"ue_id": ue_id,
"source_file": source_file,
"explicit_release": release_result.get("explicit_release"),
"inferred_release": release_result.get("inferred_release"),
"final_release": release_result.get("final_release"),
"confidence": release_result.get("confidence"),
"rule_id": None,
"rule_release": None,
"rule_weight": None,
"matched_patterns": None,
"rule_description": None,
}
]
)
rows: list[dict[str, Any]] = []
for rule in triggered:
rows.append(
{
"ue_id": ue_id,
"source_file": source_file,
"explicit_release": release_result.get("explicit_release"),
"inferred_release": release_result.get("inferred_release"),
"final_release": release_result.get("final_release"),
"confidence": release_result.get("confidence"),
"rule_id": rule.get("rule_id"),
"rule_release": rule.get("release"),
"rule_weight": rule.get("weight"),
"matched_patterns": rule.get("matched_patterns"),
"rule_description": rule.get("description"),
}
)
return pd.DataFrame(rows)
def _parse_single_message(
content: str,
source_name: str,
message_index: int,
message_count: int,
rules_path: str | None = None,
ca_rules_path: str | None = None,
volte_rules_path: str | None = None,
enable_volte_assessment: bool = True,
benchmark_combos: list[str] | None = None,
) -> dict[str, pd.DataFrame]:
sheets = _empty_sheets()
warnings: list[dict[str, Any]] = []
tokens = _tokenize(content)
ue_id = _build_ue_id(
source_name, content, message_index=message_index, message_count=message_count
)
if not tokens:
warnings.append(
{
"ue_id": ue_id,
"source_file": source_name,
"severity": "error",
"message": "No tokens found in input text.",
}
)
sheets["Parse_Warnings"] = pd.DataFrame(warnings)
return sheets
open_count = sum(1 for token in tokens if token == "{")
close_count = sum(1 for token in tokens if token == "}")
if open_count != close_count:
warnings.append(
{
"ue_id": ue_id,
"source_file": source_name,
"severity": "warning",
"message": f"Brace count mismatch: open={open_count}, close={close_count}. Parsing will continue.",
}
)
nodes, _ = _parse_entries(tokens)
kv_records, leaf_records, normalized_paths = _flatten_tree(
nodes, ue_id=ue_id, source_file=source_name
)
rats = _detect_rats(normalized_paths, kv_records)
explicit_release = _extract_release_explicit(kv_records)
categories = _extract_categories(kv_records)
bands_lte = _extract_bands_lte(kv_records)
bands_utra = _extract_bands_utra(kv_records)
bands_geran = _extract_bands_geran(kv_records)
bands_nr = _extract_bands_nr(kv_records)
ca_df = _extract_ca_combinations(nodes, ue_id=ue_id, source_file=source_name)
ca_assessment_df = _build_ca_assessment_df(ca_df, ca_rules_path=ca_rules_path)
endc_df = _extract_endc_mrdc(
kv_records, ca_assessment_df=ca_assessment_df, ue_id=ue_id, source_file=source_name
)
features_df = _extract_features(kv_records)
evidence = (
normalized_paths
+ [row["leaf_value"] for row in leaf_records]
+ [f"{row['key_normalized']}={str(row['value']).lower()}" for row in kv_records]
)
release_result = infer_release(
{"explicit_release": explicit_release, "evidence": evidence},
rules_path=rules_path,
)
release_df = _build_release_inference_df(
ue_id=ue_id, source_file=source_name, release_result=release_result
)
volte_df = _build_volte_assessment_df(
ue_id=ue_id,
source_file=source_name,
evidence=evidence,
volte_rules_path=volte_rules_path,
enabled=enable_volte_assessment,
)
benchmark_df = _build_benchmark_ca_diff(
ue_id=ue_id,
source_file=source_name,
ca_assessment_df=ca_assessment_df,
benchmark_combos=benchmark_combos,
)
if bands_lte.empty:
warnings.append(
{
"ue_id": ue_id,
"source_file": source_name,
"severity": "warning",
"message": "No LTE bands detected in this message.",
}
)
if not explicit_release and not release_result.get("inferred_release"):
warnings.append(
{
"ue_id": ue_id,
"source_file": source_name,
"severity": "warning",
"message": "Release could not be inferred from current rules.",
}
)
volte_status = None
volte_score = None
if not volte_df.empty:
volte_status = volte_df.iloc[0]["volte_status"]
volte_score = volte_df.iloc[0]["volte_score"]
benchmark_mismatch_count = int(
len(
benchmark_df[
benchmark_df["status"].isin(["missing_in_benchmark", "missing_in_log"])
].index
)
) if not benchmark_df.empty else 0
normalized_combos = (
ca_assessment_df["combo_norm"].astype(str).str.strip()
if not ca_assessment_df.empty
else pd.Series(dtype=str)
)
normalized_combos = normalized_combos[normalized_combos != ""]
summary_row = {
"ue_id": ue_id,
"source_file": source_name,
"message_index": message_index,
"rats_detected": ",".join(rats),
"release_explicit": release_result.get("explicit_release"),
"release_inferred": release_result.get("inferred_release"),
"release_final": release_result.get("final_release"),
"release_confidence": release_result.get("confidence"),
"volte_status": volte_status,
"volte_score": volte_score,
"lte_band_count": int(bands_lte["band_lte"].nunique()) if not bands_lte.empty else 0,
"utra_band_count": int(bands_utra["band_utra"].nunique()) if not bands_utra.empty else 0,
"geran_band_count": int(bands_geran["band_geran"].nunique()) if not bands_geran.empty else 0,
"nr_band_count": int(bands_nr["band_nr"].nunique()) if not bands_nr.empty else 0,
"ca_combination_count": int(len(ca_df.index)),
"ca_combo_normalized_count": int(normalized_combos.nunique()) if not ca_assessment_df.empty else 0,
"endc_mrdc_item_count": int(len(endc_df.index)),
"feature_count": int(len(features_df.index)),
"benchmark_mismatch_count": benchmark_mismatch_count,
"warning_count": int(len(warnings)),
"ue_categories": ", ".join(f"{key}={value}" for key, value in categories.items()),
"parser_profile": "decoded_tree_txt",
}
sheets["Summary"] = pd.DataFrame([summary_row])
sheets["Bands_LTE"] = bands_lte
sheets["Bands_UTRA"] = bands_utra
sheets["Bands_GERAN"] = bands_geran
sheets["Bands_NR"] = bands_nr
sheets["CA_Combinations"] = ca_df
sheets["CA_Assessment"] = ca_assessment_df
sheets["ENDC_MRDC"] = endc_df
sheets["Features"] = features_df
sheets["VoLTE_Assessment"] = volte_df
sheets["Release_Inference"] = release_df
sheets["Benchmark_CA_Diff"] = benchmark_df
sheets["Parse_Warnings"] = pd.DataFrame(warnings)
return sheets
def _merge_sheet_dicts(sheet_sets: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]:
merged = _empty_sheets()
for sheet_name in SHEET_ORDER:
dfs = [
sheets[sheet_name]
for sheets in sheet_sets
if sheet_name in sheets and not sheets[sheet_name].empty
]
merged[sheet_name] = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
return merged
def parse_uecap_text(
content: str,
source_name: str,
rules_path: str | None = None,
ca_rules_path: str | None = None,
volte_rules_path: str | None = None,
enable_volte_assessment: bool = True,
benchmark_combos: list[str] | None = None,
) -> dict[str, pd.DataFrame]:
messages = _split_messages(content)
parsed_messages: list[dict[str, pd.DataFrame]] = []
message_count = len(messages)
for message_index, message_text in messages:
parsed_messages.append(
_parse_single_message(
content=message_text,
source_name=source_name,
message_index=message_index,
message_count=message_count,
rules_path=rules_path,
ca_rules_path=ca_rules_path,
volte_rules_path=volte_rules_path,
enable_volte_assessment=enable_volte_assessment,
benchmark_combos=benchmark_combos,
)
)
return _merge_sheet_dicts(parsed_messages)
def parse_uecap_files(
files: list[tuple[str, bytes]],
rules_path: str | None = None,
ca_rules_path: str | None = None,
volte_rules_path: str | None = None,
enable_volte_assessment: bool = True,
benchmark_combos: list[str] | None = None,
) -> dict[str, pd.DataFrame]:
parsed_files: list[dict[str, pd.DataFrame]] = []
for source_name, content in files:
text = _safe_decode(content)
parsed_files.append(
parse_uecap_text(
text,
source_name=source_name,
rules_path=rules_path,
ca_rules_path=ca_rules_path,
volte_rules_path=volte_rules_path,
enable_volte_assessment=enable_volte_assessment,
benchmark_combos=benchmark_combos,
)
)
return _merge_sheet_dicts(parsed_files)
def to_excel_bytes(sheets: dict[str, pd.DataFrame]) -> bytes:
buffer = io.BytesIO()
with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer:
for sheet_name in SHEET_ORDER:
df = sheets.get(sheet_name, pd.DataFrame())
if df is None or df.empty:
pd.DataFrame({"info": ["No data extracted for this sheet."]}).to_excel(
writer, index=False, sheet_name=sheet_name[:31]
)
else:
df.to_excel(writer, index=False, sheet_name=sheet_name[:31])
return buffer.getvalue()