|
|
|
|
|
""" |
|
|
Streamlit App for Government Complaint Classification |
|
|
Author: Based on XLM-RoBERTa implementation by Farrikh Alzami |
|
|
""" |
|
|
|
|
|
import streamlit as st |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import time |
|
|
import io |
|
|
from typing import List, Dict, Tuple |
|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
from utils.model_loader import ModelLoader |
|
|
from utils.text_preprocessor import TextPreprocessor |
|
|
from utils.visualization import Visualizer |
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Government Complaint Classifier", |
|
|
page_icon="ποΈ", |
|
|
layout="wide", |
|
|
initial_sidebar_state="expanded" |
|
|
) |
|
|
|
|
|
|
|
|
st.markdown(""" |
|
|
<style> |
|
|
.main-header { |
|
|
background: linear-gradient(90deg, #FF6B35 0%, #F7931E 100%); |
|
|
padding: 1rem; |
|
|
border-radius: 10px; |
|
|
margin-bottom: 2rem; |
|
|
text-align: center; |
|
|
color: white; |
|
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
|
} |
|
|
|
|
|
.metric-container { |
|
|
background: linear-gradient(135deg, #FFF5E6 0%, #FFE5CC 100%); |
|
|
padding: 1rem; |
|
|
border-radius: 10px; |
|
|
border-left: 4px solid #FF6B35; |
|
|
margin: 0.5rem 0; |
|
|
} |
|
|
|
|
|
.prediction-container { |
|
|
background: linear-gradient(135deg, #FFF9F5 0%, #FFEDE6 100%); |
|
|
padding: 1.5rem; |
|
|
border-radius: 15px; |
|
|
border: 2px solid #FFB366; |
|
|
margin: 1rem 0; |
|
|
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05); |
|
|
} |
|
|
|
|
|
.stProgress > div > div > div > div { |
|
|
background-color: #FF6B35; |
|
|
} |
|
|
|
|
|
div[data-testid="metric-container"] { |
|
|
background-color: #FFF5E6; |
|
|
border: 1px solid #FFD4A3; |
|
|
padding: 1rem; |
|
|
border-radius: 10px; |
|
|
box-shadow: 0 2px 4px rgba(255, 107, 53, 0.1); |
|
|
} |
|
|
</style> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
class StreamlitApp: |
|
|
def __init__(self): |
|
|
self.model_loader = ModelLoader() |
|
|
self.text_preprocessor = TextPreprocessor() |
|
|
self.visualizer = Visualizer() |
|
|
|
|
|
|
|
|
if 'model_type' not in st.session_state: |
|
|
st.session_state.model_type = 'cross_entropy' |
|
|
if 'model_loaded' not in st.session_state: |
|
|
st.session_state.model_loaded = False |
|
|
if 'predictions_history' not in st.session_state: |
|
|
st.session_state.predictions_history = [] |
|
|
if 'last_analyzed_text' not in st.session_state: |
|
|
st.session_state.last_analyzed_text = "" |
|
|
if 'current_results' not in st.session_state: |
|
|
st.session_state.current_results = None |
|
|
if 'batch_results' not in st.session_state: |
|
|
st.session_state.batch_results = None |
|
|
|
|
|
def render_header(self): |
|
|
"""Render application header""" |
|
|
st.markdown(""" |
|
|
<div class="main-header"> |
|
|
<h1>ποΈ Government Complaint Classifier</h1> |
|
|
<p>Klasifikasi Otomatis Keluhan Masyarakat menggunakan XLM-RoBERTa</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
def render_sidebar(self): |
|
|
"""Render sidebar with model selection""" |
|
|
with st.sidebar: |
|
|
st.header("βοΈ Model Configuration") |
|
|
|
|
|
|
|
|
model_options = { |
|
|
'cross_entropy': 'π― Cross Entropy Loss', |
|
|
'focal_loss': 'π₯ Focal Loss' |
|
|
} |
|
|
|
|
|
selected_model = st.radio( |
|
|
"Pilih Model:", |
|
|
options=list(model_options.keys()), |
|
|
format_func=lambda x: model_options[x], |
|
|
index=0 if st.session_state.model_type == 'cross_entropy' else 1 |
|
|
) |
|
|
|
|
|
|
|
|
if selected_model != st.session_state.model_type: |
|
|
st.session_state.model_type = selected_model |
|
|
st.session_state.model_loaded = False |
|
|
st.rerun() |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
st.subheader("π Model Files Status") |
|
|
available_models = self.model_loader.get_available_models() |
|
|
|
|
|
for model_type in ['cross_entropy', 'focal_loss']: |
|
|
if model_type in available_models: |
|
|
|
|
|
is_current_loaded = ( |
|
|
hasattr(self.model_loader, 'current_model_type') and |
|
|
self.model_loader.current_model_type == model_type and |
|
|
hasattr(self.model_loader, 'classifier_pipeline') and |
|
|
self.model_loader.classifier_pipeline is not None |
|
|
) |
|
|
|
|
|
if is_current_loaded and model_type == st.session_state.model_type: |
|
|
st.success(f"β
{model_type.replace('_', ' ').title()} (Currently Loaded)") |
|
|
else: |
|
|
st.success(f"β
{model_type.replace('_', ' ').title()}") |
|
|
else: |
|
|
st.error(f"β {model_type.replace('_', ' ').title()}") |
|
|
|
|
|
if not available_models: |
|
|
st.warning("β οΈ No models found! Please check model directory.") |
|
|
st.info(""" |
|
|
Expected structure: |
|
|
``` |
|
|
models/ |
|
|
βββ cross_entropy/ |
|
|
β βββ model.safetensors |
|
|
β βββ config.json |
|
|
β βββ ... |
|
|
βββ focal_loss/ |
|
|
βββ model.safetensors |
|
|
βββ config.json |
|
|
βββ ... |
|
|
``` |
|
|
""") |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
st.subheader("π Model Information") |
|
|
|
|
|
|
|
|
is_model_actually_loaded = ( |
|
|
hasattr(self.model_loader, 'classifier_pipeline') and |
|
|
self.model_loader.classifier_pipeline is not None and |
|
|
self.model_loader.current_model_type == st.session_state.model_type |
|
|
) |
|
|
|
|
|
if is_model_actually_loaded: |
|
|
model_info = self.model_loader.get_model_info() |
|
|
st.success(f"**Status:** β
{model_info['status']}") |
|
|
st.info(f"**Current Model:** {model_info['model_type'].replace('_', ' ').title()}") |
|
|
st.info(f"**Device:** {model_info['device']}") |
|
|
st.info(f"**Categories:** {model_info['num_labels']}") |
|
|
|
|
|
|
|
|
with st.expander("π Model Details"): |
|
|
st.write(f"**Model Size:** {model_info['model_size']}") |
|
|
st.write(f"**Available Categories:**") |
|
|
categories = model_info.get('categories', []) |
|
|
if categories: |
|
|
|
|
|
display_categories = categories[:10] |
|
|
st.write(", ".join(display_categories)) |
|
|
if len(categories) > 10: |
|
|
st.write(f"... and {len(categories) - 10} more categories") |
|
|
else: |
|
|
st.write("Categories not available") |
|
|
else: |
|
|
st.info(f""" |
|
|
**Current Model:** {model_options[st.session_state.model_type]} |
|
|
|
|
|
**Architecture:** XLM-RoBERTa Base |
|
|
|
|
|
**Max Length:** 256 tokens |
|
|
|
|
|
**Languages:** Multilingual (ID, EN, etc.) |
|
|
|
|
|
**Status:** β³ Not loaded (will load on first use) |
|
|
""") |
|
|
|
|
|
|
|
|
if not st.session_state.model_loaded: |
|
|
st.info("π‘ Model will be loaded automatically when you analyze text.") |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
st.subheader("π Reset Application") |
|
|
if st.button("π§Ή Clear All & Reset Models", use_container_width=True, type="secondary"): |
|
|
|
|
|
for key in list(st.session_state.keys()): |
|
|
if key.startswith(('model_', 'predictions_', 'last_', 'current_', 'batch_')): |
|
|
del st.session_state[key] |
|
|
|
|
|
|
|
|
st.session_state.model_type = 'cross_entropy' |
|
|
st.session_state.model_loaded = False |
|
|
st.session_state.predictions_history = [] |
|
|
st.session_state.last_analyzed_text = "" |
|
|
st.session_state.current_results = None |
|
|
st.session_state.batch_results = None |
|
|
|
|
|
|
|
|
self.model_loader.model = None |
|
|
self.model_loader.tokenizer = None |
|
|
self.model_loader.label_mappings = None |
|
|
self.model_loader.classifier_pipeline = None |
|
|
self.model_loader.current_model_type = None |
|
|
|
|
|
|
|
|
st.cache_resource.clear() |
|
|
st.success("β
Application reset complete!") |
|
|
st.rerun() |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
if st.session_state.predictions_history: |
|
|
st.subheader("π Recent Predictions") |
|
|
for i, pred in enumerate(st.session_state.predictions_history[-3:]): |
|
|
with st.expander(f"Prediction {len(st.session_state.predictions_history) - i}"): |
|
|
st.write(f"**Text:** {pred['text'][:100]}...") |
|
|
st.write(f"**Category:** {pred['category']}") |
|
|
st.write(f"**Confidence:** {pred['confidence']:.2%}") |
|
|
|
|
|
def predict_single_text(self, text: str) -> Dict: |
|
|
"""Predict single text with timing""" |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
cleaned_text = self.text_preprocessor.clean_text(text) |
|
|
|
|
|
|
|
|
force_reload = ( |
|
|
not st.session_state.model_loaded or |
|
|
self.model_loader.current_model_type != st.session_state.model_type or |
|
|
self.model_loader.classifier_pipeline is None |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
if force_reload: |
|
|
with st.spinner("Loading model..."): |
|
|
|
|
|
self.model_loader.model = None |
|
|
self.model_loader.tokenizer = None |
|
|
self.model_loader.label_mappings = None |
|
|
self.model_loader.classifier_pipeline = None |
|
|
self.model_loader.current_model_type = None |
|
|
|
|
|
|
|
|
self.model_loader.load_model(st.session_state.model_type) |
|
|
|
|
|
|
|
|
st.session_state.model_loaded = True |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Failed to load model: {str(e)}") |
|
|
return { |
|
|
'predicted_category': 'Error: Model Loading Failed', |
|
|
'confidence': 0.0, |
|
|
'predicted_id': -1, |
|
|
'all_predictions': {'Error': 1.0}, |
|
|
'processing_time': 0.0, |
|
|
'original_text': text, |
|
|
'cleaned_text': cleaned_text |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
result = self.model_loader.predict(cleaned_text) |
|
|
except Exception as e: |
|
|
st.error(f"Failed to make prediction: {str(e)}") |
|
|
return { |
|
|
'predicted_category': 'Error: Prediction Failed', |
|
|
'confidence': 0.0, |
|
|
'predicted_id': -1, |
|
|
'all_predictions': {'Error': 1.0}, |
|
|
'processing_time': 0.0, |
|
|
'original_text': text, |
|
|
'cleaned_text': cleaned_text |
|
|
} |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
result['processing_time'] = processing_time |
|
|
result['original_text'] = text |
|
|
result['cleaned_text'] = cleaned_text |
|
|
|
|
|
return result |
|
|
|
|
|
def predict_batch_texts(self, texts: List[str]) -> List[Dict]: |
|
|
"""Predict batch of texts""" |
|
|
|
|
|
force_reload = ( |
|
|
not st.session_state.model_loaded or |
|
|
self.model_loader.current_model_type != st.session_state.model_type or |
|
|
self.model_loader.classifier_pipeline is None |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
if force_reload: |
|
|
with st.spinner("Loading model for batch processing..."): |
|
|
|
|
|
self.model_loader.model = None |
|
|
self.model_loader.tokenizer = None |
|
|
self.model_loader.label_mappings = None |
|
|
self.model_loader.classifier_pipeline = None |
|
|
self.model_loader.current_model_type = None |
|
|
|
|
|
|
|
|
self.model_loader.load_model(st.session_state.model_type) |
|
|
|
|
|
|
|
|
st.session_state.model_loaded = True |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Failed to load model for batch processing: {str(e)}") |
|
|
|
|
|
error_result = { |
|
|
'predicted_category': 'Error: Model Loading Failed', |
|
|
'confidence': 0.0, |
|
|
'predicted_id': -1, |
|
|
'all_predictions': {'Error': 1.0} |
|
|
} |
|
|
return [error_result] * len(texts) |
|
|
|
|
|
results = [] |
|
|
progress_bar = st.progress(0) |
|
|
|
|
|
for i, text in enumerate(texts): |
|
|
try: |
|
|
|
|
|
cleaned_text = self.text_preprocessor.clean_text(text) |
|
|
|
|
|
|
|
|
result = self.model_loader.predict(cleaned_text) |
|
|
result['original_text'] = text |
|
|
result['cleaned_text'] = cleaned_text |
|
|
|
|
|
results.append(result) |
|
|
|
|
|
except Exception as e: |
|
|
st.warning(f"Failed to process text {i+1}: {str(e)}") |
|
|
|
|
|
error_result = { |
|
|
'predicted_category': 'Error: Prediction Failed', |
|
|
'confidence': 0.0, |
|
|
'predicted_id': -1, |
|
|
'all_predictions': {'Error': 1.0}, |
|
|
'original_text': text, |
|
|
'cleaned_text': self.text_preprocessor.clean_text(text) |
|
|
} |
|
|
results.append(error_result) |
|
|
|
|
|
|
|
|
progress_bar.progress((i + 1) / len(texts)) |
|
|
|
|
|
return results |
|
|
|
|
|
def render_single_text_tab(self): |
|
|
"""Render single text analysis tab""" |
|
|
st.header("π Single Text Analysis") |
|
|
|
|
|
|
|
|
is_model_loaded = ( |
|
|
hasattr(self.model_loader, 'classifier_pipeline') and |
|
|
self.model_loader.classifier_pipeline is not None and |
|
|
self.model_loader.current_model_type == st.session_state.model_type |
|
|
) |
|
|
|
|
|
if is_model_loaded: |
|
|
st.success(f"π― Current Model: **{st.session_state.model_type.replace('_', ' ').title()} - READY**") |
|
|
else: |
|
|
st.info(f"β³ Current Model: **{st.session_state.model_type.replace('_', ' ').title()} - Will load on first use**") |
|
|
|
|
|
|
|
|
user_text = st.text_area( |
|
|
"Masukkan teks keluhan masyarakat:", |
|
|
height=150, |
|
|
placeholder="Contoh: Saya ingin melaporkan jalan rusak di daerah saya yang sudah lama tidak diperbaiki...", |
|
|
key="main_text_input" |
|
|
) |
|
|
|
|
|
|
|
|
col1, col2, col3, col4 = st.columns([2, 1, 1, 2]) |
|
|
with col2: |
|
|
analyze_button = st.button( |
|
|
"π Analyze Text", |
|
|
type="primary", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
with col3: |
|
|
clear_button = st.button( |
|
|
"π§Ή Clear", |
|
|
type="secondary", |
|
|
use_container_width=True, |
|
|
help="Clear results and reset model state" |
|
|
) |
|
|
|
|
|
if clear_button: |
|
|
|
|
|
st.session_state.model_loaded = False |
|
|
st.session_state.predictions_history = [] |
|
|
|
|
|
self.model_loader.model = None |
|
|
self.model_loader.tokenizer = None |
|
|
self.model_loader.label_mappings = None |
|
|
self.model_loader.classifier_pipeline = None |
|
|
self.model_loader.current_model_type = None |
|
|
|
|
|
st.cache_resource.clear() |
|
|
st.success("β
Cleared all states and model cache!") |
|
|
st.rerun() |
|
|
|
|
|
if 'last_analyzed_text' not in st.session_state: |
|
|
st.session_state.last_analyzed_text = "" |
|
|
if 'current_results' not in st.session_state: |
|
|
st.session_state.current_results = None |
|
|
|
|
|
|
|
|
text_changed = user_text.strip() != st.session_state.last_analyzed_text |
|
|
|
|
|
if clear_button: |
|
|
|
|
|
st.session_state.model_loaded = False |
|
|
st.session_state.predictions_history = [] |
|
|
st.session_state.last_analyzed_text = "" |
|
|
st.session_state.current_results = None |
|
|
|
|
|
self.model_loader.model = None |
|
|
self.model_loader.tokenizer = None |
|
|
self.model_loader.label_mappings = None |
|
|
self.model_loader.classifier_pipeline = None |
|
|
self.model_loader.current_model_type = None |
|
|
|
|
|
st.cache_resource.clear() |
|
|
st.success("β
Cleared all states and model cache!") |
|
|
st.rerun() |
|
|
|
|
|
if analyze_button and user_text.strip(): |
|
|
try: |
|
|
with st.spinner("Analyzing text..."): |
|
|
result = self.predict_single_text(user_text) |
|
|
|
|
|
|
|
|
st.session_state.predictions_history.append({ |
|
|
'text': user_text, |
|
|
'category': result['predicted_category'], |
|
|
'confidence': result['confidence'] |
|
|
}) |
|
|
st.session_state.last_analyzed_text = user_text.strip() |
|
|
st.session_state.current_results = result |
|
|
|
|
|
|
|
|
self.display_single_prediction_results(result) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"β Error during analysis: {str(e)}") |
|
|
st.info("π‘ Try clicking the 'Clear' button to reset the model state.") |
|
|
|
|
|
elif analyze_button and not user_text.strip(): |
|
|
st.warning("β οΈ Please enter some text to analyze!") |
|
|
|
|
|
|
|
|
elif st.session_state.current_results and not text_changed and not analyze_button: |
|
|
st.info("π Showing previous analysis results. Click 'Analyze Text' to update or 'Clear' to reset.") |
|
|
self.display_single_prediction_results(st.session_state.current_results) |
|
|
|
|
|
|
|
|
elif text_changed and st.session_state.current_results: |
|
|
st.info("βοΈ Text has been modified. Click 'Analyze Text' to get new predictions or 'Clear' to reset.") |
|
|
|
|
|
def display_single_prediction_results(self, result: Dict): |
|
|
"""Display single prediction results""" |
|
|
st.markdown("## π Analysis Results") |
|
|
|
|
|
|
|
|
st.markdown(f""" |
|
|
<div class="prediction-container"> |
|
|
<h3>π― Predicted Category</h3> |
|
|
<h2 style="color: #FF6B35; margin: 0;">{result['predicted_category']}</h2> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
|
|
|
|
with col1: |
|
|
st.metric( |
|
|
label="π― Confidence Score", |
|
|
value=f"{result['confidence']:.2%}", |
|
|
delta=f"Top prediction" |
|
|
) |
|
|
|
|
|
with col2: |
|
|
st.metric( |
|
|
label="β±οΈ Processing Time", |
|
|
value=f"{result['processing_time']:.3f}s", |
|
|
delta="Real-time" |
|
|
) |
|
|
|
|
|
with col3: |
|
|
st.metric( |
|
|
label="π Text Length", |
|
|
value=f"{len(result['cleaned_text'])} chars", |
|
|
delta="After cleaning" |
|
|
) |
|
|
|
|
|
|
|
|
st.markdown("### π Confidence Scores by Category") |
|
|
fig = self.visualizer.plot_confidence_scores(result['all_predictions']) |
|
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
|
|
|
|
|
st.markdown("### π Top 5 Predictions") |
|
|
top_predictions = sorted( |
|
|
result['all_predictions'].items(), |
|
|
key=lambda x: x[1], |
|
|
reverse=True |
|
|
)[:5] |
|
|
|
|
|
df_top = pd.DataFrame([ |
|
|
{ |
|
|
'Rank': i+1, |
|
|
'Category': category, |
|
|
'Confidence': f"{confidence:.2%}", |
|
|
'Confidence_Score': confidence |
|
|
} |
|
|
for i, (category, confidence) in enumerate(top_predictions) |
|
|
]) |
|
|
|
|
|
|
|
|
styled_df = df_top.style.format({ |
|
|
'Confidence_Score': '{:.4f}' |
|
|
}).hide(['Confidence_Score'], axis=1).background_gradient( |
|
|
subset=['Confidence_Score'], |
|
|
cmap='Oranges' |
|
|
) |
|
|
|
|
|
st.dataframe(styled_df, use_container_width=True) |
|
|
|
|
|
|
|
|
with st.expander("π§ Preprocessing Details"): |
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
with col1: |
|
|
st.markdown("**Original Text:**") |
|
|
st.text_area( |
|
|
"Original Text", |
|
|
value=result['original_text'], |
|
|
height=100, |
|
|
disabled=True, |
|
|
key="original_text_display", |
|
|
label_visibility="collapsed" |
|
|
) |
|
|
|
|
|
with col2: |
|
|
st.markdown("**Cleaned Text:**") |
|
|
st.text_area( |
|
|
"Cleaned Text", |
|
|
value=result['cleaned_text'], |
|
|
height=100, |
|
|
disabled=True, |
|
|
key="cleaned_text_display", |
|
|
label_visibility="collapsed" |
|
|
) |
|
|
|
|
|
def render_batch_processing_tab(self): |
|
|
"""Render batch processing tab""" |
|
|
st.header("π Batch Processing") |
|
|
|
|
|
|
|
|
is_model_loaded = ( |
|
|
hasattr(self.model_loader, 'classifier_pipeline') and |
|
|
self.model_loader.classifier_pipeline is not None and |
|
|
self.model_loader.current_model_type == st.session_state.model_type |
|
|
) |
|
|
|
|
|
if is_model_loaded: |
|
|
st.success(f"π― Current Model: **{st.session_state.model_type.replace('_', ' ').title()} - READY**") |
|
|
else: |
|
|
st.info(f"β³ Current Model: **{st.session_state.model_type.replace('_', ' ').title()} - Will load on first use**") |
|
|
|
|
|
|
|
|
st.markdown("### π Upload CSV File") |
|
|
uploaded_file = st.file_uploader( |
|
|
"Choose a CSV file containing texts to classify", |
|
|
type=['csv'], |
|
|
help="CSV should have a column named 'text' containing the texts to classify" |
|
|
) |
|
|
|
|
|
if uploaded_file is not None: |
|
|
try: |
|
|
|
|
|
df = pd.read_csv(uploaded_file) |
|
|
|
|
|
|
|
|
st.markdown("### π Data Preview") |
|
|
st.dataframe(df.head(10)) |
|
|
|
|
|
|
|
|
text_columns = df.columns.tolist() |
|
|
selected_column = st.selectbox( |
|
|
"Select the text column to classify:", |
|
|
options=text_columns, |
|
|
index=0 if 'text' not in text_columns else text_columns.index('text') |
|
|
) |
|
|
|
|
|
|
|
|
col1, col2, col3, col4 = st.columns([2, 1, 1, 2]) |
|
|
with col2: |
|
|
process_button = st.button( |
|
|
"π Process Batch", |
|
|
type="primary", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
with col3: |
|
|
clear_batch_button = st.button( |
|
|
"π§Ή Clear Batch", |
|
|
type="secondary", |
|
|
use_container_width=True, |
|
|
help="Clear batch results and reset model" |
|
|
) |
|
|
|
|
|
if clear_batch_button: |
|
|
|
|
|
st.session_state.batch_results = None |
|
|
st.session_state.model_loaded = False |
|
|
|
|
|
self.model_loader.model = None |
|
|
self.model_loader.tokenizer = None |
|
|
self.model_loader.label_mappings = None |
|
|
self.model_loader.classifier_pipeline = None |
|
|
self.model_loader.current_model_type = None |
|
|
|
|
|
st.cache_resource.clear() |
|
|
st.success("β
Cleared batch results and model cache!") |
|
|
st.rerun() |
|
|
|
|
|
if process_button: |
|
|
texts = df[selected_column].astype(str).tolist() |
|
|
|
|
|
st.markdown("### β‘ Processing Batch...") |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
results = self.predict_batch_texts(texts) |
|
|
total_time = time.time() - start_time |
|
|
|
|
|
|
|
|
st.session_state.batch_results = { |
|
|
'original_df': df, |
|
|
'results': results, |
|
|
'selected_column': selected_column, |
|
|
'total_time': total_time |
|
|
} |
|
|
|
|
|
|
|
|
self.display_batch_results(df, results, selected_column, total_time) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"β Error during batch processing: {str(e)}") |
|
|
st.info("π‘ Try clicking the 'Clear Batch' button to reset the model state.") |
|
|
|
|
|
|
|
|
elif st.session_state.batch_results: |
|
|
st.info("π Showing previous batch results. Upload new file to process again or click 'Clear Batch' to reset.") |
|
|
batch_data = st.session_state.batch_results |
|
|
self.display_batch_results( |
|
|
batch_data['original_df'], |
|
|
batch_data['results'], |
|
|
batch_data['selected_column'], |
|
|
batch_data['total_time'] |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error reading CSV file: {str(e)}") |
|
|
|
|
|
else: |
|
|
|
|
|
st.markdown("### π Expected CSV Format") |
|
|
example_df = pd.DataFrame({ |
|
|
'id': [1, 2, 3], |
|
|
'text': [ |
|
|
'Jalan di depan rumah saya rusak parah', |
|
|
'Pelayanan di kantor kelurahan lambat', |
|
|
'Lingkungan sekitar kotor dan tidak terawat' |
|
|
] |
|
|
}) |
|
|
st.dataframe(example_df) |
|
|
|
|
|
def display_batch_results(self, original_df: pd.DataFrame, results: List[Dict], |
|
|
text_column: str, total_time: float): |
|
|
"""Display batch processing results""" |
|
|
st.markdown("## π Batch Processing Results") |
|
|
|
|
|
|
|
|
col1, col2, col3, col4 = st.columns(4) |
|
|
|
|
|
with col1: |
|
|
st.metric("π Total Texts", len(results)) |
|
|
|
|
|
with col2: |
|
|
avg_confidence = np.mean([r['confidence'] for r in results]) |
|
|
st.metric("π― Avg Confidence", f"{avg_confidence:.2%}") |
|
|
|
|
|
with col3: |
|
|
st.metric("β±οΈ Total Time", f"{total_time:.2f}s") |
|
|
|
|
|
with col4: |
|
|
st.metric("π Speed", f"{len(results)/total_time:.1f} texts/sec") |
|
|
|
|
|
|
|
|
results_df = original_df.copy() |
|
|
results_df['predicted_category'] = [r['predicted_category'] for r in results] |
|
|
results_df['confidence'] = [r['confidence'] for r in results] |
|
|
results_df['cleaned_text'] = [r['cleaned_text'] for r in results] |
|
|
|
|
|
|
|
|
st.markdown("### π Category Distribution") |
|
|
category_counts = results_df['predicted_category'].value_counts() |
|
|
fig = self.visualizer.plot_category_distribution(category_counts) |
|
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
|
|
|
|
|
st.markdown("### π Detailed Results") |
|
|
display_df = results_df[[text_column, 'predicted_category', 'confidence']].copy() |
|
|
display_df['confidence'] = display_df['confidence'].apply(lambda x: f"{x:.2%}") |
|
|
|
|
|
st.dataframe(display_df, use_container_width=True) |
|
|
|
|
|
|
|
|
st.markdown("### πΎ Download Results") |
|
|
|
|
|
|
|
|
excel_data = [] |
|
|
for i, result in enumerate(results): |
|
|
row = original_df.iloc[i].to_dict() |
|
|
row['predicted_category'] = result['predicted_category'] |
|
|
row['confidence'] = result['confidence'] |
|
|
row['cleaned_text'] = result['cleaned_text'] |
|
|
|
|
|
|
|
|
top_3 = sorted(result['all_predictions'].items(), key=lambda x: x[1], reverse=True)[:3] |
|
|
for j, (cat, conf) in enumerate(top_3, 1): |
|
|
row[f'top_{j}_category'] = cat |
|
|
row[f'top_{j}_confidence'] = conf |
|
|
|
|
|
excel_data.append(row) |
|
|
|
|
|
excel_df = pd.DataFrame(excel_data) |
|
|
|
|
|
|
|
|
output = io.BytesIO() |
|
|
with pd.ExcelWriter(output, engine='openpyxl') as writer: |
|
|
excel_df.to_excel(writer, sheet_name='Results', index=False) |
|
|
|
|
|
|
|
|
summary_df = pd.DataFrame([ |
|
|
['Total Texts Processed', len(results)], |
|
|
['Average Confidence', f"{avg_confidence:.2%}"], |
|
|
['Processing Time', f"{total_time:.2f} seconds"], |
|
|
['Model Used', st.session_state.model_type.replace('_', ' ').title()], |
|
|
['Processing Speed', f"{len(results)/total_time:.1f} texts/second"] |
|
|
], columns=['Metric', 'Value']) |
|
|
|
|
|
summary_df.to_excel(writer, sheet_name='Summary', index=False) |
|
|
|
|
|
|
|
|
col1, col2, col3 = st.columns([2, 1, 2]) |
|
|
with col2: |
|
|
st.download_button( |
|
|
label="π₯ Download Excel Report", |
|
|
data=output.getvalue(), |
|
|
file_name=f"complaint_classification_results_{st.session_state.model_type}.xlsx", |
|
|
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
def render_about_tab(self): |
|
|
"""Render about/help tab""" |
|
|
st.header("βΉοΈ About This Application") |
|
|
|
|
|
st.markdown(""" |
|
|
### π― Purpose |
|
|
This application automatically classifies government complaints using state-of-the-art |
|
|
XLM-RoBERTa transformer models. It supports both Cross Entropy and Focal Loss variants |
|
|
for handling imbalanced datasets. |
|
|
|
|
|
### π§ Technical Details |
|
|
- **Model Architecture:** XLM-RoBERTa Base (Multi-lingual) |
|
|
- **Framework:** Hugging Face Transformers + PyTorch |
|
|
- **Preprocessing:** HTML cleaning, emoji removal, text normalization |
|
|
- **Maximum Input Length:** 256 tokens |
|
|
- **Languages Supported:** Indonesian, English, and more |
|
|
|
|
|
### π Model Comparison |
|
|
- **Cross Entropy Loss:** Traditional classification loss with class weights |
|
|
- **Focal Loss:** Specialized for imbalanced datasets, focuses on hard examples |
|
|
|
|
|
### π Usage Guide |
|
|
|
|
|
#### Single Text Analysis: |
|
|
1. Select your preferred model from the sidebar |
|
|
2. Enter text in the textarea |
|
|
3. Click "Analyze Text" |
|
|
4. View predictions and confidence scores |
|
|
|
|
|
#### Batch Processing: |
|
|
1. Prepare a CSV file with text data |
|
|
2. Upload the file in the Batch Processing tab |
|
|
3. Select the text column to classify |
|
|
4. Click "Process Batch" |
|
|
5. Download results as Excel file |
|
|
|
|
|
### π CSV Format for Batch Processing |
|
|
Your CSV should contain at least one column with text data: |
|
|
``` |
|
|
id,text,other_columns... |
|
|
1,"Jalan rusak perlu diperbaiki",metadata |
|
|
2,"Pelayanan lambat di kantor",metadata |
|
|
``` |
|
|
|
|
|
### β οΈ Limitations |
|
|
- Maximum text length: 256 tokens (approximately 200-300 words) |
|
|
- Model performance depends on training data quality |
|
|
- Processing time varies with text length and batch size |
|
|
|
|
|
### π¨βπ» Credits |
|
|
Based on research implementation by Farrikh Alzami using XLM-RoBERTa for |
|
|
government complaint classification with focal loss optimization. |
|
|
""") |
|
|
|
|
|
def run(self): |
|
|
"""Main application runner""" |
|
|
self.render_header() |
|
|
self.render_sidebar() |
|
|
|
|
|
|
|
|
tab1, tab2, tab3 = st.tabs(["π Single Text", "π Batch Processing", "βΉοΈ About"]) |
|
|
|
|
|
with tab1: |
|
|
self.render_single_text_tab() |
|
|
|
|
|
with tab2: |
|
|
self.render_batch_processing_tab() |
|
|
|
|
|
with tab3: |
|
|
self.render_about_tab() |
|
|
|
|
|
def main(): |
|
|
"""Main function""" |
|
|
try: |
|
|
app = StreamlitApp() |
|
|
app.run() |
|
|
except Exception as e: |
|
|
st.error(f"Application error: {str(e)}") |
|
|
st.info("Please ensure all model files are properly placed in the models/ directory.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |