Instructions to use nraptisss/tmf921-intent-training with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- PEFT
How to use nraptisss/tmf921-intent-training with PEFT:
Task type is invalid.
- Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| """Re-score existing generation predictions with normalized JSON metrics. | |
| This does NOT regenerate model outputs. It reads eval/<split>/predictions.json and writes: | |
| - eval/<split>/normalized_metrics.json | |
| - eval/all_normalized_metrics.json | |
| Normalization removes volatile/generated fields (ids, hrefs, descriptions, timestamps, schema links), | |
| sorts object lists deterministically, and computes normalized exact match + normalized field F1. | |
| Use this alongside raw metrics: raw metrics show deterministic reproduction; normalized metrics better | |
| estimate structural/semantic config agreement. | |
| """ | |
| import argparse | |
| import json | |
| import re | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Any, Dict, List | |
| from tmf921_train.utils import aggregate_metrics, canonical_json, field_f1, parse_json, write_json | |
| VOLATILE_KEY_EXACT = { | |
| "id", "uuid", "href", "name", "description", "displayName", "label", | |
| "@schemaLocation", "schemaLocation", "version", "revision", | |
| "createdAt", "updatedAt", "modifiedAt", "lastModified", "timestamp", | |
| "creationDate", "lastUpdate", "requestedStartDate", "requestedCompletionDate", | |
| "startTime", "endTime", "validFrom", "validTo", "validFor", | |
| "correlationId", "requestId", "transactionId", "reservationId", | |
| } | |
| VOLATILE_KEY_FRAGMENTS = ["href", "schema", "timestamp", "uuid", "correlation", "transaction"] | |
| PROTECTED_KEYS = {"sst", "sd", "sliceType", "slice_type", "latency", "reliability", "dl", "ul", "maxUEs", "maxNumberOfUEs"} | |
| ID_LIKE_RE = re.compile(r"\b(?:intent|slice|policy|booking|cell|me|gnb|nsi|nssi|req|report|monitor|assurance)[-_][A-Za-z0-9._:-]+", re.IGNORECASE) | |
| HEX_RE = re.compile(r"\b[0-9a-f]{8,}\b", re.IGNORECASE) | |
| ISO_TIME_RE = re.compile(r"\b\d{4}-\d{2}-\d{2}[T ][0-9:.+-Z]*\b") | |
| def is_volatile_key(key: str) -> bool: | |
| if key in PROTECTED_KEYS: | |
| return False | |
| if key in VOLATILE_KEY_EXACT: | |
| return True | |
| lk = key.lower() | |
| if lk in {k.lower() for k in VOLATILE_KEY_EXACT}: | |
| return True | |
| return any(fragment in lk for fragment in VOLATILE_KEY_FRAGMENTS) | |
| def normalize_string(s: str) -> str: | |
| s = ISO_TIME_RE.sub("<TIME>", s) | |
| s = ID_LIKE_RE.sub("<ID>", s) | |
| s = HEX_RE.sub("<HEX>", s) | |
| return s.strip() | |
| def normalize_json(obj: Any) -> Any: | |
| if isinstance(obj, dict): | |
| out = {} | |
| for k, v in obj.items(): | |
| if is_volatile_key(str(k)): | |
| continue | |
| nv = normalize_json(v) | |
| if nv == {} or nv == [] or nv is None: | |
| continue | |
| out[str(k)] = nv | |
| return dict(sorted(out.items(), key=lambda kv: kv[0])) | |
| if isinstance(obj, list): | |
| items = [normalize_json(x) for x in obj] | |
| items = [x for x in items if x not in ({}, [], None)] | |
| return sorted(items, key=lambda x: canonical_json(x)) | |
| if isinstance(obj, str): | |
| return normalize_string(obj) | |
| return obj | |
| def keyset_f1(pred_obj: Any, gold_obj: Any) -> Dict[str, float]: | |
| from tmf921_train.utils import flatten_json | |
| pred_keys = set(flatten_json(pred_obj).keys()) | |
| gold_keys = set(flatten_json(gold_obj).keys()) | |
| tp = len(pred_keys & gold_keys) | |
| fp = len(pred_keys - gold_keys) | |
| fn = len(gold_keys - pred_keys) | |
| p = tp / (tp + fp) if tp + fp else 1.0 | |
| r = tp / (tp + fn) if tp + fn else 1.0 | |
| f = 2 * p * r / (p + r) if p + r else 0.0 | |
| return {"norm_key_precision": p, "norm_key_recall": r, "norm_key_f1": f, "norm_key_tp": tp, "norm_key_fp": fp, "norm_key_fn": fn} | |
| def score_row(row: Dict[str, Any]) -> Dict[str, Any]: | |
| pred_obj, pred_err = parse_json(row.get("prediction", "")) | |
| gold_obj, gold_err = parse_json(row.get("gold", "")) | |
| out = dict(row) | |
| out.pop("prediction", None) | |
| out.pop("gold", None) | |
| out["norm_parse_json"] = pred_obj is not None | |
| out["norm_gold_parse_json"] = gold_obj is not None | |
| out["norm_exact_match"] = False | |
| if pred_obj is None or gold_obj is None: | |
| out.update({ | |
| "norm_field_precision": 0.0, "norm_field_recall": 0.0, "norm_field_f1": 0.0, | |
| "norm_key_precision": 0.0, "norm_key_recall": 0.0, "norm_key_f1": 0.0, | |
| }) | |
| return out | |
| pred_norm = normalize_json(pred_obj) | |
| gold_norm = normalize_json(gold_obj) | |
| out["norm_exact_match"] = canonical_json(pred_norm) == canonical_json(gold_norm) | |
| ff = field_f1(pred_norm, gold_norm) | |
| out.update({"norm_" + k: v for k, v in ff.items()}) | |
| out.update(keyset_f1(pred_norm, gold_norm)) | |
| return out | |
| def summarize(rows: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| summary = aggregate_metrics(rows) | |
| for key in ["target_layer", "slice_type", "lifecycle_operation"]: | |
| groups = defaultdict(list) | |
| for r in rows: | |
| groups[str(r.get(key))].append(r) | |
| summary[f"by_{key}"] = {g: aggregate_metrics(v) for g, v in sorted(groups.items())} | |
| return summary | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--eval_dir", required=True, help="Directory containing split/predictions.json files, e.g. runs/.../eval_merged") | |
| ap.add_argument("--splits", nargs="*", default=None, help="Optional split names. Defaults to all subdirs with predictions.json") | |
| args = ap.parse_args() | |
| eval_dir = Path(args.eval_dir) | |
| if args.splits: | |
| splits = args.splits | |
| else: | |
| splits = sorted([p.name for p in eval_dir.iterdir() if p.is_dir() and (p / "predictions.json").exists()]) | |
| all_metrics = {} | |
| for split in splits: | |
| pred_path = eval_dir / split / "predictions.json" | |
| if not pred_path.exists(): | |
| print(f"Skipping {split}: no {pred_path}") | |
| continue | |
| raw_rows = json.loads(pred_path.read_text()) | |
| scored = [score_row(r) for r in raw_rows] | |
| split_dir = eval_dir / split | |
| write_json(split_dir / "normalized_predictions_scored.json", scored) | |
| metrics = summarize(scored) | |
| write_json(split_dir / "normalized_metrics.json", metrics) | |
| all_metrics[split] = metrics | |
| print(f"{split}: norm_exact={metrics.get('norm_exact_match', 0):.4f} norm_field_f1={metrics.get('norm_field_f1', 0):.4f} norm_key_f1={metrics.get('norm_key_f1', 0):.4f}") | |
| write_json(eval_dir / "all_normalized_metrics.json", all_metrics) | |
| print(f"Wrote {eval_dir / 'all_normalized_metrics.json'}") | |
| if __name__ == "__main__": | |
| main() | |