| |
| |
| |
| import os |
| import math |
| import torch |
| import pandas as pd |
| import numpy as np |
| import gradio as gr |
| from pathlib import Path |
| from torch import nn |
| from torch.utils.data import DataLoader, TensorDataset |
| from sklearn.model_selection import train_test_split |
| from transformers import ( |
| AutoTokenizer, |
| AutoModel, |
| AutoConfig, |
| get_linear_schedule_with_warmup |
| ) |
|
|
| |
| |
| |
| LIST_LABEL = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust'] |
|
|
| FOLDER_MODEL = Path("saved_models") |
| FOLDER_MODEL.mkdir(exist_ok=True) |
|
|
| |
| |
| |
| def read_file_upload(file_obj): |
| """Handle file upload dari Gradio.""" |
| if file_obj is None: |
| raise ValueError("File belum diupload.") |
| |
| if isinstance(file_obj, str): |
| return file_obj |
| |
| if hasattr(file_obj, "name"): |
| return file_obj.name |
| |
| if hasattr(file_obj, "read"): |
| temp_path = Path("/tmp") / f"upload_{np.random.randint(1e9)}.csv" |
| with open(temp_path, "wb") as f: |
| f.write(file_obj.read()) |
| return str(temp_path) |
| |
| raise ValueError("Tipe file tidak didukung.") |
|
|
| def save_last_model(name): |
| (FOLDER_MODEL / "last_model_name.txt").write_text(name) |
|
|
| def load_last_model(): |
| path_file = FOLDER_MODEL / "last_model_name.txt" |
| if path_file.exists(): |
| return path_file.read_text().strip() |
| return None |
|
|
| def get_model_path(model_name): |
| return FOLDER_MODEL / model_name.replace("/", "_") |
|
|
| |
| |
| |
| def clean_labels(df): |
| """ |
| 1. Isi label kosong dengan 0. |
| 2. Pastikan tipe data label adalah Numeric (Float), bukan Object/String. |
| """ |
| for l in LIST_LABEL: |
| if l not in df.columns: |
| df[l] = 0 |
| |
| |
| |
| df[l] = pd.to_numeric(df[l], errors='coerce').fillna(0).astype(float) |
| |
| return df |
|
|
| def clean_text(df, col="text"): |
| """Hapus enter dan spasi berlebih.""" |
| if col not in df.columns: |
| raise KeyError(f"CSV harus punya kolom '{col}'") |
| df[col] = df[col].astype(str).str.replace("\n", " ").str.strip() |
| return df |
|
|
| |
| |
| |
| class ModelEmosi(nn.Module): |
| def __init__(self, base_model_name, num_labels=8): |
| super().__init__() |
| self.config = AutoConfig.from_pretrained(base_model_name) |
| self.base = AutoModel.from_pretrained(base_model_name) |
| self.dropout = nn.Dropout(0.3) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
|
|
| def forward(self, input_ids, attention_mask): |
| out = self.base( |
| input_ids=input_ids, |
| attention_mask=attention_mask |
| ) |
| if hasattr(out, "pooler_output") and out.pooler_output is not None: |
| x = out.pooler_output |
| else: |
| x = out.last_hidden_state[:, 0, :] |
| |
| x = self.dropout(x) |
| return self.classifier(x) |
|
|
| |
| |
| |
| def tokenize_batch(texts, tokenizer, max_len=128): |
| return tokenizer( |
| texts, |
| padding="max_length", |
| truncation=True, |
| max_length=max_len, |
| return_tensors="pt" |
| ) |
|
|
| def create_dataset(df, tokenizer, max_len=128): |
| encodings = tokenize_batch(df["text"].tolist(), tokenizer, max_len) |
| |
| |
| labels = torch.tensor(df[LIST_LABEL].values, dtype=torch.float) |
| |
| return TensorDataset( |
| encodings["input_ids"], |
| encodings["attention_mask"], |
| labels |
| ) |
|
|
| |
| |
| |
| def hitung_pos_weight(df): |
| counts = df[LIST_LABEL].sum(axis=0) |
| N = len(df) |
| pw = [] |
| for c in counts: |
| pw.append((N - c) / c if c > 0 else 1.0) |
| return torch.tensor(pw, dtype=torch.float) |
|
|
| |
| |
| |
| def save_model(model, tokenizer, folder): |
| os.makedirs(folder, exist_ok=True) |
| model.base.save_pretrained(folder) |
| tokenizer.save_pretrained(folder) |
| torch.save(model.classifier.state_dict(), str(Path(folder) / "classifier_head.pt")) |
| save_last_model(str(folder)) |
|
|
| def load_model(folder): |
| folder = str(folder) |
| config = AutoConfig.from_pretrained(folder) |
| tokenizer = AutoTokenizer.from_pretrained(folder) |
| model = ModelEmosi(folder) |
| |
| state = torch.load(f"{folder}/classifier_head.pt", map_location="cpu") |
| model.classifier.load_state_dict(state) |
| model.eval() |
| return model, tokenizer, config |
|
|
| |
| |
| |
| def jalankan_training( |
| df, |
| progress_bar=None, |
| model_name="bert-base-multilingual-cased", |
| epochs=3, |
| batch_size=8, |
| lr=2e-5, |
| max_len=128, |
| weight_decay=0.01, |
| warmup_ratio=0.1, |
| patience=2, |
| freeze_layers=6, |
| device=None |
| ): |
| """ |
| Fungsi ini diubah menjadi Generator (yield) agar bisa streaming log ke UI. |
| """ |
| |
| yield "Mempersiapkan dataset dan tokenizer...", None |
|
|
| device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| |
| df = df.reset_index(drop=True) |
| full_dataset = create_dataset(df, tokenizer, max_len) |
| |
| idx = list(range(len(full_dataset))) |
| train_idx, val_idx = train_test_split(idx, test_size=0.15, random_state=42) |
| |
| def get_subset(ds, indices): |
| return TensorDataset( |
| torch.stack([ds[i][0] for i in indices]), |
| torch.stack([ds[i][1] for i in indices]), |
| torch.stack([ds[i][2] for i in indices]), |
| ) |
| |
| train_ds = get_subset(full_dataset, train_idx) |
| val_ds = get_subset(full_dataset, val_idx) |
| |
| train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) |
| val_loader = DataLoader(val_ds, batch_size=batch_size) |
| |
| model = ModelEmosi(model_name) |
| model.to(device) |
| |
| |
| for name, param in model.base.named_parameters(): |
| if name.startswith("embeddings."): |
| param.requires_grad = False |
| elif name.startswith("encoder.layer"): |
| try: |
| layer_num = int(name.split(".")[2]) |
| if layer_num < freeze_layers: |
| param.requires_grad = False |
| except: |
| pass |
| |
| pos_weight = hitung_pos_weight(df).to(device) |
| loss_fn = nn.BCEWithLogitsLoss(pos_weight=pos_weight) |
| |
| optimizer = torch.optim.AdamW( |
| filter(lambda p: p.requires_grad, model.parameters()), |
| lr=lr, |
| weight_decay=weight_decay |
| ) |
| |
| total_steps = len(train_loader) * epochs |
| warmup_steps = int(warmup_ratio * total_steps) |
| |
| scheduler = get_linear_schedule_with_warmup( |
| optimizer, |
| num_warmup_steps=warmup_steps, |
| num_training_steps=total_steps |
| ) |
| |
| best_val_loss = float("inf") |
| no_improve = 0 |
| history = {"train_loss": [], "val_loss": []} |
| save_path = str(get_model_path(model_name)) |
| |
| yield f"Mulai Training di device: {device}\nTotal Steps: {total_steps}", None |
|
|
| for ep in range(1, epochs+1): |
| |
| if progress_bar: |
| progress_bar(float(ep)/epochs, desc=f"Epoch {ep}/{epochs}") |
|
|
| model.train() |
| total_train_loss = 0 |
| |
| |
| for input_ids, mask, labels in train_loader: |
| input_ids = input_ids.to(device) |
| mask = mask.to(device) |
| labels = labels.to(device) |
| |
| optimizer.zero_grad() |
| logits = model(input_ids, mask) |
| loss = loss_fn(logits, labels) |
| |
| loss.backward() |
| optimizer.step() |
| scheduler.step() |
| |
| total_train_loss += loss.item() * input_ids.size(0) |
| |
| avg_train_loss = total_train_loss / len(train_loader.dataset) |
| history["train_loss"].append(avg_train_loss) |
| |
| |
| model.eval() |
| total_val_loss = 0 |
| with torch.no_grad(): |
| for input_ids, mask, labels in val_loader: |
| input_ids = input_ids.to(device) |
| mask = mask.to(device) |
| labels = labels.to(device) |
| logits = model(input_ids, mask) |
| loss = loss_fn(logits, labels) |
| total_val_loss += loss.item() * input_ids.size(0) |
| |
| avg_val_loss = total_val_loss / len(val_loader.dataset) |
| history["val_loss"].append(avg_val_loss) |
| |
| |
| log_msg = f"✅ Epoch {ep} | Train Loss={avg_train_loss:.4f} | Val Loss={avg_val_loss:.4f}" |
| |
| if avg_val_loss < best_val_loss: |
| best_val_loss = avg_val_loss |
| no_improve = 0 |
| save_model(model, tokenizer, save_path) |
| log_msg += " --> (Model Saved 💾)" |
| else: |
| no_improve += 1 |
| log_msg += f" --> (No Improve: {no_improve}/{patience})" |
| |
| |
| yield log_msg, None |
|
|
| if no_improve >= patience: |
| yield "⛔ Early stopping triggered.", None |
| break |
| |
| yield "Training Selesai! 🎉", history |
|
|
| |
| |
| |
| def predict_satu(text, folder=None): |
| folder = folder or load_last_model() |
| if folder is None: |
| return {"Error": "Belum ada model yang dilatih."} |
|
|
| model, tokenizer, _ = load_model(folder) |
| |
| encoded = tokenizer( |
| text, |
| padding="max_length", |
| truncation=True, |
| max_length=128, |
| return_tensors="pt" |
| ) |
| |
| with torch.no_grad(): |
| out = model(encoded["input_ids"], encoded["attention_mask"]) |
| probs = torch.sigmoid(out).numpy()[0] |
| |
| return {LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))} |
|
|
| def predict_batch(text_list, folder=None, batch_size=32): |
| folder = folder or load_last_model() |
| if folder is None: |
| return [] |
|
|
| model, tokenizer, _ = load_model(folder) |
| preds = [] |
| |
| for i in range(0, len(text_list), batch_size): |
| batch = text_list[i:i+batch_size] |
| encoded = tokenizer( |
| batch, |
| padding="max_length", |
| truncation=True, |
| max_length=128, |
| return_tensors="pt" |
| ) |
| with torch.no_grad(): |
| out = model(encoded["input_ids"], encoded["attention_mask"]) |
| probs = torch.sigmoid(out).numpy() |
| |
| for p in probs: |
| preds.append({LIST_LABEL[j]: float(p[j]) for j in range(len(LIST_LABEL))}) |
| |
| return preds |
|
|
| def summarize_result(preds): |
| if not preds: |
| return {"Info": "Tidak ada hasil."} |
|
|
| avg = {l: 0.0 for l in LIST_LABEL} |
| n = len(preds) |
| |
| for p in preds: |
| for l,v in p.items(): |
| avg[l] += v |
| |
| for l in avg: |
| avg[l] /= n |
| |
| top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3] |
| top3_fmt = [{"label":l, "score":float(s)} for l,s in top3] |
| |
| return { |
| "jumlah_data": n, |
| "distribusi_rata2": avg, |
| "top_3": top3_fmt |
| } |
|
|
| |
| |
| |
| def wrapper_training(file_obj, sep, model_name, epoch, batch, lr, |
| max_len, wd, warmup, pat, freeze, |
| progress=gr.Progress()): |
| |
| csv_path = read_file_upload(file_obj) |
| df = pd.read_csv(csv_path, sep=sep) |
| |
| df = clean_labels(df) |
| df = clean_text(df) |
| |
| accumulated_log = "" |
| |
| |
| for log_msg, history_result in jalankan_training( |
| df=df, |
| progress_bar=progress, |
| model_name=model_name, |
| epochs=int(epoch), |
| batch_size=int(batch), |
| lr=float(lr), |
| max_len=int(max_len), |
| weight_decay=float(wd), |
| warmup_ratio=float(warmup), |
| patience=int(pat), |
| freeze_layers=int(freeze) |
| ): |
| |
| accumulated_log += log_msg + "\n" |
| |
| |
| if history_result is not None: |
| |
| yield accumulated_log, history_result |
| else: |
| |
| yield accumulated_log, None |
|
|
| def wrapper_predict_satu(text): |
| return predict_satu(text) |
|
|
| def wrapper_predict_dataset(file_obj, sep, batch_size): |
| csv_path = read_file_upload(file_obj) |
| df = pd.read_csv(csv_path, sep=sep) |
| df = clean_labels(df) |
| df = clean_text(df) |
| preds = predict_batch(df["text"].tolist(), batch_size=int(batch_size)) |
| return summarize_result(preds) |
|
|
| |
| |
| |
| with gr.Blocks() as app: |
| gr.Markdown("## Emotion Classifier — IndoBERT / Multilingual") |
| |
| with gr.Tab("Menu Training"): |
| gr.Markdown("Upload dataset CSV untuk fine-tuning model.") |
| in_file = gr.File(label="Upload File CSV") |
| in_sep = gr.Textbox(label="Delimiter (Pemisah)", value=";") |
| |
| in_model = gr.Dropdown( |
| label="Base Model", |
| choices=["bert-base-multilingual-cased", "indobert-base-p1"], |
| value="bert-base-multilingual-cased" |
| ) |
| |
| with gr.Row(): |
| in_epoch = gr.Number(label="Epochs", value=3) |
| in_batch = gr.Number(label="Batch Size", value=8) |
| in_lr = gr.Number(label="Learning Rate", value=2e-5) |
| |
| with gr.Row(): |
| in_len = gr.Number(label="Max Length", value=128) |
| in_pat = gr.Number(label="Patience (Early Stop)", value=2) |
| in_freeze = gr.Number(label="Freeze Layers", value=6) |
| |
| |
| in_wd = gr.Number(label="Weight Decay", value=0.01, visible=False) |
| in_warmup = gr.Number(label="Warmup Ratio", value=0.1, visible=False) |
| |
| btn_train = gr.Button("Mulai Training", variant="primary") |
| |
| |
| with gr.Row(): |
| out_log = gr.Textbox(label="Log Latihan (Real-time)", lines=10, interactive=False) |
| out_result = gr.JSON(label="Hasil Akhir (History)") |
| |
| btn_train.click( |
| wrapper_training, |
| inputs=[in_file, in_sep, in_model, in_epoch, in_batch, |
| in_lr, in_len, in_wd, in_warmup, in_pat, in_freeze], |
| outputs=[out_log, out_result] |
| ) |
|
|
| with gr.Tab("Tes Satu Kalimat"): |
| in_text = gr.Textbox(label="Input Teks", placeholder="Contoh: Aku senang sekali hari ini...") |
| btn_satu = gr.Button("Prediksi") |
| out_satu = gr.Label(label="Confidence Score") |
| |
| btn_satu.click(wrapper_predict_satu, inputs=[in_text], outputs=out_satu) |
|
|
| with gr.Tab("Tes Satu File"): |
| gr.Markdown("Upload file CSV baru untuk prediksi massal.") |
| in_file_test = gr.File(label="Upload CSV") |
| in_sep_test = gr.Textbox(label="Delimiter", value=";") |
| in_bs_test = gr.Number(label="Batch Size", value=32) |
| |
| btn_test = gr.Button("Run Prediction") |
| out_test = gr.JSON(label="Summary") |
| |
| btn_test.click( |
| wrapper_predict_dataset, |
| inputs=[in_file_test, in_sep_test, in_bs_test], |
| outputs=out_test |
| ) |
|
|
| app.launch() |