|
|
import os |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
from PIL import Image |
|
|
import json |
|
|
import cv2 |
|
|
from sklearn.decomposition import PCA |
|
|
from open_clip import create_model_from_pretrained, get_tokenizer |
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
from typing import List, Tuple, Union |
|
|
|
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
|
|
|
def load_model( |
|
|
model_path: str, |
|
|
device: Union[str, torch.device] |
|
|
) -> Tuple[torch.nn.Module, callable, callable]: |
|
|
""" |
|
|
Load pretrained OmiCLIP (COCA ViT‑L‑14) model, its image preprocess, and tokenizer. |
|
|
""" |
|
|
model, preprocess = create_model_from_pretrained( |
|
|
"coca_ViT-L-14", device=device, pretrained=model_path |
|
|
) |
|
|
tokenizer = get_tokenizer("coca_ViT-L-14") |
|
|
model.to(device).eval() |
|
|
return model, preprocess, tokenizer |
|
|
|
|
|
|
|
|
|
|
|
def encode_images( |
|
|
model: torch.nn.Module, |
|
|
preprocess: callable, |
|
|
image_paths: List[str], |
|
|
device: Union[str, torch.device] |
|
|
) -> torch.Tensor: |
|
|
""" |
|
|
Batch–encode a list of image file paths into L2‑normalized embeddings. |
|
|
Returns a tensor of shape (N, D). |
|
|
""" |
|
|
|
|
|
imgs = [preprocess(Image.open(p)) for p in image_paths] |
|
|
batch = torch.stack(imgs, dim=0).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
feats = model.encode_image(batch) |
|
|
return F.normalize(feats, p=2, dim=-1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def encode_texts( |
|
|
model: torch.nn.Module, |
|
|
tokenizer: callable, |
|
|
texts: List[str], |
|
|
device: Union[str, torch.device] |
|
|
) -> torch.Tensor: |
|
|
""" |
|
|
Batch–encode a list of strings into L2‑normalized embeddings. |
|
|
Returns a tensor of shape (N, D). |
|
|
""" |
|
|
|
|
|
text_inputs = tokenizer(texts) |
|
|
|
|
|
with torch.no_grad(): |
|
|
feats = model.encode_text(text_inputs) |
|
|
return F.normalize(feats, p=2, dim=-1) |
|
|
|
|
|
|
|
|
def encode_text_df( |
|
|
model: torch.nn.Module, |
|
|
tokenizer: callable, |
|
|
df: pd.DataFrame, |
|
|
col_name: str, |
|
|
device: Union[str, torch.device] |
|
|
) -> torch.Tensor: |
|
|
""" |
|
|
Encodes an entire DataFrame column into (N, D) embeddings. |
|
|
""" |
|
|
texts = df[col_name].astype(str).tolist() |
|
|
return encode_texts(model, tokenizer, texts, device) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_pca_by_fit(tar_features, src_features): |
|
|
""" |
|
|
Applies PCA to target features and transforms both target and source features using the fitted PCA model. |
|
|
Combines the PCA-transformed features from both target and source datasets and returns the combined data |
|
|
along with batch labels indicating the origin of each sample. |
|
|
|
|
|
:param tar_features: Numpy array of target features (samples by features). |
|
|
:param src_features: Numpy array of source features (samples by features). |
|
|
:return: |
|
|
- pca_comb_features: A numpy array containing PCA-transformed target and source features combined. |
|
|
- pca_comb_features_batch: A numpy array of batch labels indicating which samples are from target (0) and source (1). |
|
|
""" |
|
|
|
|
|
pca = PCA(n_components=3) |
|
|
|
|
|
|
|
|
pca_fit_tar = pca.fit(tar_features.T) |
|
|
|
|
|
|
|
|
pca_tar = pca_fit_tar.transform(tar_features.T) |
|
|
pca_src = pca_fit_tar.transform(src_features.T) |
|
|
|
|
|
|
|
|
pca_comb_features = np.concatenate((pca_tar, pca_src)) |
|
|
|
|
|
|
|
|
pca_comb_features_batch = np.array([0] * len(pca_tar) + [1] * len(pca_src)) |
|
|
|
|
|
return pca_comb_features, pca_comb_features_batch |
|
|
|
|
|
|
|
|
|
|
|
def cap_quantile(weight, cap_max=None, cap_min=None): |
|
|
""" |
|
|
Caps the values in the 'weight' array based on the specified quantile thresholds for maximum and minimum values. |
|
|
If the quantile thresholds are provided, the function will replace values above or below these thresholds |
|
|
with the corresponding quantile values. |
|
|
|
|
|
:param weight: Numpy array of weights to be capped. |
|
|
:param cap_max: Quantile threshold for the maximum cap. Values above this quantile will be capped. |
|
|
If None, no maximum capping will be applied. |
|
|
:param cap_min: Quantile threshold for the minimum cap. Values below this quantile will be capped. |
|
|
If None, no minimum capping will be applied. |
|
|
:return: Numpy array with the values capped at the specified quantiles. |
|
|
""" |
|
|
|
|
|
|
|
|
if cap_max is not None: |
|
|
cap_max = np.quantile(weight, cap_max) |
|
|
|
|
|
|
|
|
if cap_min is not None: |
|
|
cap_min = np.quantile(weight, cap_min) |
|
|
|
|
|
|
|
|
weight = np.minimum(weight, cap_max) |
|
|
|
|
|
|
|
|
weight = np.maximum(weight, cap_min) |
|
|
|
|
|
return weight |
|
|
|
|
|
|
|
|
|
|
|
def read_polygons(file_path, slide_id): |
|
|
""" |
|
|
Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness. |
|
|
|
|
|
:param file_path: Path to the JSON file containing polygon configurations. |
|
|
:param slide_id: Identifier for the specific slide whose polygon data is to be extracted. |
|
|
:return: |
|
|
- polygons: A list of numpy arrays, where each array contains the coordinates of a polygon. |
|
|
- polygon_colors: A list of color values corresponding to each polygon. |
|
|
- polygon_thickness: A list of thickness values for each polygon's border. |
|
|
""" |
|
|
|
|
|
|
|
|
with open(file_path, 'r') as f: |
|
|
polygons_configs = json.load(f) |
|
|
|
|
|
|
|
|
if slide_id not in polygons_configs: |
|
|
return None, None, None |
|
|
|
|
|
|
|
|
polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]] |
|
|
polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]] |
|
|
polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]] |
|
|
|
|
|
|
|
|
return polygons, polygon_colors, polygon_thickness |
|
|
|
|
|
|
|
|
|