import os import torch import pandas as pd import gradio as gr import shutil import zipfile from pathlib import Path from torch.utils.data import DataLoader, Dataset from torch.optim import AdamW from transformers import AutoTokenizer, AutoModelForSequenceClassification # ========================================================= # 1. KONFIGURASI & VARIABEL # ========================================================= LIST_LABEL = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust'] # Folder penyimpanan sementara DIR_UPLOADED = Path("temp_models/uploaded_zip") DIR_TRAINED = Path("temp_models/trained_cloud") DIR_UPLOADED.mkdir(parents=True, exist_ok=True) DIR_TRAINED.mkdir(parents=True, exist_ok=True) # Variabel Global untuk menyimpan path model aktif active_model_path = None # ========================================================= # 2. HELPER & DATASET # ========================================================= class EmosiDataset(Dataset): def __init__(self, df, tokenizer, max_len=128): self.df = df self.tokenizer = tokenizer self.max_len = max_len self.labels = df[LIST_LABEL].values self.texts = df["text_clean"].astype(str).tolist() def __len__(self): return len(self.df) def __getitem__(self, item): text = self.texts[item] inputs = self.tokenizer( text, truncation=True, padding='max_length', max_length=self.max_len, return_tensors='pt' ) return { 'input_ids': inputs['input_ids'].flatten(), 'attention_mask': inputs['attention_mask'].flatten(), 'labels': torch.tensor(self.labels[item], dtype=torch.float) } def clean_data(df): for l in LIST_LABEL: if l not in df.columns: df[l] = 0 df[l] = df[l].astype(str).str.replace(',', '.', regex=False) df[l] = pd.to_numeric(df[l], errors='coerce').fillna(0).astype(float) col_text = next((c for c in df.columns if c.lower() in ['text', 'kalimat', 'content', 'tweet']), None) if col_text: df["text_clean"] = df[col_text].astype(str).str.replace("\n", " ").str.strip() elif "text" in df.columns: df["text_clean"] = df["text"].astype(str).str.replace("\n", " ").str.strip() return df # ========================================================= # 3. UPLOAD ZIP # ========================================================= def handle_zip_upload(file_obj): global active_model_path if file_obj is None: return "❌ Tidak ada file.", None try: if DIR_UPLOADED.exists(): shutil.rmtree(DIR_UPLOADED) DIR_UPLOADED.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(file_obj.name, 'r') as zip_ref: zip_ref.extractall(DIR_UPLOADED) # Cari config.json config_path = list(DIR_UPLOADED.rglob("config.json")) if not config_path: return "❌ Error: Tidak ditemukan config.json dalam ZIP.", None final_model_path = config_path[0].parent active_model_path = str(final_model_path) return f"✅ Model ZIP Berhasil Dimuat!\nLokasi: {active_model_path}", "Status: Memakai Model Upload ZIP" except Exception as e: return f"❌ Error unzip: {str(e)}", None # ========================================================= # 4. TRAINING CLOUD # ========================================================= def train_model_cloud(file_obj, sep, epochs, batch_size, lr, progress=gr.Progress()): global active_model_path yield "⏳ Membaca dataset...", None if file_obj is None: yield "❌ File CSV belum diupload!", None return try: df = pd.read_csv(file_obj.name, sep=sep) df = clean_data(df) if "text_clean" not in df.columns: yield "❌ Kolom teks tidak ditemukan.", None return MODEL_NAME = "indobenchmark/indobert-base-p1" tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForSequenceClassification.from_pretrained( MODEL_NAME, num_labels=len(LIST_LABEL), problem_type="multi_label_classification" ) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) dataset = EmosiDataset(df, tokenizer) loader = DataLoader(dataset, batch_size=int(batch_size), shuffle=True) optimizer = AdamW(model.parameters(), lr=float(lr)) log_text = f"🚀 Mulai Training di {device}...\nData: {len(df)} baris.\n" yield log_text, None model.train() for ep in range(int(epochs)): total_loss = 0 steps = len(loader) for i, batch in enumerate(loader): optimizer.zero_grad() input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) labels = batch['labels'].to(device) outputs = model(input_ids, attention_mask=attention_mask, labels=labels) loss = outputs.loss loss.backward() optimizer.step() total_loss += loss.item() if i % 5 == 0: progress((ep * steps + i) / (int(epochs) * steps), desc=f"Ep {ep+1} Loss: {total_loss/(i+1):.4f}") avg_loss = total_loss / steps log_text += f"✅ Epoch {ep+1}/{epochs} | Loss: {avg_loss:.4f}\n" yield log_text, None # Simpan yield log_text + "\n💾 Menyimpan model...", None if DIR_TRAINED.exists(): shutil.rmtree(DIR_TRAINED) DIR_TRAINED.mkdir(parents=True, exist_ok=True) model.save_pretrained(DIR_TRAINED) tokenizer.save_pretrained(DIR_TRAINED) active_model_path = str(DIR_TRAINED) yield log_text + f"\n🎉 Selesai! Model training aktif.", "Status: Memakai Model Hasil Training" except Exception as e: yield f"❌ Error: {str(e)}", None # ========================================================= # 5. LOAD & PREDIKSI # ========================================================= def load_model_inference(): global active_model_path # Prioritas 1: Model aktif (hasil upload/training barusan) if active_model_path and os.path.exists(active_model_path): target_path = active_model_path # Prioritas 2: Folder default (upload manual via Files HF) elif os.path.exists("model_default") and os.path.exists("model_default/config.json"): target_path = "model_default" # Prioritas 3: Download Base Model else: return AutoModelForSequenceClassification.from_pretrained("indobenchmark/indobert-base-p1", num_labels=8), \ AutoTokenizer.from_pretrained("indobenchmark/indobert-base-p1") try: tokenizer = AutoTokenizer.from_pretrained(target_path) model = AutoModelForSequenceClassification.from_pretrained(target_path) model.eval() return model, tokenizer except: return AutoModelForSequenceClassification.from_pretrained("indobenchmark/indobert-base-p1", num_labels=8), \ AutoTokenizer.from_pretrained("indobenchmark/indobert-base-p1") def predict_text(text): if not text: return None try: model, tokenizer = load_model_inference() inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=128) with torch.no_grad(): out = model(**inputs) probs = torch.sigmoid(out.logits).numpy()[0] return {LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))} except Exception as e: return {"Error": str(e)} def predict_csv(file_obj, sep): try: try: df = pd.read_csv(file_obj.name, sep=sep) except: df = pd.read_csv(file_obj.name, sep=",") df = clean_data(df) model, tokenizer = load_model_inference() if "text_clean" not in df.columns: return {"Error": "Kolom teks tidak ditemukan"} results = [] for txt in df["text_clean"]: inputs = tokenizer(txt, return_tensors="pt", truncation=True, padding="max_length", max_length=128) with torch.no_grad(): out = model(**inputs) probs = torch.sigmoid(out.logits).numpy()[0] results.append({LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))}) avg = {l: 0.0 for l in LIST_LABEL} for r in results: for l,v in r.items(): avg[l] += v for l in avg: avg[l] /= len(results) top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3] return {"Info": f"Total {len(results)} data", "Dominan": {k: round(v,4) for k,v in top3}, "Detail": avg} except Exception as e: return {"Error": str(e)} # ========================================================= # 6. UI GRADIO # ========================================================= with gr.Blocks(title="IndoBERT Emotion Cloud") as app: gr.Markdown("# ☁️ IndoBERT Emotion Classifier") # Label Status Global lbl_status = gr.Textbox(label="Status Model Aktif", value="Default (IndoBERT Base / Uploaded Manual)", interactive=False) with gr.Tabs(): # === TAB 1: KONFIGURASI MODEL === with gr.Tab("⚙️ Konfigurasi Model"): with gr.Tabs(): # --- Sub Tab 1: Upload --- with gr.Tab("📂 Unggah Model"): gr.Markdown("Upload file `.zip` berisi model yang sudah dilatih (dari Komputer).") in_zip = gr.File(label="File ZIP Model") btn_upload = gr.Button("Ekstrak & Pakai Model", variant="primary") out_log_upload = gr.Textbox(label="Log Sistem") btn_upload.click(handle_zip_upload, inputs=in_zip, outputs=[out_log_upload, lbl_status]) # --- Sub Tab 2: Training --- with gr.Tab("🏋️‍♀️ Latih Model"): gr.Markdown("Latih model baru menggunakan Dataset CSV sendiri di Cloud.") with gr.Row(): in_csv = gr.File(label="Dataset CSV") in_sep = gr.Textbox(label="Separator", value=";") with gr.Row(): in_ep = gr.Number(label="Epoch", value=1, precision=0) in_bs = gr.Number(label="Batch Size", value=4, precision=0) in_lr = gr.Number(label="Learning Rate", value=2e-5) btn_train = gr.Button("Mulai Training", variant="stop") out_log_train = gr.Textbox(label="Log Training", lines=5) btn_train.click(train_model_cloud, inputs=[in_csv, in_sep, in_ep, in_bs, in_lr], outputs=[out_log_train, lbl_status]) # === TAB 2: TESTING === with gr.Tab("🧪 Testing"): gr.Markdown("Uji model yang sedang aktif.") with gr.Tabs(): with gr.Tab("📝 Uji Satu Kalimat"): in_txt = gr.Textbox(label="Masukkan Kalimat", lines=2, placeholder="Contoh: Saya sangat bahagia hari ini...") btn_pred = gr.Button("Prediksi Emosi") out_lbl = gr.Label(label="Hasil Prediksi") btn_pred.click(predict_text, inputs=in_txt, outputs=out_lbl) with gr.Tab("📊 Uji Batch (CSV)"): in_csv_test = gr.File(label="Upload CSV Test") btn_batch = gr.Button("Analisis Batch") out_json = gr.JSON(label="Hasil Analisis") btn_batch.click(predict_csv, inputs=[in_csv_test, in_sep], outputs=out_json) if __name__ == "__main__": app.launch()