Darendra's picture
Update app.py
2faddd5 verified
raw
history blame
17.2 kB
# ==============================================================
# KLASIFIKASI EMOSI
# ==============================================================
import os
import math
import torch
import pandas as pd
import numpy as np
import gradio as gr
from pathlib import Path
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from transformers import (
AutoTokenizer,
AutoModel,
AutoConfig,
get_linear_schedule_with_warmup
)
# =========================================================
# CONFIG
# =========================================================
LIST_LABEL = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust']
FOLDER_MODEL = Path("saved_models")
FOLDER_MODEL.mkdir(exist_ok=True)
# ==============================================================
# File & Utils
# ==============================================================
def read_file_upload(file_obj):
"""Handle file upload dari Gradio."""
if file_obj is None:
raise ValueError("File belum diupload.")
if isinstance(file_obj, str):
return file_obj
if hasattr(file_obj, "name"):
return file_obj.name
if hasattr(file_obj, "read"):
temp_path = Path("/tmp") / f"upload_{np.random.randint(1e9)}.csv"
with open(temp_path, "wb") as f:
f.write(file_obj.read())
return str(temp_path)
raise ValueError("Tipe file tidak didukung.")
def save_last_model(name):
(FOLDER_MODEL / "last_model_name.txt").write_text(name)
def load_last_model():
path_file = FOLDER_MODEL / "last_model_name.txt"
if path_file.exists():
return path_file.read_text().strip()
return None
def get_model_path(model_name):
return FOLDER_MODEL / model_name.replace("/", "_")
# ==============================================================
# Data Cleaning
# ==============================================================
def clean_labels(df):
"""
1. Isi label kosong dengan 0.
2. Pastikan tipe data label adalah Numeric (Float), bukan Object/String.
"""
for l in LIST_LABEL:
if l not in df.columns:
df[l] = 0
# --- PERBAIKAN UTAMA DI SINI ---
# Paksa konversi ke angka. Error (text/kosong) jadi NaN, lalu diisi 0.
df[l] = pd.to_numeric(df[l], errors='coerce').fillna(0).astype(float)
return df
def clean_text(df, col="text"):
"""Hapus enter dan spasi berlebih."""
if col not in df.columns:
raise KeyError(f"CSV harus punya kolom '{col}'")
df[col] = df[col].astype(str).str.replace("\n", " ").str.strip()
return df
# =========================================================
# Model Architecture
# =========================================================
class ModelEmosi(nn.Module):
def __init__(self, base_model_name, num_labels=8):
super().__init__()
self.config = AutoConfig.from_pretrained(base_model_name)
self.base = AutoModel.from_pretrained(base_model_name)
self.dropout = nn.Dropout(0.3)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask):
out = self.base(
input_ids=input_ids,
attention_mask=attention_mask
)
if hasattr(out, "pooler_output") and out.pooler_output is not None:
x = out.pooler_output
else:
x = out.last_hidden_state[:, 0, :]
x = self.dropout(x)
return self.classifier(x)
# ==============================================================
# Tokenizer & Dataset
# ==============================================================
def tokenize_batch(texts, tokenizer, max_len=128):
return tokenizer(
texts,
padding="max_length",
truncation=True,
max_length=max_len,
return_tensors="pt"
)
def create_dataset(df, tokenizer, max_len=128):
encodings = tokenize_batch(df["text"].tolist(), tokenizer, max_len)
# Karena sudah dibersihkan di clean_labels, ini aman
labels = torch.tensor(df[LIST_LABEL].values, dtype=torch.float)
return TensorDataset(
encodings["input_ids"],
encodings["attention_mask"],
labels
)
# ==============================================================
# Weights
# ==============================================================
def hitung_pos_weight(df):
counts = df[LIST_LABEL].sum(axis=0)
N = len(df)
pw = []
for c in counts:
pw.append((N - c) / c if c > 0 else 1.0)
return torch.tensor(pw, dtype=torch.float)
# ==============================================================
# Save & Load Logic
# ==============================================================
def save_model(model, tokenizer, folder):
os.makedirs(folder, exist_ok=True)
model.base.save_pretrained(folder)
tokenizer.save_pretrained(folder)
torch.save(model.classifier.state_dict(), str(Path(folder) / "classifier_head.pt"))
save_last_model(str(folder))
def load_model(folder):
folder = str(folder)
config = AutoConfig.from_pretrained(folder)
tokenizer = AutoTokenizer.from_pretrained(folder)
model = ModelEmosi(folder)
state = torch.load(f"{folder}/classifier_head.pt", map_location="cpu")
model.classifier.load_state_dict(state)
model.eval()
return model, tokenizer, config
# ==============================================================
# TRAINING
# ==============================================================
def jalankan_training(
df,
progress_bar=None, # Tambahan untuk Gradio Progress
model_name="bert-base-multilingual-cased",
epochs=3,
batch_size=8,
lr=2e-5,
max_len=128,
weight_decay=0.01,
warmup_ratio=0.1,
patience=2,
freeze_layers=6,
device=None
):
"""
Fungsi ini diubah menjadi Generator (yield) agar bisa streaming log ke UI.
"""
# 1. Yield pesan awal
yield "Mempersiapkan dataset dan tokenizer...", None
device = device or ("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained(model_name)
df = df.reset_index(drop=True)
full_dataset = create_dataset(df, tokenizer, max_len)
idx = list(range(len(full_dataset)))
train_idx, val_idx = train_test_split(idx, test_size=0.15, random_state=42)
def get_subset(ds, indices):
return TensorDataset(
torch.stack([ds[i][0] for i in indices]),
torch.stack([ds[i][1] for i in indices]),
torch.stack([ds[i][2] for i in indices]),
)
train_ds = get_subset(full_dataset, train_idx)
val_ds = get_subset(full_dataset, val_idx)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)
model = ModelEmosi(model_name)
model.to(device)
# Freeze layers logic
for name, param in model.base.named_parameters():
if name.startswith("embeddings."):
param.requires_grad = False
elif name.startswith("encoder.layer"):
try:
layer_num = int(name.split(".")[2])
if layer_num < freeze_layers:
param.requires_grad = False
except:
pass
pos_weight = hitung_pos_weight(df).to(device)
loss_fn = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.AdamW(
filter(lambda p: p.requires_grad, model.parameters()),
lr=lr,
weight_decay=weight_decay
)
total_steps = len(train_loader) * epochs
warmup_steps = int(warmup_ratio * total_steps)
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=warmup_steps,
num_training_steps=total_steps
)
best_val_loss = float("inf")
no_improve = 0
history = {"train_loss": [], "val_loss": []}
save_path = str(get_model_path(model_name))
yield f"Mulai Training di device: {device}\nTotal Steps: {total_steps}", None
for ep in range(1, epochs+1):
# Update progress bar Gradio (jika ada)
if progress_bar:
progress_bar(float(ep)/epochs, desc=f"Epoch {ep}/{epochs}")
model.train()
total_train_loss = 0
# Loop batch
for input_ids, mask, labels in train_loader:
input_ids = input_ids.to(device)
mask = mask.to(device)
labels = labels.to(device)
optimizer.zero_grad()
logits = model(input_ids, mask)
loss = loss_fn(logits, labels)
loss.backward()
optimizer.step()
scheduler.step()
total_train_loss += loss.item() * input_ids.size(0)
avg_train_loss = total_train_loss / len(train_loader.dataset)
history["train_loss"].append(avg_train_loss)
# Validation
model.eval()
total_val_loss = 0
with torch.no_grad():
for input_ids, mask, labels in val_loader:
input_ids = input_ids.to(device)
mask = mask.to(device)
labels = labels.to(device)
logits = model(input_ids, mask)
loss = loss_fn(logits, labels)
total_val_loss += loss.item() * input_ids.size(0)
avg_val_loss = total_val_loss / len(val_loader.dataset)
history["val_loss"].append(avg_val_loss)
# LOGGING MESSAGE
log_msg = f"✅ Epoch {ep} | Train Loss={avg_train_loss:.4f} | Val Loss={avg_val_loss:.4f}"
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
no_improve = 0
save_model(model, tokenizer, save_path)
log_msg += " --> (Model Saved 💾)"
else:
no_improve += 1
log_msg += f" --> (No Improve: {no_improve}/{patience})"
# Yield log per epoch
yield log_msg, None
if no_improve >= patience:
yield "⛔ Early stopping triggered.", None
break
yield "Training Selesai! 🎉", history
# ==============================================================
# PREDICTION
# ==============================================================
def predict_satu(text, folder=None):
folder = folder or load_last_model()
if folder is None:
return {"Error": "Belum ada model yang dilatih."}
model, tokenizer, _ = load_model(folder)
encoded = tokenizer(
text,
padding="max_length",
truncation=True,
max_length=128,
return_tensors="pt"
)
with torch.no_grad():
out = model(encoded["input_ids"], encoded["attention_mask"])
probs = torch.sigmoid(out).numpy()[0]
return {LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))}
def predict_batch(text_list, folder=None, batch_size=32):
folder = folder or load_last_model()
if folder is None:
return []
model, tokenizer, _ = load_model(folder)
preds = []
for i in range(0, len(text_list), batch_size):
batch = text_list[i:i+batch_size]
encoded = tokenizer(
batch,
padding="max_length",
truncation=True,
max_length=128,
return_tensors="pt"
)
with torch.no_grad():
out = model(encoded["input_ids"], encoded["attention_mask"])
probs = torch.sigmoid(out).numpy()
for p in probs:
preds.append({LIST_LABEL[j]: float(p[j]) for j in range(len(LIST_LABEL))})
return preds
def summarize_result(preds):
if not preds:
return {"Info": "Tidak ada hasil."}
avg = {l: 0.0 for l in LIST_LABEL}
n = len(preds)
for p in preds:
for l,v in p.items():
avg[l] += v
for l in avg:
avg[l] /= n
top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3]
top3_fmt = [{"label":l, "score":float(s)} for l,s in top3]
return {
"jumlah_data": n,
"distribusi_rata2": avg,
"top_3": top3_fmt
}
# ==============================================================
# GRADIO UI
# ==============================================================
def wrapper_training(file_obj, sep, model_name, epoch, batch, lr,
max_len, wd, warmup, pat, freeze,
progress=gr.Progress()): # Tambahkan progress bar object
csv_path = read_file_upload(file_obj)
df = pd.read_csv(csv_path, sep=sep)
df = clean_labels(df)
df = clean_text(df)
accumulated_log = ""
# Memanggil generator jalankan_training
for log_msg, history_result in jalankan_training(
df=df,
progress_bar=progress, # Kirim progress bar ke backend
model_name=model_name,
epochs=int(epoch),
batch_size=int(batch),
lr=float(lr),
max_len=int(max_len),
weight_decay=float(wd),
warmup_ratio=float(warmup),
patience=int(pat),
freeze_layers=int(freeze)
):
# Update log text real-time
accumulated_log += log_msg + "\n"
# Jika training selesai, history_result tidak None
if history_result is not None:
# Yield terakhir: log penuh + JSON history
yield accumulated_log, history_result
else:
# Yield proses: log berjalan + JSON kosong/null
yield accumulated_log, None
def wrapper_predict_satu(text):
return predict_satu(text)
def wrapper_predict_dataset(file_obj, sep, batch_size):
csv_path = read_file_upload(file_obj)
df = pd.read_csv(csv_path, sep=sep)
df = clean_labels(df)
df = clean_text(df)
preds = predict_batch(df["text"].tolist(), batch_size=int(batch_size))
return summarize_result(preds)
# ==============================================================
# INTERFACE
# ==============================================================
with gr.Blocks() as app:
gr.Markdown("## Emotion Classifier — IndoBERT / Multilingual")
with gr.Tab("Menu Training"):
gr.Markdown("Upload dataset CSV untuk fine-tuning model.")
in_file = gr.File(label="Upload File CSV")
in_sep = gr.Textbox(label="Delimiter (Pemisah)", value=";")
in_model = gr.Dropdown(
label="Base Model",
choices=["bert-base-multilingual-cased", "indobert-base-p1"],
value="bert-base-multilingual-cased"
)
with gr.Row():
in_epoch = gr.Number(label="Epochs", value=3)
in_batch = gr.Number(label="Batch Size", value=8)
in_lr = gr.Number(label="Learning Rate", value=2e-5)
with gr.Row():
in_len = gr.Number(label="Max Length", value=128)
in_pat = gr.Number(label="Patience (Early Stop)", value=2)
in_freeze = gr.Number(label="Freeze Layers", value=6)
# Hidden advanced params
in_wd = gr.Number(label="Weight Decay", value=0.01, visible=False)
in_warmup = gr.Number(label="Warmup Ratio", value=0.1, visible=False)
btn_train = gr.Button("Mulai Training", variant="primary")
# OUTPUT: DUA KOLOM (Log Teks & Hasil JSON)
with gr.Row():
out_log = gr.Textbox(label="Log Latihan (Real-time)", lines=10, interactive=False)
out_result = gr.JSON(label="Hasil Akhir (History)")
btn_train.click(
wrapper_training,
inputs=[in_file, in_sep, in_model, in_epoch, in_batch,
in_lr, in_len, in_wd, in_warmup, in_pat, in_freeze],
outputs=[out_log, out_result] # Output ke dua komponen
)
with gr.Tab("Tes Satu Kalimat"):
in_text = gr.Textbox(label="Input Teks", placeholder="Contoh: Aku senang sekali hari ini...")
btn_satu = gr.Button("Prediksi")
out_satu = gr.Label(label="Confidence Score")
btn_satu.click(wrapper_predict_satu, inputs=[in_text], outputs=out_satu)
with gr.Tab("Tes Satu File"):
gr.Markdown("Upload file CSV baru untuk prediksi massal.")
in_file_test = gr.File(label="Upload CSV")
in_sep_test = gr.Textbox(label="Delimiter", value=";")
in_bs_test = gr.Number(label="Batch Size", value=32)
btn_test = gr.Button("Run Prediction")
out_test = gr.JSON(label="Summary")
btn_test.click(
wrapper_predict_dataset,
inputs=[in_file_test, in_sep_test, in_bs_test],
outputs=out_test
)
app.launch()