File size: 8,127 Bytes
1e315b6
 
 
 
 
 
 
 
 
 
 
 
 
b118ecd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e315b6
b118ecd
1e315b6
 
 
 
b118ecd
 
1e315b6
 
b118ecd
1e315b6
b118ecd
 
 
 
 
 
1e315b6
b118ecd
 
1e315b6
b118ecd
 
 
1e315b6
b118ecd
 
 
1e315b6
 
b118ecd
 
 
 
 
1e315b6
b118ecd
 
1e315b6
b118ecd
 
1e315b6
b118ecd
 
 
1e315b6
b118ecd
 
1e315b6
b118ecd
1e315b6
 
b118ecd
1e315b6
b118ecd
 
 
 
 
 
1e315b6
b118ecd
 
1e315b6
b118ecd
 
 
1e315b6
b118ecd
 
1e315b6
 
b118ecd
 
 
 
 
 
 
1e315b6
b118ecd
1e315b6
b118ecd
 
1e315b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import os
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from PIL import Image
import json
import cv2
from sklearn.decomposition import PCA
from open_clip import create_model_from_pretrained, get_tokenizer



import os
from typing import List, Tuple, Union

import torch
import torch.nn.functional as F
import numpy as np
from PIL import Image
import pandas as pd

# --- Model loading --------------------------------------------------------

def load_model(
    model_path: str,
    device: Union[str, torch.device]
) -> Tuple[torch.nn.Module, callable, callable]:
    """
    Load pretrained OmiCLIP (COCA ViT‑L‑14) model, its image preprocess, and tokenizer.
    """
    model, preprocess = create_model_from_pretrained(
        "coca_ViT-L-14", device=device, pretrained=model_path
    )
    tokenizer = get_tokenizer("coca_ViT-L-14")
    model.to(device).eval()
    return model, preprocess, tokenizer

# --- Image encoding -------------------------------------------------------

def encode_images(
    model: torch.nn.Module,
    preprocess: callable,
    image_paths: List[str],
    device: Union[str, torch.device]
) -> torch.Tensor:
    """
    Batch–encode a list of image file paths into L2‑normalized embeddings.
    Returns a tensor of shape (N, D).
    """
    # Load & preprocess all images
    imgs = [preprocess(Image.open(p)) for p in image_paths]
    batch = torch.stack(imgs, dim=0).to(device)           # (N, C, H, W)
    
    with torch.no_grad():
        feats = model.encode_image(batch)                 # (N, D)
    return F.normalize(feats, p=2, dim=-1)                # (N, D)


    # # Loop through each image name in the provided list
    # for img_name in img_list:
    #     # Build the path to the patch image and open it
    #     image_path = os.path.join(data_dir, 'demo_data', 'patch', img_name)
    #     image = Image.open(image_path)

    #     # Encode the image using the model & preprocess; returns shape (1, embedding_dim)
    #     image_features = encode_image(model, preprocess, image)

    #     # Accumulate the feature embeddings in the list
    #     image_embeddings.append(image_features)

    # # Convert the list of embeddings to a NumPy array, then to a PyTorch tensor
    # # Resulting shape will be (N, 1, embedding_dim)
    # image_embeddings = torch.from_numpy(np.array(image_embeddings))

    # # Normalize all embeddings across the feature dimension (L2 normalization)
    # image_embeddings = F.normalize(image_embeddings, p=2, dim=-1)

    # return image_embeddings


# --- Text encoding --------------------------------------------------------

def encode_texts(
    model: torch.nn.Module,
    tokenizer: callable,
    texts: List[str],
    device: Union[str, torch.device]
) -> torch.Tensor:
    """
    Batch–encode a list of strings into L2‑normalized embeddings.
    Returns a tensor of shape (N, D).
    """
    # Tokenizer returns a dict of tensors
    text_inputs = tokenizer(texts)
    
    with torch.no_grad():
        feats = model.encode_text(text_inputs)             # (N, D)
    return F.normalize(feats, p=2, dim=-1)                # (N, D)


