# streamlit run visualize_splink_networks_from_csv.py import streamlit as st import pandas as pd import numpy as np import jellyfish # For quick string similarity (Levenshtein, Jaro, etc.) import io import uuid from io import BytesIO from st_link_analysis import st_link_analysis, NodeStyle, EdgeStyle # Try to import networkx, fall back to manual implementation if not available try: import networkx as nx HAS_NETWORKX = True except ImportError: HAS_NETWORKX = False # ---------------------- # CONFIG # ---------------------- DEFAULT_NODE_LABEL = "Record" DEFAULT_REL_TYPE = "SIMILAR" DEFAULT_THRESHOLD = 0.80 # default similarity threshold MAX_REDLINE_PREVIEW = 10 # how many top edges to preview with "red-lining" st.set_page_config( page_title="CSV ER & Network Graph", layout="wide", initial_sidebar_state="expanded" ) st.title("Entity Resolution on CSV (Network Graph)") # ---------------------- # SIDEBAR: CSV UPLOAD # ---------------------- st.sidebar.header("Upload CSV for Entity Resolution") uploaded_file = st.sidebar.file_uploader( "📁 Choose a CSV file", type=["csv"], help="Drag and drop your CSV file here or click to browse", accept_multiple_files=False, key="csv_uploader" ) # File upload status info if uploaded_file is None: st.sidebar.info("👆 **Drag & drop** your CSV file above or click to browse") st.sidebar.markdown("**Supported formats:** `.csv` files") st.sidebar.markdown("**Max size:** 200MB") # Initialize session state for data if 'uploaded_data_df' not in st.session_state: st.session_state.uploaded_data_df = None if 'last_uploaded_file' not in st.session_state: st.session_state.last_uploaded_file = None # Enhanced file upload processing with comprehensive error handling if uploaded_file is not None: # Check if this is a new file file_id = f"{uploaded_file.name}_{uploaded_file.size}" if 'current_file_id' not in st.session_state: st.session_state.current_file_id = None # Only process if it's a new file if st.session_state.current_file_id != file_id: st.sidebar.info("🔄 Processing uploaded file...") # Try multiple methods to read the file df = None success_method = None # Method 1: Direct pandas read try: df = pd.read_csv(uploaded_file) success_method = "Direct Read" except Exception as e1: st.sidebar.warning(f"Method 1 failed: {str(e1)[:50]}...") # Method 2: Read as bytes try: uploaded_file.seek(0) # Reset file pointer bytes_data = uploaded_file.getvalue() df = pd.read_csv(BytesIO(bytes_data)) success_method = "Bytes Read" except Exception as e2: st.sidebar.warning(f"Method 2 failed: {str(e2)[:50]}...") # Method 3: Read as string try: uploaded_file.seek(0) # Reset file pointer string_data = uploaded_file.getvalue().decode("utf-8") df = pd.read_csv(io.StringIO(string_data)) success_method = "String Read" except Exception as e3: st.sidebar.error(f"Method 3 failed: {str(e3)[:50]}...") # Method 4: Force UTF-8 encoding try: uploaded_file.seek(0) raw_data = uploaded_file.read() if isinstance(raw_data, bytes): string_data = raw_data.decode('utf-8') else: string_data = raw_data df = pd.read_csv(io.StringIO(string_data)) success_method = "Force UTF-8" except Exception as e4: st.sidebar.error(f"All methods failed. Last error: {str(e4)}") # If successful, store the data if df is not None: st.session_state.uploaded_data_df = df st.session_state.last_uploaded_file = uploaded_file.name st.session_state.current_file_id = file_id st.sidebar.success(f"✅ **File loaded successfully!**") st.sidebar.success(f"📁 **{uploaded_file.name}** ({len(df)} rows, {len(df.columns)} columns)") st.sidebar.info(f"🔧 Used method: {success_method}") else: st.sidebar.error("❌ **All reading methods failed!**") st.sidebar.info("💡 Try the manual processing button below or paste your data") st.session_state.uploaded_data_df = None else: # File already processed if st.session_state.uploaded_data_df is not None: st.sidebar.success(f"✅ **File already loaded:** {uploaded_file.name}") st.sidebar.info(f"📊 {len(st.session_state.uploaded_data_df)} rows, {len(st.session_state.uploaded_data_df.columns)} columns") # Manual processing button as backup if uploaded_file is not None and st.session_state.uploaded_data_df is None: if st.sidebar.button("🔄 Process Uploaded File"): try: df = pd.read_csv(uploaded_file) st.session_state.uploaded_data_df = df st.session_state.last_uploaded_file = uploaded_file.name st.sidebar.success("✅ File processed manually!") st.rerun() except Exception as e: st.sidebar.error(f"❌ Error: {str(e)}") # Show current data status if st.session_state.uploaded_data_df is not None: st.sidebar.write(f"**Current Data:** {len(st.session_state.uploaded_data_df)} rows loaded") else: st.sidebar.write("**Current Data:** None") # Clear data button if st.sidebar.button("🗑️ Clear All Data"): st.session_state.uploaded_data_df = None st.session_state.last_uploaded_file = None st.sidebar.success("Data cleared!") # Alternative: Paste CSV data directly st.sidebar.markdown("---") st.sidebar.markdown("**Alternative: Paste CSV data:**") csv_text = st.sidebar.text_area( "Paste your CSV data here:", height=100, placeholder="first_name,last_name,email_address,phone_number\nJohn,Smith,john@email.com,555-0123\nJane,Doe,jane@email.com,555-0456" ) if st.sidebar.button("📋 Process Pasted Data") and csv_text.strip(): try: # Convert text to CSV using StringIO from io import StringIO df = pd.read_csv(StringIO(csv_text)) st.session_state.uploaded_data_df = df st.session_state.last_uploaded_file = "pasted_data" st.sidebar.success(f"✅ **Pasted data loaded!** ({len(df)} rows, {len(df.columns)} columns)") except Exception as e: st.sidebar.error(f"❌ **Error parsing CSV:** {str(e)}") st.sidebar.info("💡 Make sure your data is in proper CSV format with headers") # Generate sample data option st.sidebar.markdown("---") st.sidebar.markdown("**Or use sample data:**") if st.sidebar.button("Use Sample Data"): # Create simple sample data for testing st.session_state.uploaded_data_df = pd.DataFrame({ 'first_name': ['John', 'Jon', 'Jane', 'Jain', 'Mike', 'Michael'], 'last_name': ['Smith', 'Smith', 'Doe', 'Doe', 'Johnson', 'Johnson'], 'email_address': ['john.smith@email.com', 'j.smith@gmail.com', 'jane.doe@company.com', 'jdoe@company.com', 'mike.j@work.com', 'michael.johnson@work.com'], 'phone_number': ['555-0123', '555-0123', '555-0456', '(555) 456-0000', '555-0789', '5550789'] }) st.session_state.last_uploaded_file = "sample_data" st.sidebar.success("Sample data loaded!") similarity_threshold = st.sidebar.slider( "Similarity Threshold", min_value=0.0, max_value=1.0, value=DEFAULT_THRESHOLD, step=0.01 ) # Choose which columns to compare st.sidebar.header("Similarity Columns") # The user can list (or guess) which columns in the CSV are relevant for measuring similarity # We'll default to common ones from 'create_mock_data_csv.py': first_name, last_name, email_address, phone_number default_cols = "first_name,last_name,email_address,phone_number" similarity_cols_raw = st.sidebar.text_input( "Columns to compare (comma-separated):", value=default_cols ) similarity_cols = [c.strip() for c in similarity_cols_raw.split(",") if c.strip()] # If the user wants to see red-lining differences show_redlining = st.sidebar.checkbox("Show red-lined differences for top pairs", value=True) # Data and Graph placeholders df = None elements = {"nodes": [], "edges": []} # ---------------------- # UTILITY FUNCTIONS # ---------------------- def jaro_winkler_score(str1, str2): """Simple wrapper around jellyfish.jaro_winkler for string similarity.""" return jellyfish.jaro_winkler_similarity(str1 or "", str2 or "") def overall_similarity(row1, row2, cols): """ Compute an average similarity across the provided columns. You could weight them or do more sophisticated logic. """ scores = [] for col in cols: val1 = str(row1.get(col, "")).lower() val2 = str(row2.get(col, "")).lower() if val1 == "" or val2 == "": # If one is empty, skip or treat as partial continue sim = jaro_winkler_score(val1, val2) scores.append(sim) if len(scores) == 0: return 0.0 return sum(scores) / len(scores) def redline_text(str1, str2): """ A simplistic "red-lining" of differences: We'll highlight mismatched characters in red. This helps show how two strings differ. """ # For brevity, let's just do a character-by-character compare: # if they match, we keep them black; if not, we color them red. # In practice, you might do a diff algorithm for better results. out = [] max_len = max(len(str1), len(str2)) for i in range(max_len): c1 = str1[i] if i < len(str1) else "" c2 = str2[i] if i < len(str2) else "" if c1 == c2: out.append(c1) # same char else: # highlight mismatch out.append(f"{c1 or '_'}") # If str2 is longer, we won't show it in the same line for now. # You can adapt to show side-by-side. We'll keep it simple. return "".join(out) def find_connected_components_manual(nodes, edges): """ Manual implementation of connected components finding. Fallback when NetworkX is not available. """ # Build adjacency list adj_list = {node: set() for node in nodes} for edge in edges: source = edge["data"]["source"] target = edge["data"]["target"] adj_list[source].add(target) adj_list[target].add(source) visited = set() components = [] def dfs(node, component): if node in visited: return visited.add(node) component.add(node) for neighbor in adj_list[node]: dfs(neighbor, component) for node in nodes: if node not in visited: component = set() dfs(node, component) if component: # Only add non-empty components components.append(component) return components # ---------------------- # LOAD CSV & PROCESS # ---------------------- # Use the unified session state data if st.session_state.uploaded_data_df is not None: st.markdown("### Preview of Data") df = st.session_state.uploaded_data_df st.dataframe(df.head(1000)) st.info(f"📊 Dataset contains {len(df)} rows and {len(df.columns)} columns") # Provide a "Run Entity Resolution" button if st.button("Run Entity Resolution"): # STEP 1: Generate nodes # We'll create one node per row, storing all row data as properties nodes = [] for idx, row in df.iterrows(): node_data = row.to_dict() node_data["id"] = str(idx) # use row index as unique ID node_data["label"] = DEFAULT_NODE_LABEL # We'll store "name" as a short label for the node # e.g. we might use something like first_name + last_name or a subset # but for demonstration, let's just do "row index" or any chosen fields first_name = row.get("first_name", "") last_name = row.get("last_name", "") short_label = f"{first_name} {last_name}".strip() if not short_label.strip(): short_label = f"Row-{idx}" node_data["name"] = short_label nodes.append({"data": node_data}) # STEP 2: Pairwise similarity for edges # We'll do a naive all-pairs approach. For large data, you'd do blocking. edges = [] for i in range(len(df)): for j in range(i + 1, len(df)): sim = overall_similarity(df.loc[i], df.loc[j], similarity_cols) if sim >= similarity_threshold: edge_data = { "id": f"edge_{i}_{j}", "source": str(i), "target": str(j), "label": DEFAULT_REL_TYPE, "similarity": round(sim, 3) } edges.append({"data": edge_data}) elements = {"nodes": nodes, "edges": edges} st.success("Entity Resolution complete! Network graph built.") # ------------ # Visualization st.markdown("### Network Graph") node_labels = set(node["data"]["label"] for node in elements["nodes"]) rel_labels = set(edge["data"]["label"] for edge in elements["edges"]) # Basic styling default_colors = ["#2A629A", "#FF7F3E", "#C0C0C0", "#008000", "#800080"] node_styles = [] for i, label in enumerate(sorted(node_labels)): color = default_colors[i % len(default_colors)] node_styles.append(NodeStyle(label=label, color=color, caption="name")) edge_styles = [] for rel in sorted(rel_labels): edge_styles.append(EdgeStyle(rel, caption="similarity", directed=False)) st_link_analysis( elements, layout="cose", node_styles=node_styles, edge_styles=edge_styles ) # ------------ # Community Detection & CSV Export st.markdown("### Community Detection Results") # Find connected components (communities) if HAS_NETWORKX: # Use NetworkX if available G = nx.Graph() for node in elements["nodes"]: G.add_node(node["data"]["id"]) for edge in elements["edges"]: G.add_edge(edge["data"]["source"], edge["data"]["target"]) communities = list(nx.connected_components(G)) else: # Use manual implementation as fallback st.info("NetworkX not found. Using manual connected components algorithm. Install NetworkX for better performance: `pip install networkx`") node_ids = [node["data"]["id"] for node in elements["nodes"]] communities = find_connected_components_manual(node_ids, elements["edges"]) # Create a mapping from node_id to community_id node_to_community = {} community_uuids = {} for i, community in enumerate(communities): community_uuid = str(uuid.uuid4()) community_uuids[i] = community_uuid for node_id in community: node_to_community[node_id] = community_uuid # Add community IDs to the original dataframe df_with_communities = df.copy() df_with_communities['community_id'] = [ node_to_community.get(str(idx), str(uuid.uuid4())) for idx in df_with_communities.index ] st.write(f"**Found {len(communities)} communities:**") for i, community in enumerate(communities): st.write(f"- Community {i+1}: {len(community)} records (UUID: {community_uuids[i]})") # Show the results dataframe st.markdown("#### Results with Community IDs") st.dataframe(df_with_communities) # ------------ # Canonical View Section st.markdown("#### Canonical View (Representative Records by Community)") # Create canonical view - one representative record per community canonical_records = [] for i, community in enumerate(communities): community_uuid = community_uuids[i] # Get all records in this community community_rows = df_with_communities[df_with_communities['community_id'] == community_uuid] if len(community_rows) > 0: # Use the first record as the representative, but could use most complete, most recent, etc. representative = community_rows.iloc[0].copy() # Add community metadata representative['community_size'] = len(community_rows) representative['community_members'] = f"{len(community_rows)} records" # Create a summary of variations if there are multiple records if len(community_rows) > 1: variations = [] for col in similarity_cols: if col in community_rows.columns: unique_vals = community_rows[col].dropna().unique() if len(unique_vals) > 1: variations.append(f"{col}: {len(unique_vals)} variants") representative['variations'] = "; ".join(variations) if variations else "No variations" else: representative['variations'] = "Single record" canonical_records.append(representative) if canonical_records: canonical_df = pd.DataFrame(canonical_records) # Select and reorder columns for canonical view display_cols = [] # Always include community info first display_cols.extend(['community_id', 'community_members', 'variations']) # Add similarity columns if they exist for col in similarity_cols: if col in canonical_df.columns: display_cols.append(col) # Add other important columns (avoiding duplicates) for col in canonical_df.columns: if col not in display_cols and col not in ['community_size']: display_cols.append(col) # Filter to only existing columns display_cols = [col for col in display_cols if col in canonical_df.columns] canonical_display = canonical_df[display_cols] st.dataframe(canonical_display) st.info(f"📋 Showing {len(canonical_display)} canonical records representing {len(communities)} communities") # Export canonical view option canonical_csv_buffer = io.StringIO() canonical_display.to_csv(canonical_csv_buffer, index=False) canonical_csv_data = canonical_csv_buffer.getvalue() st.download_button( label="📥 Download Canonical View as CSV", data=canonical_csv_data, file_name="canonical_entity_resolution.csv", mime="text/csv", key="canonical_download" ) else: st.warning("No canonical records to display") # CSV Export option st.markdown("#### Export Results") csv_buffer = io.StringIO() df_with_communities.to_csv(csv_buffer, index=False) csv_data = csv_buffer.getvalue() st.download_button( label="📥 Download Results as CSV", data=csv_data, file_name="entity_resolution_results.csv", mime="text/csv" ) # ------------ # Red-lining (moved to bottom as lower priority) if show_redlining and len(edges) > 0: st.markdown("### Top Similar Pairs (Red-Lined Differences)") # Filter out exact matches (similarity == 1.0) filtered_edges = [ edge for edge in edges if edge["data"]["similarity"] < 1.0 ] # Sort by highest similarity (closest matches first) sorted_edges = sorted(filtered_edges, key=lambda e: e["data"]["similarity"], reverse=True) top_edges = sorted_edges[:MAX_REDLINE_PREVIEW] if not top_edges: st.info("No slightly different pairs found; all matches are exact or none meet the threshold.") else: for edge_item in top_edges: s_idx = int(edge_item["data"]["source"]) t_idx = int(edge_item["data"]["target"]) sim_val = edge_item["data"]["similarity"] st.markdown(f"**Pair:** Row {s_idx} ↔ Row {t_idx}, **similarity**={sim_val}") # Highlight differences in selected columns mismatch_cols = [] for col in similarity_cols: val1 = str(df.loc[s_idx, col]) val2 = str(df.loc[t_idx, col]) if val1.lower() != val2.lower(): mismatch_cols.append((col, val1, val2)) if mismatch_cols: st.write("Differences in the following columns:") for col_name, str1, str2 in mismatch_cols: redlined = redline_text(str1, str2) st.markdown(f"  **{col_name}:** {redlined}", unsafe_allow_html=True) else: st.write("No differences in the compared columns.") st.markdown("---") # ------------ # Enterprise Scale Note st.markdown("---") st.markdown("### 📈 Enterprise Scale Solutions") if not HAS_NETWORKX: st.warning(""" **Missing NetworkX Dependency** For better performance, install NetworkX: ```bash pip install networkx ``` """) st.info(""" **Need help with larger scale deployments?** If you need to persist UUIDs from run to run, handle larger datasets, or require more sophisticated entity resolution capabilities, you may need an enterprise-scale solution. Consider: - **Database Integration**: Store community IDs in a persistent database - **Incremental Processing**: Handle new data without re-processing everything - **Advanced Blocking**: Use more sophisticated blocking strategies for large datasets - **Distributed Computing**: Scale across multiple machines for very large datasets - **Custom ML Models**: Train domain-specific models for better accuracy Contact **Eastridge Analytics** for guidance on enterprise implementations. """) else: st.info("Please upload a CSV file in the sidebar to begin.")