###### IMPORTS ######## # Imports for app and model creation and from fastapi import FastAPI, HTTPException from pydantic import BaseModel import requests from typing import Union, List ########## # Imports for model creation/usage import torch from transformers import AutoTokenizer, AutoModelForSequenceClassification from scipy.special import softmax import numpy as np import urllib.request import csv # ################# # LOCAL IMPORTS from .config import MODEL_SOURCE, ModelSource, EVAL_BATCH_SIZE, EVAL_SAMPLE_SIZE, DATASET_PATH, EVAL_PERIOD_MIN from .utils import preprocess, load_model_and_tokenizer, load_dataset ################## # Imports for app monitoring from prometheus_fastapi_instrumentator import Instrumentator from prometheus_client import Counter, Gauge from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta import threading ################# ################# # App creation and metrics exposition app = FastAPI() Instrumentator().instrument(app).expose(app, endpoint="/metrics", include_in_schema=False) ################# # class for transferring post request data class SentimentQuery(BaseModel): input_texts: Union[str, List[str]] ################# # Retrieve model either locally or via download tokenizer, model = load_model_and_tokenizer(MODEL_SOURCE) model.eval() ############## # retrieve label to int mapping from model repo mapping_link = f"https://raw.githubusercontent.com/cardiffnlp/tweeteval/main/datasets/sentiment/mapping.txt" with urllib.request.urlopen(mapping_link) as f: html = f.read().decode('utf-8').split("\n") csvreader = csv.reader(html, delimiter='\t') labels = [row[1] for row in csvreader if len(row) > 1] ############# @app.get("/") def read_root(): return {"status": "ok", "message": "Sentiment API is running"} @app.post("/predict") async def analyze_text(query:SentimentQuery)->dict: """ Elaborates an input query containing one or more text messages and returns a response containing the prediction and the sentiment score for each message """ if isinstance(query.input_texts, str): input_texts = [query.input_texts] else: # already a List[str] input_texts = query.input_texts encoded_batch = tokenizer( [preprocess(t) for t in input_texts], padding=True, # pad to same length truncation=True, # truncate long texts return_tensors="pt", ) with torch.no_grad(): output = model(**encoded_batch) logits = output[0].detach().cpu().numpy() scores = softmax(logits, axis=-1) pred_labels = scores.argmax(axis=-1) response_body = [] for i,text in enumerate(input_texts): predicted = labels[pred_labels[i]] response_body.append( { "input_text":text, "prediction":labels[pred_labels[i]], "scores": { "negative": float(scores[i][0]), "neutral": float(scores[i][1]), "positive": float(scores[i][2]) } }) return { "status" : "successful", "response_body": response_body } # Evaluation metrics on labeled test set EVAL_ACCURACY = Gauge( "model_evaluation_accuracy", "Accuracy on latest periodic evaluation of labeled test subset" ) def evaluate_accuracy(N_SAMPLES:int, BATCH_SIZE:int)->float: """ Evaluates and returns the model accuracy on a random subset of the test dataset """ dataset = load_dataset(DATASET_PATH).shuffle()["test"][:N_SAMPLES] N_BATCHES = len(dataset["text"])//BATCH_SIZE accuracy = 0 for i in range(N_BATCHES+1): if i == N_BATCHES : samples, labels = dataset["text"][i*BATCH_SIZE:], dataset["label"][i*BATCH_SIZE:] else: samples, labels = dataset["text"][i*BATCH_SIZE:(i+1)*BATCH_SIZE], dataset["label"][i*BATCH_SIZE:(i+1)*BATCH_SIZE] model.eval() encoded_batch = tokenizer( [preprocess(t) for t in samples], padding=True, # pad to same length truncation=True, # truncate long texts return_tensors="pt", ) with torch.no_grad(): output = model(**encoded_batch) logits = output[0].detach().cpu().numpy() scores = softmax(logits, axis=-1) pred_labels = scores.argmax(axis=-1) accuracy += sum(pred_labels==labels) accuracy/=N_SAMPLES return accuracy # Sentiment Distribution over unlabelled set SENTIMENT_BATCH_FRACTION = Gauge( "sentiment_batch_fraction", "Fraction of predictions in the latest monitored batch, by label (0..1).", ["label"] ) def evaluate_sentiment_distribution(N_SAMPLES:int, BATCH_SIZE:int)->np.ndarray: """ Evaluates and returns the sentiment distribution over a random subset of the test dataset """ dataset = load_dataset(DATASET_PATH).shuffle()["test"][:N_SAMPLES] N_BATCHES = len(dataset["text"])//BATCH_SIZE model.eval() counts = np.array([0.,0.,0.]) for i in range(N_BATCHES+1): if i == N_BATCHES : samples = dataset["text"][i*BATCH_SIZE:] else: samples = dataset["text"][i*BATCH_SIZE:(i+1)*BATCH_SIZE] encoded_batch = tokenizer( [preprocess(t) for t in samples], padding=True, # pad to same length truncation=True, # truncate long texts return_tensors="pt", ) with torch.no_grad(): output = model(**encoded_batch) logits = output[0].detach().cpu().numpy() scores = softmax(logits, axis=-1) pred_labels = scores.argmax(axis=-1) counts += np.unique(pred_labels, return_counts=True)[1] fractions=counts/N_SAMPLES return fractions ################## # scheduler creation for managing the metric creation jobs scheduler = BackgroundScheduler(daemon=True) # threading lock to possibly handle concurrent request _model_lock = threading.Lock() ############ # jobs to be launched periodically def _run_eval_and_send_data(): with _model_lock: acc = evaluate_accuracy(EVAL_SAMPLE_SIZE, EVAL_BATCH_SIZE) EVAL_ACCURACY.set(acc) def _run_sentiment_distr_and_send_data(): with _model_lock: fractions = evaluate_sentiment_distribution(EVAL_SAMPLE_SIZE, EVAL_BATCH_SIZE) for i, label in enumerate(labels): SENTIMENT_BATCH_FRACTION.labels(label=label).set(fractions[i]) @app.on_event("startup") def _start_scheduler(): # run once soon after startup scheduler.add_job(_run_eval_and_send_data, next_run_time=datetime.now() + timedelta(seconds=2)) # then every EVAL_PERIOD_MIN minutes scheduler.add_job(_run_eval_and_send_data, "interval", minutes=EVAL_PERIOD_MIN) # run once soon after startup scheduler.add_job(_run_sentiment_distr_and_send_data, next_run_time=datetime.now() + timedelta(seconds=2)) # then every EVAL_PERIOD_MIN minutes scheduler.add_job(_run_sentiment_distr_and_send_data, "interval", minutes=EVAL_PERIOD_MIN) scheduler.start() @app.on_event("shutdown") def _stop_scheduler(): scheduler.shutdown(wait=False) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)