| import os |
| import torch |
| import pandas as pd |
| import numpy as np |
| import gradio as gr |
| import zipfile |
| import shutil |
| from pathlib import Path |
| from torch import nn |
| from torch.utils.data import DataLoader, TensorDataset |
| from transformers import AutoTokenizer, AutoModel, AutoConfig |
|
|
| |
| |
| |
| LIST_LABEL = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust'] |
| DIR_TRAINED = Path("saved_models/trained_local") |
| DIR_UPLOADED = Path("saved_models/uploaded_colab") |
|
|
| DIR_TRAINED.mkdir(parents=True, exist_ok=True) |
| DIR_UPLOADED.mkdir(parents=True, exist_ok=True) |
|
|
| ACTIVE_MODEL_POINTER = "active_model_path.txt" |
|
|
| |
| |
| |
| class ModelEmosi(nn.Module): |
| def __init__(self, base_model_name, num_labels=8): |
| super().__init__() |
| self.config = AutoConfig.from_pretrained(base_model_name) |
| self.base = AutoModel.from_pretrained(base_model_name) |
| self.dropout = nn.Dropout(0.3) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
|
|
| def forward(self, input_ids, attention_mask): |
| out = self.base(input_ids=input_ids, attention_mask=attention_mask) |
| if hasattr(out, "pooler_output") and out.pooler_output is not None: |
| x = out.pooler_output |
| else: |
| x = out.last_hidden_state[:, 0, :] |
| return self.classifier(self.dropout(x)) |
|
|
| |
| |
| |
| def clean_data(df): |
| for l in LIST_LABEL: |
| if l not in df.columns: df[l] = 0 |
| df[l] = pd.to_numeric(df[l], errors='coerce').fillna(0).astype(float) |
| if "text" in df.columns: |
| df["text"] = df["text"].astype(str).str.replace("\n", " ").str.strip() |
| return df |
|
|
| def get_active_model_path(): |
| if os.path.exists(ACTIVE_MODEL_POINTER): |
| with open(ACTIVE_MODEL_POINTER, "r") as f: |
| path = f.read().strip() |
| if os.path.exists(path): return path |
| return None |
|
|
| def set_active_model_path(path): |
| with open(ACTIVE_MODEL_POINTER, "w") as f: |
| f.write(str(path)) |
|
|
| |
| |
| |
| def run_training_generator(file_obj, sep, epochs, batch_size, lr, progress=gr.Progress()): |
| yield "β³ Membaca dataset...", None |
| try: |
| df = pd.read_csv(file_obj.name, sep=sep) |
| df = clean_data(df) |
| except Exception as e: |
| yield f"β Error: {str(e)}", None |
| return |
|
|
| device = "cpu" |
| model_name = "bert-base-multilingual-cased" |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| |
| def tokenize_fn(texts): |
| return tokenizer(texts, padding="max_length", truncation=True, max_length=128, return_tensors="pt") |
| |
| encodings = tokenize_fn(df["text"].tolist()) |
| labels = torch.tensor(df[LIST_LABEL].values, dtype=torch.float) |
| dataset = TensorDataset(encodings["input_ids"], encodings["attention_mask"], labels) |
| train_loader = DataLoader(dataset, batch_size=int(batch_size), shuffle=True) |
| |
| model = ModelEmosi(model_name) |
| model.to(device) |
| optimizer = torch.optim.AdamW(model.parameters(), lr=float(lr)) |
| loss_fn = nn.BCEWithLogitsLoss() |
|
|
| log_text = f"π Mulai Training CPU...\nData: {len(df)} baris\n" |
| yield log_text, None |
|
|
| model.train() |
| for ep in range(int(epochs)): |
| total_loss = 0 |
| for step, batch in enumerate(train_loader): |
| b_ids, b_mask, b_lbl = batch |
| optimizer.zero_grad() |
| out = model(b_ids, b_mask) |
| loss = loss_fn(out, b_lbl) |
| loss.backward() |
| optimizer.step() |
| total_loss += loss.item() |
| |
| if step % 5 == 0: |
| progress((ep * len(train_loader) + step) / (int(epochs) * len(train_loader))) |
| |
| avg_loss = total_loss / len(train_loader) |
| log_text += f"β
Epoch {ep+1} | Loss: {avg_loss:.4f}\n" |
| yield log_text, None |
| |
| model.base.save_pretrained(DIR_TRAINED) |
| tokenizer.save_pretrained(DIR_TRAINED) |
| torch.save(model.classifier.state_dict(), DIR_TRAINED / "classifier_head.pt") |
| set_active_model_path(DIR_TRAINED) |
| |
| yield log_text + "\nπ Selesai & Disimpan!", "Model Lokal (Baru Dilatih)" |
|
|
| |
| |
| |
| def handle_zip_upload(file_obj): |
| if file_obj is None: return "β Tidak ada file.", None |
| try: |
| if DIR_UPLOADED.exists(): shutil.rmtree(DIR_UPLOADED) |
| DIR_UPLOADED.mkdir() |
| |
| with zipfile.ZipFile(file_obj.name, 'r') as zip_ref: |
| zip_ref.extractall(DIR_UPLOADED) |
| |
| |
| files_in_dir = list(DIR_UPLOADED.iterdir()) |
| if len(files_in_dir) == 1 and files_in_dir[0].is_dir(): |
| subfolder = files_in_dir[0] |
| for item in subfolder.iterdir(): |
| shutil.move(str(item), str(DIR_UPLOADED)) |
| subfolder.rmdir() |
|
|
| set_active_model_path(DIR_UPLOADED) |
| return f"β
Model berhasil dimuat dari ZIP!\nLokasi: {DIR_UPLOADED}", "Model Upload (Dari Colab)" |
| except Exception as e: |
| return f"β Error unzip: {str(e)}", None |
|
|
| |
| |
| |
| def load_model_inference(): |
| path = get_active_model_path() |
| if not path: raise ValueError("Belum ada model aktif.") |
| |
| path = Path(path) |
| config = AutoConfig.from_pretrained(path) |
| tokenizer = AutoTokenizer.from_pretrained(path) |
| model = ModelEmosi(path) |
| |
| head_path = path / "classifier_head.pt" |
| if head_path.exists(): |
| model.classifier.load_state_dict(torch.load(head_path, map_location="cpu")) |
| |
| model.eval() |
| return model, tokenizer |
|
|
| def predict_text(text): |
| if not text: return None |
| try: |
| model, tokenizer = load_model_inference() |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=128) |
| with torch.no_grad(): |
| out = model(inputs["input_ids"], inputs["attention_mask"]) |
| probs = torch.sigmoid(out).numpy()[0] |
| return {LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))} |
| except Exception as e: |
| return {"Error": str(e)} |
|
|
| def predict_csv(file_obj, sep): |
| try: |
| df = pd.read_csv(file_obj.name, sep=sep) |
| df = clean_data(df) |
| model, tokenizer = load_model_inference() |
| results = [] |
| for txt in df["text"]: |
| inputs = tokenizer(txt, return_tensors="pt", truncation=True, padding="max_length", max_length=128) |
| with torch.no_grad(): |
| out = model(inputs["input_ids"], inputs["attention_mask"]) |
| probs = torch.sigmoid(out).numpy()[0] |
| results.append({LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))}) |
| |
| avg = {l: 0.0 for l in LIST_LABEL} |
| for r in results: |
| for l,v in r.items(): avg[l] += v |
| for l in avg: avg[l] /= len(results) |
| |
| top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3] |
| return {"Total Data": len(results), "Top 3 Emosi": {k: round(v,4) for k,v in top3}, "Rata-rata": avg} |
| except Exception as e: |
| return {"Error": str(e)} |
|
|
| |
| |
| |
| with gr.Blocks(title="Emotion AI Manager") as app: |
| gr.Markdown("# π AI Emotion Classifier System") |
| |
| |
| lbl_active_model = gr.Textbox(label="Status Model Aktif", value="Belum ada model yang dipilih.", interactive=False) |
|
|
| |
| with gr.Tab("βοΈ Pelatihan & Model"): |
| with gr.Tabs(): |
| |
| |
| with gr.Tab("π Upload Pretrained (Recommended)"): |
| gr.Markdown("Gunakan model hasil training GPU (Colab) agar cepat.") |
| in_zip = gr.File(label="Upload File .zip Model", file_types=[".zip"]) |
| btn_upload = gr.Button("Ekstrak & Aktifkan Model", variant="primary") |
| out_log_upload = gr.Textbox(label="Log Sistem") |
| |
| btn_upload.click(handle_zip_upload, inputs=in_zip, outputs=[out_log_upload, lbl_active_model]) |
| |
| |
| with gr.Tab("ποΈββοΈ Latihan Manual (CPU)"): |
| gr.Markdown("β οΈ Lambat di Hugging Face Space. Gunakan data kecil saja.") |
| with gr.Row(): |
| in_csv = gr.File(label="Dataset CSV") |
| in_sep = gr.Textbox(label="Separator", value=";") |
| with gr.Row(): |
| in_ep = gr.Number(label="Epoch", value=1) |
| in_bs = gr.Number(label="Batch", value=4) |
| in_lr = gr.Number(label="LR", value=2e-5) |
| |
| btn_train = gr.Button("Mulai Latihan") |
| out_log_train = gr.Textbox(label="Log Training", lines=6) |
| |
| btn_train.click(run_training_generator, inputs=[in_csv, in_sep, in_ep, in_bs, in_lr], outputs=[out_log_train, lbl_active_model]) |
|
|
| |
| with gr.Tab("π§ͺ Pengujian (Testing)"): |
| with gr.Tabs(): |
| |
| |
| with gr.Tab("π Uji Tunggal (Teks)"): |
| in_txt = gr.Textbox(label="Masukkan Kalimat", placeholder="Saya merasa...") |
| btn_pred_txt = gr.Button("Prediksi Emosi") |
| out_lbl = gr.Label(label="Confidence Score") |
| |
| btn_pred_txt.click(predict_text, inputs=in_txt, outputs=out_lbl) |
| |
| |
| with gr.Tab("π Uji Batch (CSV)"): |
| in_csv_test = gr.File(label="Upload CSV Test") |
| in_sep_test = gr.Textbox(label="Separator", value=";") |
| btn_pred_csv = gr.Button("Analisis Batch") |
| out_json = gr.JSON(label="Hasil Analisis") |
| |
| btn_pred_csv.click(predict_csv, inputs=[in_csv_test, in_sep_test], outputs=out_json) |
|
|
| app.queue().launch() |