|
|
import os |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
from PIL import Image |
|
|
import json |
|
|
import cv2 |
|
|
from sklearn.decomposition import PCA |
|
|
from open_clip import create_model_from_pretrained, get_tokenizer |
|
|
|
|
|
|
|
|
|
|
|
def load_model(model_path, device): |
|
|
model, preprocess = create_model_from_pretrained("coca_ViT-L-14", device=device, pretrained=model_path) |
|
|
tokenizer = get_tokenizer('coca_ViT-L-14') |
|
|
|
|
|
return model, preprocess, tokenizer |
|
|
|
|
|
|
|
|
|
|
|
def encode_image(model, preprocess, image): |
|
|
image_input = torch.stack([preprocess(image)]) |
|
|
with torch.no_grad(): |
|
|
image_features = model.encode_image(image_input) |
|
|
image_embeddings = F.normalize(image_features, p=2, dim=-1) |
|
|
|
|
|
return image_embeddings |
|
|
|
|
|
|
|
|
|
|
|
def encode_image_patches(model, preprocess, data_dir, img_list): |
|
|
image_embeddings = [] |
|
|
for img_name in img_list: |
|
|
image_path = os.path.join(data_dir, 'demo_data', 'patch', img_name) |
|
|
image = Image.open(image_path) |
|
|
image_features = encode_image(model, preprocess, image) |
|
|
image_embeddings.append(image_features) |
|
|
image_embeddings = torch.from_numpy(np.array(image_embeddings)) |
|
|
image_embeddings = F.normalize(image_embeddings, p=2, dim=-1) |
|
|
return image_embeddings |
|
|
|
|
|
|
|
|
|
|
|
def encode_text(model, tokenizer, text): |
|
|
text_input = tokenizer(text) |
|
|
with torch.no_grad(): |
|
|
text_features = model.encode_text(text_input) |
|
|
text_embeddings = F.normalize(text_features, p=2, dim=-1) |
|
|
|
|
|
return text_embeddings |
|
|
|
|
|
|
|
|
|
|
|
def encode_text_df(model, tokenizer, df, col_name): |
|
|
text_embeddings = [] |
|
|
for idx in df.index: |
|
|
text = df[df.index==idx][col_name][0] |
|
|
text_features = encode_text(model, tokenizer, text) |
|
|
text_embeddings.append(text_features) |
|
|
text_embeddings = torch.from_numpy(np.array(text_embeddings)) |
|
|
text_embeddings = F.normalize(text_embeddings, p=2, dim=-1) |
|
|
return text_embeddings |
|
|
|
|
|
|
|
|
|
|
|
def get_pca_by_fit(tar_features, src_features): |
|
|
""" |
|
|
Applies PCA to target features and transforms both target and source features using the fitted PCA model. |
|
|
Combines the PCA-transformed features from both target and source datasets and returns the combined data |
|
|
along with batch labels indicating the origin of each sample. |
|
|
|
|
|
:param tar_features: Numpy array of target features (samples by features). |
|
|
:param src_features: Numpy array of source features (samples by features). |
|
|
:return: |
|
|
- pca_comb_features: A numpy array containing PCA-transformed target and source features combined. |
|
|
- pca_comb_features_batch: A numpy array of batch labels indicating which samples are from target (0) and source (1). |
|
|
""" |
|
|
|
|
|
pca = PCA(n_components=3) |
|
|
|
|
|
|
|
|
pca_fit_tar = pca.fit(tar_features.T) |
|
|
|
|
|
|
|
|
pca_tar = pca_fit_tar.transform(tar_features.T) |
|
|
pca_src = pca_fit_tar.transform(src_features.T) |
|
|
|
|
|
|
|
|
pca_comb_features = np.concatenate((pca_tar, pca_src)) |
|
|
|
|
|
|
|
|
pca_comb_features_batch = np.array([0] * len(pca_tar) + [1] * len(pca_src)) |
|
|
|
|
|
return pca_comb_features, pca_comb_features_batch |
|
|
|
|
|
|
|
|
|
|
|
def cap_quantile(weight, cap_max=None, cap_min=None): |
|
|
""" |
|
|
Caps the values in the 'weight' array based on the specified quantile thresholds for maximum and minimum values. |
|
|
If the quantile thresholds are provided, the function will replace values above or below these thresholds |
|
|
with the corresponding quantile values. |
|
|
|
|
|
:param weight: Numpy array of weights to be capped. |
|
|
:param cap_max: Quantile threshold for the maximum cap. Values above this quantile will be capped. |
|
|
If None, no maximum capping will be applied. |
|
|
:param cap_min: Quantile threshold for the minimum cap. Values below this quantile will be capped. |
|
|
If None, no minimum capping will be applied. |
|
|
:return: Numpy array with the values capped at the specified quantiles. |
|
|
""" |
|
|
|
|
|
|
|
|
if cap_max is not None: |
|
|
cap_max = np.quantile(weight, cap_max) |
|
|
|
|
|
|
|
|
if cap_min is not None: |
|
|
cap_min = np.quantile(weight, cap_min) |
|
|
|
|
|
|
|
|
weight = np.minimum(weight, cap_max) |
|
|
|
|
|
|
|
|
weight = np.maximum(weight, cap_min) |
|
|
|
|
|
return weight |
|
|
|
|
|
|
|
|
|
|
|
def read_polygons(file_path, slide_id): |
|
|
""" |
|
|
Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness. |
|
|
|
|
|
:param file_path: Path to the JSON file containing polygon configurations. |
|
|
:param slide_id: Identifier for the specific slide whose polygon data is to be extracted. |
|
|
:return: |
|
|
- polygons: A list of numpy arrays, where each array contains the coordinates of a polygon. |
|
|
- polygon_colors: A list of color values corresponding to each polygon. |
|
|
- polygon_thickness: A list of thickness values for each polygon's border. |
|
|
""" |
|
|
|
|
|
|
|
|
with open(file_path, 'r') as f: |
|
|
polygons_configs = json.load(f) |
|
|
|
|
|
|
|
|
if slide_id not in polygons_configs: |
|
|
return None, None, None |
|
|
|
|
|
|
|
|
polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]] |
|
|
polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]] |
|
|
polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]] |
|
|
|
|
|
|
|
|
return polygons, polygon_colors, polygon_thickness |
|
|
|
|
|
|
|
|
|