Darendra's picture
Update app.py
d0cc31c verified
raw
history blame
14.5 kB
# ==============================================================
# EMOTION CLASSIFIER
# ==============================================================
import os
import math
import torch
import pandas as pd
import numpy as np
import gradio as gr
import matplotlib.pyplot as plt
from pathlib import Path
from torch import nn
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from transformers import (
AutoTokenizer,
AutoModel,
AutoConfig,
get_linear_schedule_with_warmup
)
# =========================================================
# CONFIG
# =========================================================
LABELS = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust']
LABEL2ID = {l:i for i,l in enumerate(LABELS)}
ID2LABEL = {i:l for i,l in enumerate(LABELS)}
SAVED_ROOT = Path("saved_models")
SAVED_ROOT.mkdir(exist_ok=True)
# ==============================================================
# Simpan dan Muat Data
# ==============================================================
def read_uploaded_file(uploaded):
if uploaded is None:
raise ValueError("No file provided")
if isinstance(uploaded, str):
return uploaded
if hasattr(uploaded, "name"):
return uploaded.name
if hasattr(uploaded, "read"):
tmp = Path("/tmp") / f"uploaded_{np.random.randint(1e9)}.csv"
with open(tmp, "wb") as f:
f.write(uploaded.read())
return str(tmp)
raise ValueError("Unsupported uploaded file type")
def save_last_model_name(name):
(SAVED_ROOT / "last_model.txt").write_text(name)
def load_last_model_name():
p = SAVED_ROOT / "last_model.txt"
if p.exists():
return p.read_text().strip()
return None
def model_folder(model_name):
return SAVED_ROOT / model_name.replace("/", "_")
# ==============================================================
# Pembersihan Data
# ==============================================================
def clean_labels(df):
for l in LABELS:
if l not in df.columns:
df[l] = 0
return df
def clean_text(df, col="text"):
if col not in df.columns:
raise KeyError(f"CSV must contain a column '{col}'")
df[col] = df[col].astype(str).str.replace("\n", " ").str.strip()
return df
# =========================================================
# Model AI
# =========================================================
class EmotionModel(nn.Module):
"""Consistent backbone + dropout + classifier."""
def __init__(self, base_model_name, num_labels=8):
super().__init__()
self.config = AutoConfig.from_pretrained(base_model_name)
self.base = AutoModel.from_pretrained(base_model_name)
self.drop = nn.Dropout(0.3)
self.clf = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, ids, mask):
out = self.base(
input_ids=ids,
attention_mask=mask
)
# Prefer pooler_output if exists
if hasattr(out, "pooler_output") and out.pooler_output is not None:
x = out.pooler_output
else:
x = out.last_hidden_state[:, 0, :]
x = self.drop(x)
return self.clf(x)
# ==============================================================
# Tokenisasi Dataset
# ==============================================================
def tokenize_batch(texts, tokenizer, max_len=128):
return tokenizer(
texts,
padding="max_length",
truncation=True,
max_length=max_len,
return_tensors="pt"
)
def build_tensor_dataset(df, tokenizer, max_len=128):
enc = tokenize_batch(df["text"].tolist(), tokenizer, max_len)
labels = torch.tensor(df[LABELS].values, dtype=torch.float)
return TensorDataset(
enc["input_ids"],
enc["attention_mask"],
labels
)
# ==============================================================
# Bobot
# ==============================================================
def compute_pos_weight(df):
counts = df[LABELS].sum(axis=0)
N = len(df)
pw = []
for c in counts:
pw.append((N - c) / c if c > 0 else 1.0)
return torch.tensor(pw, dtype=torch.float)
# ==============================================================
# Simpan dan Muat Model
# ==============================================================
def save_model(model, tokenizer, folder):
os.makedirs(folder, exist_ok=True)
# Save backbone HF style
model.base.save_pretrained(folder)
tokenizer.save_pretrained(folder)
# Save classifier head
torch.save(model.clf.state_dict(), str(Path(folder) / "classifier.pt"))
# Save last-used name
save_last_model_name(str(folder))
def load_model(folder):
folder = str(folder)
config = AutoConfig.from_pretrained(folder)
tokenizer = AutoTokenizer.from_pretrained(folder)
model = EmotionModel(folder)
state = torch.load(f"{folder}/classifier.pt", map_location="cpu")
model.clf.load_state_dict(state)
model.eval()
return model, tokenizer, config
# ==============================================================
# Pelatihan
# ==============================================================
def train_model(
df,
model_name="bert-base-multilingual-cased",
epochs=3,
batch_size=8,
lr=2e-5,
max_len=128,
weight_decay=0.01,
warmup_ratio=0.1,
patience=2,
freeze_layers=6,
device=None
):
device = device or ("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained(model_name)
df = df.reset_index(drop=True)
dataset = build_tensor_dataset(df, tokenizer, max_len)
idx = list(range(len(dataset)))
train_idx, val_idx = train_test_split(idx, test_size=0.15, random_state=42)
def subset(ds, idxs):
return TensorDataset(
torch.stack([ds[i][0] for i in idxs]),
torch.stack([ds[i][1] for i in idxs]),
torch.stack([ds[i][2] for i in idxs]),
)
train_ds = subset(dataset, train_idx)
val_ds = subset(dataset, val_idx)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)
model = EmotionModel(model_name)
model.to(device)
# Freeze lower layers
for name, param in model.base.named_parameters():
if name.startswith("embeddings."):
param.requires_grad = False
elif name.startswith("encoder.layer"):
try:
layer_num = int(name.split(".")[2])
if layer_num < freeze_layers:
param.requires_grad = False
except:
pass
pos_weight = compute_pos_weight(df).to(device)
loss_fn = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.AdamW(
filter(lambda p: p.requires_grad, model.parameters()),
lr=lr,
weight_decay=weight_decay
)
total_steps = len(train_loader) * epochs
warmup_steps = int(warmup_ratio * total_steps)
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=warmup_steps,
num_training_steps=total_steps
)
best_val = float("inf")
no_improve = 0
history = {"train_loss": [], "val_loss": []}
save_path = str(model_folder(model_name))
for ep in range(1, epochs+1):
model.train()
t_loss = 0
for input_ids, attn, labels in train_loader:
input_ids = input_ids.to(device)
attn = attn.to(device)
labels = labels.to(device)
optimizer.zero_grad()
logits = model(input_ids, attn)
loss = loss_fn(logits, labels)
loss.backward()
optimizer.step()
scheduler.step()
t_loss += loss.item() * input_ids.size(0)
train_loss = t_loss / len(train_loader.dataset)
history["train_loss"].append(train_loss)
# Validation
model.eval()
v_loss = 0
with torch.no_grad():
for input_ids, attn, labels in val_loader:
input_ids = input_ids.to(device)
attn = attn.to(device)
labels = labels.to(device)
logits = model(input_ids, attn)
loss = loss_fn(logits, labels)
v_loss += loss.item() * input_ids.size(0)
val_loss = v_loss / len(val_loader.dataset)
history["val_loss"].append(val_loss)
print(f"Epoch {ep} | Train={train_loss:.4f} | Val={val_loss:.4f}")
if val_loss < best_val:
best_val = val_loss
no_improve = 0
save_model(model, tokenizer, save_path)
print(f"Saved best model to {save_path}")
else:
no_improve += 1
if no_improve >= patience:
print("Early stopping.")
break
return model, tokenizer, history
# ==============================================================
# Uji
# ==============================================================
def predict_single(text, folder=None):
folder = folder or load_last_model_name()
model, tokenizer, cfg = load_model(folder)
encoded = tokenizer(
text,
padding="max_length",
truncation=True,
max_length=128,
return_tensors="pt"
)
with torch.no_grad():
out = model(encoded["input_ids"], encoded["attention_mask"])
probs = torch.sigmoid(out).numpy()[0]
return {LABELS[i]: float(probs[i]) for i in range(len(LABELS))}
def predict_batch(texts, folder=None, batch_size=32):
folder = folder or load_last_model_name()
model, tokenizer, cfg = load_model(folder)
preds = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
enc = tokenizer(
batch,
padding="max_length",
truncation=True,
max_length=128,
return_tensors="pt"
)
with torch.no_grad():
out = model(enc["input_ids"], enc["attention_mask"])
probs = torch.sigmoid(out).numpy()
for p in probs:
preds.append({LABELS[j]: float(p[j]) for j in range(len(LABELS))})
return preds
def summarize_preds(preds):
avg = {l: 0.0 for l in LABELS}
n = len(preds)
for p in preds:
for l,v in p.items():
avg[l] += v
for l in avg:
avg[l] /= n
top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3]
top3 = [{"label":l, "score":float(s)} for l,s in top3]
return {"n":n, "avg_distribution":avg, "top3":top3}
# ==============================================================
# GRADIO GUI
# ==============================================================
def wrapper_train(file_obj, sep, model_name, epochs, batch_size, lr,
max_len, weight_decay, warmup_ratio, patience, freeze_layers):
csv = read_uploaded_file(file_obj)
df = pd.read_csv(csv, sep=sep)
df = clean_labels(df)
df = clean_text(df)
_, _, history = train_model(
df=df,
model_name=model_name,
epochs=int(epochs),
batch_size=int(batch_size),
lr=float(lr),
max_len=int(max_len),
weight_decay=float(weight_decay),
warmup_ratio=float(warmup_ratio),
patience=int(patience),
freeze_layers=int(freeze_layers)
)
return {
"message": "Training finished.",
"history": history,
"model_name": model_name
}
def wrapper_single(text):
return predict_single(text)
def wrapper_dataset(file_obj, sep, max_len, batch_size):
csv = read_uploaded_file(file_obj)
df = pd.read_csv(csv, sep=sep)
df = clean_labels(df)
df = clean_text(df)
preds = predict_batch(df["text"].tolist(), batch_size=int(batch_size))
return summarize_preds(preds)
# ==============================================================
# Menjalankan GRADIO
# ==============================================================
with gr.Blocks() as app:
gr.Markdown("## Emotion Classifier — Dava (Final Version)")
with gr.Tab("Training"):
file_in = gr.File(label="Upload Training CSV")
sep_in = gr.Textbox(label="Delimiter", value=",")
model_name_in = gr.Dropdown(
label="Backbone Model",
choices=["bert-base-multilingual-cased", "indobert-base-p1"],
value="bert-base-multilingual-cased"
)
epochs_in = gr.Number(label="Epochs", value=3)
bs_in = gr.Number(label="Batch Size", value=8)
lr_in = gr.Number(label="Learning Rate", value=2e-5)
maxlen_in = gr.Number(label="Max Length", value=128)
wd_in = gr.Number(label="Weight Decay", value=0.01)
warmup_in = gr.Number(label="Warmup Ratio", value=0.1)
patience_in = gr.Number(label="Patience", value=2)
freeze_in = gr.Number(label="Freeze Layers", value=6)
btn_train = gr.Button("Start Training")
out_train = gr.JSON(label="Train Result")
btn_train.click(
wrapper_train,
inputs=[file_in, sep_in, model_name_in, epochs_in, bs_in,
lr_in, maxlen_in, wd_in, warmup_in, patience_in, freeze_in],
outputs=out_train
)
with gr.Tab("Single Prediction"):
text_in = gr.Textbox(label="Text")
btn_single = gr.Button("Predict")
out_single = gr.JSON(label="Emotion Scores")
btn_single.click(wrapper_single, inputs=[text_in], outputs=out_single)
with gr.Tab("Dataset Prediction"):
file_test = gr.File(label="Upload CSV")
sep_test = gr.Textbox(label="Delimiter", value=",")
maxlen_test = gr.Number(label="Max Length", value=128)
bs_test = gr.Number(label="Batch Size", value=32)
btn_test = gr.Button("Run Prediction")
out_test = gr.JSON(label="Summary Result")
btn_test.click(
wrapper_dataset,
inputs=[file_test, sep_test, maxlen_test, bs_test],
outputs=out_test
)
app.launch()