|
|
import os |
|
|
import torch |
|
|
import pandas as pd |
|
|
import gradio as gr |
|
|
import shutil |
|
|
import zipfile |
|
|
from pathlib import Path |
|
|
from torch.utils.data import DataLoader, Dataset |
|
|
from torch.optim import AdamW |
|
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LIST_LABEL = ['anger','anticipation','disgust','fear','joy','sadness','surprise','trust'] |
|
|
|
|
|
|
|
|
DIR_UPLOADED = Path("temp_models/uploaded_zip") |
|
|
DIR_TRAINED = Path("temp_models/trained_cloud") |
|
|
|
|
|
DIR_UPLOADED.mkdir(parents=True, exist_ok=True) |
|
|
DIR_TRAINED.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
active_model_path = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EmosiDataset(Dataset): |
|
|
def __init__(self, df, tokenizer, max_len=128): |
|
|
self.df = df |
|
|
self.tokenizer = tokenizer |
|
|
self.max_len = max_len |
|
|
self.labels = df[LIST_LABEL].values |
|
|
self.texts = df["text_clean"].astype(str).tolist() |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.df) |
|
|
|
|
|
def __getitem__(self, item): |
|
|
text = self.texts[item] |
|
|
inputs = self.tokenizer( |
|
|
text, |
|
|
truncation=True, |
|
|
padding='max_length', |
|
|
max_length=self.max_len, |
|
|
return_tensors='pt' |
|
|
) |
|
|
return { |
|
|
'input_ids': inputs['input_ids'].flatten(), |
|
|
'attention_mask': inputs['attention_mask'].flatten(), |
|
|
'labels': torch.tensor(self.labels[item], dtype=torch.float) |
|
|
} |
|
|
|
|
|
def clean_data(df): |
|
|
for l in LIST_LABEL: |
|
|
if l not in df.columns: df[l] = 0 |
|
|
df[l] = df[l].astype(str).str.replace(',', '.', regex=False) |
|
|
df[l] = pd.to_numeric(df[l], errors='coerce').fillna(0).astype(float) |
|
|
|
|
|
col_text = next((c for c in df.columns if c.lower() in ['text', 'kalimat', 'content', 'tweet']), None) |
|
|
if col_text: |
|
|
df["text_clean"] = df[col_text].astype(str).str.replace("\n", " ").str.strip() |
|
|
elif "text" in df.columns: |
|
|
df["text_clean"] = df["text"].astype(str).str.replace("\n", " ").str.strip() |
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_zip_upload(file_obj): |
|
|
global active_model_path |
|
|
|
|
|
if file_obj is None: return "β Tidak ada file.", None |
|
|
try: |
|
|
if DIR_UPLOADED.exists(): shutil.rmtree(DIR_UPLOADED) |
|
|
DIR_UPLOADED.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with zipfile.ZipFile(file_obj.name, 'r') as zip_ref: |
|
|
zip_ref.extractall(DIR_UPLOADED) |
|
|
|
|
|
|
|
|
config_path = list(DIR_UPLOADED.rglob("config.json")) |
|
|
if not config_path: |
|
|
return "β Error: Tidak ditemukan config.json dalam ZIP.", None |
|
|
|
|
|
final_model_path = config_path[0].parent |
|
|
active_model_path = str(final_model_path) |
|
|
|
|
|
return f"β
Model ZIP Berhasil Dimuat!\nLokasi: {active_model_path}", "Status: Memakai Model Upload ZIP" |
|
|
except Exception as e: |
|
|
return f"β Error unzip: {str(e)}", None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def train_model_cloud(file_obj, sep, epochs, batch_size, lr, progress=gr.Progress()): |
|
|
global active_model_path |
|
|
|
|
|
yield "β³ Membaca dataset...", None |
|
|
if file_obj is None: |
|
|
yield "β File CSV belum diupload!", None |
|
|
return |
|
|
|
|
|
try: |
|
|
df = pd.read_csv(file_obj.name, sep=sep) |
|
|
df = clean_data(df) |
|
|
if "text_clean" not in df.columns: |
|
|
yield "β Kolom teks tidak ditemukan.", None |
|
|
return |
|
|
|
|
|
MODEL_NAME = "indobenchmark/indobert-base-p1" |
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
|
|
model = AutoModelForSequenceClassification.from_pretrained( |
|
|
MODEL_NAME, num_labels=len(LIST_LABEL), problem_type="multi_label_classification" |
|
|
) |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model.to(device) |
|
|
|
|
|
dataset = EmosiDataset(df, tokenizer) |
|
|
loader = DataLoader(dataset, batch_size=int(batch_size), shuffle=True) |
|
|
optimizer = AdamW(model.parameters(), lr=float(lr)) |
|
|
|
|
|
log_text = f"π Mulai Training di {device}...\nData: {len(df)} baris.\n" |
|
|
yield log_text, None |
|
|
|
|
|
model.train() |
|
|
for ep in range(int(epochs)): |
|
|
total_loss = 0 |
|
|
steps = len(loader) |
|
|
for i, batch in enumerate(loader): |
|
|
optimizer.zero_grad() |
|
|
input_ids = batch['input_ids'].to(device) |
|
|
attention_mask = batch['attention_mask'].to(device) |
|
|
labels = batch['labels'].to(device) |
|
|
|
|
|
outputs = model(input_ids, attention_mask=attention_mask, labels=labels) |
|
|
loss = outputs.loss |
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
|
|
|
total_loss += loss.item() |
|
|
if i % 5 == 0: |
|
|
progress((ep * steps + i) / (int(epochs) * steps), desc=f"Ep {ep+1} Loss: {total_loss/(i+1):.4f}") |
|
|
|
|
|
avg_loss = total_loss / steps |
|
|
log_text += f"β
Epoch {ep+1}/{epochs} | Loss: {avg_loss:.4f}\n" |
|
|
yield log_text, None |
|
|
|
|
|
|
|
|
yield log_text + "\nπΎ Menyimpan model...", None |
|
|
if DIR_TRAINED.exists(): shutil.rmtree(DIR_TRAINED) |
|
|
DIR_TRAINED.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
model.save_pretrained(DIR_TRAINED) |
|
|
tokenizer.save_pretrained(DIR_TRAINED) |
|
|
|
|
|
active_model_path = str(DIR_TRAINED) |
|
|
yield log_text + f"\nπ Selesai! Model training aktif.", "Status: Memakai Model Hasil Training" |
|
|
|
|
|
except Exception as e: |
|
|
yield f"β Error: {str(e)}", None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_model_inference(): |
|
|
global active_model_path |
|
|
|
|
|
|
|
|
if active_model_path and os.path.exists(active_model_path): |
|
|
target_path = active_model_path |
|
|
|
|
|
|
|
|
elif os.path.exists("model_default") and os.path.exists("model_default/config.json"): |
|
|
target_path = "model_default" |
|
|
|
|
|
|
|
|
else: |
|
|
return AutoModelForSequenceClassification.from_pretrained("indobenchmark/indobert-base-p1", num_labels=8), \ |
|
|
AutoTokenizer.from_pretrained("indobenchmark/indobert-base-p1") |
|
|
|
|
|
try: |
|
|
tokenizer = AutoTokenizer.from_pretrained(target_path) |
|
|
model = AutoModelForSequenceClassification.from_pretrained(target_path) |
|
|
model.eval() |
|
|
return model, tokenizer |
|
|
except: |
|
|
return AutoModelForSequenceClassification.from_pretrained("indobenchmark/indobert-base-p1", num_labels=8), \ |
|
|
AutoTokenizer.from_pretrained("indobenchmark/indobert-base-p1") |
|
|
|
|
|
def predict_text(text): |
|
|
if not text: return None |
|
|
try: |
|
|
model, tokenizer = load_model_inference() |
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=128) |
|
|
with torch.no_grad(): |
|
|
out = model(**inputs) |
|
|
probs = torch.sigmoid(out.logits).numpy()[0] |
|
|
return {LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))} |
|
|
except Exception as e: |
|
|
return {"Error": str(e)} |
|
|
|
|
|
def predict_csv(file_obj, sep): |
|
|
try: |
|
|
try: df = pd.read_csv(file_obj.name, sep=sep) |
|
|
except: df = pd.read_csv(file_obj.name, sep=",") |
|
|
df = clean_data(df) |
|
|
|
|
|
model, tokenizer = load_model_inference() |
|
|
if "text_clean" not in df.columns: return {"Error": "Kolom teks tidak ditemukan"} |
|
|
|
|
|
results = [] |
|
|
for txt in df["text_clean"]: |
|
|
inputs = tokenizer(txt, return_tensors="pt", truncation=True, padding="max_length", max_length=128) |
|
|
with torch.no_grad(): |
|
|
out = model(**inputs) |
|
|
probs = torch.sigmoid(out.logits).numpy()[0] |
|
|
results.append({LIST_LABEL[i]: float(probs[i]) for i in range(len(LIST_LABEL))}) |
|
|
|
|
|
avg = {l: 0.0 for l in LIST_LABEL} |
|
|
for r in results: |
|
|
for l,v in r.items(): avg[l] += v |
|
|
for l in avg: avg[l] /= len(results) |
|
|
top3 = sorted(avg.items(), key=lambda x: x[1], reverse=True)[:3] |
|
|
return {"Info": f"Total {len(results)} data", "Dominan": {k: round(v,4) for k,v in top3}, "Detail": avg} |
|
|
except Exception as e: |
|
|
return {"Error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="IndoBERT Emotion Cloud") as app: |
|
|
gr.Markdown("# βοΈ IndoBERT Emotion Classifier") |
|
|
|
|
|
|
|
|
lbl_status = gr.Textbox(label="Status Model Aktif", value="Default (IndoBERT Base / Uploaded Manual)", interactive=False) |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.Tab("βοΈ Konfigurasi Model"): |
|
|
with gr.Tabs(): |
|
|
|
|
|
|
|
|
with gr.Tab("π Unggah Model"): |
|
|
gr.Markdown("Upload file `.zip` berisi model yang sudah dilatih (dari Komputer).") |
|
|
in_zip = gr.File(label="File ZIP Model") |
|
|
btn_upload = gr.Button("Ekstrak & Pakai Model", variant="primary") |
|
|
out_log_upload = gr.Textbox(label="Log Sistem") |
|
|
|
|
|
btn_upload.click(handle_zip_upload, inputs=in_zip, outputs=[out_log_upload, lbl_status]) |
|
|
|
|
|
|
|
|
with gr.Tab("ποΈββοΈ Latih Model"): |
|
|
gr.Markdown("Latih model baru menggunakan Dataset CSV sendiri di Cloud.") |
|
|
with gr.Row(): |
|
|
in_csv = gr.File(label="Dataset CSV") |
|
|
in_sep = gr.Textbox(label="Separator", value=";") |
|
|
with gr.Row(): |
|
|
in_ep = gr.Number(label="Epoch", value=1, precision=0) |
|
|
in_bs = gr.Number(label="Batch Size", value=4, precision=0) |
|
|
in_lr = gr.Number(label="Learning Rate", value=2e-5) |
|
|
btn_train = gr.Button("Mulai Training", variant="stop") |
|
|
out_log_train = gr.Textbox(label="Log Training", lines=5) |
|
|
|
|
|
btn_train.click(train_model_cloud, inputs=[in_csv, in_sep, in_ep, in_bs, in_lr], outputs=[out_log_train, lbl_status]) |
|
|
|
|
|
|
|
|
with gr.Tab("π§ͺ Testing"): |
|
|
gr.Markdown("Uji model yang sedang aktif.") |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.Tab("π Uji Satu Kalimat"): |
|
|
in_txt = gr.Textbox(label="Masukkan Kalimat", lines=2, placeholder="Contoh: Saya sangat bahagia hari ini...") |
|
|
btn_pred = gr.Button("Prediksi Emosi") |
|
|
out_lbl = gr.Label(label="Hasil Prediksi") |
|
|
btn_pred.click(predict_text, inputs=in_txt, outputs=out_lbl) |
|
|
|
|
|
with gr.Tab("π Uji Batch (CSV)"): |
|
|
in_csv_test = gr.File(label="Upload CSV Test") |
|
|
btn_batch = gr.Button("Analisis Batch") |
|
|
out_json = gr.JSON(label="Hasil Analisis") |
|
|
btn_batch.click(predict_csv, inputs=[in_csv_test, in_sep], outputs=out_json) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.launch() |