import os import requests import torch from typing import Optional from fastapi import FastAPI, Header, HTTPException, BackgroundTasks import asyncio from fastapi.responses import FileResponse from huggingface_hub.hf_api import HfApi from huggingface_hub import login import huggingface_hub as hf from .models import config, WebhookPayload # import models import pandas as pd import numpy as np import random from sklearn.preprocessing import LabelEncoder from transformers import TrainingArguments, Trainer import datasets import evaluate import accuracy # unused here but required import for evaluate import time from transformers import AutoModelForSequenceClassification, AutoTokenizer import subprocess from datetime import datetime WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") HF_ACCESS_TOKEN = os.getenv("HF_ACCESS_TOKEN") # Local repository path (not on the Hub) REPOSITORY_PATH = "staging/brand-classifier-stage" INPUT_DIR = config.input_model # OUTPUT_DIR = "magellan-ai/brand-classifier-stage" # Model to be updated OUTPUT_DIR = config.input_model ENDPOINT_ID = "magellan-ai/magellan-brand-recent" # Endpoint to be updated DATASET_PATH = config.input_dataset print('logging in') login(token=HF_ACCESS_TOKEN) print('logged in') print(f"Is CUDA available: {torch.cuda.is_available()}") # True print( f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") # Tesla T4 print('setting device') device = torch.device("cuda") print(device) print('set device') print('setting up Repository') # TODO: Change based on whether input dir is stage to output to prod or other way repo = hf.Repository(local_dir=REPOSITORY_PATH, clone_from=INPUT_DIR, token=HF_ACCESS_TOKEN) repo.git_pull() model_nm = "gpt2" # this is for the full re-training. max_concurrent_requests = 1 request_queue = asyncio.Queue(maxsize=max_concurrent_requests) app = FastAPI() @app.get("/") async def home(): # nvidia-smi print('printing nvidia smi') print(subprocess.run(["nvidia-smi"], capture_output=True).stdout.decode()) return FileResponse("home.html") @app.post("/webhook") async def post_webhook( payload: WebhookPayload, task_queue: BackgroundTasks, repo=repo, REPOSITORY_PATH: str = REPOSITORY_PATH, OUTPUT_DIR: str = OUTPUT_DIR, x_webhook_secret: Optional[str] = Header(default=None), ): try: print('received webhook:') # Add a request to the queue with a timeout await asyncio.wait_for(request_queue.put(True), timeout=1) if x_webhook_secret is None: raise HTTPException(401) if x_webhook_secret != WEBHOOK_SECRET: raise HTTPException(403) if not ( (payload.event.action == "update" or payload.event.action == "create") and (payload.event.scope.startswith("repo.content") or payload.event.scope.startswith("repo")) # and payload.repo.name == DATASET_PATH # commented out because no difference between prod and stage and payload.repo.type == "dataset" ): # no-op print('webhook oh no') print(payload.event.action) print(payload.event.scope) print(payload.repo.name) print(payload.repo.type) raise HTTPException(400) return {"processed": False} print('scheduling retrain') task_queue.add_task( schedule_retrain, payload, repo, REPOSITORY_PATH, OUTPUT_DIR, HF_ACCESS_TOKEN, ) return {"message": "Webhook request accepted for processing"} except asyncio.TimeoutError: raise HTTPException( status_code=503, detail="Previous webhook is being processed. Try again later.") def schedule_retrain(payload: WebhookPayload, repo: repo, REPOSITORY_PATH: REPOSITORY_PATH, OUTPUT_DIR: OUTPUT_DIR, HF_ACCESS_TOKEN: HF_ACCESS_TOKEN ): # Create re-training project print("loading dataset") dataset = datasets.load_dataset(DATASET_PATH) dataset, le = load_data(dataset) # Now, you can create id2label and label2id for Hugging Face as follows: label2id = {str(label): int(id) for label, id in zip(le.classes_, le.transform(le.classes_))} id2label = {int(id): str(label) for label, id in label2id.items()} print("loaded dataset") print("tokenizing dataset") tokenizer, tok_ds = tokenizer_func(dataset) print("tokenized dataset") print("splitting dataset") tok_train_ds = tok_ds.train_test_split(test_size=0.2, seed=42) tok_val_ds = tok_train_ds['test'] tok_train_ds = tok_train_ds['train'] print("split dataset") bs = 16 RESUME_LAST = False if RESUME_LAST: epochs = 2 else: epochs = 8 lr = 8e-5 torch.backends.cuda.matmul.allow_tf32 = True print("setting arguments") args = TrainingArguments(REPOSITORY_PATH, learning_rate=lr, warmup_ratio=0.1, lr_scheduler_type='cosine', tf32=True, evaluation_strategy="epoch", per_device_train_batch_size=bs, per_device_eval_batch_size=bs*2, num_train_epochs=epochs, weight_decay=0.01, report_to='none', gradient_accumulation_steps=2, save_strategy="epoch", gradient_checkpointing=True, optim="adafactor", log_level="debug",) # args.set_push_to_hub(OUTPUT_DIR, token=HF_ACCESS_TOKEN, private_repo=True) print("setting model") model = AutoModelForSequenceClassification.from_pretrained(model_nm, num_labels=len(np.unique(tok_ds['labels'])), ignore_mismatched_sizes=True, id2label=id2label, label2id=label2id) model.resize_token_embeddings(len(tokenizer)) model.config.pad_token_id = model.config.eos_token_id trainer = Trainer(model, args, train_dataset=tok_train_ds.shuffle(seed=42), eval_dataset=tok_val_ds.shuffle(seed=42), tokenizer=tokenizer, compute_metrics=compute_metrics) print("training model") trainer.train() print("evaluating model") eval = trainer.evaluate() print(eval) eval_accuracy = round(eval['eval_accuracy'], 2) print("evaluated model") # Save the model and tokenizer tokenizer.save_pretrained(REPOSITORY_PATH) model.save_pretrained(REPOSITORY_PATH) trainer.save_model(REPOSITORY_PATH) # Save evaluation metrics and accuracy with open(os.path.join(REPOSITORY_PATH, 'eval_metrics.txt'), 'w') as f: f.write(str(eval)) with open(os.path.join(REPOSITORY_PATH, 'eval_accuracy.txt'), 'w') as f: f.write(str(eval_accuracy)) # Pushing Repository to the hub print("pushing repository") now = datetime.now() dt_string = now.strftime("%m/%d/%Y %H:%M:%S") repo.git_add('.', auto_lfs_track=True) repo.git_commit(f're-trained {dt_string}, acc: {eval_accuracy}') repo.git_push() print("pushed repository") # Update the endpoint print("updating endpoint") hf_api = HfApi(token=HF_ACCESS_TOKEN) update_endpoint(hf_api) print("updated endpoint") # Notify in the community tab print("notifying success") notify_success() print("notified success") # Remove from queue # request_queue.task_done() # Mark the request as done in the queue # Keep in queue because we only want one at a time # Restart space to clear cache hf_api.restart_space('magellan-ai/brand-classifier') return {"processed": True} def load_data(dataset): df = pd.DataFrame(dataset['train']) # Drop duplicates based on episode copy ID column df.drop_duplicates(['simhash'], inplace=True) # Sort the DataFrame by brand_id and then by downloaded_at df.sort_values(['brand_id', 'downloaded_at'], ascending=[True, False], inplace=True) # Create a new column called 'rank' which indicates how recent the brand name is seen df['rank'] = df.groupby('brand_name').cumcount() + 1 # Keep only the last 100 occurences for each brand sampled_df = df[df['rank'] <= 100] # Drop the 'rank' column from the final sampled DataFrame sampled_df.drop('rank', axis=1, inplace=True) # reducing sample df = sampled_df brands = df['brand_name'].unique() random.shuffle(brands) df = df[df['brand_name'].isin(brands[:])] # Create a label encoder label_encoder = LabelEncoder() # Fit the encoder and transform your labels df['labels'] = label_encoder.fit_transform(df['brand_id']) # df['labels'] = df['brand_id'] ds = datasets.Dataset.from_pandas(df) return ds, label_encoder def tokenizer_func(ds): tokenizer = AutoTokenizer.from_pretrained(model_nm) tokenizer.add_special_tokens({'pad_token': '[PAD]'}) tok_ds = ds.map(tok_func, batched=True, fn_kwargs={'tokenizer': tokenizer}) return tokenizer, tok_ds def tok_func(x, tokenizer): encoding = tokenizer(x['plain_text'], max_length=1024, return_overflowing_tokens=False) return encoding def compute_metrics(eval_pred): scoring = evaluate.load("accuracy") predictions, labels = eval_pred predictions = np.argmax(predictions, axis=1) return scoring.compute(predictions=predictions, references=labels) def update_endpoint(hf_api): # Get most recent revision revisions = hf_api.list_repo_commits(OUTPUT_DIR) revision = revisions[0].commit_id # Pause headers = { 'Authorization': 'Bearer ' + HF_ACCESS_TOKEN, # 'Bearer ' + HF_ACCESS_TOKEN, 'accept': 'application/json', 'content-type': 'application/x-www-form-urlencoded', } response = requests.post( f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}/pause', headers=headers, ) print('pause response:', response) time.sleep(10) # Update headers = { 'Authorization': 'Bearer ' + HF_ACCESS_TOKEN, # 'Bearer ' + HF_ACCESS_TOKEN, 'accept': 'application/json', 'Content-Type': 'application/json', } json_data = {} response = requests.put( f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}', headers=headers, json=json_data, ) print('update response:', response) time.sleep(10) # Resume headers = { 'Authorization': 'Bearer ' + HF_ACCESS_TOKEN, # 'Bearer ' + HF_ACCESS_TOKEN, 'accept': 'application/json', 'content-type': 'application/x-www-form-urlencoded', } response = requests.post( f'https://api.endpoints.huggingface.cloud/v2/endpoint/{ENDPOINT_ID}/resume', headers=headers, ) print('resume response:', response) def notify_success(): message = NOTIFICATION_TEMPLATE.format( input_model=config.input_model, input_dataset=DATASET_PATH, ) return HfApi(token=HF_ACCESS_TOKEN).create_discussion( repo_id=DATASET_PATH, repo_type="dataset", title="✨ Retraining started!", description=message, token=HF_ACCESS_TOKEN, ) NOTIFICATION_TEMPLATE = """\ 🌸 Hello there! Following an update of [{input_dataset}](https://huggingface.co/datasets/{input_dataset}), an automatic re-training of [{input_model}](https://huggingface.co/{input_model}) has been scheduled and completed! (This is an automated message) """