| | from __future__ import annotations |
| |
|
| | import glob |
| | import gzip |
| | import json |
| | import logging |
| | import os |
| | from typing import Any, Dict, Optional, Tuple, List |
| |
|
| |
|
| | def _list_json_files(dir_path: str): |
| | if not os.path.isdir(dir_path): |
| | return [] |
| | out = [] |
| | for fn in os.listdir(dir_path): |
| | if fn.endswith(".json"): |
| | out.append(os.path.join(dir_path, fn)) |
| | return out |
| |
|
| |
|
| | def _list_json_and_gz_files(dir_path: str) -> List[str]: |
| | """Return .json and .json.gz files in directory (non-recursive).""" |
| | if not os.path.isdir(dir_path): |
| | return [] |
| | out: List[str] = [] |
| | for fn in os.listdir(dir_path): |
| | if fn.endswith(".json") or fn.endswith(".json.gz"): |
| | out.append(os.path.join(dir_path, fn)) |
| | return out |
| |
|
| |
|
| | def find_best_metadata_json( |
| | orfs_flow_dir: str, |
| | platform: str, |
| | design: str, |
| | variant: str, |
| | ) -> Optional[str]: |
| | """ |
| | ORFS convention: |
| | reports/<platform>/<design>/<FLOW_VARIANT>/ |
| | We search for likely metadata / metrics files and pick most recently modified. |
| | |
| | Tries multiple patterns in order of preference: |
| | 1. Exact matches: metadata.json, metrics.json |
| | 2. Pattern matches: *metadata*.json, *metrics*.json |
| | 3. Fallback: any .json file |
| | """ |
| | base = os.path.join(orfs_flow_dir, "reports", platform, design, variant) |
| | |
| | if not os.path.exists(base): |
| | logging.debug(f"Reports directory does not exist: {base}") |
| | return None |
| | |
| | if not os.path.isdir(base): |
| | logging.warning(f"Reports path exists but is not a directory: {base}") |
| | return None |
| | |
| | |
| | patterns = [ |
| | "**/metadata.json", |
| | "**/metrics.json", |
| | "**/*metadata*.json", |
| | "**/*metrics*.json", |
| | "**/*final*.json", |
| | "**/*report*.json", |
| | "**/*results*.json", |
| | ] |
| |
|
| | candidates: List[str] = [] |
| | for pattern in patterns: |
| | matches = glob.glob(os.path.join(base, pattern), recursive=True) |
| | if matches: |
| | candidates.extend(matches) |
| | logging.debug(f"Found {len(matches)} files matching pattern '{pattern}' under {base}") |
| | break |
| | |
| | |
| | if not candidates: |
| | candidates = _list_json_and_gz_files(base) |
| | if candidates: |
| | logging.debug(f"Using fallback: found {len(candidates)} JSON(/gz) files in {base}") |
| | |
| | if not candidates: |
| | logging.warning(f"No JSON files found in {base}") |
| | return None |
| | |
| | |
| | if not candidates: |
| | parent = os.path.dirname(base) |
| | siblings = glob.glob(os.path.join(parent, "**/*.json"), recursive=True) |
| | if siblings: |
| | candidates = siblings |
| | logging.debug(f"Fallback: found {len(siblings)} JSON files under {parent}") |
| |
|
| | if not candidates: |
| | logging.warning(f"No JSON files found in {base} or nearby") |
| | return None |
| |
|
| | |
| | candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True) |
| | selected = candidates[0] |
| | logging.debug(f"Selected metadata file: {selected} (from {len(candidates)} candidates)") |
| | return selected |
| |
|
| |
|
| | def load_json(path: str) -> Dict[str, Any]: |
| | """Load JSON file with error handling.""" |
| | try: |
| | if path.endswith('.gz') or path.endswith('.json.gz'): |
| | with gzip.open(path, 'rt', encoding='utf-8') as f: |
| | return json.load(f) |
| | with open(path, "r", encoding="utf-8") as f: |
| | return json.load(f) |
| | except json.JSONDecodeError as e: |
| | logging.warning(f"Failed to parse JSON from {path}: {e}. Trying lenient read.") |
| | |
| | try: |
| | if path.endswith('.gz') or path.endswith('.json.gz'): |
| | with gzip.open(path, 'rt', encoding='utf-8', errors='ignore') as f: |
| | txt = f.read() |
| | else: |
| | with open(path, 'r', encoding='utf-8', errors='ignore') as f: |
| | txt = f.read() |
| | |
| | start = txt.find('{') |
| | end = txt.rfind('}') |
| | if start != -1 and end != -1 and end > start: |
| | snippet = txt[start:end+1] |
| | return json.loads(snippet) |
| | except Exception as e2: |
| | logging.error(f"Lenient parse failed for {path}: {e2}") |
| | raise |
| | except FileNotFoundError: |
| | logging.error(f"JSON file not found: {path}") |
| | raise |
| | except Exception as e: |
| | logging.error(f"Unexpected error loading JSON from {path}: {e}") |
| | raise |
| |
|
| |
|
| | def flatten_metrics(obj: Any, prefix: str = "") -> Dict[str, Any]: |
| | """ |
| | Flattens nested dicts into key paths joined by '__'. |
| | Keeps non-dict leaf values. |
| | """ |
| | out: Dict[str, Any] = {} |
| | if isinstance(obj, dict): |
| | for k, v in obj.items(): |
| | kk = f"{prefix}__{k}" if prefix else str(k) |
| | out.update(flatten_metrics(v, kk)) |
| | else: |
| | out[prefix] = obj |
| | return out |
| |
|
| |
|
| | def coerce_float(x: Any) -> Optional[float]: |
| | if x is None: |
| | return None |
| | if isinstance(x, (int, float)): |
| | return float(x) |
| | if isinstance(x, str): |
| | try: |
| | return float(x) |
| | except ValueError: |
| | return None |
| | return None |
| |
|
| |
|
| | def pick_first(metrics_flat: Dict[str, Any], keys: list[str]) -> Optional[float]: |
| | for k in keys: |
| | if k in metrics_flat: |
| | v = coerce_float(metrics_flat[k]) |
| | if v is not None: |
| | return v |
| | |
| | lower = {kk.lower(): kk for kk in metrics_flat.keys()} |
| | for k in keys: |
| | kk = lower.get(k.lower()) |
| | if kk: |
| | v = coerce_float(metrics_flat[kk]) |
| | if v is not None: |
| | return v |
| | return None |
| |
|