def encode_text_df(
    model: torch.nn.Module,
    tokenizer: callable,
    df: pd.DataFrame,
    col_name: str,
    device: Union[str, torch.device]
) -> torch.Tensor:
    """
    Encodes an entire DataFrame column into (N, D) embeddings.
    """
    texts = df[col_name].astype(str).tolist()
    return encode_texts(model, tokenizer, texts, device)




def get_pca_by_fit(tar_features, src_features):
    """
    Applies PCA to target features and transforms both target and source features using the fitted PCA model.
    Combines the PCA-transformed features from both target and source datasets and returns the combined data 
    along with batch labels indicating the origin of each sample.

    :param tar_features: Numpy array of target features (samples by features).
    :param src_features: Numpy array of source features (samples by features).
    :return: 
        - pca_comb_features: A numpy array containing PCA-transformed target and source features combined.
        - pca_comb_features_batch: A numpy array of batch labels indicating which samples are from target (0) and source (1).
    """

    pca = PCA(n_components=3)
    
    # Fit the PCA model on the target features (transposed to fit on features)
    pca_fit_tar = pca.fit(tar_features.T)
    
    # Transform the target and source features using the fitted PCA model
    pca_tar = pca_fit_tar.transform(tar_features.T)  # Transform target features
    pca_src = pca_fit_tar.transform(src_features.T)  # Transform source features using the same PCA fit
    
    # Combine the PCA-transformed target and source features
    pca_comb_features = np.concatenate((pca_tar, pca_src))
    
    # Create a batch label array: 0 for target features, 1 for source features
    pca_comb_features_batch = np.array([0] * len(pca_tar) + [1] * len(pca_src))

    return pca_comb_features, pca_comb_features_batch



def cap_quantile(weight, cap_max=None, cap_min=None):
    """
    Caps the values in the 'weight' array based on the specified quantile thresholds for maximum and minimum values.
    If the quantile thresholds are provided, the function will replace values above or below these thresholds 
    with the corresponding quantile values.

    :param weight: Numpy array of weights to be capped.
    :param cap_max: Quantile threshold for the maximum cap. Values above this quantile will be capped. 
                    If None, no maximum capping will be applied.
    :param cap_min: Quantile threshold for the minimum cap. Values below this quantile will be capped. 
                    If None, no minimum capping will be applied.
    :return: Numpy array with the values capped at the specified quantiles.
    """
    
    # If a maximum cap is specified, calculate the value at the specified cap_max quantile
    if cap_max is not None:
        cap_max = np.quantile(weight, cap_max)  # Get the value at the cap_max quantile
    
    # If a minimum cap is specified, calculate the value at the specified cap_min quantile
    if cap_min is not None:
        cap_min = np.quantile(weight, cap_min)  # Get the value at the cap_min quantile
    
    # Cap the values in 'weight' array to not exceed the maximum cap (cap_max)
    weight = np.minimum(weight, cap_max)
    
    # Cap the values in 'weight' array to not go below the minimum cap (cap_min)
    weight = np.maximum(weight, cap_min)
    
    return weight



def read_polygons(file_path, slide_id):
    """
    Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness.

    :param file_path: Path to the JSON file containing polygon configurations.
    :param slide_id: Identifier for the specific slide whose polygon data is to be extracted.
    :return: 
        - polygons: A list of numpy arrays, where each array contains the coordinates of a polygon.
        - polygon_colors: A list of color values corresponding to each polygon.
        - polygon_thickness: A list of thickness values for each polygon's border.
    """

    # Open the JSON file and load the polygon configurations into a Python dictionary
    with open(file_path, 'r') as f:
        polygons_configs = json.load(f)

    # Check if the given slide_id exists in the polygon configurations
    if slide_id not in polygons_configs:
        return None, None, None  # If slide_id is not found, return None for all outputs

    # Extract the polygon coordinates, colors, and thicknesses for the given slide_id
    polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]]  # Convert polygon coordinates to numpy arrays
    polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]]  # Extract the color for each polygon
    polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]]  # Extract the thickness for each polygon

    # Return the polygons, their colors, and their thicknesses
    return polygons, polygon_colors, polygon_thickness