File size: 5,410 Bytes
3abf0cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | from __future__ import annotations
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from PIL import Image
from skimage.feature import hog, local_binary_pattern
from tqdm import tqdm
from .augmentations import augment_pil_for_classical
from .preprocessing import image_size_from_config, load_pil_image, pil_to_uint8_array, resize_image
from .utils import get_logger
LOGGER = get_logger(__name__)
def _prepare_gray_image(
image: str | Path | Image.Image,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
pil = load_pil_image(image, mode="L")
if augment_id > 0:
pil = augment_pil_for_classical(pil, augment_id, seed=int(config["seed"]))
pil = resize_image(pil, image_size_from_config(config))
return pil_to_uint8_array(pil)
def extract_hog_feature(
image: str | Path | Image.Image,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
gray = _prepare_gray_image(image, config, augment_id=augment_id)
params = config["features"]["hog"]
return hog(
gray,
orientations=int(params.get("orientations", 9)),
pixels_per_cell=tuple(params.get("pixels_per_cell", [16, 16])),
cells_per_block=tuple(params.get("cells_per_block", [2, 2])),
block_norm=str(params.get("block_norm", "L2-Hys")),
transform_sqrt=True,
feature_vector=True,
).astype(np.float32)
def extract_lbp_feature(
image: str | Path | Image.Image,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
gray = _prepare_gray_image(image, config, augment_id=augment_id)
params = config["features"]["lbp"]
radius = int(params.get("radius", 2))
n_points = int(params.get("n_points", 16))
method = str(params.get("method", "uniform"))
lbp = local_binary_pattern(gray, P=n_points, R=radius, method=method)
if method == "uniform":
n_bins = n_points + 2
else:
n_bins = min(2**n_points, 4096)
hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=False)
hist = hist.astype(np.float32)
hist /= hist.sum() + 1e-8
return hist
def extract_single_feature(
image: str | Path | Image.Image,
feature_type: str,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
if feature_type == "hog":
return extract_hog_feature(image, config, augment_id=augment_id)
if feature_type == "lbp":
return extract_lbp_feature(image, config, augment_id=augment_id)
raise ValueError(f"Unsupported classical feature type: {feature_type}")
def expand_train_dataframe_for_balance(df: pd.DataFrame, config: dict[str, Any]) -> pd.DataFrame:
if not config.get("balance", {}).get("enabled", True):
return df.assign(augment_id=0, is_augmented=False)
counts = df["label"].value_counts()
if len(counts) < 2:
return df.assign(augment_id=0, is_augmented=False)
ratio = counts.max() / max(counts.min(), 1)
threshold = float(config.get("data", {}).get("imbalance_threshold", 1.2))
if ratio <= threshold:
return df.assign(augment_id=0, is_augmented=False)
rng = np.random.default_rng(int(config["seed"]))
target = int(counts.max())
rows = [df.assign(augment_id=0, is_augmented=False)]
for label, count in counts.items():
needed = target - int(count)
if needed <= 0:
continue
candidates = df[df["label"] == label]
sampled_idx = rng.choice(candidates.index.to_numpy(), size=needed, replace=True)
augmented = candidates.loc[sampled_idx].copy().reset_index(drop=True)
augmented["augment_id"] = np.arange(1, needed + 1)
augmented["is_augmented"] = True
rows.append(augmented)
LOGGER.info("Classical balancing: added %d augmented '%s' training samples.", needed, label)
expanded = pd.concat(rows, ignore_index=True)
max_samples = int(config.get("balance", {}).get("max_augmented_train_samples", 3000) or 0)
if max_samples > 0 and len(expanded) > max_samples:
expanded = (
expanded.groupby("label", group_keys=False)
.sample(n=max_samples // expanded["label"].nunique(), random_state=int(config["seed"]), replace=False)
.reset_index(drop=True)
)
LOGGER.info("Capped augmented classical training data at %d rows.", len(expanded))
return expanded.sample(frac=1.0, random_state=int(config["seed"])).reset_index(drop=True)
def extract_feature_matrix(
df: pd.DataFrame,
feature_type: str,
config: dict[str, Any],
balance_train: bool = False,
) -> tuple[np.ndarray, np.ndarray, pd.DataFrame]:
working = expand_train_dataframe_for_balance(df, config) if balance_train else df.assign(
augment_id=0, is_augmented=False
)
features: list[np.ndarray] = []
labels: list[int] = []
iterator = tqdm(working.itertuples(index=False), total=len(working), desc=f"{feature_type.upper()} features")
for row in iterator:
features.append(
extract_single_feature(
getattr(row, "filepath"),
feature_type,
config,
augment_id=int(getattr(row, "augment_id", 0)),
)
)
labels.append(int(getattr(row, "label_id")))
return np.vstack(features).astype(np.float32), np.asarray(labels, dtype=np.int64), working
|