import os.path import torch import torch.nn as nn import torch.nn.functional as F import torchaudio import psutil import pickle import random import argparse import librosa as li from sklearn.utils import shuffle from sklearn.neighbors import NearestNeighbors from pesq import pesq, NoUtterancesError from tqdm import tqdm from sklearn.preprocessing import LabelEncoder import numpy as np from pathlib import Path from tqdm import tqdm import builtins import math import jiwer from jiwer import wer, cer from typing import Iterable from copy import deepcopy from distutils.util import strtobool from src.data import * from src.constants import * from src.models import * from src.simulation import * from src.preprocess import * from src.attacks.offline import * from src.loss import * from src.pipelines import * from src.utils import * ################################################################################ # Evaluate attacks on speaker recognition systems ################################################################################ EVAL_DATASET = "voxceleb" # "librispeech" LOOKAHEAD = 5 VOICEBOX_PATH = VOICEBOX_PRETRAINED_PATH UNIVERSAL_PATH = UNIVERSAL_PRETRAINED_PATH BATCH_SIZE = 20 # evaluation batch size N_QUERY = 15 # number of query utterances per speaker N_CONDITION = 10 # number of conditioning utterances per speaker N_ENROLL = 20 # number of enrolled utterances per speaker ADV_ENROLL = False # evaluate under assumption adversarial audio is enrolled TARGETS_TRAIN = 'centroid' # 'random', 'same', 'single', 'median' TARGETS_TEST = 'centroid' # 'random', 'same', 'single', 'median' TRANSFER = True # evaluate attacks on unseen model DENOISER = False # evaluate with unseen denoiser defense applied to queries SIMULATION = False # apply noisy channel simulation to all queries in evaluation COMPUTE_OBJECTIVE_METRICS = True # PESQ, STOI def set_random_seed(seed: int = 123): """Set random seed to allow for reproducibility""" random.seed(seed) torch.manual_seed(seed) if torch.backends.cudnn.is_available(): # torch.backends.cudnn.benchmark = True torch.backends.cudnn.deterministic = True def param_count(m: nn.Module, trainable: bool = False): """Count the number of trainable parameters (weights) in a model""" if trainable: return builtins.sum( [p.shape.numel() for p in m.parameters() if p.requires_grad]) else: return builtins.sum([p.shape.numel() for p in m.parameters()]) def pad_sequence(sequences: list): max_len = max([s.shape[-1] for s in sequences]) padded = torch.zeros( (len(sequences), 1, max_len), dtype=sequences[0].dtype, device=sequences[0].device) for i, s in enumerate(sequences): padded[i, :, :s.shape[-1]] = s return padded @torch.no_grad() def compute_embeddings_batch(audio: list, p: Pipeline, defense: nn.Module = nn.Identity()): """Compute batched speaker embeddings""" assert isinstance(p.model, SpeakerVerificationModel) emb = [p(defense(audio[i].to(p.device))).to('cpu') for i in range(len(audio))] emb = torch.cat(emb, dim=0) return emb @torch.no_grad() def compute_transcripts_batch(audio: list, p: Pipeline): """Compute batched transcripts""" assert isinstance(p.model, SpeechRecognitionModel) transcripts = [] for i in range(len(audio)): t = p.model.transcribe(audio[i].to(p.device)) if isinstance(t, str): transcripts.append(t) elif isinstance(t, list): transcripts.extend(t) assert len(transcripts) == len(audio), f'Transcript format error' return transcripts @torch.no_grad() def compute_attack_batch(audio: list, a: TrainableAttack, c: torch.Tensor): if len(c) < len(audio): c = c.repeat(len(audio), 1, 1) adv = [a.perturbation(audio[i].to(a.pipeline.device), y=c[i:i+1].to(a.pipeline.device)).to('cpu').reshape(1, 1, -1) for i in range(len(audio))] return adv @torch.no_grad() def compute_pesq(audio1: list, audio2: list, mode: str = 'wb'): assert len(audio1) == len(audio2) scores = [] for i in range(len(audio1)): try: scores.append( pesq(DataProperties.get('sample_rate'), tensor_to_np(audio1[i]).flatten(), tensor_to_np(audio2[i]).flatten(), mode) ) except NoUtterancesError: print("PESQ error, skipping audio file...") return scores @torch.no_grad() def compute_stoi(audio1: list, audio2: list, extended: bool = False): assert len(audio1) == len(audio2) scores = [] for i in range(len(audio1)): scores.append( stoi(tensor_to_np(audio1[i]).flatten(), tensor_to_np(audio2[i]).flatten(), DataProperties.get('sample_rate'), extended=extended) ) return scores @torch.no_grad() def build_ls_dataset(pipelines: dict): """ Build LibriSpeech evaluation dataset on disk holding: * query audio * query embeddings * conditioning embeddings * enrolled embeddings * ground-truth query transcripts """ # locate dataset data_dir = LIBRISPEECH_DATA_DIR / 'train-clean-360' cache_dir = CACHE_DIR / 'ls_wer_eval' ensure_dir(cache_dir) assert os.path.isdir(data_dir), \ f'LibriSpeech `train-clean-360` subset required for evaluation' spkr_dirs = list(data_dir.glob("*/")) spkr_dirs = [s_d for s_d in spkr_dirs if os.path.isdir(s_d)] # catalog audio and load transcripts for spkr_dir in tqdm(spkr_dirs, total=len(spkr_dirs), desc='Building dataset'): # identify speaker spkr_id = spkr_dir.parts[-1] # check whether cached data exists for speaker spkr_cache_dir = cache_dir / spkr_id if os.path.isdir(spkr_cache_dir): continue # each recording session has a separate subdirectory rec_dirs = list(spkr_dir.glob("*/")) rec_dirs = [r_d for r_d in rec_dirs if os.path.isdir(r_d)] # for each speaker, process & store necessary (non-adversarial) data all_audio = [] all_transcripts = [] # for each recording session, extract all audio files and transcripts for rec_dir in rec_dirs: rec_id = rec_dir.parts[-1] trans_fn = rec_dir / f"{spkr_id}-{rec_id}.trans.txt" # open transcript file with open(trans_fn, "r") as f: trans_idx = f.readlines() if len(trans_idx) == 0: print(f"Error: empty transcript {trans_fn}") continue for line in trans_idx: split_line = line.strip().split(" ") audio_fn = rec_dir / f'{split_line[0]}.{LIBRISPEECH_EXT}' transcript = " ".join(split_line[1:]).replace(" ", "|") x, _ = li.load(audio_fn, mono=True, sr=16000) all_audio.append(torch.as_tensor(x).reshape(1, 1, -1).float()) all_transcripts.append(transcript) # shuffle audio and transcripts in same random order all_audio, all_transcripts = shuffle(all_audio, all_transcripts) # divide audio and transcripts query_audio = all_audio[:N_QUERY] query_transcripts = all_transcripts[:N_QUERY] condition_audio = all_audio[N_QUERY:N_QUERY+N_CONDITION] enroll_audio = all_audio[N_QUERY+N_CONDITION:][:N_ENROLL] # check for sufficient audio in each category if len(query_audio) < N_QUERY: print(f"Error: insufficient query audio for speaker {spkr_id}") continue elif len(condition_audio) < N_CONDITION: print(f"Error: insufficient conditioning audio for speaker {spkr_id}") continue elif len(enroll_audio) < N_ENROLL: print(f"Error: insufficient enrollment audio for speaker {spkr_id}") continue # compute and save embeddings for p_name, p in pipelines.items(): # compute and save query embeddings query_emb = compute_embeddings_batch(query_audio, p) f_query = spkr_cache_dir / p_name / 'query_emb.pt' ensure_dir_for_filename(f_query) # compute and save conditioning embeddings condition_emb = compute_embeddings_batch(condition_audio, p) f_condition = spkr_cache_dir / p_name / 'condition_emb.pt' ensure_dir_for_filename(f_condition) # compute and save enrolled embeddings enroll_emb = compute_embeddings_batch(enroll_audio, p) f_enroll = spkr_cache_dir / p_name / 'enroll_emb.pt' ensure_dir_for_filename(f_enroll) torch.save(query_emb, f_query) torch.save(condition_emb, f_condition) torch.save(enroll_emb, f_enroll) # save query audio f_audio = spkr_cache_dir / 'query_audio.pt' torch.save(query_audio, f_audio) # save query transcripts f_transcript = spkr_cache_dir / 'query_trans.pt' torch.save(query_transcripts, f_transcript) @torch.no_grad() def build_vc_dataset(pipelines: dict): """ Build VoxCeleb evaluation dataset on disk holding: * query audio * query embeddings * conditioning embeddings * enrolled embeddings """ # locate dataset data_dir = VOXCELEB1_DATA_DIR / 'voxceleb1' cache_dir = CACHE_DIR / 'vc_wer_eval' ensure_dir(cache_dir) assert os.path.isdir(data_dir), \ f'VoxCeleb1 dataset required for evaluation' spkr_dirs = list(data_dir.glob("*/")) spkr_dirs = [s_d for s_d in spkr_dirs if os.path.isdir(s_d)] # catalog audio for spkr_dir in tqdm(spkr_dirs, total=len(spkr_dirs), desc='Building dataset'): # identify speaker spkr_id = spkr_dir.parts[-1] # check whether cached data exists for speaker spkr_cache_dir = cache_dir / spkr_id if os.path.isdir(spkr_cache_dir): continue # each recording session has a separate subdirectory rec_dirs = list(spkr_dir.glob("*/")) rec_dirs = [r_d for r_d in rec_dirs if os.path.isdir(r_d)] # for each speaker, process & store necessary (non-adversarial) data all_audio = [] # for each recording session, extract all audio files and transcripts for rec_dir in rec_dirs: for audio_fn in rec_dir.glob(f"*.{VOXCELEB1_EXT}"): x, _ = li.load(audio_fn, mono=True, sr=16000) all_audio.append(torch.as_tensor(x).reshape(1, 1, -1).float()) # shuffle audio in random order all_audio = shuffle(all_audio) # divide audio and transcripts query_audio = all_audio[:N_QUERY] condition_audio = all_audio[N_QUERY:N_QUERY+N_CONDITION] enroll_audio = all_audio[N_QUERY+N_CONDITION:][:N_ENROLL] # check for sufficient audio in each category if len(query_audio) < N_QUERY: print(f"Error: insufficient query audio for speaker {spkr_id}") continue elif len(condition_audio) < N_CONDITION: print(f"Error: insufficient conditioning audio for speaker {spkr_id}") continue elif len(enroll_audio) < N_ENROLL: print(f"Error: insufficient enrollment audio for speaker {spkr_id}") continue # compute and save embeddings for p_name, p in pipelines.items(): # compute and save query embeddings query_emb = compute_embeddings_batch(query_audio, p) f_query = spkr_cache_dir / p_name / 'query_emb.pt' ensure_dir_for_filename(f_query) # compute and save conditioning embeddings condition_emb = compute_embeddings_batch(condition_audio, p) f_condition = spkr_cache_dir / p_name / 'condition_emb.pt' ensure_dir_for_filename(f_condition) # compute and save enrolled embeddings enroll_emb = compute_embeddings_batch(enroll_audio, p) f_enroll = spkr_cache_dir / p_name / 'enroll_emb.pt' ensure_dir_for_filename(f_enroll) torch.save(query_emb, f_query) torch.save(condition_emb, f_condition) torch.save(enroll_emb, f_enroll) # save query audio f_audio = spkr_cache_dir / 'query_audio.pt' torch.save(query_audio, f_audio) @torch.no_grad() def asr_metrics(true: list, hypothesis: list, batch_size: int = 5): """ Compute word and character error rates between two lists of corresponding transcripts """ assert len(true) == len(hypothesis) n_batches = math.ceil(len(true) / batch_size) transform_wer = jiwer.Compose([ jiwer.ToLowerCase(), jiwer.RemoveWhiteSpace(replace_by_space=True), jiwer.RemoveMultipleSpaces(), jiwer.ReduceToSingleSentence(word_delimiter="|"), jiwer.ReduceToListOfListOfWords(word_delimiter="|"), ]) wer_score = 0.0 cer_score = 0.0 wer_n = 0 cer_n = 0 for i in range(n_batches): batch_true = true[i*batch_size:(i+1)*batch_size] batch_hypothesis = hypothesis[i*batch_size:(i+1)*batch_size] wer_n_batch = builtins.sum([len(s.split('|')) for s in batch_true]) cer_n_batch = builtins.sum([len(s) for s in batch_true]) attack_cer = cer(batch_true, batch_hypothesis) attack_wer = wer(batch_true, batch_hypothesis, truth_transform=transform_wer, hypothesis_transform=transform_wer) wer_score += wer_n_batch*attack_wer cer_score += cer_n_batch*attack_cer wer_n += wer_n_batch cer_n += cer_n_batch wer_score /= wer_n cer_score /= cer_n return wer_score, cer_score @torch.no_grad() def top_k(query: dict, enrolled: dict, k: int): """ Compute portion of queries for which 'correct' ID appears in k-closest enrolled entries """ # concatenate query embeddings into single tensor query_array = [] query_ids = [] for s_l in query.keys(): query_array.append(query[s_l]) query_ids.extend([s_l] * len(query[s_l])) query_array = torch.cat(query_array, dim=0).squeeze().cpu().numpy() query_ids = torch.as_tensor(query_ids).cpu().numpy() # concatenate enrolled embeddings into single tensor enrolled_array = [] enrolled_ids = [] for s_l in enrolled.keys(): enrolled_array.append(enrolled[s_l]) enrolled_ids.extend([s_l] * len(enrolled[s_l])) enrolled_array = torch.cat(enrolled_array, dim=0).squeeze().cpu().numpy() enrolled_ids = torch.as_tensor(enrolled_ids).cpu().numpy() # embedding dimension assert query_array.shape[-1] == enrolled_array.shape[-1] d = query_array.shape[-1] # index enrolled embeddings knn = NearestNeighbors(n_neighbors=k, metric="cosine").fit(enrolled_array) # `I` is a (n_queries, k) array holding the indices of the k-closest enrolled # embeddings for each query; `D` is a (n_queries, k) array holding the corresponding # embedding-space distances D, I = knn.kneighbors(query_array, k, return_distance=True) # for each row, see if at least one of the k nearest enrolled indices maps # to a speaker ID that matches the query index's speaker id targets = np.tile(query_ids.reshape(-1, 1), (1, k)) predictions = enrolled_ids[I] matches = (targets == predictions).sum(axis=-1) > 0 return np.mean(matches) def init_attacks(): """ Initialize pre-trained speaker recognition pipelines and de-identification attacks """ # channel simulation if SIMULATION: sim = [ Offset(length=[-.15, .15]), Noise(type='gaussian', snr=[30.0, 50.0]), Bandpass(low=[300, 500], high=[3400, 7400]), Dropout(rate=0.001) ] else: sim = None pipelines = {} model_resnet = SpeakerVerificationModel( model=ResNetSE34V2(nOut=512, encoder_type='ASP'), n_segments=1, segment_select='lin', distance_fn='cosine', threshold=0.0 ) model_resnet.load_weights( MODELS_DIR / 'speaker' / 'resnetse34v2' / 'resnetse34v2.pt') model_yvector = SpeakerVerificationModel( model=YVector(), n_segments=1, segment_select='lin', distance_fn='cosine', threshold=0.0 ) model_yvector.load_weights( MODELS_DIR / 'speaker' / 'yvector' / 'yvector.pt') pipelines['resnet'] = Pipeline( simulation=sim, preprocessor=Preprocessor(Normalize(method='peak')), model=model_resnet, device='cuda' if torch.cuda.is_available() else 'cpu' ) if TRANSFER: pipelines['yvector'] = Pipeline( simulation=sim, preprocessor=Preprocessor(Normalize(method='peak')), model=model_yvector, device='cuda' if torch.cuda.is_available() else 'cpu' ) else: del model_yvector # prepare to log attack progress writer = Writer( root_dir=RUNS_DIR, name='evaluate-attacks', use_timestamp=True, log_iter=300, use_tb=True ) attacks = {} # use consistent adversarial loss adv_loss = SpeakerEmbeddingLoss( targeted=False, confidence=0.1, threshold=0.0 ) # use consistent auxiliary loss across attacks aux_loss = SumLoss().add_loss_function( DemucsMRSTFTLoss(), 1.0 ).add_loss_function(L1Loss(), 1.0).to('cuda') attacks['voicebox'] = VoiceBoxAttack( pipeline=pipelines['resnet'], adv_loss=adv_loss, aux_loss=aux_loss, lr=1e-4, epochs=1, batch_size=BATCH_SIZE, voicebox_kwargs={ 'win_length': 256, 'ppg_encoder_hidden_size': 256, 'use_phoneme_encoder': True, 'use_pitch_encoder': True, 'use_loudness_encoder': True, 'spec_encoder_lookahead_frames': 0, 'spec_encoder_type': 'mel', 'spec_encoder_mlp_depth': 2, 'bottleneck_lookahead_frames': LOOKAHEAD, 'ppg_encoder_path': PPG_PRETRAINED_PATH, 'n_bands': 128, 'spec_encoder_hidden_size': 512, 'bottleneck_skip': True, 'bottleneck_hidden_size': 512, 'bottleneck_feedforward_size': 512, 'bottleneck_type': 'lstm', 'bottleneck_depth': 2, 'control_eps': 0.5, 'projection_norm': float('inf'), 'conditioning_dim': 512 }, writer=writer, checkpoint_name='voicebox-attack' ) attacks['voicebox'].load(VOICEBOX_PATH) attacks['universal'] = AdvPulseAttack( pipeline=pipelines['resnet'], adv_loss=adv_loss, pgd_norm=float('inf'), pgd_variant=None, scale_grad=None, eps=0.08, length=2.0, align='start', lr=1e-4, normalize=True, loop=True, aux_loss=aux_loss, epochs=1, batch_size=BATCH_SIZE, writer=writer, checkpoint_name='universal-attack' ) attacks['universal'].load(UNIVERSAL_PATH) attacks['kenansville'] = KenansvilleAttack( pipeline=pipelines['resnet'], batch_size=BATCH_SIZE, adv_loss=adv_loss, threshold_db_low=4.0, # fix threshold threshold_db_high=4.0, win_length=512, writer=writer, step_size=1.0, search='bisection', min_success_rate=0.2, checkpoint_name='kenansville-attack' ) attacks['noise'] = WhiteNoiseAttack( pipeline=pipelines['resnet'], adv_loss=adv_loss, aux_loss=aux_loss, snr_low=-10.0, # fix threshold snr_high=-10.0, writer=writer, step_size=1, search='bisection', min_success_rate=0.2, checkpoint_name='noise-perturbation' ) return attacks, pipelines, writer @torch.no_grad() def evaluate_attack(attack: TrainableAttack, speaker_pipeline: Pipeline, asr_pipeline: Pipeline): if DENOISER: from src.models.denoiser.demucs import load_demucs defense = load_demucs('dns_48').to( 'cuda' if torch.cuda.is_available() else 'cpu') defense.eval() else: defense = nn.Identity() # prepare for GPU inference if torch.cuda.is_available(): attack.pipeline.set_device('cuda') speaker_pipeline.set_device('cuda') asr_pipeline.set_device('cuda') attack.perturbation.to('cuda') # locate dataset if EVAL_DATASET == "librispeech": cache_dir = CACHE_DIR / 'ls_wer_eval' else: cache_dir = CACHE_DIR / 'vc_wer_eval' assert os.path.isdir(cache_dir), \ f'Dataset must be built/cached before evaluation' # prepare for PESQ/STOI calculations all_pesq_scores = [] all_stoi_scores = [] # prepare for WER/CER computations all_query_transcripts = [] all_pred_query_transcripts = [] all_adv_query_transcripts = [] # prepare for accuracy computations all_query_emb = {} all_adv_query_emb = {} all_enroll_emb = {} all_enroll_emb_centroid = {} spkr_dirs = list(cache_dir.glob("*/")) spkr_dirs = [s_d for s_d in spkr_dirs if os.path.isdir(s_d)] for spkr_dir in tqdm(spkr_dirs, total=len(spkr_dirs), desc='Running evaluation'): # identify speaker spkr_id = spkr_dir.parts[-1] # use integer IDs if EVAL_DATASET != "librispeech": spkr_id = spkr_id.split("id")[-1] # identify speaker recognition model if isinstance(speaker_pipeline.model.model, ResNetSE34V2): model_name = 'resnet' elif isinstance(speaker_pipeline.model.model, YVector): model_name = 'yvector' else: raise ValueError(f'Invalid speaker recognition model') # load clean embeddings query_emb = torch.load(spkr_dir / model_name / 'query_emb.pt') condition_emb = torch.load(spkr_dir / 'resnet' / 'condition_emb.pt') enroll_emb = torch.load(spkr_dir / model_name / 'enroll_emb.pt') # load clean audio query_audio = torch.load(spkr_dir / 'query_audio.pt') # if defense in use, re-compute query audio if DENOISER: query_emb = compute_embeddings_batch( query_audio, speaker_pipeline, defense=defense ) # load clean transcript if EVAL_DATASET == "librispeech": query_transcripts = torch.load(spkr_dir / 'query_trans.pt') else: query_transcripts = None # compute conditioning embedding centroid condition_centroid = condition_emb.mean(dim=(0, 1), keepdim=True) # compute enrolled embedding centroid enroll_centroid = enroll_emb.mean(dim=(0, 1), keepdim=True) # compute adversarial query audio adv_query_audio = compute_attack_batch( query_audio, attack, condition_centroid) # compute adversarial query embeddings; optionally, pass through # unseen denoiser defense adv_query_emb = compute_embeddings_batch( adv_query_audio, speaker_pipeline, defense=defense ) if EVAL_DATASET == "librispeech": # compute clean predicted transcripts pred_query_transcripts = compute_transcripts_batch( query_audio, asr_pipeline ) # compute adversarial transcripts adv_query_transcripts = compute_transcripts_batch( adv_query_audio, asr_pipeline ) # compute objective quality metric scores if COMPUTE_OBJECTIVE_METRICS: pesq_scores = compute_pesq(query_audio, adv_query_audio) stoi_scores = compute_stoi(query_audio, adv_query_audio) else: pesq_scores = np.zeros(len(query_audio)) stoi_scores = np.zeros(len(query_audio)) # store all objective quality metric scores all_pesq_scores.extend(pesq_scores) all_stoi_scores.extend(stoi_scores) # store all unit-normalized clean, adversarial, and enrolled centroid # embeddings all_query_emb[int(spkr_id)] = F.normalize(query_emb.clone(), dim=-1) all_adv_query_emb[int(spkr_id)] = F.normalize(adv_query_emb.clone(), dim=-1) all_enroll_emb[int(spkr_id)] = F.normalize(enroll_emb.clone(), dim=-1) all_enroll_emb_centroid[int(spkr_id)] = F.normalize(enroll_centroid.clone(), dim=-1) # store all transcripts if EVAL_DATASET == "librispeech": all_query_transcripts.extend(query_transcripts) all_pred_query_transcripts.extend(pred_query_transcripts) all_adv_query_transcripts.extend(adv_query_transcripts) # free GPU memory for similarity search attack.pipeline.set_device('cpu') speaker_pipeline.set_device('cpu') asr_pipeline.set_device('cpu') attack.perturbation.to('cpu') torch.cuda.empty_cache() # compute and display final objective quality metrics print(f"PESQ (mean/std): {np.mean(all_pesq_scores)}/{np.std(all_pesq_scores)}") print(f"STOI (mean/std): {np.mean(all_stoi_scores)}/{np.std(all_stoi_scores)}") if EVAL_DATASET == "librispeech": # compute and display final WER/CER metrics wer, cer = asr_metrics(all_query_transcripts, all_adv_query_transcripts) print(f"Adversarial WER / CER: {wer} / {cer}") wer, cer = asr_metrics(all_query_transcripts, all_pred_query_transcripts) print(f"Clean WER / CER: {wer} / {cer}") else: wer, cer = None, None del (wer, cer, all_pesq_scores, all_stoi_scores, all_query_transcripts, all_adv_query_transcripts, all_pred_query_transcripts) # embedding-space cosine distance calculations cos_dist_fn = EmbeddingDistance(distance_fn='cosine') # mean clean-to-adversarial query embedding distance total_query_dist = 0.0 n = 0 for spkr_id in all_query_emb.keys(): dist = cos_dist_fn(all_query_emb[spkr_id], all_adv_query_emb[spkr_id]).mean() total_query_dist += len(all_query_emb[spkr_id]) * dist.item() n += len(all_query_emb[spkr_id]) mean_query_dist = total_query_dist / n print(f"\n\t\tMean cosine distance between clean and adversarial query " f"embeddings: {mean_query_dist :0.4f}") # mean adversarial-query-to-enrolled-centroid embedding distance total_centroid_dist = 0.0 n = 0 for spkr_id in all_query_emb.keys(): n_queries = len(all_adv_query_emb[spkr_id]) dist = 0.0 for i in range(n_queries): dist += cos_dist_fn(all_enroll_emb_centroid[spkr_id], all_adv_query_emb[spkr_id][i:i+1]).item() total_centroid_dist += dist n += n_queries mean_centroid_dist = total_centroid_dist / n print(f"\t\tMean cosine distance between clean enrolled centroids and " f"adversarial query embeddings: {mean_centroid_dist :0.4f}") # top-1 accuracy for clean queries (closest embedding) top_1_clean_single = top_k(all_query_emb, all_enroll_emb, k=1) # top-1 accuracy for clean queries (centroid embedding) top_1_clean_centroid = top_k(all_query_emb, all_enroll_emb_centroid, k=1) # top-10 accuracy for clean queries (closest embedding) top_10_clean_single = top_k(all_query_emb, all_enroll_emb, k=10) # top-10 accuracy for clean queries (centroid embedding) top_10_clean_centroid = top_k(all_query_emb, all_enroll_emb_centroid, k=10) # top-1 accuracy for adversarial queries (closest embedding) top_1_adv_single = top_k(all_adv_query_emb, all_enroll_emb, k=1) # top-1 accuracy for adversarial queries (centroid embedding) top_1_adv_centroid = top_k(all_adv_query_emb, all_enroll_emb_centroid, k=1) # top-10 accuracy for adversarial queries (closest embedding) top_10_adv_single = top_k(all_adv_query_emb, all_enroll_emb, k=10) # top-10 accuracy for adversarial queries (centroid embedding) top_10_adv_centroid = top_k(all_adv_query_emb, all_enroll_emb_centroid, k=10) print(f"\n\t\tTop-1 accuracy (clean embedding / nearest enrolled embedding) {top_1_clean_single :0.4f}", f"\n\t\tTop-1 accuracy (clean embedding / nearest enrolled centroid) {top_1_clean_centroid :0.4f}", f"\n\t\tTop-10 accuracy (clean embedding / nearest enrolled embedding) {top_10_clean_single :0.4f}" f"\n\t\tTop-10 accuracy (clean embedding / nearest enrolled centroid) {top_10_clean_centroid :0.4f}", f"\n\t\tTop-1 accuracy (adversarial embedding / nearest enrolled embedding {top_1_adv_single :0.4f}", f"\n\t\tTop-1 accuracy (adversarial embedding / nearest enrolled centroid) {top_1_adv_centroid :0.4f}", f"\n\t\tTop-10 accuracy (adversarial embedding / nearest enrolled embedding {top_10_adv_single :0.4f}", f"\n\t\tTop-10 accuracy (adversarial embedding / nearest enrolled centroid) {top_10_adv_centroid :0.4f}" ) @torch.no_grad() def evaluate_attacks(attacks: dict, speaker_pipelines: dict, asr_pipeline: Pipeline): for attack_name, attack in attacks.items(): for sp_name, sp in speaker_pipelines.items(): print(f'Evaluating {attack_name} against model {sp_name} ' f'{"with" if DENOISER else "without"} denoiser defense') evaluate_attack(attack, sp, asr_pipeline) def main(): # initial random seed (keep dataset order consistent) set_random_seed(0) # initialize pipelines attacks, pipelines, writer = init_attacks() # ensure that necessary data is cached if EVAL_DATASET == "librispeech": build_ls_dataset(pipelines) else: build_vc_dataset(pipelines) # initialize ASR model asr_model = SpeechRecognitionModel( model=Wav2Vec2(), ) asr_pipeline = Pipeline( model=asr_model, preprocessor=Preprocessor(Normalize(method='peak')), device='cuda' if torch.cuda.is_available() else 'cpu' ) writer.log_cuda_memory() evaluate_attacks(attacks, pipelines, asr_pipeline) if __name__ == "__main__": main()