| | import torch |
| | import numpy as np |
| | import torch.nn.functional as F |
| | from deeprobust.graph.defense import RGCN |
| | from deeprobust.graph.utils import * |
| | from deeprobust.graph.data import Dataset, PrePtbDataset |
| | import argparse |
| | import os |
| | import csv |
| |
|
| | parser = argparse.ArgumentParser() |
| | parser.add_argument('--seed', type=int, default=15, help='Random seed.') |
| | parser.add_argument('--dataset', type=str, default='Flickr', choices=['cora', 'cora_ml', 'citeseer', 'polblogs', 'pubmed', 'Flickr'], help='dataset') |
| | parser.add_argument('--ptb_rate', type=float, default=0.25, help='pertubation rate') |
| | parser.add_argument('--ptb_type', type=str, default='minmax', choices=['clean', 'meta', 'dice', 'minmax', 'pgd', 'random'], help='attack type') |
| | parser.add_argument('--hidden', type=int, default=16, help='Number of hidden units.') |
| | parser.add_argument('--dropout', type=float, default=0.5, help='Dropout rate (1 - keep probability).') |
| | parser.add_argument('--gpu', type=int, default=0, help='GPU device ID (default: 0)') |
| |
|
| |
|
| | args = parser.parse_args() |
| | args.cuda = torch.cuda.is_available() |
| | print('cuda: %s' % args.cuda) |
| | device = torch.device(f"cuda:{args.gpu}" if torch.cuda.is_available() else "cpu") |
| |
|
| | |
| | np.random.seed(args.seed) |
| | if args.cuda: |
| | torch.cuda.manual_seed(args.seed) |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | data = Dataset(root='/tmp/', name=args.dataset) |
| | adj, features, labels = data.adj, data.features, data.labels |
| | idx_train, idx_val, idx_test = data.idx_train, data.idx_val, data.idx_test |
| |
|
| | |
| |
|
| |
|
| | |
| | ptb_path = f"../attacked_adj/{args.dataset}/{args.ptb_type}_{args.dataset}_{args.ptb_rate}.pt" |
| | perturbed_adj = torch.load(ptb_path) |
| | perturbed_adj = perturbed_adj |
| |
|
| | def test_rgcn(adj): |
| | ''' test on GCN ''' |
| |
|
| | |
| | gcn = RGCN(nnodes=adj.shape[0], nfeat=features.shape[1], nclass=labels.max()+1, |
| | nhid=args.hidden, device=device) |
| | gcn = gcn.to(device) |
| | gcn.fit(features, adj, labels, idx_train, idx_val, train_iters=200, verbose=True) |
| | gcn.eval() |
| | acc = gcn.test(idx_test) |
| | return acc |
| |
|
| |
|
| | def main(): |
| | |
| | acc = test_rgcn(perturbed_adj) |
| | |
| | csv_dir = "../result" |
| | os.makedirs(csv_dir, exist_ok=True) |
| |
|
| | csv_filename = os.path.join(csv_dir, f"RGCN_{args.dataset}_{args.ptb_type}_{args.ptb_rate}.csv") |
| | row = [f"{args.dataset} ", f" {args.ptb_type} ", f" {args.ptb_rate} ", f" {acc}"] |
| |
|
| | try: |
| | file_exists = os.path.isfile(csv_filename) |
| | with open(csv_filename, 'a', newline='') as csvfile: |
| | writer = csv.writer(csvfile) |
| | if not file_exists: |
| | writer.writerow(["dataset ", "ptb_type ", "ptb_rate ", "accuracy"]) |
| | writer.writerow(row) |
| | except Exception as e: |
| | print(f"[Error] Failed to write CSV: {e}") |
| | |
| | |
| | |
| |
|
| | if __name__ == '__main__': |
| | main() |