| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import pandas as pd |
| from PIL import Image |
| from skimage.feature import hog, local_binary_pattern |
| from tqdm import tqdm |
|
|
| from .augmentations import augment_pil_for_classical |
| from .preprocessing import image_size_from_config, load_pil_image, pil_to_uint8_array, resize_image |
| from .utils import get_logger |
|
|
|
|
| LOGGER = get_logger(__name__) |
|
|
|
|
| def _prepare_gray_image( |
| image: str | Path | Image.Image, |
| config: dict[str, Any], |
| augment_id: int = 0, |
| ) -> np.ndarray: |
| pil = load_pil_image(image, mode="L") |
| if augment_id > 0: |
| pil = augment_pil_for_classical(pil, augment_id, seed=int(config["seed"])) |
| pil = resize_image(pil, image_size_from_config(config)) |
| return pil_to_uint8_array(pil) |
|
|
|
|
| def extract_hog_feature( |
| image: str | Path | Image.Image, |
| config: dict[str, Any], |
| augment_id: int = 0, |
| ) -> np.ndarray: |
| gray = _prepare_gray_image(image, config, augment_id=augment_id) |
| params = config["features"]["hog"] |
| return hog( |
| gray, |
| orientations=int(params.get("orientations", 9)), |
| pixels_per_cell=tuple(params.get("pixels_per_cell", [16, 16])), |
| cells_per_block=tuple(params.get("cells_per_block", [2, 2])), |
| block_norm=str(params.get("block_norm", "L2-Hys")), |
| transform_sqrt=True, |
| feature_vector=True, |
| ).astype(np.float32) |
|
|
|
|
| def extract_lbp_feature( |
| image: str | Path | Image.Image, |
| config: dict[str, Any], |
| augment_id: int = 0, |
| ) -> np.ndarray: |
| gray = _prepare_gray_image(image, config, augment_id=augment_id) |
| params = config["features"]["lbp"] |
| radius = int(params.get("radius", 2)) |
| n_points = int(params.get("n_points", 16)) |
| method = str(params.get("method", "uniform")) |
| lbp = local_binary_pattern(gray, P=n_points, R=radius, method=method) |
| if method == "uniform": |
| n_bins = n_points + 2 |
| else: |
| n_bins = min(2**n_points, 4096) |
| hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=False) |
| hist = hist.astype(np.float32) |
| hist /= hist.sum() + 1e-8 |
| return hist |
|
|
|
|
| def extract_single_feature( |
| image: str | Path | Image.Image, |
| feature_type: str, |
| config: dict[str, Any], |
| augment_id: int = 0, |
| ) -> np.ndarray: |
| if feature_type == "hog": |
| return extract_hog_feature(image, config, augment_id=augment_id) |
| if feature_type == "lbp": |
| return extract_lbp_feature(image, config, augment_id=augment_id) |
| raise ValueError(f"Unsupported classical feature type: {feature_type}") |
|
|
|
|
| def expand_train_dataframe_for_balance(df: pd.DataFrame, config: dict[str, Any]) -> pd.DataFrame: |
| if not config.get("balance", {}).get("enabled", True): |
| return df.assign(augment_id=0, is_augmented=False) |
| counts = df["label"].value_counts() |
| if len(counts) < 2: |
| return df.assign(augment_id=0, is_augmented=False) |
| ratio = counts.max() / max(counts.min(), 1) |
| threshold = float(config.get("data", {}).get("imbalance_threshold", 1.2)) |
| if ratio <= threshold: |
| return df.assign(augment_id=0, is_augmented=False) |
|
|
| rng = np.random.default_rng(int(config["seed"])) |
| target = int(counts.max()) |
| rows = [df.assign(augment_id=0, is_augmented=False)] |
| for label, count in counts.items(): |
| needed = target - int(count) |
| if needed <= 0: |
| continue |
| candidates = df[df["label"] == label] |
| sampled_idx = rng.choice(candidates.index.to_numpy(), size=needed, replace=True) |
| augmented = candidates.loc[sampled_idx].copy().reset_index(drop=True) |
| augmented["augment_id"] = np.arange(1, needed + 1) |
| augmented["is_augmented"] = True |
| rows.append(augmented) |
| LOGGER.info("Classical balancing: added %d augmented '%s' training samples.", needed, label) |
| expanded = pd.concat(rows, ignore_index=True) |
| max_samples = int(config.get("balance", {}).get("max_augmented_train_samples", 3000) or 0) |
| if max_samples > 0 and len(expanded) > max_samples: |
| expanded = ( |
| expanded.groupby("label", group_keys=False) |
| .sample(n=max_samples // expanded["label"].nunique(), random_state=int(config["seed"]), replace=False) |
| .reset_index(drop=True) |
| ) |
| LOGGER.info("Capped augmented classical training data at %d rows.", len(expanded)) |
| return expanded.sample(frac=1.0, random_state=int(config["seed"])).reset_index(drop=True) |
|
|
|
|
| def extract_feature_matrix( |
| df: pd.DataFrame, |
| feature_type: str, |
| config: dict[str, Any], |
| balance_train: bool = False, |
| ) -> tuple[np.ndarray, np.ndarray, pd.DataFrame]: |
| working = expand_train_dataframe_for_balance(df, config) if balance_train else df.assign( |
| augment_id=0, is_augmented=False |
| ) |
| features: list[np.ndarray] = [] |
| labels: list[int] = [] |
| iterator = tqdm(working.itertuples(index=False), total=len(working), desc=f"{feature_type.upper()} features") |
| for row in iterator: |
| features.append( |
| extract_single_feature( |
| getattr(row, "filepath"), |
| feature_type, |
| config, |
| augment_id=int(getattr(row, "augment_id", 0)), |
| ) |
| ) |
| labels.append(int(getattr(row, "label_id"))) |
| return np.vstack(features).astype(np.float32), np.asarray(labels, dtype=np.int64), working |
|
|
|
|