File size: 13,546 Bytes
52c4067 3dee2c6 52c4067 0bf1a1c 52c4067 3dee2c6 0bf1a1c 3dee2c6 52c4067 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 | import os
import pathlib
import re
import logging
import requests
from typing import List, Dict, Optional
from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.response import ModelResponse
from transformers import pipeline, Pipeline
from itertools import groupby
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer, AutoTokenizer
from transformers import DataCollatorForTokenClassification
from datasets import Dataset, ClassLabel, Value, Sequence, Features
from functools import partial
logger = logging.getLogger(__name__)
_model: Optional[Pipeline] = None
MODEL_DIR = os.getenv('MODEL_DIR', './results')
BASELINE_MODEL_NAME = os.getenv('BASELINE_MODEL_NAME', 'dslim/bert-base-NER')
FINETUNED_MODEL_NAME = os.getenv('FINETUNED_MODEL_NAME', 'finetuned_model')
def reload_model():
global _model
_model = None
try:
chk_path = str(pathlib.Path(MODEL_DIR) / FINETUNED_MODEL_NAME)
logger.info(f"Loading finetuned model from {chk_path}")
_model = pipeline("ner", model=chk_path, tokenizer=chk_path)
except:
# if finetuned model is not available, use the baseline model with the original labels
logger.info(f"Loading baseline model {BASELINE_MODEL_NAME}")
_model = pipeline("ner", model=BASELINE_MODEL_NAME, tokenizer=BASELINE_MODEL_NAME)
reload_model()
class HuggingFaceNER(LabelStudioMLBase):
"""Custom ML Backend model
"""
LABEL_STUDIO_HOST = os.getenv('LABEL_STUDIO_HOST', 'http://localhost:8080')
LABEL_STUDIO_API_KEY = os.getenv('LABEL_STUDIO_API_KEY')
START_TRAINING_EACH_N_UPDATES = int(os.getenv('START_TRAINING_EACH_N_UPDATES', 10))
LEARNING_RATE = float(os.getenv('LEARNING_RATE', 1e-3))
NUM_TRAIN_EPOCHS = int(os.getenv('NUM_TRAIN_EPOCHS', 10))
WEIGHT_DECAY = float(os.getenv('WEIGHT_DECAY', 0.01))
def get_labels(self):
li = self.label_interface
from_name, _, _ = li.get_first_tag_occurence('Labels', 'Text')
tag = li.get_tag(from_name)
return tag.labels
def setup(self):
"""Configure any paramaters of your model here
"""
self.set("model_version", f'{self.__class__.__name__}-v0.0.1')
def predict(self, tasks: List[Dict], context: Optional[Dict] = None, **kwargs) -> ModelResponse:
""" Write your inference logic here
:param tasks: [Label Studio tasks in JSON format](https://labelstud.io/guide/task_format.html)
:param context: [Label Studio context in JSON format](https://labelstud.io/guide/ml_create#Implement-prediction-logic)
:return model_response
ModelResponse(predictions=predictions) with
predictions: [Predictions array in JSON format](https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks)
"""
li = self.label_interface
from_name, to_name, value = li.get_first_tag_occurence('Labels', 'Text')
texts = [self.preload_task_data(task, task['data'][value]) for task in tasks]
# run predictions
model_predictions = _model(texts)
predictions = []
for prediction in model_predictions:
# prediction returned in the format: [{'entity': 'B-ORG', 'score': 0.999, 'index': 1, 'start': 0, 'end': 7, 'word': 'Google'}, ...]
# we need to group them by 'B-' and 'I-' prefixes to form entities
results = []
avg_score = 0
for label, group in groupby(prediction, key=lambda x: re.sub(r'^[BI]-', '', x['entity'])):
entities = list(group)
start = entities[0]['start']
end = entities[-1]['end']
score = float(sum([entity['score'] for entity in entities]) / len(entities))
results.append({
'from_name': from_name,
'to_name': to_name,
'type': 'labels',
'value': {
'start': start,
'end': end,
'labels': [label]
},
'score': score
})
avg_score += score
if results:
predictions.append({
'result': results,
'score': avg_score / len(results),
'model_version': self.get('model_version')
})
return ModelResponse(predictions=predictions, model_version=self.get('model_version'))
def _get_access_token(self):
# (HF Spaces patch) Cache the short-lived access token on the class to
# avoid hammering /api/token/refresh/ which is rate-limited (~5/min).
# LS access tokens last ~5 min; cache for 4 to leave headroom.
import time
now = time.time()
cached = getattr(type(self), "_access_token_cache", None)
if cached and cached["expires_at"] > now:
return cached["token"]
r = requests.post(
f"{self.LABEL_STUDIO_HOST}/api/token/refresh/",
json={"refresh": self.LABEL_STUDIO_API_KEY},
timeout=30,
)
r.raise_for_status()
token = r.json()["access"]
type(self)._access_token_cache = {"token": token, "expires_at": now + 240}
return token
def _get_tasks(self, project_id):
# Download annotated tasks from Label Studio.
# (HF Spaces patch) Modern LS instances disable legacy-token auth, which
# breaks the upstream `label_studio_sdk.Client(host, api_key)` path.
# LABEL_STUDIO_API_KEY is treated as a Personal Access Token (refresh
# token); exchange (with caching) for a short-lived access token and
# call the LS REST API directly. Filters to tasks with at least one
# annotation (legacy `project.get_labeled_tasks()` semantics).
access = self._get_access_token()
headers = {"Authorization": f"Bearer {access}"}
tasks: List[Dict] = []
url = f"{self.LABEL_STUDIO_HOST}/api/tasks/?project={project_id}&page_size=200&fields=all"
while url:
r = requests.get(url, headers=headers, timeout=60)
r.raise_for_status()
payload = r.json()
page = payload if isinstance(payload, list) else payload.get("tasks", [])
tasks.extend(t for t in page if t.get("annotations"))
url = payload.get("next") if isinstance(payload, dict) else None
return tasks
def tokenize_and_align_labels(self, examples, tokenizer):
"""
From example https://huggingface.co/docs/transformers/en/tasks/token_classification#preprocess
"""
tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
labels = []
for i, label in enumerate(examples[f"ner_tags"]):
word_ids = tokenized_inputs.word_ids(batch_index=i) # Map tokens to their respective word.
previous_word_idx = None
label_ids = []
for word_idx in word_ids: # Set the special tokens to -100.
if word_idx is None:
label_ids.append(-100)
elif word_idx != previous_word_idx: # Only label the first token of a given word.
label_ids.append(label[word_idx])
else:
label_ids.append(-100)
previous_word_idx = word_idx
labels.append(label_ids)
tokenized_inputs["labels"] = labels
return tokenized_inputs
def fit(self, event, data, **kwargs):
"""Download dataset from Label Studio and prepare data for training in BERT
"""
if event not in ('ANNOTATION_CREATED', 'ANNOTATION_UPDATED', 'START_TRAINING'):
logger.info(f"Skip training: event {event} is not supported")
return
# Get project from annotation first if present, otherwise fall back to top-level project field
project = data.get('annotation', {}).get('project') or data.get('project')
# Handle both possible formats
if isinstance(project, dict):
project_id = project.get('id')
else:
project_id = project
# If project_id is still None, log and safely exit
if project_id is None:
logger.error(f"Cannot find project_id in webhook payload: {data}")
return
tasks = self._get_tasks(project_id)
if len(tasks) % self.START_TRAINING_EACH_N_UPDATES != 0 and event != 'START_TRAINING':
logger.info(f"Skip training: {len(tasks)} tasks are not multiple of {self.START_TRAINING_EACH_N_UPDATES}")
return
# we need to convert Label Studio NER annotations to hugingface NER format in datasets
# for example:
# {'id': '0',
# 'ner_tags': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 8, 8, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0],
# 'tokens': ['@paulwalk', 'It', "'s", 'the', 'view', 'from', 'where', 'I', "'m", 'living', 'for', 'two', 'weeks', '.', 'Empire', 'State', 'Building', '=', 'ESB', '.', 'Pretty', 'bad', 'storm', 'here', 'last', 'evening', '.']
# }
ds_raw = []
from_name, to_name, value = self.label_interface.get_first_tag_occurence('Labels', 'Text')
tokenizer = AutoTokenizer.from_pretrained(BASELINE_MODEL_NAME)
no_label = 'O'
label_to_id = {no_label: 0}
for task in tasks:
for annotation in task['annotations']:
if not annotation.get('result'):
continue
spans = [{'label': r['value']['labels'][0], 'start': r['value']['start'], 'end': r['value']['end']} for r in annotation['result']]
spans = sorted(spans, key=lambda x: x['start'])
text = self.preload_task_data(task, task['data'][value])
# insert tokenizer.pad_token to the unlabeled chunks of the text in-between the labeled spans, as well as to the beginning and end of the text
last_end = 0
all_spans = []
for span in spans:
if last_end < span['start']:
all_spans.append({'label': no_label, 'start': last_end, 'end': span['start']})
all_spans.append(span)
last_end = span['end']
if last_end < len(text):
all_spans.append({'label': no_label, 'start': last_end, 'end': len(text)})
# now tokenize chunks separately and add them to the dataset
item = {'id': task['id'], 'tokens': [], 'ner_tags': []}
for span in all_spans:
tokens = tokenizer.tokenize(text[span['start']:span['end']])
item['tokens'].extend(tokens)
if span['label'] == no_label:
item['ner_tags'].extend([label_to_id[no_label]] * len(tokens))
else:
label = 'B-' + span['label']
if label not in label_to_id:
label_to_id[label] = len(label_to_id)
item['ner_tags'].append(label_to_id[label])
if len(tokens) > 1:
label = 'I-' + span['label']
if label not in label_to_id:
label_to_id[label] = len(label_to_id)
item['ner_tags'].extend([label_to_id[label] for _ in range(1, len(tokens))])
ds_raw.append(item)
logger.debug(f"Dataset: {ds_raw}")
# convert to huggingface dataset
# Define the features of your dataset
features = Features({
'id': Value('string'),
'tokens': Sequence(Value('string')),
'ner_tags': Sequence(ClassLabel(names=list(label_to_id.keys())))
})
hf_dataset = Dataset.from_list(ds_raw, features=features)
tokenized_dataset = hf_dataset.map(partial(self.tokenize_and_align_labels, tokenizer=tokenizer), batched=True)
logger.debug(f"HF Dataset: {tokenized_dataset}")
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)
id_to_label = {i: label for label, i in label_to_id.items()}
logger.debug(f"Labels: {id_to_label}")
model = AutoModelForTokenClassification.from_pretrained(
BASELINE_MODEL_NAME, num_labels=len(id_to_label),
id2label=id_to_label, label2id=label_to_id)
logger.debug(f"Model: {model}")
training_args = TrainingArguments(
output_dir=str(pathlib.Path(MODEL_DIR) / FINETUNED_MODEL_NAME),
learning_rate=self.LEARNING_RATE,
per_device_train_batch_size=8,
num_train_epochs=self.NUM_TRAIN_EPOCHS,
weight_decay=self.WEIGHT_DECAY,
evaluation_strategy="no",
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
tokenizer=tokenizer,
data_collator=data_collator,
)
trainer.train()
chk_path = str(pathlib.Path(MODEL_DIR) / FINETUNED_MODEL_NAME)
logger.info(f"Model is trained and saved as {chk_path}")
trainer.save_model(chk_path)
# reload model
# TODO: this is not thread-safe, should be done with critical section
reload_model()
|