|
|
import numpy as np |
|
|
|
|
|
from sklearn.linear_model import LogisticRegressionCV |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
import torch.optim as optim |
|
|
|
|
|
from typing import List |
|
|
|
|
|
from conformal import compute_conformity_scores |
|
|
|
|
|
def fit_model( |
|
|
features : np.ndarray, |
|
|
labels : np.ndarray, |
|
|
config : dict, |
|
|
dataset_train : List = None, |
|
|
eval_dict : dict = None |
|
|
): |
|
|
name = config.model.prob.name |
|
|
if name == "logistic": |
|
|
model = LogisticRegressionCV() |
|
|
model.fit(X=features, y=labels) |
|
|
return model |
|
|
elif name == "XGBoost": |
|
|
raise ValueError("not implemented yet") |
|
|
elif name == "torch": |
|
|
|
|
|
model = LogisticRegression(features.shape[1]) |
|
|
|
|
|
optimizer = optim.Adam(model.parameters(), lr=1) |
|
|
x = torch.tensor(features, requires_grad=True, dtype=torch.float32) |
|
|
|
|
|
for i in range(500): |
|
|
optimizer.zero_grad() |
|
|
probs = model.forward(x) |
|
|
|
|
|
loss, avg_train = get_conformal_loss(probs, labels, dataset_train, config.conformal.alpha) |
|
|
if i % 100 == 0: |
|
|
probs_valid = model.forward(torch.tensor(eval_dict['X_valid'], dtype=torch.float32)).detach().numpy() |
|
|
probs_split = np.array_split(probs_valid, eval_dict['splits_valid']) |
|
|
threshold = np.quantile(compute_conformity_scores(eval_dict['dataset_valid'], probs_split), 1 - config.conformal.alpha) |
|
|
probs_test = model.forward(torch.tensor(eval_dict['X_test'], dtype=torch.float32)).detach().numpy() |
|
|
probs_split = np.array_split(probs_test, eval_dict['splits_test']) |
|
|
avg = 0 |
|
|
for prob in probs_split: |
|
|
avg_retain = np.mean(prob > threshold.item()) |
|
|
avg += avg_retain |
|
|
print(f"Average % of train claims retained: {avg_train}") |
|
|
print(f"Average % of test claims retained: {avg / len(probs_split)}") |
|
|
print(f"Loss at iteration {i}: {loss.item()}") |
|
|
|
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
return model |
|
|
|
|
|
else: |
|
|
return ValueError(f"{name} not available.") |
|
|
|
|
|
|
|
|
def get_conformal_loss(probs, labels, dataset_train, alpha): |
|
|
claim_splits = torch.tensor( |
|
|
np.cumsum([len(dat['atomic_facts']) for dat in dataset_train])[:-1] |
|
|
) |
|
|
|
|
|
claim_probs = torch.tensor_split(probs, claim_splits) |
|
|
claim_labels = np.array_split(1 - labels, claim_splits.numpy()) |
|
|
scores = [] |
|
|
for c_prob, c_label in zip(claim_probs, claim_labels): |
|
|
scores.append(c_prob[c_label].max()) |
|
|
|
|
|
|
|
|
random_indices = np.random.permutation(len(scores)) |
|
|
threshold_indices = random_indices[:25] |
|
|
loss_indices = random_indices[25:] |
|
|
|
|
|
threshold_scores = [scores[i] for i in range(len(scores)) if i in threshold_indices] |
|
|
|
|
|
threshold = torch.quantile(torch.stack(threshold_scores), 1 - alpha) |
|
|
loss = 0 |
|
|
avg = 0 |
|
|
for idx, c_prob in enumerate(claim_probs): |
|
|
if idx in loss_indices: |
|
|
loss += torch.sigmoid((threshold - c_prob)).mean() |
|
|
avg_retain = (c_prob > threshold).float().mean() |
|
|
avg += avg_retain |
|
|
if np.isnan(loss.item()): |
|
|
raise ValueError(claim_probs[0]) |
|
|
return loss, avg / len(loss_indices) |
|
|
|
|
|
class LogisticRegression(nn.Module): |
|
|
|
|
|
def __init__(self, n_features): |
|
|
super(LogisticRegression, self).__init__() |
|
|
self.linear = nn.Linear(n_features, 1) |
|
|
|
|
|
def forward(self, x): |
|
|
return F.sigmoid(self.linear(x)) |
|
|
|
|
|
|
|
|
|