| |
| |
| |
|
|
| import os |
| import math |
| import torch |
| import pandas as pd |
| import numpy as np |
| import gradio as gr |
| import matplotlib.pyplot as plt |
|
|
| from pathlib import Path |
| from torch import nn |
| from torch.utils.data import Dataset, DataLoader, TensorDataset |
| from sklearn.model_selection import train_test_split |
| from transformers import ( |
| AutoTokenizer, |
| AutoModel, |
| AutoConfig, |
| get_linear_schedule_with_warmup |
| ) |
|
|
| |
| |
| |
| LABELS = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust'] |
| LABEL2ID = {l:i for i,l in enumerate(LABELS)} |
| ID2LABEL = {i:l for i,l in enumerate(LABELS)} |
|
|
| SAVED_ROOT = Path("saved_models") |
| SAVED_ROOT.mkdir(exist_ok=True) |
|
|
| |
| |
| |
| def read_uploaded_file(uploaded): |
| if uploaded is None: |
| raise ValueError("No file provided") |
|
|
| if isinstance(uploaded, str): |
| return uploaded |
|
|
| if hasattr(uploaded, "name"): |
| return uploaded.name |
|
|
| if hasattr(uploaded, "read"): |
| tmp = Path("/tmp") / f"uploaded_{np.random.randint(1e9)}.csv" |
| with open(tmp, "wb") as f: |
| f.write(uploaded.read()) |
| return str(tmp) |
|
|
| raise ValueError("Unsupported uploaded file type") |
|
|
|
|
| def save_last_model_name(name): |
| (SAVED_ROOT / "last_model.txt").write_text(name) |
|
|
|
|
| def load_last_model_name(): |
| p = SAVED_ROOT / "last_model.txt" |
| if p.exists(): |
| return p.read_text().strip() |
| return None |
|
|
|
|
| def model_folder(model_name): |
| return SAVED_ROOT / model_name.replace("/", "_") |
|
|
|
|
| |
| |
| |
| def clean_labels(df): |
| for l in LABELS: |
| if l not in df.columns: |
| df[l] = 0 |
| return df |
|
|
|
|
| def clean_text(df, col="text"): |
| if col not in df.columns: |
| raise KeyError(f"CSV must contain a column '{col}'") |
| df[col] = df[col].astype(str).str.replace("\n", " ").str.strip() |
| return df |
|
|
|
|
| |
| |
| |
| class EmotionModel(nn.Module): |
| """Consistent backbone + dropout + classifier.""" |
| def __init__(self, base_model_name, num_labels=8): |
| super().__init__() |
| self.config = AutoConfig.from_pretrained(base_model_name) |
| self.base = AutoModel.from_pretrained(base_model_name) |
| self.drop = nn.Dropout(0.3) |
| self.clf = nn.Linear(self.config.hidden_size, num_labels) |
|
|
| def forward(self, ids, mask): |
| out = self.base( |
| input_ids=ids, |
| attention_mask=mask |
| ) |
|
|
| |
| if hasattr(out, "pooler_output") and out.pooler_output is not None: |
| x = out.pooler_output |
| else: |
| x = out.last_hidden_state[:, 0, :] |
|
|
| x = self.drop(x) |
| return self.clf(x) |
|
|
|
|
| |
| |
| |
| def tokenize_batch(texts, tokenizer, max_len=128): |
| return tokenizer( |
| texts, |
| padding="max_length", |
| truncation=True, |
| max_length=max_len, |
| return_tensors="pt" |
| ) |
|
|
|
|
| def build_tensor_dataset(df, tokenizer, max_len=128): |
| enc = tokenize_batch(df["text"].tolist(), tokenizer, max_len) |
| labels = torch.tensor(df[LABELS].values, dtype=torch.float) |
| return TensorDataset( |
| enc["input_ids"], |
| enc["attention_mask"], |
| labels |
| ) |
|
|
|
|
| |
| |
| |
| def compute_pos_weight(df): |
| counts = df[LABELS].sum(axis=0) |
| N = len(df) |
| pw = [] |
| for c in counts: |
| pw.append((N - c) / c if c > 0 else 1.0) |
| return torch.tensor(pw, dtype=torch.float) |
|
|
|
|
| |
| |
| |
| def save_model(model, tokenizer, folder): |
| os.makedirs(folder, exist_ok=True) |
|
|
| |
| model.base.save_pretrained(folder) |
| tokenizer.save_pretrained(folder) |
|
|
| |
| torch.save(model.clf.state_dict(), str(Path(folder) / "classifier.pt")) |
|
|
| |
| save_last_model_name(str(folder)) |
|
|
|
|
| def load_model(folder): |
| folder = str(folder) |
| config = AutoConfig.from_pretrained(folder) |
| tokenizer = AutoTokenizer.from_pretrained(folder) |
|
|
| model = EmotionModel(folder) |
| state = torch.load(f"{folder}/classifier.pt", map_location="cpu") |
| model.clf.load_state_dict(state) |
| model.eval() |
|
|
| return model, tokenizer, config |
|
|
|
|
| |
| |
| |
| def train_model( |
| df, |
| model_name="bert-base-multilingual-cased", |
| epochs=3, |
| batch_size=8, |
| lr=2e-5, |
| max_len=128, |
| weight_decay=0.01, |
| warmup_ratio=0.1, |
| patience=2, |
| freeze_layers=6, |
| device=None |
| ): |
| device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
|
| df = df.reset_index(drop=True) |
| dataset = build_tensor_dataset(df, tokenizer, max_len) |
|
|
| idx = list(range(len(dataset))) |
| train_idx, val_idx = train_test_split(idx, test_size=0.15, random_state=42) |
|
|
| def subset(ds, idxs): |
| return TensorDataset( |
| torch.stack([ds[i][0] for i in idxs]), |
| torch.stack([ds[i][1] for i in idxs]), |
| torch.stack([ds[i][2] for i in idxs]), |
| ) |
|
|
| train_ds = subset(dataset, train_idx) |
| val_ds = subset(dataset, val_idx) |
|
|
| train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) |
| val_loader = DataLoader(val_ds, batch_size=batch_size) |
|
|
| model = EmotionModel(model_name) |
| model.to(device) |
|
|
| |
| for name, param in model.base.named_parameters(): |
| if name.startswith("embeddings."): |
| param.requires_grad = False |
| elif name.startswith("encoder.layer"): |
| try: |
| layer_num = int(name.split(".")[2]) |
| if layer_num < freeze_layers: |
| param.requires_grad = False |
| except: |
| pass |
|
|
| pos_weight = compute_pos_weight(df).to(device) |
| loss_fn = nn.BCEWithLogitsLoss(pos_weight=pos_weight) |
|
|
| optimizer = torch.optim.AdamW( |
| filter(lambda p: p.requires_grad, model.parameters()), |
| lr=lr, |
| weight_decay=weight_decay |
| ) |
|
|
| total_steps = len(train_loader) * epochs |
| warmup_steps = int(warmup_ratio * total_steps) |
|
|
| scheduler = get_linear_schedule_with_warmup( |
| optimizer, |
| num_warmup_steps=warmup_steps, |
| num_training_steps=total_steps |
| ) |
|
|
| best_val = float("inf") |
| no_improve = 0 |
|
|
| history = {"train_loss": [], "val_loss": []} |
|
|
| save_path = str(model_folder(model_name)) |
|
|
| for ep in range(1, epochs+1): |
| model.train() |
| t_loss = 0 |
|
|
| for input_ids, attn, labels in train_loader: |
| input_ids = input_ids.to(device) |
| attn = attn.to(device) |
| labels = labels.to(device) |
|
|
| optimizer.zero_grad() |
| logits = model(input_ids, attn) |
| loss = loss_fn(logits, labels) |
| loss.backward() |
| optimizer.step() |
| scheduler.step() |
|
|
| t_loss += loss.item() * input_ids.size(0) |
|
|
| train_loss = t_loss / len(train_loader.dataset) |
| history["train_loss"].append(train_loss) |
|
|
| |
| model.eval() |
| v_loss = 0 |
| with torch.no_grad(): |
| for input_ids, attn, labels in val_loader: |
| input_ids = input_ids.to(device) |
| attn = attn.to(device) |
| labels = labels.to(device) |
| logits = model(input_ids, attn) |
| loss = loss_fn(logits, labels) |
| v_loss += loss.item() * input_ids.size(0) |
|
|
| val_loss = v_loss / len(val_loader.dataset) |
| history["val_loss"].append(val_loss) |
|
|
| print(f"Epoch {ep} | Train={train_loss:.4f} | Val={val_loss:.4f}") |
|
|
| if val_loss < best_val: |
| best_val = val_loss |
| no_improve = 0 |
| save_model(model, tokenizer, save_path) |
| print(f"Saved best model to {save_path}") |
| else: |
| no_improve += 1 |
| if no_improve >= patience: |
| print("Early stopping.") |
| break |
|
|
| return model, tokenizer, history |
|
|
|
|
| |
| |
| |
| def predict_single(text, folder=None): |
| folder = folder or load_last_model_name() |
| model, tokenizer, cfg = load_model(folder) |
|
|
| encoded = tokenizer( |
| text, |
| padding="max_length", |
| truncation=True, |
| max_length=128, |
| return_tensors="pt" |
| ) |
|
|
| with torch.no_grad(): |
| out = model(encoded["input_ids"], encoded["attention_mask"]) |
| probs = torch.sigmoid(out).numpy()[0] |
|
|
| return {LABELS[i]: float(probs[i]) for i in range(len(LABELS))} |
|
|
|
|
| def predict_batch(texts, folder=None, batch_size=32): |
| folder = folder or load_last_model_name() |
| model, tokenizer, cfg = load_model(folder) |
|
|
| preds = [] |
| for i in range(0, len(texts), batch_size): |
| batch = texts[i:i+batch_size] |
| enc = tokenizer( |
| batch, |
| padding="max_length", |
| truncation=True, |
| max_length=128, |
| return_tensors="pt" |
| ) |
|
|
| with torch.no_grad(): |
| out = model(enc["input_ids"], enc["attention_mask"]) |
| probs = torch.sigmoid(out).numpy() |
|
|
| for p in probs: |
| preds.append({LABELS[j]: float(p[j]) for j in range(len(LABELS))}) |
|
|
| return preds |
|
|
|
|
| def summarize_preds(preds): |
| avg = {l: 0.0 for l in LABELS} |
| n = len(preds) |
|
|
| for p in preds: |
| for l,v in p.items(): |
| avg[l] += v |
| for l in avg: |
| avg[l] /= n |
|
|
| top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3] |
| top3 = [{"label":l, "score":float(s)} for l,s in top3] |
|
|
| return {"n":n, "avg_distribution":avg, "top3":top3} |
|
|
|
|
| |
| |
| |
| def wrapper_train(file_obj, sep, model_name, epochs, batch_size, lr, |
| max_len, weight_decay, warmup_ratio, patience, freeze_layers): |
| csv = read_uploaded_file(file_obj) |
| df = pd.read_csv(csv, sep=sep) |
| df = clean_labels(df) |
| df = clean_text(df) |
|
|
| _, _, history = train_model( |
| df=df, |
| model_name=model_name, |
| epochs=int(epochs), |
| batch_size=int(batch_size), |
| lr=float(lr), |
| max_len=int(max_len), |
| weight_decay=float(weight_decay), |
| warmup_ratio=float(warmup_ratio), |
| patience=int(patience), |
| freeze_layers=int(freeze_layers) |
| ) |
|
|
| return { |
| "message": "Training finished.", |
| "history": history, |
| "model_name": model_name |
| } |
|
|
|
|
| def wrapper_single(text): |
| return predict_single(text) |
|
|
|
|
| def wrapper_dataset(file_obj, sep, max_len, batch_size): |
| csv = read_uploaded_file(file_obj) |
| df = pd.read_csv(csv, sep=sep) |
| df = clean_labels(df) |
| df = clean_text(df) |
|
|
| preds = predict_batch(df["text"].tolist(), batch_size=int(batch_size)) |
| return summarize_preds(preds) |
|
|
|
|
| |
| |
| |
| with gr.Blocks() as app: |
| gr.Markdown("## Emotion Classifier — Dava (Final Version)") |
|
|
| with gr.Tab("Training"): |
| file_in = gr.File(label="Upload Training CSV") |
| sep_in = gr.Textbox(label="Delimiter", value=",") |
| model_name_in = gr.Dropdown( |
| label="Backbone Model", |
| choices=["bert-base-multilingual-cased", "indobert-base-p1"], |
| value="bert-base-multilingual-cased" |
| ) |
| epochs_in = gr.Number(label="Epochs", value=3) |
| bs_in = gr.Number(label="Batch Size", value=8) |
| lr_in = gr.Number(label="Learning Rate", value=2e-5) |
| maxlen_in = gr.Number(label="Max Length", value=128) |
| wd_in = gr.Number(label="Weight Decay", value=0.01) |
| warmup_in = gr.Number(label="Warmup Ratio", value=0.1) |
| patience_in = gr.Number(label="Patience", value=2) |
| freeze_in = gr.Number(label="Freeze Layers", value=6) |
|
|
| btn_train = gr.Button("Start Training") |
| out_train = gr.JSON(label="Train Result") |
|
|
| btn_train.click( |
| wrapper_train, |
| inputs=[file_in, sep_in, model_name_in, epochs_in, bs_in, |
| lr_in, maxlen_in, wd_in, warmup_in, patience_in, freeze_in], |
| outputs=out_train |
| ) |
|
|
| with gr.Tab("Single Prediction"): |
| text_in = gr.Textbox(label="Text") |
| btn_single = gr.Button("Predict") |
| out_single = gr.JSON(label="Emotion Scores") |
| btn_single.click(wrapper_single, inputs=[text_in], outputs=out_single) |
|
|
| with gr.Tab("Dataset Prediction"): |
| file_test = gr.File(label="Upload CSV") |
| sep_test = gr.Textbox(label="Delimiter", value=",") |
| maxlen_test = gr.Number(label="Max Length", value=128) |
| bs_test = gr.Number(label="Batch Size", value=32) |
|
|
| btn_test = gr.Button("Run Prediction") |
| out_test = gr.JSON(label="Summary Result") |
|
|
| btn_test.click( |
| wrapper_dataset, |
| inputs=[file_test, sep_test, maxlen_test, bs_test], |
| outputs=out_test |
| ) |
|
|
| app.launch() |