from __future__ import annotations from pathlib import Path from typing import Any import numpy as np import pandas as pd from PIL import Image from skimage.feature import hog, local_binary_pattern from tqdm import tqdm from .augmentations import augment_pil_for_classical from .preprocessing import image_size_from_config, load_pil_image, pil_to_uint8_array, resize_image from .utils import get_logger LOGGER = get_logger(__name__) def _prepare_gray_image( image: str | Path | Image.Image, config: dict[str, Any], augment_id: int = 0, ) -> np.ndarray: pil = load_pil_image(image, mode="L") if augment_id > 0: pil = augment_pil_for_classical(pil, augment_id, seed=int(config["seed"])) pil = resize_image(pil, image_size_from_config(config)) return pil_to_uint8_array(pil) def extract_hog_feature( image: str | Path | Image.Image, config: dict[str, Any], augment_id: int = 0, ) -> np.ndarray: gray = _prepare_gray_image(image, config, augment_id=augment_id) params = config["features"]["hog"] return hog( gray, orientations=int(params.get("orientations", 9)), pixels_per_cell=tuple(params.get("pixels_per_cell", [16, 16])), cells_per_block=tuple(params.get("cells_per_block", [2, 2])), block_norm=str(params.get("block_norm", "L2-Hys")), transform_sqrt=True, feature_vector=True, ).astype(np.float32) def extract_lbp_feature( image: str | Path | Image.Image, config: dict[str, Any], augment_id: int = 0, ) -> np.ndarray: gray = _prepare_gray_image(image, config, augment_id=augment_id) params = config["features"]["lbp"] radius = int(params.get("radius", 2)) n_points = int(params.get("n_points", 16)) method = str(params.get("method", "uniform")) lbp = local_binary_pattern(gray, P=n_points, R=radius, method=method) if method == "uniform": n_bins = n_points + 2 else: n_bins = min(2**n_points, 4096) hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=False) hist = hist.astype(np.float32) hist /= hist.sum() + 1e-8 return hist def extract_single_feature( image: str | Path | Image.Image, feature_type: str, config: dict[str, Any], augment_id: int = 0, ) -> np.ndarray: if feature_type == "hog": return extract_hog_feature(image, config, augment_id=augment_id) if feature_type == "lbp": return extract_lbp_feature(image, config, augment_id=augment_id) raise ValueError(f"Unsupported classical feature type: {feature_type}") def expand_train_dataframe_for_balance(df: pd.DataFrame, config: dict[str, Any]) -> pd.DataFrame: if not config.get("balance", {}).get("enabled", True): return df.assign(augment_id=0, is_augmented=False) counts = df["label"].value_counts() if len(counts) < 2: return df.assign(augment_id=0, is_augmented=False) ratio = counts.max() / max(counts.min(), 1) threshold = float(config.get("data", {}).get("imbalance_threshold", 1.2)) if ratio <= threshold: return df.assign(augment_id=0, is_augmented=False) rng = np.random.default_rng(int(config["seed"])) target = int(counts.max()) rows = [df.assign(augment_id=0, is_augmented=False)] for label, count in counts.items(): needed = target - int(count) if needed <= 0: continue candidates = df[df["label"] == label] sampled_idx = rng.choice(candidates.index.to_numpy(), size=needed, replace=True) augmented = candidates.loc[sampled_idx].copy().reset_index(drop=True) augmented["augment_id"] = np.arange(1, needed + 1) augmented["is_augmented"] = True rows.append(augmented) LOGGER.info("Classical balancing: added %d augmented '%s' training samples.", needed, label) expanded = pd.concat(rows, ignore_index=True) max_samples = int(config.get("balance", {}).get("max_augmented_train_samples", 3000) or 0) if max_samples > 0 and len(expanded) > max_samples: expanded = ( expanded.groupby("label", group_keys=False) .sample(n=max_samples // expanded["label"].nunique(), random_state=int(config["seed"]), replace=False) .reset_index(drop=True) ) LOGGER.info("Capped augmented classical training data at %d rows.", len(expanded)) return expanded.sample(frac=1.0, random_state=int(config["seed"])).reset_index(drop=True) def extract_feature_matrix( df: pd.DataFrame, feature_type: str, config: dict[str, Any], balance_train: bool = False, ) -> tuple[np.ndarray, np.ndarray, pd.DataFrame]: working = expand_train_dataframe_for_balance(df, config) if balance_train else df.assign( augment_id=0, is_augmented=False ) features: list[np.ndarray] = [] labels: list[int] = [] iterator = tqdm(working.itertuples(index=False), total=len(working), desc=f"{feature_type.upper()} features") for row in iterator: features.append( extract_single_feature( getattr(row, "filepath"), feature_type, config, augment_id=int(getattr(row, "augment_id", 0)), ) ) labels.append(int(getattr(row, "label_id"))) return np.vstack(features).astype(np.float32), np.asarray(labels, dtype=np.int64), working