egg-damage-top3-classifier / src /egg_damage /classical_features.py
budijuarto's picture
Upload src/egg_damage/classical_features.py
3abf0cf verified
from __future__ import annotations
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from PIL import Image
from skimage.feature import hog, local_binary_pattern
from tqdm import tqdm
from .augmentations import augment_pil_for_classical
from .preprocessing import image_size_from_config, load_pil_image, pil_to_uint8_array, resize_image
from .utils import get_logger
LOGGER = get_logger(__name__)
def _prepare_gray_image(
image: str | Path | Image.Image,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
pil = load_pil_image(image, mode="L")
if augment_id > 0:
pil = augment_pil_for_classical(pil, augment_id, seed=int(config["seed"]))
pil = resize_image(pil, image_size_from_config(config))
return pil_to_uint8_array(pil)
def extract_hog_feature(
image: str | Path | Image.Image,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
gray = _prepare_gray_image(image, config, augment_id=augment_id)
params = config["features"]["hog"]
return hog(
gray,
orientations=int(params.get("orientations", 9)),
pixels_per_cell=tuple(params.get("pixels_per_cell", [16, 16])),
cells_per_block=tuple(params.get("cells_per_block", [2, 2])),
block_norm=str(params.get("block_norm", "L2-Hys")),
transform_sqrt=True,
feature_vector=True,
).astype(np.float32)
def extract_lbp_feature(
image: str | Path | Image.Image,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
gray = _prepare_gray_image(image, config, augment_id=augment_id)
params = config["features"]["lbp"]
radius = int(params.get("radius", 2))
n_points = int(params.get("n_points", 16))
method = str(params.get("method", "uniform"))
lbp = local_binary_pattern(gray, P=n_points, R=radius, method=method)
if method == "uniform":
n_bins = n_points + 2
else:
n_bins = min(2**n_points, 4096)
hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=False)
hist = hist.astype(np.float32)
hist /= hist.sum() + 1e-8
return hist
def extract_single_feature(
image: str | Path | Image.Image,
feature_type: str,
config: dict[str, Any],
augment_id: int = 0,
) -> np.ndarray:
if feature_type == "hog":
return extract_hog_feature(image, config, augment_id=augment_id)
if feature_type == "lbp":
return extract_lbp_feature(image, config, augment_id=augment_id)
raise ValueError(f"Unsupported classical feature type: {feature_type}")
def expand_train_dataframe_for_balance(df: pd.DataFrame, config: dict[str, Any]) -> pd.DataFrame:
if not config.get("balance", {}).get("enabled", True):
return df.assign(augment_id=0, is_augmented=False)
counts = df["label"].value_counts()
if len(counts) < 2:
return df.assign(augment_id=0, is_augmented=False)
ratio = counts.max() / max(counts.min(), 1)
threshold = float(config.get("data", {}).get("imbalance_threshold", 1.2))
if ratio <= threshold:
return df.assign(augment_id=0, is_augmented=False)
rng = np.random.default_rng(int(config["seed"]))
target = int(counts.max())
rows = [df.assign(augment_id=0, is_augmented=False)]
for label, count in counts.items():
needed = target - int(count)
if needed <= 0:
continue
candidates = df[df["label"] == label]
sampled_idx = rng.choice(candidates.index.to_numpy(), size=needed, replace=True)
augmented = candidates.loc[sampled_idx].copy().reset_index(drop=True)
augmented["augment_id"] = np.arange(1, needed + 1)
augmented["is_augmented"] = True
rows.append(augmented)
LOGGER.info("Classical balancing: added %d augmented '%s' training samples.", needed, label)
expanded = pd.concat(rows, ignore_index=True)
max_samples = int(config.get("balance", {}).get("max_augmented_train_samples", 3000) or 0)
if max_samples > 0 and len(expanded) > max_samples:
expanded = (
expanded.groupby("label", group_keys=False)
.sample(n=max_samples // expanded["label"].nunique(), random_state=int(config["seed"]), replace=False)
.reset_index(drop=True)
)
LOGGER.info("Capped augmented classical training data at %d rows.", len(expanded))
return expanded.sample(frac=1.0, random_state=int(config["seed"])).reset_index(drop=True)
def extract_feature_matrix(
df: pd.DataFrame,
feature_type: str,
config: dict[str, Any],
balance_train: bool = False,
) -> tuple[np.ndarray, np.ndarray, pd.DataFrame]:
working = expand_train_dataframe_for_balance(df, config) if balance_train else df.assign(
augment_id=0, is_augmented=False
)
features: list[np.ndarray] = []
labels: list[int] = []
iterator = tqdm(working.itertuples(index=False), total=len(working), desc=f"{feature_type.upper()} features")
for row in iterator:
features.append(
extract_single_feature(
getattr(row, "filepath"),
feature_type,
config,
augment_id=int(getattr(row, "augment_id", 0)),
)
)
labels.append(int(getattr(row, "label_id")))
return np.vstack(features).astype(np.float32), np.asarray(labels, dtype=np.int64), working