from __future__ import annotations import glob import gzip import json import logging import os from typing import Any, Dict, Optional, Tuple, List def _list_json_files(dir_path: str): if not os.path.isdir(dir_path): return [] out = [] for fn in os.listdir(dir_path): if fn.endswith(".json"): out.append(os.path.join(dir_path, fn)) return out def _list_json_and_gz_files(dir_path: str) -> List[str]: """Return .json and .json.gz files in directory (non-recursive).""" if not os.path.isdir(dir_path): return [] out: List[str] = [] for fn in os.listdir(dir_path): if fn.endswith(".json") or fn.endswith(".json.gz"): out.append(os.path.join(dir_path, fn)) return out def find_best_metadata_json( orfs_flow_dir: str, platform: str, design: str, variant: str, ) -> Optional[str]: """ ORFS convention: reports//// We search for likely metadata / metrics files and pick most recently modified. Tries multiple patterns in order of preference: 1. Exact matches: metadata.json, metrics.json 2. Pattern matches: *metadata*.json, *metrics*.json 3. Fallback: any .json file """ base = os.path.join(orfs_flow_dir, "reports", platform, design, variant) if not os.path.exists(base): logging.debug(f"Reports directory does not exist: {base}") return None if not os.path.isdir(base): logging.warning(f"Reports path exists but is not a directory: {base}") return None # Try multiple patterns, searching recursively so nested report dirs are found patterns = [ "**/metadata.json", "**/metrics.json", "**/*metadata*.json", "**/*metrics*.json", "**/*final*.json", "**/*report*.json", "**/*results*.json", ] candidates: List[str] = [] for pattern in patterns: matches = glob.glob(os.path.join(base, pattern), recursive=True) if matches: candidates.extend(matches) logging.debug(f"Found {len(matches)} files matching pattern '{pattern}' under {base}") break # Prefer the first matching pattern set # Fallback: any .json or .json.gz file in the dir (non-recursive) if not candidates: candidates = _list_json_and_gz_files(base) if candidates: logging.debug(f"Using fallback: found {len(candidates)} JSON(/gz) files in {base}") if not candidates: logging.warning(f"No JSON files found in {base}") return None # If still empty, try searching one level up or across siblings if not candidates: parent = os.path.dirname(base) siblings = glob.glob(os.path.join(parent, "**/*.json"), recursive=True) if siblings: candidates = siblings logging.debug(f"Fallback: found {len(siblings)} JSON files under {parent}") if not candidates: logging.warning(f"No JSON files found in {base} or nearby") return None # Sort by modification time (most recent first) candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True) selected = candidates[0] logging.debug(f"Selected metadata file: {selected} (from {len(candidates)} candidates)") return selected def load_json(path: str) -> Dict[str, Any]: """Load JSON file with error handling.""" try: if path.endswith('.gz') or path.endswith('.json.gz'): with gzip.open(path, 'rt', encoding='utf-8') as f: return json.load(f) with open(path, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError as e: logging.warning(f"Failed to parse JSON from {path}: {e}. Trying lenient read.") # Try a lenient read: read file and attempt to find JSON-like substring try: if path.endswith('.gz') or path.endswith('.json.gz'): with gzip.open(path, 'rt', encoding='utf-8', errors='ignore') as f: txt = f.read() else: with open(path, 'r', encoding='utf-8', errors='ignore') as f: txt = f.read() # attempt to locate first JSON object within text start = txt.find('{') end = txt.rfind('}') if start != -1 and end != -1 and end > start: snippet = txt[start:end+1] return json.loads(snippet) except Exception as e2: logging.error(f"Lenient parse failed for {path}: {e2}") raise except FileNotFoundError: logging.error(f"JSON file not found: {path}") raise except Exception as e: logging.error(f"Unexpected error loading JSON from {path}: {e}") raise def flatten_metrics(obj: Any, prefix: str = "") -> Dict[str, Any]: """ Flattens nested dicts into key paths joined by '__'. Keeps non-dict leaf values. """ out: Dict[str, Any] = {} if isinstance(obj, dict): for k, v in obj.items(): kk = f"{prefix}__{k}" if prefix else str(k) out.update(flatten_metrics(v, kk)) else: out[prefix] = obj return out def coerce_float(x: Any) -> Optional[float]: if x is None: return None if isinstance(x, (int, float)): return float(x) if isinstance(x, str): try: return float(x) except ValueError: return None return None def pick_first(metrics_flat: Dict[str, Any], keys: list[str]) -> Optional[float]: for k in keys: if k in metrics_flat: v = coerce_float(metrics_flat[k]) if v is not None: return v # also try case-insensitive match lower = {kk.lower(): kk for kk in metrics_flat.keys()} for k in keys: kk = lower.get(k.lower()) if kk: v = coerce_float(metrics_flat[kk]) if v is not None: return v return None