Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| from scipy.sparse import csr_matrix | |
| import joblib | |
| from sklearn.metrics import f1_score | |
| from sklearn.decomposition import TruncatedSVD | |
| from tqdm import tqdm | |
| import tempfile | |
| import os | |
| st.set_page_config(page_title="RLF Model Tester", layout="wide") | |
| st.title("Random Label Forest (RLF) - Test Dataset Evaluation") | |
| # ----------------------- Parsing Function ----------------------- # | |
| def parse_rcv1_sparse_file(filename): | |
| with open(filename, 'r') as f: | |
| header = f.readline() | |
| num_samples, num_features, num_labels = map(int, header.strip().split()) | |
| feat_data, feat_rows, feat_cols = [], [], [] | |
| label_data, label_rows, label_cols = [], [], [] | |
| for row_idx, line in enumerate(f): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| label_part, feature_part = line.split(' ', 1) | |
| labels = list(map(int, label_part.split(','))) | |
| features = feature_part.strip().split() | |
| for feat in features: | |
| fid, fval = feat.split(':') | |
| feat_rows.append(int(fid)) | |
| feat_cols.append(row_idx) | |
| feat_data.append(float(fval)) | |
| for lid in labels: | |
| label_rows.append(lid) | |
| label_cols.append(row_idx) | |
| label_data.append(1.0) | |
| X = csr_matrix((feat_data, (feat_rows, feat_cols)), shape=(num_features, num_samples), dtype=np.float32) | |
| Y = csr_matrix((label_data, (label_rows, label_cols)), shape=(num_labels, num_samples), dtype=np.uint8) | |
| return X.transpose(), Y.transpose() | |
| # ----------------------- Prediction Functions ----------------------- # | |
| def predict_scores_one_tree(x, tree): | |
| kmeans1 = tree['level1_kmeans'] | |
| level2_dict = tree['level2_kmeans'] | |
| tree_model = tree['model_tree'] | |
| l1_cluster = kmeans1.predict(x.reshape(1, -1))[0] | |
| if l1_cluster not in level2_dict: | |
| return {} | |
| kmeans2 = level2_dict[l1_cluster] | |
| l2_cluster = kmeans2.predict(x.reshape(1, -1))[0] | |
| node_key = (l1_cluster, l2_cluster) | |
| if node_key not in tree_model: | |
| return {} | |
| classifiers = tree_model[node_key]['classifiers'] | |
| label_ids = tree_model[node_key]['label_ids'] | |
| preds = {} | |
| for clf, lid in zip(classifiers, label_ids): | |
| score = clf.decision_function(x.reshape(1, -1))[0] | |
| preds[lid] = score | |
| return preds | |
| def predict_ensemble(X_test_reduced, ensemble): | |
| all_preds = [] | |
| for i in range(X_test_reduced.shape[0]): | |
| instance_scores = {} | |
| x = X_test_reduced[i] | |
| for tree in ensemble: | |
| preds = predict_scores_one_tree(x, tree) | |
| for lid, score in preds.items(): | |
| if lid not in instance_scores: | |
| instance_scores[lid] = [] | |
| instance_scores[lid].append(score) | |
| avg_scores = {lid: np.mean(scores) for lid, scores in instance_scores.items()} | |
| all_preds.append(avg_scores) | |
| return all_preds | |
| def get_topk(pred_scores, k): | |
| top_labels = [] | |
| for scores in pred_scores: | |
| if len(scores) == 0: | |
| top_labels.append([]) | |
| continue | |
| sorted_labels = sorted(scores.items(), key=lambda x: -x[1]) | |
| top_labels.append([lid for lid, _ in sorted_labels[:k]]) | |
| return top_labels | |
| def precision_at_k(preds_topk, Y_true, k): | |
| hits = 0 | |
| total = len(preds_topk) | |
| for i, pred_labels in enumerate(preds_topk): | |
| true_labels = set(Y_true[i].nonzero()[1]) | |
| hits += len(set(pred_labels) & true_labels) | |
| return hits / (total * k) | |
| def evaluate_rforest(ensemble, X_test, Y_test, dim=300): | |
| svd = TruncatedSVD(n_components=dim, random_state=42) | |
| X_test_reduced = svd.fit_transform(X_test) | |
| pred_scores = predict_ensemble(X_test_reduced, ensemble) | |
| preds_at_1 = get_topk(pred_scores, k=1) | |
| preds_at_3 = get_topk(pred_scores, k=3) | |
| p1 = precision_at_k(preds_at_1, Y_test, k=1) | |
| p3 = precision_at_k(preds_at_3, Y_test, k=3) | |
| y_true = Y_test.toarray() | |
| y_pred_bin = np.zeros_like(y_true) | |
| for i, pred in enumerate(get_topk(pred_scores, k=5)): | |
| y_pred_bin[i, pred] = 1 | |
| macro_f1 = f1_score(y_true, y_pred_bin, average='macro', zero_division=0) | |
| return p1, p3, macro_f1 | |
| # ----------------------- Load Pretrained Model ----------------------- # | |
| def load_model(): | |
| return joblib.load("random_label_forest_model.pkl") | |
| ensemble_model = load_model() | |
| # ----------------------- User Interface ----------------------- # | |
| uploaded_file = st.file_uploader("Upload Test Dataset (.txt in RCV1 format)", type=["txt"]) | |
| if uploaded_file: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp: | |
| tmp.write(uploaded_file.getvalue()) | |
| test_file_path = tmp.name | |
| if st.button("Test Model"): | |
| st.info("Parsing dataset and evaluating. Please wait...") | |
| X_test, Y_test = parse_rcv1_sparse_file(test_file_path) | |
| p1, p3, macro_f1 = evaluate_rforest(ensemble_model, X_test, Y_test) | |
| st.success("Evaluation Completed!") | |
| st.metric("Precision@1", f"{p1:.4f}") | |
| st.metric("Precision@3", f"{p3:.4f}") | |
| st.metric("Macro F1 Score", f"{macro_f1:.4f}") | |