Spaces:
Sleeping
Sleeping
| import matplotlib | |
| import torch | |
| #matplotlib.rc("font", size=25) | |
| import numpy as np | |
| from scipy import stats | |
| from scipy.optimize import curve_fit | |
| from scipy import asarray as ar, exp | |
| def calculate_eff(sd, log_scale=False, pandora=False): | |
| if log_scale: | |
| bins = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
| else: | |
| bins = np.arange(0, 51, 5) | |
| eff = [] | |
| energy_eff = [] | |
| for i in range(len(bins) - 1): | |
| bin_i = bins[i] | |
| bin_i1 = bins[i + 1] | |
| mask_above = sd.reco_showers_E.values <= bin_i1 | |
| mask_below = sd.reco_showers_E.values > bin_i | |
| mask = mask_below * mask_above | |
| number_of_non_reconstructed_showers = np.sum( | |
| np.isnan(sd.pred_showers_E.values)[mask] | |
| ) | |
| total_showers = len(sd.true_showers_E.values[mask]) | |
| if pandora: | |
| number_of_non_reconstructed_showers = np.sum( | |
| np.isnan(sd.pandora_calibrated_E.values)[mask] | |
| ) | |
| total_showers = len(sd.pandora_calibrated_E.values[mask]) | |
| if total_showers > 0: | |
| eff.append( | |
| (total_showers - number_of_non_reconstructed_showers) / total_showers | |
| ) | |
| energy_eff.append((bin_i1 + bin_i) / 2) | |
| return eff, energy_eff | |
| def calculate_fakes(sd, matched, log_scale=False, pandora=False): | |
| if log_scale: | |
| bins_fakes = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
| else: | |
| bins_fakes = np.linspace(0, 51, 5) | |
| fake_rate = [] | |
| energy_fakes = [] | |
| fake_percent_energy = [] | |
| total_true_showers = np.sum( | |
| ~np.isnan(sd.true_showers_E.values) | |
| ) # the ones where truthHitAssignedEnergies is not nan | |
| for i in range(len(bins_fakes) - 1): | |
| bin_i = bins_fakes[i] | |
| bin_i1 = bins_fakes[i + 1] | |
| if pandora: | |
| mask_above = sd.pred_showers_E.values <= bin_i1 | |
| mask_below = sd.pred_showers_E.values > bin_i | |
| mask = mask_below * mask_above | |
| fakes = np.sum(np.isnan(sd.pid)[mask]) | |
| non_fakes_mask = ~np.isnan(sd.pid)[mask] | |
| fakes_mask = np.isnan(sd.pid)[mask] | |
| energy_in_fakes = np.sum(sd.pandora_calibrated_pfo[mask].values[fakes_mask]) | |
| total_energy_true = np.sum(sd.true_showers_E.values[mask][non_fakes_mask]) | |
| total_showers = len(sd.pred_showers_E.values[mask]) | |
| else: | |
| mask_above = sd.pred_showers_E.values <= bin_i1 | |
| mask_below = sd.pred_showers_E.values > bin_i | |
| mask = mask_below * mask_above | |
| fakes = np.sum(np.isnan(sd.pid)[mask]) | |
| total_showers = len(sd.pred_showers_E.values[mask]) | |
| fakes_mask = np.isnan(sd.pid)[mask] | |
| energy_in_fakes = np.sum(sd.pred_showers_E[mask].values[fakes_mask]) | |
| non_fakes_mask = ~np.isnan(sd.pid)[mask] | |
| total_energy_true = np.sum(sd.true_showers_E.values[mask][non_fakes_mask]) | |
| if total_showers > 0: | |
| # print(fakes, np.mean(sd.pred_energy_hits_raw[mask])) | |
| fake_rate.append(fakes / total_true_showers) | |
| energy_fakes.append((bin_i1 + bin_i) / 2) | |
| fake_percent_energy.append(energy_in_fakes / total_energy_true) | |
| return fake_rate, energy_fakes, fake_percent_energy | |
| def calculate_response(matched, pandora, log_scale=False): | |
| if log_scale: | |
| bins = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
| else: | |
| bins = np.arange(0, 51, 2) | |
| bins_plot_histogram = [5, 6, 10, 20] | |
| if pandora: | |
| bins_per_binned_E = np.arange(0, 3, 0.001) | |
| else: | |
| bins_per_binned_E = np.arange(0, 3, 0.001) | |
| mean = [] | |
| variance_om = [] | |
| mean_true_rec = [] | |
| variance_om_true_rec = [] | |
| energy_resolutions = [] | |
| energy_resolutions_reco = [] | |
| dic_histograms = {} | |
| for i in range(len(bins) - 1): | |
| bin_i = bins[i] | |
| bin_i1 = bins[i + 1] | |
| mask_above = ( | |
| matched["reco_showers_E"] <= bin_i1 | |
| ) # true_showers_E, reco_showers_E | |
| mask_below = matched["reco_showers_E"] > bin_i | |
| mask_check = matched["pred_showers_E"] > 0 | |
| mask = mask_below * mask_above * mask_check | |
| pred_e = matched.calibrated_E[mask] | |
| true_rec = matched.reco_showers_E[mask] | |
| true_e = matched.true_showers_E[mask] | |
| if pandora: | |
| pred_e_corrected = matched.pandora_calibrated_E[mask] | |
| else: | |
| pred_e_corrected = matched.calibrated_E[mask] | |
| if np.sum(mask) > 0: # if the bin is not empty | |
| e_over_rec = pred_e / true_rec | |
| if i in bins_plot_histogram: | |
| dic_histograms[str(i) + "reco"] = e_over_rec | |
| dic_histograms[str(i) + "reco_baseline"] = true_rec | |
| dic_histograms[str(i) + "pred_corr_e"] = pred_e_corrected | |
| dic_histograms[str(i) + "true_baseline"] = true_e | |
| dic_histograms[str(i) + "pred_e"] = pred_e | |
| mean_predtored, variance_om_true_rec_ = obtain_MPV_and_68( | |
| e_over_rec, bins_per_binned_E | |
| ) | |
| # mean_predtored = np.mean(e_over_rec) | |
| # variance_om_true_rec_ = np.var(e_over_rec) / mean_predtored | |
| mean_true_rec.append(mean_predtored) | |
| variance_om_true_rec.append(variance_om_true_rec_) | |
| energy_resolutions_reco.append((bin_i1 + bin_i) / 2) | |
| # TODO change the pred_showers_E to the pandora calibrated E and the calibrated E for the model pandora_calibrated_E | |
| if pandora: | |
| bins_per_binned_E = np.arange(0, 3, 0.005) | |
| else: | |
| bins_per_binned_E = np.arange(0, 3, 0.005) | |
| for i in range(len(bins) - 1): | |
| bin_i = bins[i] | |
| bin_i1 = bins[i + 1] | |
| mask_above = matched["true_showers_E"] <= bin_i1 | |
| mask_below = matched["true_showers_E"] > bin_i | |
| mask_check = matched["pred_showers_E"] > 0 | |
| mask = mask_below * mask_above * mask_check | |
| true_e = matched.true_showers_E[mask] | |
| true_rec = matched.reco_showers_E[mask] | |
| if pandora: | |
| pred_e = matched.pandora_calibrated_E[mask] | |
| else: | |
| pred_e = matched.calibrated_E[mask] | |
| if np.sum(mask) > 0: # if the bin is not empty | |
| e_over_true = pred_e / true_e | |
| e_rec_over_true = true_rec / true_e | |
| if i in bins_plot_histogram: | |
| dic_histograms[str(i) + "true"] = e_over_true | |
| dic_histograms[str(i) + "reco_showers"] = e_rec_over_true | |
| mean_predtotrue, var_predtotrue = obtain_MPV_and_68( | |
| e_over_true, bins_per_binned_E | |
| ) | |
| # mean_predtotrue, var_predtotrue = get_sigma_gaussian(e_over_true,bins_per_binned_E) | |
| # mean_predtotrue = np.mean(e_over_true) | |
| # var_predtotrue = np.var(e_over_true) / mean_predtotrue | |
| print( | |
| "bin i ", | |
| bins[i], | |
| mean_predtotrue, | |
| var_predtotrue, | |
| np.mean(e_over_true), | |
| np.var(e_over_true) / np.mean(e_over_true), | |
| ) | |
| mean.append(mean_predtotrue) | |
| variance_om.append(var_predtotrue) | |
| energy_resolutions.append((bin_i1 + bin_i) / 2) | |
| return ( | |
| mean, | |
| variance_om, | |
| mean_true_rec, | |
| variance_om_true_rec, | |
| energy_resolutions, | |
| energy_resolutions_reco, | |
| dic_histograms, | |
| ) | |
| def get_sigma_gaussian(e_over_reco, bins_per_binned_E): | |
| hist, bin_edges = np.histogram(e_over_reco, bins=bins_per_binned_E, density=True) | |
| # Calculating the Gaussian PDF values given Gaussian parameters and random variable X | |
| def gaus(X, C, X_mean, sigma): | |
| return C * exp(-((X - X_mean) ** 2) / (2 * sigma**2)) | |
| n = len(hist) | |
| x_hist = np.zeros((n), dtype=float) | |
| for ii in range(n): | |
| x_hist[ii] = (bin_edges[ii + 1] + bin_edges[ii]) / 2 | |
| y_hist = hist | |
| if (torch.tensor(hist) == 0).all(): | |
| return 0,0 | |
| mean = sum(x_hist * y_hist) / sum(y_hist) | |
| sigma = sum(y_hist * (x_hist - mean) ** 2) / sum(y_hist) | |
| # cut 1% of highest vals | |
| #e_over_reco_filtered = np.sort(e_over_reco) | |
| #e_over_reco_filtered = e_over_reco_filtered[:int(len(e_over_reco_filtered) * 0.99)] | |
| #mean = np.mean(e_over_reco_filtered) | |
| #sigma = np.std(e_over_reco_filtered) | |
| try: | |
| param_optimised, param_covariance_matrix = curve_fit( | |
| gaus, x_hist, y_hist, p0=[max(y_hist), mean, sigma], maxfev=10000 | |
| ) | |
| except: | |
| print("Error! Using this") | |
| return mean, sigma/mean, 0.001, 0.001 # dummy errors temporarily | |
| if param_optimised[2] < 0: | |
| param_optimised[2] = sigma | |
| if param_optimised[1] < 0: | |
| param_optimised[1] = mean # due to some weird fitting errors | |
| #assert param_optimised[1] >= 0 | |
| #assert param_optimised[2] >= 0 | |
| errors = np.sqrt(np.diag(param_covariance_matrix)) | |
| # sigma_over_E_error = errors[2] / param_optimised[1] | |
| return param_optimised[1], param_optimised[2] / param_optimised[1], errors[1], errors[2] / param_optimised[1] | |
| def obtain_MPV_and_68(data_for_hist, bins_per_binned_E, epsilon=0.0001): | |
| hist, bin_edges = np.histogram(data_for_hist, bins=bins_per_binned_E, density=True) | |
| ind_max_hist = np.argmax(hist) | |
| MPV = (bin_edges[ind_max_hist] + bin_edges[ind_max_hist + 1]) / 2 | |
| std68, low, high = get_std68(hist, bin_edges, epsilon=epsilon) | |
| return MPV, std68 / MPV | |
| def get_std68(theHist, bin_edges, percentage=0.683, epsilon=0.01): | |
| # theHist, bin_edges = np.histogram(data_for_hist, bins=bins, density=True) | |
| wmin = 0.2 | |
| wmax = 1.0 | |
| weight = 0.0 | |
| points = [] | |
| sums = [] | |
| # fill list of bin centers and the integral up to those point | |
| for i in range(len(bin_edges) - 1): | |
| weight += theHist[i] * (bin_edges[i + 1] - bin_edges[i]) | |
| points.append([(bin_edges[i + 1] + bin_edges[i]) / 2, weight]) | |
| sums.append(weight) | |
| low = wmin | |
| high = wmax | |
| width = 100 | |
| for i in range(len(points)): | |
| for j in range(i, len(points)): | |
| wy = points[j][1] - points[i][1] | |
| if abs(wy - percentage) < epsilon: | |
| wx = points[j][0] - points[i][0] | |
| if wx < width: | |
| low = points[i][0] | |
| high = points[j][0] | |
| width = wx | |
| # ii = i | |
| # jj = j | |
| return 0.5 * (high - low), low, high | |
| def calculate_purity_containment(matched, log_scale=False): | |
| if log_scale: | |
| bins = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
| else: | |
| bins = np.arange(0, 51, 2) | |
| fce_energy = [] | |
| fce_var_energy = [] | |
| energy_ms = [] | |
| purity_energy = [] | |
| purity_var_energy = [] | |
| fce = matched["e_pred_and_truth"] / matched["reco_showers_E"] | |
| purity = matched["e_pred_and_truth"] / matched["pred_showers_E"] | |
| for i in range(len(bins) - 1): | |
| bin_i = bins[i] | |
| bin_i1 = bins[i + 1] | |
| mask_above = matched["reco_showers_E"] <= bin_i1 | |
| mask_below = matched["reco_showers_E"] > bin_i | |
| mask_check = matched["pred_showers_E"] > 0 | |
| mask = mask_below * mask_above * mask_check | |
| fce_e = np.mean(fce[mask]) | |
| fce_var = np.var(fce[mask]) | |
| purity_e = np.mean(purity[mask]) | |
| purity_var = np.var(purity[mask]) | |
| if np.sum(mask) > 0: | |
| fce_energy.append(fce_e) | |
| fce_var_energy.append(fce_var) | |
| energy_ms.append((bin_i1 + bin_i) / 2) | |
| purity_energy.append(purity_e) | |
| purity_var_energy.append(purity_var) | |
| return ( | |
| fce_energy, | |
| fce_var_energy, | |
| energy_ms, | |
| purity_energy, | |
| purity_var_energy, | |
| ) | |
| def obtain_metrics(sd, matched, pandora=False, log_scale=False): | |
| eff, energy_eff = calculate_eff(sd, log_scale) | |
| fake_rate, energy_fakes = calculate_fakes(sd, matched, log_scale) | |
| ( | |
| mean, | |
| variance_om, | |
| mean_true_rec, | |
| variance_om_true_rec, | |
| energy_resolutions, | |
| energy_resolutions_reco, | |
| dic_histograms, | |
| ) = calculate_response(matched, pandora, log_scale) | |
| ( | |
| fce_energy, | |
| fce_var_energy, | |
| energy_ms, | |
| purity_energy, | |
| purity_var_energy, | |
| ) = calculate_purity_containment(matched, log_scale) | |
| dict = { | |
| "energy_eff": energy_eff, | |
| "eff": eff, | |
| "energy_fakes": energy_fakes, | |
| "fake_rate": fake_rate, | |
| "mean": mean, | |
| "variance_om": variance_om, | |
| "mean_true_rec": mean_true_rec, | |
| "variance_om_true_rec": variance_om_true_rec, | |
| "fce_energy": fce_energy, | |
| "fce_var_energy": fce_var_energy, | |
| "energy_ms": energy_ms, | |
| "purity_energy": purity_energy, | |
| "purity_var_energy": purity_var_energy, | |
| "energy_resolutions": energy_resolutions, | |
| "energy_resolutions_reco": energy_resolutions_reco, | |
| "dic_histograms": dic_histograms, | |
| } | |
| return dict | |