ohmygaugh's picture
Add canonical view section with representative records per community
4560dc0
# streamlit run visualize_splink_networks_from_csv.py
import streamlit as st
import pandas as pd
import numpy as np
import jellyfish # For quick string similarity (Levenshtein, Jaro, etc.)
import io
import uuid
from io import BytesIO
from st_link_analysis import st_link_analysis, NodeStyle, EdgeStyle
# Try to import networkx, fall back to manual implementation if not available
try:
import networkx as nx
HAS_NETWORKX = True
except ImportError:
HAS_NETWORKX = False
# ----------------------
# CONFIG
# ----------------------
DEFAULT_NODE_LABEL = "Record"
DEFAULT_REL_TYPE = "SIMILAR"
DEFAULT_THRESHOLD = 0.80 # default similarity threshold
MAX_REDLINE_PREVIEW = 10 # how many top edges to preview with "red-lining"
st.set_page_config(
page_title="CSV ER & Network Graph",
layout="wide",
initial_sidebar_state="expanded"
)
st.title("Entity Resolution on CSV (Network Graph)")
# ----------------------
# SIDEBAR: CSV UPLOAD
# ----------------------
st.sidebar.header("Upload CSV for Entity Resolution")
uploaded_file = st.sidebar.file_uploader(
"πŸ“ Choose a CSV file",
type=["csv"],
help="Drag and drop your CSV file here or click to browse",
accept_multiple_files=False,
key="csv_uploader"
)
# File upload status info
if uploaded_file is None:
st.sidebar.info("πŸ‘† **Drag & drop** your CSV file above or click to browse")
st.sidebar.markdown("**Supported formats:** `.csv` files")
st.sidebar.markdown("**Max size:** 200MB")
# Initialize session state for data
if 'uploaded_data_df' not in st.session_state:
st.session_state.uploaded_data_df = None
if 'last_uploaded_file' not in st.session_state:
st.session_state.last_uploaded_file = None
# Enhanced file upload processing with comprehensive error handling
if uploaded_file is not None:
# Check if this is a new file
file_id = f"{uploaded_file.name}_{uploaded_file.size}"
if 'current_file_id' not in st.session_state:
st.session_state.current_file_id = None
# Only process if it's a new file
if st.session_state.current_file_id != file_id:
st.sidebar.info("πŸ”„ Processing uploaded file...")
# Try multiple methods to read the file
df = None
success_method = None
# Method 1: Direct pandas read
try:
df = pd.read_csv(uploaded_file)
success_method = "Direct Read"
except Exception as e1:
st.sidebar.warning(f"Method 1 failed: {str(e1)[:50]}...")
# Method 2: Read as bytes
try:
uploaded_file.seek(0) # Reset file pointer
bytes_data = uploaded_file.getvalue()
df = pd.read_csv(BytesIO(bytes_data))
success_method = "Bytes Read"
except Exception as e2:
st.sidebar.warning(f"Method 2 failed: {str(e2)[:50]}...")
# Method 3: Read as string
try:
uploaded_file.seek(0) # Reset file pointer
string_data = uploaded_file.getvalue().decode("utf-8")
df = pd.read_csv(io.StringIO(string_data))
success_method = "String Read"
except Exception as e3:
st.sidebar.error(f"Method 3 failed: {str(e3)[:50]}...")
# Method 4: Force UTF-8 encoding
try:
uploaded_file.seek(0)
raw_data = uploaded_file.read()
if isinstance(raw_data, bytes):
string_data = raw_data.decode('utf-8')
else:
string_data = raw_data
df = pd.read_csv(io.StringIO(string_data))
success_method = "Force UTF-8"
except Exception as e4:
st.sidebar.error(f"All methods failed. Last error: {str(e4)}")
# If successful, store the data
if df is not None:
st.session_state.uploaded_data_df = df
st.session_state.last_uploaded_file = uploaded_file.name
st.session_state.current_file_id = file_id
st.sidebar.success(f"βœ… **File loaded successfully!**")
st.sidebar.success(f"πŸ“ **{uploaded_file.name}** ({len(df)} rows, {len(df.columns)} columns)")
st.sidebar.info(f"πŸ”§ Used method: {success_method}")
else:
st.sidebar.error("❌ **All reading methods failed!**")
st.sidebar.info("πŸ’‘ Try the manual processing button below or paste your data")
st.session_state.uploaded_data_df = None
else:
# File already processed
if st.session_state.uploaded_data_df is not None:
st.sidebar.success(f"βœ… **File already loaded:** {uploaded_file.name}")
st.sidebar.info(f"πŸ“Š {len(st.session_state.uploaded_data_df)} rows, {len(st.session_state.uploaded_data_df.columns)} columns")
# Manual processing button as backup
if uploaded_file is not None and st.session_state.uploaded_data_df is None:
if st.sidebar.button("πŸ”„ Process Uploaded File"):
try:
df = pd.read_csv(uploaded_file)
st.session_state.uploaded_data_df = df
st.session_state.last_uploaded_file = uploaded_file.name
st.sidebar.success("βœ… File processed manually!")
st.rerun()
except Exception as e:
st.sidebar.error(f"❌ Error: {str(e)}")
# Show current data status
if st.session_state.uploaded_data_df is not None:
st.sidebar.write(f"**Current Data:** {len(st.session_state.uploaded_data_df)} rows loaded")
else:
st.sidebar.write("**Current Data:** None")
# Clear data button
if st.sidebar.button("πŸ—‘οΈ Clear All Data"):
st.session_state.uploaded_data_df = None
st.session_state.last_uploaded_file = None
st.sidebar.success("Data cleared!")
# Alternative: Paste CSV data directly
st.sidebar.markdown("---")
st.sidebar.markdown("**Alternative: Paste CSV data:**")
csv_text = st.sidebar.text_area(
"Paste your CSV data here:",
height=100,
placeholder="first_name,last_name,email_address,phone_number\nJohn,Smith,john@email.com,555-0123\nJane,Doe,jane@email.com,555-0456"
)
if st.sidebar.button("πŸ“‹ Process Pasted Data") and csv_text.strip():
try:
# Convert text to CSV using StringIO
from io import StringIO
df = pd.read_csv(StringIO(csv_text))
st.session_state.uploaded_data_df = df
st.session_state.last_uploaded_file = "pasted_data"
st.sidebar.success(f"βœ… **Pasted data loaded!** ({len(df)} rows, {len(df.columns)} columns)")
except Exception as e:
st.sidebar.error(f"❌ **Error parsing CSV:** {str(e)}")
st.sidebar.info("πŸ’‘ Make sure your data is in proper CSV format with headers")
# Generate sample data option
st.sidebar.markdown("---")
st.sidebar.markdown("**Or use sample data:**")
if st.sidebar.button("Use Sample Data"):
# Create simple sample data for testing
st.session_state.uploaded_data_df = pd.DataFrame({
'first_name': ['John', 'Jon', 'Jane', 'Jain', 'Mike', 'Michael'],
'last_name': ['Smith', 'Smith', 'Doe', 'Doe', 'Johnson', 'Johnson'],
'email_address': ['john.smith@email.com', 'j.smith@gmail.com', 'jane.doe@company.com', 'jdoe@company.com', 'mike.j@work.com', 'michael.johnson@work.com'],
'phone_number': ['555-0123', '555-0123', '555-0456', '(555) 456-0000', '555-0789', '5550789']
})
st.session_state.last_uploaded_file = "sample_data"
st.sidebar.success("Sample data loaded!")
similarity_threshold = st.sidebar.slider(
"Similarity Threshold",
min_value=0.0,
max_value=1.0,
value=DEFAULT_THRESHOLD,
step=0.01
)
# Choose which columns to compare
st.sidebar.header("Similarity Columns")
# The user can list (or guess) which columns in the CSV are relevant for measuring similarity
# We'll default to common ones from 'create_mock_data_csv.py': first_name, last_name, email_address, phone_number
default_cols = "first_name,last_name,email_address,phone_number"
similarity_cols_raw = st.sidebar.text_input(
"Columns to compare (comma-separated):",
value=default_cols
)
similarity_cols = [c.strip() for c in similarity_cols_raw.split(",") if c.strip()]
# If the user wants to see red-lining differences
show_redlining = st.sidebar.checkbox("Show red-lined differences for top pairs", value=True)
# Data and Graph placeholders
df = None
elements = {"nodes": [], "edges": []}
# ----------------------
# UTILITY FUNCTIONS
# ----------------------
def jaro_winkler_score(str1, str2):
"""Simple wrapper around jellyfish.jaro_winkler for string similarity."""
return jellyfish.jaro_winkler_similarity(str1 or "", str2 or "")
def overall_similarity(row1, row2, cols):
"""
Compute an average similarity across the provided columns.
You could weight them or do more sophisticated logic.
"""
scores = []
for col in cols:
val1 = str(row1.get(col, "")).lower()
val2 = str(row2.get(col, "")).lower()
if val1 == "" or val2 == "":
# If one is empty, skip or treat as partial
continue
sim = jaro_winkler_score(val1, val2)
scores.append(sim)
if len(scores) == 0:
return 0.0
return sum(scores) / len(scores)
def redline_text(str1, str2):
"""
A simplistic "red-lining" of differences:
We'll highlight mismatched characters in red.
This helps show how two strings differ.
"""
# For brevity, let's just do a character-by-character compare:
# if they match, we keep them black; if not, we color them red.
# In practice, you might do a diff algorithm for better results.
out = []
max_len = max(len(str1), len(str2))
for i in range(max_len):
c1 = str1[i] if i < len(str1) else ""
c2 = str2[i] if i < len(str2) else ""
if c1 == c2:
out.append(c1) # same char
else:
# highlight mismatch
out.append(f"<span style='color:red'>{c1 or '_'}</span>")
# If str2 is longer, we won't show it in the same line for now.
# You can adapt to show side-by-side. We'll keep it simple.
return "".join(out)
def find_connected_components_manual(nodes, edges):
"""
Manual implementation of connected components finding.
Fallback when NetworkX is not available.
"""
# Build adjacency list
adj_list = {node: set() for node in nodes}
for edge in edges:
source = edge["data"]["source"]
target = edge["data"]["target"]
adj_list[source].add(target)
adj_list[target].add(source)
visited = set()
components = []
def dfs(node, component):
if node in visited:
return
visited.add(node)
component.add(node)
for neighbor in adj_list[node]:
dfs(neighbor, component)
for node in nodes:
if node not in visited:
component = set()
dfs(node, component)
if component: # Only add non-empty components
components.append(component)
return components
# ----------------------
# LOAD CSV & PROCESS
# ----------------------
# Use the unified session state data
if st.session_state.uploaded_data_df is not None:
st.markdown("### Preview of Data")
df = st.session_state.uploaded_data_df
st.dataframe(df.head(1000))
st.info(f"πŸ“Š Dataset contains {len(df)} rows and {len(df.columns)} columns")
# Provide a "Run Entity Resolution" button
if st.button("Run Entity Resolution"):
# STEP 1: Generate nodes
# We'll create one node per row, storing all row data as properties
nodes = []
for idx, row in df.iterrows():
node_data = row.to_dict()
node_data["id"] = str(idx) # use row index as unique ID
node_data["label"] = DEFAULT_NODE_LABEL
# We'll store "name" as a short label for the node
# e.g. we might use something like first_name + last_name or a subset
# but for demonstration, let's just do "row index" or any chosen fields
first_name = row.get("first_name", "")
last_name = row.get("last_name", "")
short_label = f"{first_name} {last_name}".strip()
if not short_label.strip():
short_label = f"Row-{idx}"
node_data["name"] = short_label
nodes.append({"data": node_data})
# STEP 2: Pairwise similarity for edges
# We'll do a naive all-pairs approach. For large data, you'd do blocking.
edges = []
for i in range(len(df)):
for j in range(i + 1, len(df)):
sim = overall_similarity(df.loc[i], df.loc[j], similarity_cols)
if sim >= similarity_threshold:
edge_data = {
"id": f"edge_{i}_{j}",
"source": str(i),
"target": str(j),
"label": DEFAULT_REL_TYPE,
"similarity": round(sim, 3)
}
edges.append({"data": edge_data})
elements = {"nodes": nodes, "edges": edges}
st.success("Entity Resolution complete! Network graph built.")
# ------------
# Visualization
st.markdown("### Network Graph")
node_labels = set(node["data"]["label"] for node in elements["nodes"])
rel_labels = set(edge["data"]["label"] for edge in elements["edges"])
# Basic styling
default_colors = ["#2A629A", "#FF7F3E", "#C0C0C0", "#008000", "#800080"]
node_styles = []
for i, label in enumerate(sorted(node_labels)):
color = default_colors[i % len(default_colors)]
node_styles.append(NodeStyle(label=label, color=color, caption="name"))
edge_styles = []
for rel in sorted(rel_labels):
edge_styles.append(EdgeStyle(rel, caption="similarity", directed=False))
st_link_analysis(
elements,
layout="cose",
node_styles=node_styles,
edge_styles=edge_styles
)
# ------------
# Community Detection & CSV Export
st.markdown("### Community Detection Results")
# Find connected components (communities)
if HAS_NETWORKX:
# Use NetworkX if available
G = nx.Graph()
for node in elements["nodes"]:
G.add_node(node["data"]["id"])
for edge in elements["edges"]:
G.add_edge(edge["data"]["source"], edge["data"]["target"])
communities = list(nx.connected_components(G))
else:
# Use manual implementation as fallback
st.info("NetworkX not found. Using manual connected components algorithm. Install NetworkX for better performance: `pip install networkx`")
node_ids = [node["data"]["id"] for node in elements["nodes"]]
communities = find_connected_components_manual(node_ids, elements["edges"])
# Create a mapping from node_id to community_id
node_to_community = {}
community_uuids = {}
for i, community in enumerate(communities):
community_uuid = str(uuid.uuid4())
community_uuids[i] = community_uuid
for node_id in community:
node_to_community[node_id] = community_uuid
# Add community IDs to the original dataframe
df_with_communities = df.copy()
df_with_communities['community_id'] = [
node_to_community.get(str(idx), str(uuid.uuid4()))
for idx in df_with_communities.index
]
st.write(f"**Found {len(communities)} communities:**")
for i, community in enumerate(communities):
st.write(f"- Community {i+1}: {len(community)} records (UUID: {community_uuids[i]})")
# Show the results dataframe
st.markdown("#### Results with Community IDs")
st.dataframe(df_with_communities)
# ------------
# Canonical View Section
st.markdown("#### Canonical View (Representative Records by Community)")
# Create canonical view - one representative record per community
canonical_records = []
for i, community in enumerate(communities):
community_uuid = community_uuids[i]
# Get all records in this community
community_rows = df_with_communities[df_with_communities['community_id'] == community_uuid]
if len(community_rows) > 0:
# Use the first record as the representative, but could use most complete, most recent, etc.
representative = community_rows.iloc[0].copy()
# Add community metadata
representative['community_size'] = len(community_rows)
representative['community_members'] = f"{len(community_rows)} records"
# Create a summary of variations if there are multiple records
if len(community_rows) > 1:
variations = []
for col in similarity_cols:
if col in community_rows.columns:
unique_vals = community_rows[col].dropna().unique()
if len(unique_vals) > 1:
variations.append(f"{col}: {len(unique_vals)} variants")
representative['variations'] = "; ".join(variations) if variations else "No variations"
else:
representative['variations'] = "Single record"
canonical_records.append(representative)
if canonical_records:
canonical_df = pd.DataFrame(canonical_records)
# Select and reorder columns for canonical view
display_cols = []
# Always include community info first
display_cols.extend(['community_id', 'community_members', 'variations'])
# Add similarity columns if they exist
for col in similarity_cols:
if col in canonical_df.columns:
display_cols.append(col)
# Add other important columns (avoiding duplicates)
for col in canonical_df.columns:
if col not in display_cols and col not in ['community_size']:
display_cols.append(col)
# Filter to only existing columns
display_cols = [col for col in display_cols if col in canonical_df.columns]
canonical_display = canonical_df[display_cols]
st.dataframe(canonical_display)
st.info(f"πŸ“‹ Showing {len(canonical_display)} canonical records representing {len(communities)} communities")
# Export canonical view option
canonical_csv_buffer = io.StringIO()
canonical_display.to_csv(canonical_csv_buffer, index=False)
canonical_csv_data = canonical_csv_buffer.getvalue()
st.download_button(
label="πŸ“₯ Download Canonical View as CSV",
data=canonical_csv_data,
file_name="canonical_entity_resolution.csv",
mime="text/csv",
key="canonical_download"
)
else:
st.warning("No canonical records to display")
# CSV Export option
st.markdown("#### Export Results")
csv_buffer = io.StringIO()
df_with_communities.to_csv(csv_buffer, index=False)
csv_data = csv_buffer.getvalue()
st.download_button(
label="πŸ“₯ Download Results as CSV",
data=csv_data,
file_name="entity_resolution_results.csv",
mime="text/csv"
)
# ------------
# Red-lining (moved to bottom as lower priority)
if show_redlining and len(edges) > 0:
st.markdown("### Top Similar Pairs (Red-Lined Differences)")
# Filter out exact matches (similarity == 1.0)
filtered_edges = [
edge for edge in edges if edge["data"]["similarity"] < 1.0
]
# Sort by highest similarity (closest matches first)
sorted_edges = sorted(filtered_edges, key=lambda e: e["data"]["similarity"], reverse=True)
top_edges = sorted_edges[:MAX_REDLINE_PREVIEW]
if not top_edges:
st.info("No slightly different pairs found; all matches are exact or none meet the threshold.")
else:
for edge_item in top_edges:
s_idx = int(edge_item["data"]["source"])
t_idx = int(edge_item["data"]["target"])
sim_val = edge_item["data"]["similarity"]
st.markdown(f"**Pair:** Row {s_idx} ↔ Row {t_idx}, **similarity**={sim_val}")
# Highlight differences in selected columns
mismatch_cols = []
for col in similarity_cols:
val1 = str(df.loc[s_idx, col])
val2 = str(df.loc[t_idx, col])
if val1.lower() != val2.lower():
mismatch_cols.append((col, val1, val2))
if mismatch_cols:
st.write("Differences in the following columns:")
for col_name, str1, str2 in mismatch_cols:
redlined = redline_text(str1, str2)
st.markdown(f"&nbsp;&nbsp;**{col_name}:** {redlined}", unsafe_allow_html=True)
else:
st.write("No differences in the compared columns.")
st.markdown("---")
# ------------
# Enterprise Scale Note
st.markdown("---")
st.markdown("### πŸ“ˆ Enterprise Scale Solutions")
if not HAS_NETWORKX:
st.warning("""
**Missing NetworkX Dependency**
For better performance, install NetworkX:
```bash
pip install networkx
```
""")
st.info("""
**Need help with larger scale deployments?**
If you need to persist UUIDs from run to run, handle larger datasets, or require more sophisticated
entity resolution capabilities, you may need an enterprise-scale solution. Consider:
- **Database Integration**: Store community IDs in a persistent database
- **Incremental Processing**: Handle new data without re-processing everything
- **Advanced Blocking**: Use more sophisticated blocking strategies for large datasets
- **Distributed Computing**: Scale across multiple machines for very large datasets
- **Custom ML Models**: Train domain-specific models for better accuracy
Contact **Eastridge Analytics** for guidance on enterprise implementations.
""")
else:
st.info("Please upload a CSV file in the sidebar to begin.")