| import re |
| from typing import Mapping, Optional |
|
|
| import pandas as pd |
|
|
|
|
| def _normalize_profile_name(value: object) -> str: |
| if value is None: |
| return "" |
| return str(value).strip().lower() |
|
|
|
|
| def _normalize_site_key(value: object) -> str: |
| if value is None or pd.isna(value): |
| return "" |
|
|
| s = str(value).strip() |
| if not s: |
| return "" |
|
|
| if re.fullmatch(r"\d+(?:\.0+)?", s): |
| return str(int(float(s))) |
|
|
| return s.upper() |
|
|
|
|
| def _iter_site_keys(*identifiers: object) -> list[str]: |
| keys: list[str] = [] |
|
|
| for identifier in identifiers: |
| normalized = _normalize_site_key(identifier) |
| if normalized and normalized not in keys: |
| keys.append(normalized) |
|
|
| if isinstance(identifier, str): |
| raw = identifier.strip() |
| if not raw: |
| continue |
|
|
| first_token = raw.split("_", 1)[0].strip() |
| normalized_token = _normalize_site_key(first_token) |
| if normalized_token and normalized_token not in keys: |
| keys.append(normalized_token) |
|
|
| return keys |
|
|
|
|
| def parse_band_profile_definitions(profile_text: str) -> dict[str, str]: |
| profiles: dict[str, str] = {} |
|
|
| for line_no, raw_line in enumerate(profile_text.splitlines(), start=1): |
| line = raw_line.strip() |
| if not line or line.startswith("#"): |
| continue |
|
|
| if "=" not in line: |
| raise ValueError( |
| f"Invalid profile definition on line {line_no}: expected 'name = bands'." |
| ) |
|
|
| profile_name_raw, bands_raw = line.split("=", 1) |
| profile_name = _normalize_profile_name(profile_name_raw) |
| bands = str(bands_raw).strip() |
|
|
| if not profile_name or not bands: |
| raise ValueError( |
| f"Invalid profile definition on line {line_no}: name and bands are required." |
| ) |
|
|
| profiles[profile_name] = bands |
|
|
| return profiles |
|
|
|
|
| def read_site_profile_mapping(mapping_file) -> dict[str, str]: |
| if mapping_file is None: |
| return {} |
|
|
| if hasattr(mapping_file, "seek"): |
| mapping_file.seek(0) |
|
|
| df = pd.read_csv(mapping_file) |
| normalized_columns = { |
| str(column).strip().lower().replace(" ", "_"): column for column in df.columns |
| } |
|
|
| site_code_col = normalized_columns.get("site_code") |
| profile_col = normalized_columns.get("profile") |
|
|
| if site_code_col is None or profile_col is None: |
| raise ValueError("Profile mapping CSV must contain columns 'site_code' and 'profile'.") |
|
|
| mapping: dict[str, str] = {} |
| for _, row in df.iterrows(): |
| site_key = _normalize_site_key(row.get(site_code_col)) |
| profile_name = _normalize_profile_name(row.get(profile_col)) |
| if site_key and profile_name: |
| mapping[site_key] = profile_name |
|
|
| return mapping |
|
|
|
|
| def resolve_site_bands( |
| default_bands: str, |
| profile_definitions: Optional[Mapping[str, str]], |
| site_profile_mapping: Optional[Mapping[str, str]], |
| *site_identifiers: object, |
| ) -> str: |
| bands = str(default_bands).strip() |
| if not profile_definitions or not site_profile_mapping: |
| return bands |
|
|
| normalized_profiles = { |
| _normalize_profile_name(profile_name): str(profile_bands).strip() |
| for profile_name, profile_bands in profile_definitions.items() |
| if _normalize_profile_name(profile_name) and str(profile_bands).strip() |
| } |
|
|
| if not normalized_profiles: |
| return bands |
|
|
| for site_key in _iter_site_keys(*site_identifiers): |
| profile_name = site_profile_mapping.get(site_key) |
| if not profile_name: |
| continue |
|
|
| profile_bands = normalized_profiles.get(_normalize_profile_name(profile_name)) |
| if profile_bands: |
| return profile_bands |
|
|
| return bands |
|
|