| import os
|
| import requests
|
| import torch
|
| from typing import Optional
|
|
|
| from fastapi import FastAPI, Header, HTTPException, BackgroundTasks
|
| import asyncio
|
| from fastapi.responses import FileResponse
|
| from huggingface_hub.hf_api import HfApi
|
| from huggingface_hub import login
|
| import huggingface_hub as hf
|
|
|
| from .models import config, WebhookPayload
|
|
|
|
|
| import pandas as pd
|
| import numpy as np
|
| import random
|
|
|
| from sklearn.preprocessing import LabelEncoder
|
| from transformers import TrainingArguments, Trainer
|
|
|
| import datasets
|
| import evaluate
|
| import accuracy
|
|
|
| import time
|
|
|
| from transformers import AutoModelForSequenceClassification, AutoTokenizer
|
|
|
| import subprocess
|
|
|
| from datetime import datetime
|
|
|
| WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
|
| HF_ACCESS_TOKEN = os.getenv("HF_ACCESS_TOKEN")
|
|
|
|
|
| REPOSITORY_PATH = "staging/brand-classifier-stage"
|
| INPUT_DIR = config.input_model
|
|
|
| OUTPUT_DIR = config.input_model
|
| ENDPOINT_ID = "magellan-ai/magellan-brand-recent"
|
| DATASET_PATH = config.input_dataset
|
|
|
| print('logging in')
|
| login(token=HF_ACCESS_TOKEN)
|
| print('logged in')
|
|
|
| print(f"Is CUDA available: {torch.cuda.is_available()}")
|
|
|
| print(
|
| f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}")
|
|
|
|
|
| print('setting device')
|
| device = torch.device("cuda")
|
| print(device)
|
| print('set device')
|
|
|
| print('setting up Repository')
|
|
|
| repo = hf.Repository(local_dir=REPOSITORY_PATH,
|
| clone_from=INPUT_DIR,
|
| token=HF_ACCESS_TOKEN)
|
| repo.git_pull()
|
|
|
| model_nm = "gpt2"
|
|
|
| max_concurrent_requests = 1
|
| request_queue = asyncio.Queue(maxsize=max_concurrent_requests)
|
|
|
| app = FastAPI()
|
|
|
|
|
| @app.get("/")
|
| async def home():
|
|
|
| print('printing nvidia smi')
|
| print(subprocess.run(["nvidia-smi"], capture_output=True).stdout.decode())
|
|
|
| return FileResponse("home.html")
|
|
|
|
|
| @app.post("/webhook")
|
| async def post_webhook(
|
| payload: WebhookPayload,
|
| task_queue: BackgroundTasks,
|
| repo=repo,
|
| REPOSITORY_PATH: str = REPOSITORY_PATH,
|
| OUTPUT_DIR: str = OUTPUT_DIR,
|
| x_webhook_secret: Optional[str] = Header(default=None),
|
| ):
|
|
|
| try:
|
| print('received webhook:')
|
|
|
|
|
| await asyncio.wait_for(request_queue.put(True), timeout=1)
|
|
|
| if x_webhook_secret is None:
|
| raise HTTPException(401)
|
| if x_webhook_secret != WEBHOOK_SECRET:
|
| raise HTTPException(403)
|
| if not (
|
| (payload.event.action == "update" or payload.event.action == "create")
|
| and (payload.event.scope.startswith("repo.content") or payload.event.scope.startswith("repo"))
|
|
|
| and payload.repo.type == "dataset"
|
| ):
|
|
|
| print('webhook oh no')
|
| print(payload.event.action)
|
| print(payload.event.scope)
|
| print(payload.repo.name)
|
| print(payload.repo.type)
|
|
|
| raise HTTPException(400)
|
|
|
| return {"processed": False}
|
|
|
| print('scheduling retrain')
|
| task_queue.add_task(
|
| schedule_retrain,
|
| payload,
|
| repo,
|
| REPOSITORY_PATH,
|
| OUTPUT_DIR,
|
| HF_ACCESS_TOKEN,
|
| )
|
|
|
| return {"message": "Webhook request accepted for processing"}
|
|
|
| except asyncio.TimeoutError:
|
| raise HTTPException(
|
| status_code=503, detail="Previous webhook is being processed. Try again later.")
|
|
|
|
|
| def schedule_retrain(payload: WebhookPayload,
|
| repo: repo,
|
| REPOSITORY_PATH: REPOSITORY_PATH,
|
| OUTPUT_DIR: OUTPUT_DIR,
|
| HF_ACCESS_TOKEN: HF_ACCESS_TOKEN
|
| ):
|
|
|
| print("loading dataset")
|
| dataset = datasets.load_dataset(DATASET_PATH)
|
| dataset, le = load_data(dataset)
|
|
|
| label2id = {str(label): int(id)
|
| for label, id in zip(le.classes_, le.transform(le.classes_))}
|
| id2label = {int(id): str(label) for label, id in label2id.items()}
|
| print("loaded dataset")
|
|
|
| print("tokenizing dataset")
|
| tokenizer, tok_ds = tokenizer_func(dataset)
|
| print("tokenized dataset")
|
|
|
| print("splitting dataset")
|
| tok_train_ds = tok_ds.train_test_split(test_size=0.2, seed=42)
|
| tok_val_ds = tok_train_ds['test']
|
| tok_train_ds = tok_train_ds['train']
|
| print("split dataset")
|
|
|
| bs = 16
|
| RESUME_LAST = False
|
| if RESUME_LAST:
|
| epochs = 2
|
| else:
|
| epochs = 8
|
| lr = 8e-5
|
|
|
| torch.backends.cuda.matmul.allow_tf32 = True
|
|
|
| print("setting arguments")
|
| args = TrainingArguments(REPOSITORY_PATH,
|
| learning_rate=lr,
|
| warmup_ratio=0.1,
|
| lr_scheduler_type='cosine',
|
| tf32=True,
|
| evaluation_strategy="epoch",
|
| per_device_train_batch_size=bs,
|
| per_device_eval_batch_size=bs*2,
|
| num_train_epochs=epochs,
|
| weight_decay=0.01,
|
| report_to='none',
|
| gradient_accumulation_steps=2,
|
| save_strategy="epoch",
|
| gradient_checkpointing=True,
|
| optim="adafactor",
|
| log_level="debug",)
|
|
|
|
|
|
|
| print("setting model")
|
| model = AutoModelForSequenceClassification.from_pretrained(model_nm, num_labels=len(np.unique(tok_ds['labels'])), ignore_mismatched_sizes=True,
|
| id2label=id2label, label2id=label2id)
|
| model.resize_token_embeddings(len(tokenizer))
|
| model.config.pad_token_id = model.config.eos_token_id
|
| trainer = Trainer(model, args, train_dataset=tok_train_ds.shuffle(seed=42), eval_dataset=tok_val_ds.shuffle(seed=42),
|
| tokenizer=tokenizer, compute_metrics=compute_metrics)
|
|
|
| print("training model")
|
| trainer.train()
|
|
|
| print("evaluating model")
|
| eval = trainer.evaluate()
|
| print(eval)
|
| eval_accuracy = round(eval['eval_accuracy'], 2)
|
| print("evaluated model")
|
|
|
|
|
| tokenizer.save_pretrained(REPOSITORY_PATH)
|
| model.save_pretrained(REPOSITORY_PATH)
|
| trainer.save_model(REPOSITORY_PATH)
|
|
|
|
|
| with open(os.path.join(REPOSITORY_PATH, 'eval_metrics.txt'), 'w') as f:
|
| f.write(str(eval))
|
| with open(os.path.join(REPOSITORY_PATH, 'eval_accuracy.txt'), 'w') as f:
|
| f.write(str(eval_accuracy))
|
|
|
|
|
| print("pushing repository")
|
|
|
| now = datetime.now()
|
| dt_string = now.strftime("%m/%d/%Y %H:%M:%S")
|
| repo.git_add('.', auto_lfs_track=True)
|
| repo.git_commit(f're-trained {dt_string}, acc: {eval_accuracy}')
|
| repo.git_push()
|
| print("pushed repository")
|
|
|
|
|
| print("updating endpoint")
|
| hf_api = HfApi(token=HF_ACCESS_TOKEN)
|
| update_endpoint(hf_api)
|
| print("updated endpoint")
|
|
|
|
|
| print("notifying success")
|
| notify_success()
|
| print("notified success")
|
|
|
|
|
|
|
|
|
|
|
| hf_api.restart_space('magellan-ai/brand-classifier')
|
|
|
| return {"processed": True}
|
|
|
|
|
| def load_data(dataset):
|
| df = pd.DataFrame(dataset['train'])
|
|
|
|
|
| df.drop_duplicates(['simhash'], inplace=True)
|
|
|
|
|
| df.sort_values(['brand_id', 'downloaded_at'], ascending=[True, False],
|
| inplace=True)
|
|
|
|
|
| df['rank'] = df.groupby('brand_name').cumcount() + 1
|
|
|
|
|
| sampled_df = df[df['rank'] <= 100]
|
|
|
|
|
| sampled_df.drop('rank', axis=1, inplace=True)
|
|
|
|
|
| df = sampled_df
|
| brands = df['brand_name'].unique()
|
| random.shuffle(brands)
|
| df = df[df['brand_name'].isin(brands[:])]
|
|
|
|
|
| label_encoder = LabelEncoder()
|
|
|
|
|
| df['labels'] = label_encoder.fit_transform(df['brand_id'])
|
|
|
|
|
| ds = datasets.Dataset.from_pandas(df)
|
| return ds, label_encoder
|
|
|
|
|
| def tokenizer_func(ds):
|
| tokenizer = AutoTokenizer.from_pretrained(model_nm)
|
| tokenizer.add_special_tokens({'pad_token': '[PAD]'})
|
| tok_ds = ds.map(tok_func, batched=True, fn_kwargs={'tokenizer': tokenizer})
|
|
|
| return tokenizer, tok_ds
|
|
|
|
|
| def tok_func(x, tokenizer):
|
| encoding = tokenizer(x['plain_text'], max_length=1024,
|
| return_overflowing_tokens=False)
|
| return encoding
|
|
|
|
|
| def compute_metrics(eval_pred):
|
| scoring = evaluate.load("accuracy")
|
| predictions, labels = eval_pred
|
| predictions = np.argmax(predictions, axis=1)
|
| return scoring.compute(predictions=predictions, references=labels)
|
|
|
|
|
| def update_endpoint(hf_api):
|
|
|
| revisions = hf_api.list_repo_commits(OUTPUT_DIR)
|
| revision = revisions[0].commit_id
|
|
|
|
|
| headers = {
|
| 'Authorization': 'Bearer ' + HF_ACCESS_TOKEN,
|
| 'accept': 'application/json',
|
| 'content-type': 'application/x-www-form-urlencoded',
|
| }
|
| response = requests.post(
|
| f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}/pause',
|
| headers=headers,
|
| )
|
|
|
| print('pause response:', response)
|
| time.sleep(10)
|
|
|
|
|
| headers = {
|
| 'Authorization': 'Bearer ' + HF_ACCESS_TOKEN,
|
| 'accept': 'application/json',
|
| 'Content-Type': 'application/json',
|
| }
|
| json_data = {}
|
| response = requests.put(
|
| f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}',
|
| headers=headers,
|
| json=json_data,
|
| )
|
| print('update response:', response)
|
| time.sleep(10)
|
|
|
|
|
| headers = {
|
| 'Authorization': 'Bearer ' + HF_ACCESS_TOKEN,
|
| 'accept': 'application/json',
|
| 'content-type': 'application/x-www-form-urlencoded',
|
| }
|
| response = requests.post(
|
| f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}/resume',
|
| headers=headers,
|
| )
|
| print('resume response:', response)
|
|
|
|
|
| def notify_success():
|
| message = NOTIFICATION_TEMPLATE.format(
|
| input_model=config.input_model,
|
| input_dataset=DATASET_PATH,
|
| )
|
| return HfApi(token=HF_ACCESS_TOKEN).create_discussion(
|
| repo_id=DATASET_PATH,
|
| repo_type="dataset",
|
| title="✨ Retraining started!",
|
| description=message,
|
| token=HF_ACCESS_TOKEN,
|
| )
|
|
|
|
|
| NOTIFICATION_TEMPLATE = """\
|
| 🌸 Hello there!
|
|
|
| Following an update of [{input_dataset}](https://huggingface.co/datasets/{input_dataset}), an automatic re-training of [{input_model}](https://huggingface.co/{input_model}) has been scheduled and completed!
|
|
|
| (This is an automated message)
|
| """
|
|
|