File size: 5,460 Bytes
fdf190d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
""" Origin code from https://github.com/google/active-learning/blob/master/sampling_methods/kcenter_greedy.py """
"""Returns points that minimizes the maximum distance of any point to a center.
Implements the k-Center-Greedy method in
Ozan Sener and Silvio Savarese. A Geometric Approach to Active Learning for
Convolutional Neural Networks. https://arxiv.org/abs/1708.00489 2017
Distance metric defaults to l2 distance. Features used to calculate distance
are either raw features or if a model has transform method then uses the output
of model.transform(X).
Can be extended to a robust k centers algorithm that ignores a certain number of
outlier datapoints. Resulting centers are solution to multiple integer program.
"""
import numpy as np
from sklearn.metrics import pairwise_distances
import torch
from tqdm import tqdm
import abc
import argparse
class SamplingMethod(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self, X, y, seed, **kwargs):
self.X = X
self.y = y
self.seed = seed
def flatten_X(self):
shape = self.X.shape
flat_X = self.X
if len(shape) > 2:
flat_X = np.reshape(self.X, (shape[0], np.product(shape[1:])))
return flat_X
@abc.abstractmethod
def select_batch_(self):
return
def select_batch(self, **kwargs):
return self.select_batch_(**kwargs)
def to_dict(self):
return None
class kCenterGreedy(SamplingMethod):
def __init__(self, X, y, seed, metric="euclidean"):
self.X = X
self.y = y
self.flat_X = self.flatten_X()
self.name = "kcenter"
self.features = self.flat_X
self.metric = metric
self.min_distances = None
self.n_obs = self.X.shape[0]
self.already_selected = []
def update_distances(self, cluster_centers, only_new=True, reset_dist=False):
"""Update min distances given cluster centers.
Args:
cluster_centers: indices of cluster centers
only_new: only calculate distance for newly selected points and update
min_distances.
rest_dist: whether to reset min_distances.
"""
if reset_dist:
self.min_distances = None
if only_new:
cluster_centers = [
d for d in cluster_centers if d not in self.already_selected
]
if cluster_centers:
# Update min_distances for all examples given new cluster center.
x = self.features[cluster_centers]
dist = pairwise_distances(self.features, x, metric=self.metric)
if self.min_distances is None:
self.min_distances = np.min(dist, axis=1).reshape(-1, 1)
else:
self.min_distances = np.minimum(self.min_distances, dist)
def select_batch_(self, model, already_selected, N, **kwargs):
"""
Diversity promoting active learning method that greedily forms a batch
to minimize the maximum distance to a cluster center among all unlabeled
datapoints.
Args:
model: model with scikit-like API with decision_function implemented
already_selected: index of datapoints already selected
N: batch size
Returns:
indices of points selected to minimize distance to cluster centers
"""
try:
# Assumes that the transform function takes in original data and not
# flattened data.
print("Getting transformed features...")
self.features = model.transform(self.X)
print("Calculating distances...")
self.update_distances(already_selected, only_new=False, reset_dist=True)
except:
print("Using flat_X as features.")
self.update_distances(already_selected, only_new=True, reset_dist=False)
new_batch = []
for _ in tqdm(range(N)):
if self.already_selected is None:
# Initialize centers with a randomly selected datapoint
ind = np.random.choice(np.arange(self.n_obs))
else:
ind = np.argmax(self.min_distances)
# New examples should not be in already selected since those points
# should have min_distance of zero to a cluster center.
assert ind not in already_selected
self.update_distances([ind], only_new=True, reset_dist=False)
new_batch.append(ind)
print(
"Maximum distance from cluster centers is %0.2f" % max(self.min_distances)
)
self.already_selected = already_selected
return new_batch
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start', type=int)
parser.add_argument("--end", type=int)
args = parser.parse_args()
embeddings = torch.load("/home/aiscuser/fhw/embeddings/qwq_ins_embeddings.pt")
f = open("/home/aiscuser/fhw/data/qwq_python_selected.json", "r+")
fw = open(f"/home/aiscuser/fhw/data/qwq_python_diverse_{args.start}_{args.end}.json", "w+")
lines = f.readlines()[args.start:args.end]
selected_nums = 10000
nums = embeddings.shape[0]
kcg = kCenterGreedy(X=embeddings[args.start:args.end], y=None, seed=42)
batch = kcg.select_batch_(model=None, already_selected=[], N=selected_nums)
for idx in batch:
fw.write(lines[idx])
|