Spaces:
Runtime error
Runtime error
| import os | |
| import torch | |
| import gradio as gr | |
| import torchvision | |
| from utils import * | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torch.optim as optim | |
| from huggingface_hub import Repository, upload_file | |
| n_epochs = 3 | |
| batch_size_train = 64 | |
| batch_size_test = 1000 | |
| learning_rate = 0.01 | |
| momentum = 0.5 | |
| log_interval = 10 | |
| random_seed = 1 | |
| REPOSITORY_DIR = "data" | |
| LOCAL_DIR = 'data_local' | |
| os.makedirs(LOCAL_DIR,exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| HF_DATASET ="mnist-adversarial-dataset" | |
| torch.backends.cudnn.enabled = False | |
| torch.manual_seed(random_seed) | |
| train_loader = torch.utils.data.DataLoader( | |
| torchvision.datasets.MNIST('files/', train=True, download=True, | |
| transform=torchvision.transforms.Compose([ | |
| torchvision.transforms.ToTensor(), | |
| torchvision.transforms.Normalize( | |
| (0.1307,), (0.3081,)) | |
| ])), | |
| batch_size=batch_size_train, shuffle=True) | |
| test_loader = torch.utils.data.DataLoader( | |
| torchvision.datasets.MNIST('files/', train=False, download=True, | |
| transform=torchvision.transforms.Compose([ | |
| torchvision.transforms.ToTensor(), | |
| torchvision.transforms.Normalize( | |
| (0.1307,), (0.3081,)) | |
| ])), | |
| batch_size=batch_size_test, shuffle=True) | |
| # Source: https://nextjournal.com/gkoehler/pytorch-mnist | |
| class MNIST_Model(nn.Module): | |
| def __init__(self): | |
| super(MNIST_Model, self).__init__() | |
| self.conv1 = nn.Conv2d(1, 10, kernel_size=5) | |
| self.conv2 = nn.Conv2d(10, 20, kernel_size=5) | |
| self.conv2_drop = nn.Dropout2d() | |
| self.fc1 = nn.Linear(320, 50) | |
| self.fc2 = nn.Linear(50, 10) | |
| def forward(self, x): | |
| x = F.relu(F.max_pool2d(self.conv1(x), 2)) | |
| x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) | |
| x = x.view(-1, 320) | |
| x = F.relu(self.fc1(x)) | |
| x = F.dropout(x, training=self.training) | |
| x = self.fc2(x) | |
| return F.log_softmax(x) | |
| def train(epochs,network,optimizer): | |
| train_losses=[] | |
| network.train() | |
| for epoch in range(epochs): | |
| for batch_idx, (data, target) in enumerate(train_loader): | |
| optimizer.zero_grad() | |
| output = network(data) | |
| loss = F.nll_loss(output, target) | |
| loss.backward() | |
| optimizer.step() | |
| if batch_idx % log_interval == 0: | |
| print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( | |
| epoch, batch_idx * len(data), len(train_loader.dataset), | |
| 100. * batch_idx / len(train_loader), loss.item())) | |
| train_losses.append(loss.item()) | |
| torch.save(network.state_dict(), 'model.pth') | |
| torch.save(optimizer.state_dict(), 'optimizer.pth') | |
| def test(): | |
| test_losses=[] | |
| network.eval() | |
| test_loss = 0 | |
| correct = 0 | |
| with torch.no_grad(): | |
| for data, target in test_loader: | |
| output = network(data) | |
| test_loss += F.nll_loss(output, target, size_average=False).item() | |
| pred = output.data.max(1, keepdim=True)[1] | |
| correct += pred.eq(target.data.view_as(pred)).sum() | |
| test_loss /= len(test_loader.dataset) | |
| test_losses.append(test_loss) | |
| print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( | |
| test_loss, correct, len(test_loader.dataset), | |
| 100. * correct / len(test_loader.dataset))) | |
| random_seed = 1 | |
| torch.backends.cudnn.enabled = False | |
| torch.manual_seed(random_seed) | |
| network = MNIST_Model() | |
| optimizer = optim.SGD(network.parameters(), lr=learning_rate, | |
| momentum=momentum) | |
| model_state_dict = 'model.pth' | |
| optimizer_state_dict = 'optmizer.pth' | |
| if os.path.exists(model_state_dict): | |
| network_state_dict = torch.load(model_state_dict) | |
| network.load_state_dict(network_state_dict) | |
| if os.path.exists(optimizer_state_dict): | |
| optimizer_state_dict = torch.load(optimizer_state_dict) | |
| optimizer.load_state_dict(optimizer_state_dict) | |
| # Train | |
| #train(n_epochs,network,optimizer) | |
| def image_classifier(inp): | |
| """ | |
| It takes an image as input and returns a dictionary of class labels and their corresponding | |
| confidence scores. | |
| :param inp: the image to be classified | |
| :return: A dictionary of the class index and the confidence value. | |
| """ | |
| input_image = torchvision.transforms.ToTensor()(inp).unsqueeze(0) | |
| with torch.no_grad(): | |
| prediction = torch.nn.functional.softmax(network(input_image)[0], dim=0) | |
| #pred_number = prediction.data.max(1, keepdim=True)[1] | |
| sorted_prediction = torch.sort(prediction,descending=True) | |
| confidences={} | |
| for s,v in zip(sorted_prediction.indices.numpy().tolist(),sorted_prediction.values.numpy().tolist()): | |
| confidences.update({s:v}) | |
| return confidences | |
| def flag(input_image,correct_result): | |
| # take an image, the wrong result, the correct result. | |
| # push to dataset. | |
| # get size of current dataset | |
| # Write audio to file | |
| metadata_name = get_unique_name() | |
| SAVE_FILE_DIR = os.path.join(LOCAL_DIR,metadata_name) | |
| os.makedirs(SAVE_FILE_DIR,exist_ok=True) | |
| image_output_filename = os.path.join(SAVE_FILE_DIR,'image.png') | |
| try: | |
| input_image.save(image_output_filename) | |
| except Exception: | |
| raise Exception(f"Had issues saving PIL image to file") | |
| # Write metadata.json to file | |
| json_file_path = os.path.join(SAVE_FILE_DIR,'metadata.jsonl') | |
| metadata= {'id':metadata_name,'file_name':'image.png', | |
| 'correct_number':correct_result | |
| } | |
| dump_json(metadata,json_file_path) | |
| # Simply upload the audio file and metadata using the hub's upload_file | |
| # Upload the image | |
| repo_image_path = os.path.join(REPOSITORY_DIR,os.path.join(metadata_name,'image.png')) | |
| _ = upload_file(path_or_fileobj = image_output_filename, | |
| path_in_repo =repo_image_path, | |
| repo_id=f'chrisjay/{HF_DATASET}', | |
| repo_type='dataset', | |
| token=HF_TOKEN | |
| ) | |
| # Upload the metadata | |
| repo_json_path = os.path.join(REPOSITORY_DIR,os.path.join(metadata_name,'metadata.jsonl')) | |
| _ = upload_file(path_or_fileobj = json_file_path, | |
| path_in_repo =repo_json_path, | |
| repo_id=f'chrisjay/{HF_DATASET}', | |
| repo_type='dataset', | |
| token=HF_TOKEN | |
| ) | |
| output = f'<div> Successfully saved to flagged dataset. </div>' | |
| return output | |
| def main(): | |
| TITLE = "# MNIST Adversarial: Try to fool this MNIST model" | |
| description = """This project is about dynamic adversarial data collection (DADC). | |
| The basic idea is to do data collection by collecting “adversarial data”, the kind of data that is difficult for a model to predict correctly. | |
| This kind of data is presumably the most valuable for a model, so this can be helpful in low-resource settings where data is hard to collect and label. | |
| ### What to do: | |
| - Draw a number from 0-9. | |
| - Click `Submit` and see the model's prediciton. | |
| - If the model misclassifies it, Flag that example. | |
| - This will add your (adversarial) example to a dataset on which the model will be trained later. | |
| """ | |
| MODEL_IS_WRONG = """ | |
| > Did the model get it wrong? Choose the correct prediction below and flag it. | |
| When you flag it, the instance is saved to our dataset and the model is trained on it. | |
| """ | |
| #block = gr.Blocks(css=BLOCK_CSS) | |
| block = gr.Blocks() | |
| with block: | |
| gr.Markdown(TITLE) | |
| with gr.Tabs(): | |
| gr.Markdown(description) | |
| with gr.TabItem('MNIST'): | |
| with gr.Row(): | |
| image_input =gr.inputs.Image(source="canvas",shape=(28,28),invert_colors=True,image_mode="L",type="pil") | |
| label_output = gr.outputs.Label(num_top_classes=10) | |
| submit = gr.Button("Submit") | |
| gr.Markdown(MODEL_IS_WRONG) | |
| number_dropdown = gr.Dropdown(choices=[i for i in range(10)],type='value',default=None,label="What was the correct prediction?") | |
| flag_btn = gr.Button("Flag") | |
| output_result = gr.outputs.HTML() | |
| submit.click(image_classifier,inputs = [image_input],outputs=[label_output]) | |
| flag_btn.click(flag,inputs=[image_input,number_dropdown],outputs=[output_result]) | |
| block.launch() | |
| if __name__ == "__main__": | |
| main() |