Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| from wordcloud import WordCloud | |
| import matplotlib.pyplot as plt | |
| # Initialize session state if needed | |
| if 'selected_token' not in st.session_state: | |
| st.session_state.selected_token = None | |
| if 'selected_task' not in st.session_state: | |
| st.session_state.selected_task = None | |
| if 'selected_layer' not in st.session_state: | |
| st.session_state.selected_layer = None | |
| def get_available_tasks(): | |
| """Get list of available tasks based on directory structure.""" | |
| base_path = os.path.join("src", "codebert") | |
| return [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))] | |
| def get_available_layers(task): | |
| """Get list of available layers for a task.""" | |
| task_path = os.path.join("src", "codebert", task) | |
| layers = [] | |
| for item in os.listdir(task_path): | |
| if item.startswith("layer"): | |
| try: | |
| layer_num = int(item.replace("layer", "")) | |
| layers.append(layer_num) | |
| except ValueError: | |
| continue | |
| return sorted(layers) | |
| def load_predictions(task, layer): | |
| """Load predictions from CSV file.""" | |
| predictions_path = os.path.join("src", "codebert", task, f"layer{layer}", f"predictions_layer_{layer}.csv") | |
| if os.path.exists(predictions_path): | |
| try: | |
| # Read CSV with tab delimiter | |
| df = pd.read_csv(predictions_path, delimiter='\t') | |
| # Convert Token column to string to handle numeric tokens | |
| df['Token'] = df['Token'].astype(str) | |
| # Get the primary predicted cluster (Top 1) | |
| df['predicted_cluster'] = df['Top 1'].astype(str) | |
| # Create display strings for each token occurrence | |
| df['display_text'] = df.apply( | |
| lambda row: f"{row['Token']} (line {row['line_idx']}, pos {row['position_idx']}, cluster {row['predicted_cluster']})", | |
| axis=1 | |
| ) | |
| return df | |
| except Exception as e: | |
| st.error(f"Error loading predictions: {str(e)}") | |
| return None | |
| return None | |
| def load_clusters(task, layer): | |
| """Load cluster data from clusters file.""" | |
| clusters_path = os.path.join("src", "codebert", task, f"layer{layer}", "clusters-350.txt") | |
| if not os.path.exists(clusters_path): | |
| return None | |
| clusters = {} | |
| try: | |
| with open(clusters_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: # Skip empty lines | |
| continue | |
| try: | |
| # Split on ||| and get the parts | |
| parts = [p.strip() for p in line.split('|||')] | |
| if len(parts) == 5: | |
| token, occurrence, line_num, col_num, cluster_id = parts | |
| # Clean up cluster_id (remove any trailing pipes) | |
| cluster_id = cluster_id.split('|')[0].strip() | |
| if not cluster_id.isdigit(): # Skip if cluster_id is not a valid number | |
| continue | |
| cluster_id = str(int(cluster_id)) # Normalize cluster ID | |
| # Store in clusters dict | |
| if cluster_id not in clusters: | |
| clusters[cluster_id] = [] | |
| clusters[cluster_id].append({ | |
| 'token': token, | |
| 'line_num': int(line_num), | |
| 'col_num': int(col_num) | |
| }) | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| st.error(f"Error loading clusters: {str(e)}") | |
| return None | |
| return clusters | |
| def load_dev_sentences(task, layer): | |
| """Load sentences from dev.in file.""" | |
| dev_path = os.path.join("src", "codebert", task, f"layer{layer}", "dev.in") | |
| if not os.path.exists(dev_path): | |
| dev_path = os.path.join("src", "codebert", task, "dev.in") | |
| try: | |
| with open(dev_path, 'r', encoding='utf-8') as f: | |
| return f.readlines() | |
| except Exception: | |
| return [] | |
| def load_train_sentences(task, layer): | |
| """Load sentences from input.in (training set) file.""" | |
| train_path = os.path.join("src", "codebert", task, f"layer{layer}", "input.in") | |
| if not os.path.exists(train_path): | |
| train_path = os.path.join("src", "codebert", task, "input.in") | |
| try: | |
| with open(train_path, 'r', encoding='utf-8') as f: | |
| return f.readlines() | |
| except Exception: | |
| return [] | |
| def is_cls_token(token): | |
| """Check if token is a CLS token (including numbered ones like [CLS]0).""" | |
| return token.startswith('[CLS]') | |
| def create_wordcloud(tokens_with_freq): | |
| """Create wordcloud from tokens with their frequencies.""" | |
| if not tokens_with_freq: | |
| return None | |
| try: | |
| # Set all frequencies to 1 to make all words the same size | |
| uniform_frequencies = {token: 1 for token in tokens_with_freq.keys()} | |
| wordcloud = WordCloud( | |
| width=800, | |
| height=400, | |
| background_color='#f9f9f9', # Very light gray, almost white | |
| prefer_horizontal=1, # All text horizontal | |
| relative_scaling=0, # This ensures uniform sizing | |
| min_font_size=35, # Ensure text is readable | |
| max_font_size=150, # Same as min to ensure uniform size | |
| ).generate_from_frequencies(uniform_frequencies) | |
| return wordcloud | |
| except Exception as e: | |
| st.error(f"Error creating wordcloud: {str(e)}") | |
| return None | |
| def load_explanation_words(task, layer): | |
| """Load explanation words file with labels.""" | |
| file_path = os.path.join("src", "codebert", task, f"layer{layer}", f"explanation_words_layer{layer}.csv") | |
| try: | |
| df = pd.read_csv(file_path, sep='\t') | |
| # Create a dictionary mapping (token, line_idx, position_idx) to label | |
| token_to_label = {} | |
| for _, row in df.iterrows(): | |
| key = (row['token'], row['line_idx'], row['position_idx']) | |
| token_to_label[key] = row['labels'] | |
| return token_to_label | |
| except Exception as e: | |
| st.error(f"Error loading explanation words: {str(e)}") | |
| return {} | |
| def main(): | |
| st.title("Token Analysis") | |
| # Task and Layer Selection | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| available_tasks = get_available_tasks() | |
| selected_task = st.selectbox( | |
| "Select Task", | |
| available_tasks, | |
| key='task_selector' | |
| ) | |
| with col2: | |
| if selected_task: | |
| available_layers = get_available_layers(selected_task) | |
| selected_layer = st.selectbox( | |
| "Select Layer", | |
| available_layers, | |
| key='layer_selector' | |
| ) | |
| else: | |
| selected_layer = None | |
| # Only proceed if both task and layer are selected | |
| if selected_task and selected_layer is not None: | |
| predictions_df = load_predictions(selected_task, selected_layer) | |
| clusters = load_clusters(selected_task, selected_layer) | |
| dev_sentences = load_dev_sentences(selected_task, selected_layer) # Test set sentences | |
| train_sentences = load_train_sentences(selected_task, selected_layer) # Training set sentences | |
| token_labels = load_explanation_words(selected_task, selected_layer) # Load token labels | |
| if predictions_df is not None and clusters is not None: | |
| # Token selection with search | |
| search_token = st.text_input("Search tokens", key='token_search') | |
| # Filter display options based on search | |
| filtered_df = predictions_df | |
| if search_token: | |
| filtered_df = predictions_df[predictions_df['Token'].str.contains(search_token, case=False, na=False)] | |
| # Display token selection | |
| selected_token_display = st.selectbox( | |
| "Select a token occurrence", | |
| filtered_df['display_text'].tolist(), | |
| key='token_selector' | |
| ) | |
| if selected_token_display: | |
| # Get the selected row from the dataframe | |
| selected_row = filtered_df[filtered_df['display_text'] == selected_token_display].iloc[0] | |
| token = selected_row['Token'] | |
| line_idx = selected_row['line_idx'] | |
| position_idx = selected_row['position_idx'] | |
| # Get the label for the selected token | |
| token_key = (token, line_idx, position_idx) | |
| # Display token information | |
| st.header(f"Token: {token}") | |
| st.write(f"๐ Line: {selected_row['line_idx']}, Position: {selected_row['position_idx']}") | |
| st.metric("Predicted Cluster", selected_row['predicted_cluster']) | |
| if token_key in token_labels: | |
| st.write(f"**Label:** {token_labels[token_key]}") | |
| # Show original context from dev.in (test set) | |
| if dev_sentences and selected_row['line_idx'] < len(dev_sentences): | |
| st.subheader("Original Context (from test set)") | |
| st.code(dev_sentences[selected_row['line_idx']].strip()) | |
| # Show wordcloud for the cluster (from training set) | |
| if clusters and selected_row['predicted_cluster'] in clusters: | |
| token_frequencies = {} | |
| for token_info in clusters[selected_row['predicted_cluster']]: | |
| token = token_info['token'] | |
| token_frequencies[token] = 1 # Set all frequencies to 1 for uniform size | |
| if token_frequencies: | |
| st.subheader("Cluster Word Cloud (from training set)") | |
| wordcloud = create_wordcloud(token_frequencies) | |
| if wordcloud: | |
| plt.figure(figsize=(10, 5)) | |
| plt.imshow(wordcloud, interpolation='bilinear') | |
| plt.axis('off') | |
| st.pyplot(plt) | |
| # Show similar contexts from the cluster (from training set) | |
| with st.expander(f"๐ View Similar Contexts (from training set, Cluster {selected_row['predicted_cluster']})"): | |
| if clusters and selected_row['predicted_cluster'] in clusters: | |
| shown_contexts = set() | |
| for token_info in clusters[selected_row['predicted_cluster']]: | |
| line_num = token_info['line_num'] | |
| if line_num >= 0 and line_num < len(train_sentences): | |
| context = train_sentences[line_num].strip() | |
| if context not in shown_contexts: | |
| st.code(context) | |
| shown_contexts.add(context) | |
| if not shown_contexts: | |
| st.info("No similar contexts found in this cluster from the training set.") | |
| if __name__ == "__main__": | |
| main() |