brand-classifier / src /main.py
ananaphasia
correct model
9db7f91
import os
import requests
import torch
from typing import Optional
from fastapi import FastAPI, Header, HTTPException, BackgroundTasks
import asyncio
from fastapi.responses import FileResponse
from huggingface_hub.hf_api import HfApi
from huggingface_hub import login
import huggingface_hub as hf
from .models import config, WebhookPayload
# import models
import pandas as pd
import numpy as np
import random
from sklearn.preprocessing import LabelEncoder
from transformers import TrainingArguments, Trainer
import datasets
import evaluate
import accuracy # unused here but required import for evaluate
import time
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import subprocess
from datetime import datetime
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
HF_ACCESS_TOKEN = os.getenv("HF_ACCESS_TOKEN")
# Local repository path (not on the Hub)
REPOSITORY_PATH = "staging/brand-classifier-stage"
INPUT_DIR = config.input_model
# OUTPUT_DIR = "magellan-ai/brand-classifier-stage" # Model to be updated
OUTPUT_DIR = config.input_model
ENDPOINT_ID = "magellan-ai/magellan-brand-recent" # Endpoint to be updated
DATASET_PATH = config.input_dataset
print('logging in')
login(token=HF_ACCESS_TOKEN)
print('logged in')
print(f"Is CUDA available: {torch.cuda.is_available()}")
# True
print(
f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}")
# Tesla T4
print('setting device')
device = torch.device("cuda")
print(device)
print('set device')
print('setting up Repository')
# TODO: Change based on whether input dir is stage to output to prod or other way
repo = hf.Repository(local_dir=REPOSITORY_PATH,
clone_from=INPUT_DIR,
token=HF_ACCESS_TOKEN)
repo.git_pull()
model_nm = "gpt2" # this is for the full re-training.
max_concurrent_requests = 1
request_queue = asyncio.Queue(maxsize=max_concurrent_requests)
app = FastAPI()
@app.get("/")
async def home():
# nvidia-smi
print('printing nvidia smi')
print(subprocess.run(["nvidia-smi"], capture_output=True).stdout.decode())
return FileResponse("home.html")
@app.post("/webhook")
async def post_webhook(
payload: WebhookPayload,
task_queue: BackgroundTasks,
repo=repo,
REPOSITORY_PATH: str = REPOSITORY_PATH,
OUTPUT_DIR: str = OUTPUT_DIR,
x_webhook_secret: Optional[str] = Header(default=None),
):
try:
print('received webhook:')
# Add a request to the queue with a timeout
await asyncio.wait_for(request_queue.put(True), timeout=1)
if x_webhook_secret is None:
raise HTTPException(401)
if x_webhook_secret != WEBHOOK_SECRET:
raise HTTPException(403)
if not (
(payload.event.action == "update" or payload.event.action == "create")
and (payload.event.scope.startswith("repo.content") or payload.event.scope.startswith("repo"))
# and payload.repo.name == DATASET_PATH # commented out because no difference between prod and stage
and payload.repo.type == "dataset"
):
# no-op
print('webhook oh no')
print(payload.event.action)
print(payload.event.scope)
print(payload.repo.name)
print(payload.repo.type)
raise HTTPException(400)
return {"processed": False}
print('scheduling retrain')
task_queue.add_task(
schedule_retrain,
payload,
repo,
REPOSITORY_PATH,
OUTPUT_DIR,
HF_ACCESS_TOKEN,
)
return {"message": "Webhook request accepted for processing"}
except asyncio.TimeoutError:
raise HTTPException(
status_code=503, detail="Previous webhook is being processed. Try again later.")
def schedule_retrain(payload: WebhookPayload,
repo: repo,
REPOSITORY_PATH: REPOSITORY_PATH,
OUTPUT_DIR: OUTPUT_DIR,
HF_ACCESS_TOKEN: HF_ACCESS_TOKEN
):
# Create re-training project
print("loading dataset")
dataset = datasets.load_dataset(DATASET_PATH)
dataset, le = load_data(dataset)
# Now, you can create id2label and label2id for Hugging Face as follows:
label2id = {str(label): int(id)
for label, id in zip(le.classes_, le.transform(le.classes_))}
id2label = {int(id): str(label) for label, id in label2id.items()}
print("loaded dataset")
print("tokenizing dataset")
tokenizer, tok_ds = tokenizer_func(dataset)
print("tokenized dataset")
print("splitting dataset")
tok_train_ds = tok_ds.train_test_split(test_size=0.2, seed=42)
tok_val_ds = tok_train_ds['test']
tok_train_ds = tok_train_ds['train']
print("split dataset")
bs = 16
RESUME_LAST = False
if RESUME_LAST:
epochs = 2
else:
epochs = 8
lr = 8e-5
torch.backends.cuda.matmul.allow_tf32 = True
print("setting arguments")
args = TrainingArguments(REPOSITORY_PATH,
learning_rate=lr,
warmup_ratio=0.1,
lr_scheduler_type='cosine',
tf32=True,
evaluation_strategy="epoch",
per_device_train_batch_size=bs,
per_device_eval_batch_size=bs*2,
num_train_epochs=epochs,
weight_decay=0.01,
report_to='none',
gradient_accumulation_steps=2,
save_strategy="epoch",
gradient_checkpointing=True,
optim="adafactor",
log_level="debug",)
# args.set_push_to_hub(OUTPUT_DIR, token=HF_ACCESS_TOKEN, private_repo=True)
print("setting model")
model = AutoModelForSequenceClassification.from_pretrained(model_nm, num_labels=len(np.unique(tok_ds['labels'])), ignore_mismatched_sizes=True,
id2label=id2label, label2id=label2id)
model.resize_token_embeddings(len(tokenizer))
model.config.pad_token_id = model.config.eos_token_id
trainer = Trainer(model, args, train_dataset=tok_train_ds.shuffle(seed=42), eval_dataset=tok_val_ds.shuffle(seed=42),
tokenizer=tokenizer, compute_metrics=compute_metrics)
print("training model")
trainer.train()
print("evaluating model")
eval = trainer.evaluate()
print(eval)
eval_accuracy = round(eval['eval_accuracy'], 2)
print("evaluated model")
# Save the model and tokenizer
tokenizer.save_pretrained(REPOSITORY_PATH)
model.save_pretrained(REPOSITORY_PATH)
trainer.save_model(REPOSITORY_PATH)
# Save evaluation metrics and accuracy
with open(os.path.join(REPOSITORY_PATH, 'eval_metrics.txt'), 'w') as f:
f.write(str(eval))
with open(os.path.join(REPOSITORY_PATH, 'eval_accuracy.txt'), 'w') as f:
f.write(str(eval_accuracy))
# Pushing Repository to the hub
print("pushing repository")
now = datetime.now()
dt_string = now.strftime("%m/%d/%Y %H:%M:%S")
repo.git_add('.', auto_lfs_track=True)
repo.git_commit(f're-trained {dt_string}, acc: {eval_accuracy}')
repo.git_push()
print("pushed repository")
# Update the endpoint
print("updating endpoint")
hf_api = HfApi(token=HF_ACCESS_TOKEN)
update_endpoint(hf_api)
print("updated endpoint")
# Notify in the community tab
print("notifying success")
notify_success()
print("notified success")
# Remove from queue
# request_queue.task_done() # Mark the request as done in the queue # Keep in queue because we only want one at a time
# Restart space to clear cache
hf_api.restart_space('magellan-ai/brand-classifier')
return {"processed": True}
def load_data(dataset):
df = pd.DataFrame(dataset['train'])
# Drop duplicates based on episode copy ID column
df.drop_duplicates(['simhash'], inplace=True)
# Sort the DataFrame by brand_id and then by downloaded_at
df.sort_values(['brand_id', 'downloaded_at'], ascending=[True, False],
inplace=True)
# Create a new column called 'rank' which indicates how recent the brand name is seen
df['rank'] = df.groupby('brand_name').cumcount() + 1
# Keep only the last 100 occurences for each brand
sampled_df = df[df['rank'] <= 100]
# Drop the 'rank' column from the final sampled DataFrame
sampled_df.drop('rank', axis=1, inplace=True)
# reducing sample
df = sampled_df
brands = df['brand_name'].unique()
random.shuffle(brands)
df = df[df['brand_name'].isin(brands[:])]
# Create a label encoder
label_encoder = LabelEncoder()
# Fit the encoder and transform your labels
df['labels'] = label_encoder.fit_transform(df['brand_id'])
# df['labels'] = df['brand_id']
ds = datasets.Dataset.from_pandas(df)
return ds, label_encoder
def tokenizer_func(ds):
tokenizer = AutoTokenizer.from_pretrained(model_nm)
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
tok_ds = ds.map(tok_func, batched=True, fn_kwargs={'tokenizer': tokenizer})
return tokenizer, tok_ds
def tok_func(x, tokenizer):
encoding = tokenizer(x['plain_text'], max_length=1024,
return_overflowing_tokens=False)
return encoding
def compute_metrics(eval_pred):
scoring = evaluate.load("accuracy")
predictions, labels = eval_pred
predictions = np.argmax(predictions, axis=1)
return scoring.compute(predictions=predictions, references=labels)
def update_endpoint(hf_api):
# Get most recent revision
revisions = hf_api.list_repo_commits(OUTPUT_DIR)
revision = revisions[0].commit_id
# Pause
headers = {
'Authorization': 'Bearer ' + HF_ACCESS_TOKEN, # 'Bearer ' + HF_ACCESS_TOKEN,
'accept': 'application/json',
'content-type': 'application/x-www-form-urlencoded',
}
response = requests.post(
f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}/pause',
headers=headers,
)
print('pause response:', response)
time.sleep(10)
# Update
headers = {
'Authorization': 'Bearer ' + HF_ACCESS_TOKEN, # 'Bearer ' + HF_ACCESS_TOKEN,
'accept': 'application/json',
'Content-Type': 'application/json',
}
json_data = {}
response = requests.put(
f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}',
headers=headers,
json=json_data,
)
print('update response:', response)
time.sleep(10)
# Resume
headers = {
'Authorization': 'Bearer ' + HF_ACCESS_TOKEN, # 'Bearer ' + HF_ACCESS_TOKEN,
'accept': 'application/json',
'content-type': 'application/x-www-form-urlencoded',
}
response = requests.post(
f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}/resume',
headers=headers,
)
print('resume response:', response)
def notify_success():
message = NOTIFICATION_TEMPLATE.format(
input_model=config.input_model,
input_dataset=DATASET_PATH,
)
return HfApi(token=HF_ACCESS_TOKEN).create_discussion(
repo_id=DATASET_PATH,
repo_type="dataset",
title="✨ Retraining started!",
description=message,
token=HF_ACCESS_TOKEN,
)
NOTIFICATION_TEMPLATE = """\
🌸 Hello there!
Following an update of [{input_dataset}](https://huggingface.co/datasets/{input_dataset}), an automatic re-training of [{input_model}](https://huggingface.co/{input_model}) has been scheduled and completed!
(This is an automated message)
"""