import streamlit as st import numpy as np from scipy.sparse import csr_matrix import joblib from sklearn.metrics import f1_score from sklearn.decomposition import TruncatedSVD from tqdm import tqdm import tempfile import os st.set_page_config(page_title="RLF Model Tester", layout="wide") st.title("Random Label Forest (RLF) - Test Dataset Evaluation") # ----------------------- Parsing Function ----------------------- # def parse_rcv1_sparse_file(filename): with open(filename, 'r') as f: header = f.readline() num_samples, num_features, num_labels = map(int, header.strip().split()) feat_data, feat_rows, feat_cols = [], [], [] label_data, label_rows, label_cols = [], [], [] for row_idx, line in enumerate(f): line = line.strip() if not line: continue label_part, feature_part = line.split(' ', 1) labels = list(map(int, label_part.split(','))) features = feature_part.strip().split() for feat in features: fid, fval = feat.split(':') feat_rows.append(int(fid)) feat_cols.append(row_idx) feat_data.append(float(fval)) for lid in labels: label_rows.append(lid) label_cols.append(row_idx) label_data.append(1.0) X = csr_matrix((feat_data, (feat_rows, feat_cols)), shape=(num_features, num_samples), dtype=np.float32) Y = csr_matrix((label_data, (label_rows, label_cols)), shape=(num_labels, num_samples), dtype=np.uint8) return X.transpose(), Y.transpose() # ----------------------- Prediction Functions ----------------------- # def predict_scores_one_tree(x, tree): kmeans1 = tree['level1_kmeans'] level2_dict = tree['level2_kmeans'] tree_model = tree['model_tree'] l1_cluster = kmeans1.predict(x.reshape(1, -1))[0] if l1_cluster not in level2_dict: return {} kmeans2 = level2_dict[l1_cluster] l2_cluster = kmeans2.predict(x.reshape(1, -1))[0] node_key = (l1_cluster, l2_cluster) if node_key not in tree_model: return {} classifiers = tree_model[node_key]['classifiers'] label_ids = tree_model[node_key]['label_ids'] preds = {} for clf, lid in zip(classifiers, label_ids): score = clf.decision_function(x.reshape(1, -1))[0] preds[lid] = score return preds def predict_ensemble(X_test_reduced, ensemble): all_preds = [] for i in range(X_test_reduced.shape[0]): instance_scores = {} x = X_test_reduced[i] for tree in ensemble: preds = predict_scores_one_tree(x, tree) for lid, score in preds.items(): if lid not in instance_scores: instance_scores[lid] = [] instance_scores[lid].append(score) avg_scores = {lid: np.mean(scores) for lid, scores in instance_scores.items()} all_preds.append(avg_scores) return all_preds def get_topk(pred_scores, k): top_labels = [] for scores in pred_scores: if len(scores) == 0: top_labels.append([]) continue sorted_labels = sorted(scores.items(), key=lambda x: -x[1]) top_labels.append([lid for lid, _ in sorted_labels[:k]]) return top_labels def precision_at_k(preds_topk, Y_true, k): hits = 0 total = len(preds_topk) for i, pred_labels in enumerate(preds_topk): true_labels = set(Y_true[i].nonzero()[1]) hits += len(set(pred_labels) & true_labels) return hits / (total * k) def evaluate_rforest(ensemble, X_test, Y_test, dim=300): svd = TruncatedSVD(n_components=dim, random_state=42) X_test_reduced = svd.fit_transform(X_test) pred_scores = predict_ensemble(X_test_reduced, ensemble) preds_at_1 = get_topk(pred_scores, k=1) preds_at_3 = get_topk(pred_scores, k=3) p1 = precision_at_k(preds_at_1, Y_test, k=1) p3 = precision_at_k(preds_at_3, Y_test, k=3) y_true = Y_test.toarray() y_pred_bin = np.zeros_like(y_true) for i, pred in enumerate(get_topk(pred_scores, k=5)): y_pred_bin[i, pred] = 1 macro_f1 = f1_score(y_true, y_pred_bin, average='macro', zero_division=0) return p1, p3, macro_f1 # ----------------------- Load Pretrained Model ----------------------- # @st.cache_resource def load_model(): return joblib.load("random_label_forest_model.pkl") ensemble_model = load_model() # ----------------------- User Interface ----------------------- # uploaded_file = st.file_uploader("Upload Test Dataset (.txt in RCV1 format)", type=["txt"]) if uploaded_file: with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp: tmp.write(uploaded_file.getvalue()) test_file_path = tmp.name if st.button("Test Model"): st.info("Parsing dataset and evaluating. Please wait...") X_test, Y_test = parse_rcv1_sparse_file(test_file_path) p1, p3, macro_f1 = evaluate_rforest(ensemble_model, X_test, Y_test) st.success("Evaluation Completed!") st.metric("Precision@1", f"{p1:.4f}") st.metric("Precision@3", f"{p3:.4f}") st.metric("Macro F1 Score", f"{macro_f1:.4f}")