import streamlit as st import pandas as pd import os from wordcloud import WordCloud import matplotlib.pyplot as plt # Initialize session state if needed if 'selected_token' not in st.session_state: st.session_state.selected_token = None if 'selected_task' not in st.session_state: st.session_state.selected_task = None if 'selected_layer' not in st.session_state: st.session_state.selected_layer = None def get_available_tasks(): """Get list of available tasks based on directory structure.""" base_path = os.path.join("src", "codebert") return [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))] def get_available_layers(task): """Get list of available layers for a task.""" task_path = os.path.join("src", "codebert", task) layers = [] for item in os.listdir(task_path): if item.startswith("layer"): try: layer_num = int(item.replace("layer", "")) layers.append(layer_num) except ValueError: continue return sorted(layers) def load_predictions(task, layer): """Load predictions from CSV file.""" predictions_path = os.path.join("src", "codebert", task, f"layer{layer}", f"predictions_layer_{layer}.csv") if os.path.exists(predictions_path): try: # Read CSV with tab delimiter df = pd.read_csv(predictions_path, delimiter='\t') # Convert Token column to string to handle numeric tokens df['Token'] = df['Token'].astype(str) # Get the primary predicted cluster (Top 1) df['predicted_cluster'] = df['Top 1'].astype(str) # Create display strings for each token occurrence df['display_text'] = df.apply( lambda row: f"{row['Token']} (line {row['line_idx']}, pos {row['position_idx']}, cluster {row['predicted_cluster']})", axis=1 ) return df except Exception as e: st.error(f"Error loading predictions: {str(e)}") return None return None def load_clusters(task, layer): """Load cluster data from clusters file.""" clusters_path = os.path.join("src", "codebert", task, f"layer{layer}", "clusters-350.txt") if not os.path.exists(clusters_path): return None clusters = {} try: with open(clusters_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: # Skip empty lines continue try: # Split on ||| and get the parts parts = [p.strip() for p in line.split('|||')] if len(parts) == 5: token, occurrence, line_num, col_num, cluster_id = parts # Clean up cluster_id (remove any trailing pipes) cluster_id = cluster_id.split('|')[0].strip() if not cluster_id.isdigit(): # Skip if cluster_id is not a valid number continue cluster_id = str(int(cluster_id)) # Normalize cluster ID # Store in clusters dict if cluster_id not in clusters: clusters[cluster_id] = [] clusters[cluster_id].append({ 'token': token, 'line_num': int(line_num), 'col_num': int(col_num) }) except Exception: continue except Exception as e: st.error(f"Error loading clusters: {str(e)}") return None return clusters def load_dev_sentences(task, layer): """Load sentences from dev.in file.""" dev_path = os.path.join("src", "codebert", task, f"layer{layer}", "dev.in") if not os.path.exists(dev_path): dev_path = os.path.join("src", "codebert", task, "dev.in") try: with open(dev_path, 'r', encoding='utf-8') as f: return f.readlines() except Exception: return [] def load_train_sentences(task, layer): """Load sentences from input.in (training set) file.""" train_path = os.path.join("src", "codebert", task, f"layer{layer}", "input.in") if not os.path.exists(train_path): train_path = os.path.join("src", "codebert", task, "input.in") try: with open(train_path, 'r', encoding='utf-8') as f: return f.readlines() except Exception: return [] def is_cls_token(token): """Check if token is a CLS token (including numbered ones like [CLS]0).""" return token.startswith('[CLS]') def create_wordcloud(tokens_with_freq): """Create wordcloud from tokens with their frequencies.""" if not tokens_with_freq: return None try: # Set all frequencies to 1 to make all words the same size uniform_frequencies = {token: 1 for token in tokens_with_freq.keys()} wordcloud = WordCloud( width=800, height=400, background_color='#f9f9f9', # Very light gray, almost white prefer_horizontal=1, # All text horizontal relative_scaling=0, # This ensures uniform sizing min_font_size=35, # Ensure text is readable max_font_size=150, # Same as min to ensure uniform size ).generate_from_frequencies(uniform_frequencies) return wordcloud except Exception as e: st.error(f"Error creating wordcloud: {str(e)}") return None def load_explanation_words(task, layer): """Load explanation words file with labels.""" file_path = os.path.join("src", "codebert", task, f"layer{layer}", f"explanation_words_layer{layer}.csv") try: df = pd.read_csv(file_path, sep='\t') # Create a dictionary mapping (token, line_idx, position_idx) to label token_to_label = {} for _, row in df.iterrows(): key = (row['token'], row['line_idx'], row['position_idx']) token_to_label[key] = row['labels'] return token_to_label except Exception as e: st.error(f"Error loading explanation words: {str(e)}") return {} def main(): st.title("Token Analysis") # Task and Layer Selection col1, col2 = st.columns(2) with col1: available_tasks = get_available_tasks() selected_task = st.selectbox( "Select Task", available_tasks, key='task_selector' ) with col2: if selected_task: available_layers = get_available_layers(selected_task) selected_layer = st.selectbox( "Select Layer", available_layers, key='layer_selector' ) else: selected_layer = None # Only proceed if both task and layer are selected if selected_task and selected_layer is not None: predictions_df = load_predictions(selected_task, selected_layer) clusters = load_clusters(selected_task, selected_layer) dev_sentences = load_dev_sentences(selected_task, selected_layer) # Test set sentences train_sentences = load_train_sentences(selected_task, selected_layer) # Training set sentences token_labels = load_explanation_words(selected_task, selected_layer) # Load token labels if predictions_df is not None and clusters is not None: # Token selection with search search_token = st.text_input("Search tokens", key='token_search') # Filter display options based on search filtered_df = predictions_df if search_token: filtered_df = predictions_df[predictions_df['Token'].str.contains(search_token, case=False, na=False)] # Display token selection selected_token_display = st.selectbox( "Select a token occurrence", filtered_df['display_text'].tolist(), key='token_selector' ) if selected_token_display: # Get the selected row from the dataframe selected_row = filtered_df[filtered_df['display_text'] == selected_token_display].iloc[0] token = selected_row['Token'] line_idx = selected_row['line_idx'] position_idx = selected_row['position_idx'] # Get the label for the selected token token_key = (token, line_idx, position_idx) # Display token information st.header(f"Token: {token}") st.write(f"📍 Line: {selected_row['line_idx']}, Position: {selected_row['position_idx']}") st.metric("Predicted Cluster", selected_row['predicted_cluster']) if token_key in token_labels: st.write(f"**Label:** {token_labels[token_key]}") # Show original context from dev.in (test set) if dev_sentences and selected_row['line_idx'] < len(dev_sentences): st.subheader("Original Context (from test set)") st.code(dev_sentences[selected_row['line_idx']].strip()) # Show wordcloud for the cluster (from training set) if clusters and selected_row['predicted_cluster'] in clusters: token_frequencies = {} for token_info in clusters[selected_row['predicted_cluster']]: token = token_info['token'] token_frequencies[token] = 1 # Set all frequencies to 1 for uniform size if token_frequencies: st.subheader("Cluster Word Cloud (from training set)") wordcloud = create_wordcloud(token_frequencies) if wordcloud: plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') st.pyplot(plt) # Show similar contexts from the cluster (from training set) with st.expander(f"👀 View Similar Contexts (from training set, Cluster {selected_row['predicted_cluster']})"): if clusters and selected_row['predicted_cluster'] in clusters: shown_contexts = set() for token_info in clusters[selected_row['predicted_cluster']]: line_num = token_info['line_num'] if line_num >= 0 and line_num < len(train_sentences): context = train_sentences[line_num].strip() if context not in shown_contexts: st.code(context) shown_contexts.add(context) if not shown_contexts: st.info("No similar contexts found in this cluster from the training set.") if __name__ == "__main__": main()