File size: 9,379 Bytes
906e061 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
"""
Basic Conformal Implementation for Factuality Assessment
This module implements a basic conformal prediction method for assessing
the factuality of generated text by filtering claims based on conformity scores.
"""
import numpy as np
from typing import List, Tuple, Optional, Callable
class BasicConformal:
def __init__(
self,
score_function: Callable,
random_state: Optional[int] = None
):
self.score_function = score_function
self.random_state = random_state
self.calibration_scores = None
self.threshold = None
self._rng = np.random.default_rng(random_state)
self._tie_gamma_keep: float = 1.0
def fit_on_calib(self, calib_data: List, alpha: float = 0.1) -> 'BasicConformal':
if not 0 < alpha < 1:
raise ValueError("alpha must be between 0 and 1")
raw_scores = self.score_function(calib_data)
per_sample_scores: List[List[float]] = []
if len(raw_scores) == len(calib_data) and hasattr(raw_scores[0], "__iter__") and not isinstance(raw_scores[0], (str, bytes)):
for i, sample in enumerate(calib_data):
if 'atomic_facts' in sample:
s_i = np.asarray(list(raw_scores[i]), dtype=float)
else:
s_i = np.asarray([float(raw_scores[i])], dtype=float)
s_i = np.where(np.isnan(s_i), -np.inf, s_i)
per_sample_scores.append(s_i.tolist())
else:
if len(raw_scores) != len(calib_data):
raise ValueError("score_function must return one score per sample or a per-claim score list per sample")
for i, sample in enumerate(calib_data):
if 'atomic_facts' in sample and len(sample['atomic_facts']) > 0:
s_i = np.asarray([float(raw_scores[i])] * len(sample['atomic_facts']), dtype=float)
else:
s_i = np.asarray([float(raw_scores[i])], dtype=float)
s_i = np.where(np.isnan(s_i), -np.inf, s_i)
per_sample_scores.append(s_i.tolist())
S_values: List[float] = []
for sample, scores_i in zip(calib_data, per_sample_scores):
if 'atomic_facts' in sample and len(sample['atomic_facts']) > 0:
false_scores = [s for s, fact in zip(scores_i, sample['atomic_facts']) if not fact.get('is_supported', False)]
if len(false_scores) == 0:
S_values.append(float('-inf'))
else:
vals = np.asarray(false_scores, dtype=float)
S_values.append(float(np.nanmax(vals)) if vals.size > 0 else float('-inf'))
else:
vals = np.asarray(scores_i, dtype=float)
if vals.size == 0:
S_values.append(float('-inf'))
else:
S_values.append(float(np.nanmax(vals)))
self.calibration_scores = np.array(S_values, dtype=float)
n = len(self.calibration_scores)
if n == 0:
raise ValueError("No calibration samples available to compute threshold")
quantile = 1 - alpha
try:
self.threshold = np.quantile(self.calibration_scores, quantile, method='higher')
except TypeError:
self.threshold = np.quantile(self.calibration_scores, quantile)
sorted_scores = np.sort(self.calibration_scores)
k = int(np.ceil((1.0 - alpha) * (n + 1))) - 1
k = min(max(k, 0), n - 1)
t_star = float(sorted_scores[k])
n_lt = int(np.sum(self.calibration_scores < t_star))
n_eq = int(np.sum(np.isclose(self.calibration_scores, t_star)))
if n_eq <= 0:
gamma_standard = 0.0
else:
gamma_standard = ((1.0 - alpha) * (n + 1) - n_lt) / n_eq
gamma_standard = float(np.clip(gamma_standard, 0.0, 1.0))
self._tie_gamma_keep = 1.0 - gamma_standard
return self
def predict(self, data: List) -> Tuple[List, List]:
if self.threshold is None:
raise ValueError("Model must be fitted before prediction")
raw_scores = self.score_function(data)
per_sample_scores: List[List[float]] = []
if len(raw_scores) == len(data) and hasattr(raw_scores[0], "__iter__") and not isinstance(raw_scores[0], (str, bytes)):
for i, sample in enumerate(data):
if 'atomic_facts' in sample:
s_i = np.asarray(list(raw_scores[i]), dtype=float)
else:
s_i = np.asarray([float(raw_scores[i])], dtype=float)
s_i = np.where(np.isnan(s_i), -np.inf, s_i)
per_sample_scores.append(s_i.tolist())
else:
if len(raw_scores) != len(data):
raise ValueError("score_function must return one score per sample or per-claim score lists per sample")
for i, sample in enumerate(data):
if 'atomic_facts' in sample and len(sample['atomic_facts']) > 0:
s_i = np.asarray([float(raw_scores[i])] * len(sample['atomic_facts']), dtype=float)
else:
s_i = np.asarray([float(raw_scores[i])], dtype=float)
s_i = np.where(np.isnan(s_i), -np.inf, s_i)
per_sample_scores.append(s_i.tolist())
filtered_data: List = []
retention_rates: List[float] = []
for sample, scores_i in zip(data, per_sample_scores):
if 'atomic_facts' in sample and len(sample['atomic_facts']) > 0:
filtered_claims = []
for claim, s in zip(sample['atomic_facts'], scores_i):
if s > self.threshold:
filtered_claims.append(claim)
elif np.isclose(s, self.threshold):
if self._rng.uniform() < self._tie_gamma_keep:
filtered_claims.append(claim)
sample = dict(sample)
sample['filtered_claims'] = filtered_claims
retention_rate = len(filtered_claims) / len(sample['atomic_facts'])
elif 'atomic_facts' in sample and len(sample['atomic_facts']) == 0:
sample = dict(sample)
sample['filtered_claims'] = []
retention_rate = 0.0
else:
sample = dict(sample)
if len(scores_i) == 0:
sample['is_retained'] = False
retention_rate = 0.0
else:
s = float(scores_i[0])
sample['is_retained'] = (s > self.threshold) or (np.isclose(s, self.threshold) and self._rng.uniform() < self._tie_gamma_keep)
retention_rate = 1.0 if sample['is_retained'] else 0.0
filtered_data.append(sample)
retention_rates.append(retention_rate)
return filtered_data, retention_rates
def get_coverage(self, data: List) -> float:
if self.threshold is None:
raise ValueError("Model must be fitted before computing coverage")
raw_scores = self.score_function(data)
per_sample_scores: List[List[float]] = []
if len(raw_scores) == len(data) and hasattr(raw_scores[0], "__iter__") and not isinstance(raw_scores[0], (str, bytes)):
for i, sample in enumerate(data):
if 'atomic_facts' in sample:
s_i = np.asarray(list(raw_scores[i]), dtype=float)
else:
s_i = np.asarray([float(raw_scores[i])], dtype=float)
s_i = np.where(np.isnan(s_i), -np.inf, s_i)
per_sample_scores.append(s_i.tolist())
else:
if len(raw_scores) != len(data):
raise ValueError("score_function must return one score per sample or per-claim score lists per sample")
for i, sample in enumerate(data):
if 'atomic_facts' in sample and len(sample['atomic_facts']) > 0:
s_i = np.asarray([float(raw_scores[i])] * len(sample['atomic_facts']), dtype=float)
else:
s_i = np.asarray([float(raw_scores[i])], dtype=float)
s_i = np.where(np.isnan(s_i), -np.inf, s_i)
per_sample_scores.append(s_i.tolist())
indicators = []
for sample, scores_i in zip(data, per_sample_scores):
if 'atomic_facts' in sample and len(sample['atomic_facts']) > 0:
false_scores = [s for s, fact in zip(scores_i, sample['atomic_facts']) if not fact.get('is_supported', False)]
if len(false_scores) == 0:
indicators.append(1.0)
else:
vals = np.asarray(false_scores, dtype=float)
max_false = float(np.nanmax(vals)) if vals.size > 0 else float('-inf')
indicators.append(1.0 if max_false <= self.threshold else 0.0)
else:
vals = np.asarray(scores_i, dtype=float)
if vals.size == 0:
indicators.append(1.0)
else:
indicators.append(1.0 if float(np.nanmax(vals)) <= self.threshold else 0.0)
return float(np.mean(indicators))
def get_threshold(self) -> float:
return self.threshold
|