prernajeet01's picture
Update app.py
255aa7a verified
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from datetime import datetime, timedelta
import random
import string
import re
def generate_sample_data():
"""Generate sample payment data for demonstration"""
vendors = ['ABC Corp', 'XYZ Ltd', 'Tech Solutions', 'Global Services', 'ABC Corp', 'XYZ Ltd']
descriptions = ['Software License', 'Consulting Services', 'Hardware Purchase', 'Monthly Subscription']
data = []
base_date = datetime.now() - timedelta(days=30)
for i in range(50):
vendor = random.choice(vendors)
amount = round(random.uniform(100, 5000), 2)
date = base_date + timedelta(days=random.randint(0, 30))
invoice = f"INV-{random.randint(1000, 9999)}"
description = random.choice(descriptions)
# Create some intentional duplicates
if i % 8 == 0 and i > 0: # Every 8th record, create a near-duplicate
prev_record = data[i-1]
vendor = prev_record['Vendor']
amount = prev_record['Amount'] + random.uniform(-10, 10) # Slight variation
description = prev_record['Description']
data.append({
'Vendor': vendor,
'Amount': amount,
'Date': date.strftime('%Y-%m-%d'),
'Invoice': invoice,
'Description': description
})
return pd.DataFrame(data)
def preprocess_data(df):
"""Preprocess data for K-means clustering"""
# Create numerical features
features = []
# Amount feature
amounts = df['Amount'].values.reshape(-1, 1)
scaler_amount = StandardScaler()
amounts_scaled = scaler_amount.fit_transform(amounts)
# Date feature (days since earliest date)
df['Date'] = pd.to_datetime(df['Date'])
min_date = df['Date'].min()
date_features = (df['Date'] - min_date).dt.days.values.reshape(-1, 1)
scaler_date = StandardScaler()
date_features_scaled = scaler_date.fit_transform(date_features)
# Text features for vendor and description
text_data = df['Vendor'].astype(str) + ' ' + df['Description'].astype(str)
vectorizer = TfidfVectorizer(max_features=10, stop_words='english')
text_features = vectorizer.fit_transform(text_data).toarray()
# Combine all features
features = np.hstack([amounts_scaled, date_features_scaled, text_features])
return features
def detect_duplicates(df, n_clusters=5):
"""Detect potential duplicate payments using K-means clustering"""
if len(df) < 2:
return df, "Not enough data for analysis"
try:
# Preprocess data
features = preprocess_data(df)
# Apply K-means clustering
n_clusters = min(n_clusters, len(df))
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
clusters = kmeans.fit_predict(features)
# Add cluster information to dataframe
df_result = df.copy()
df_result['Cluster'] = clusters
# Find potential duplicates (clusters with multiple entries)
duplicate_pairs = []
cluster_counts = pd.Series(clusters).value_counts()
for cluster_id in cluster_counts[cluster_counts > 1].index:
cluster_data = df_result[df_result['Cluster'] == cluster_id]
# Calculate similarity within cluster
for i, row1 in cluster_data.iterrows():
for j, row2 in cluster_data.iterrows():
if i < j: # Avoid duplicate pairs
# Calculate similarity score
vendor_match = 1 if row1['Vendor'].lower() == row2['Vendor'].lower() else 0
amount_diff = abs(row1['Amount'] - row2['Amount'])
amount_similarity = max(0, 1 - amount_diff / max(row1['Amount'], row2['Amount']))
desc_similarity = len(set(row1['Description'].lower().split()) &
set(row2['Description'].lower().split())) / \
len(set(row1['Description'].lower().split()) |
set(row2['Description'].lower().split()))
similarity_score = (vendor_match * 0.4 + amount_similarity * 0.4 + desc_similarity * 0.2)
if similarity_score > 0.5: # Threshold for potential duplicates
duplicate_pairs.append({
'Index_1': i,
'Index_2': j,
'Vendor_1': row1['Vendor'],
'Vendor_2': row2['Vendor'],
'Amount_1': row1['Amount'],
'Amount_2': row2['Amount'],
'Date_1': row1['Date'].strftime('%Y-%m-%d') if hasattr(row1['Date'], 'strftime') else row1['Date'],
'Date_2': row2['Date'].strftime('%Y-%m-%d') if hasattr(row2['Date'], 'strftime') else row2['Date'],
'Invoice_1': row1['Invoice'],
'Invoice_2': row2['Invoice'],
'Description_1': row1['Description'],
'Description_2': row2['Description'],
'Similarity_Score': round(similarity_score * 100, 2),
'Cluster': cluster_id
})
if duplicate_pairs:
duplicate_df = pd.DataFrame(duplicate_pairs)
duplicate_df = duplicate_df.sort_values('Similarity_Score', ascending=False)
return duplicate_df, f"Found {len(duplicate_pairs)} potential duplicate pairs"
else:
return pd.DataFrame(), "No potential duplicates found"
except Exception as e:
return pd.DataFrame(), f"Error in analysis: {str(e)}"
def analyze_payments(file_input, n_clusters):
"""Main analysis function"""
try:
if file_input is None:
return pd.DataFrame(), "Please upload a file or load sample data"
# Read the uploaded file
if file_input.name.endswith('.csv'):
df = pd.read_csv(file_input.name)
elif file_input.name.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_input.name)
else:
return pd.DataFrame(), "Please upload a CSV or Excel file"
# Check required columns
required_columns = ['Vendor', 'Amount', 'Date', 'Invoice', 'Description']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return pd.DataFrame(), f"Missing required columns: {', '.join(missing_columns)}"
# Perform duplicate detection
result_df, message = detect_duplicates(df, n_clusters)
return result_df, message
except Exception as e:
return pd.DataFrame(), f"Error processing file: {str(e)}"
def load_sample():
"""Load sample data for demonstration"""
sample_df = generate_sample_data()
# Save to temporary file
sample_df.to_csv("sample_data.csv", index=False)
return "sample_data.csv"
# Create Gradio interface
with gr.Blocks(theme=gr.themes.Soft(), title="Vendor Duplicate Analyzer") as app:
# Header
gr.HTML("""
<div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 10px; margin-bottom: 20px;">
<h1 style="color: white; margin: 0; font-size: 2.5em;">πŸ” Vendor Duplicate Analyzer</h1>
<p style="color: white; margin: 10px 0 0 0; font-size: 1.2em;">Using K-means Clustering for Duplicate Detection</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.HTML("<h3>πŸ“€ Upload Data</h3>")
file_input = gr.File(
label="Upload Payment CSV/Excel",
file_types=[".csv", ".xlsx", ".xls"],
type="filepath"
)
gr.HTML("""
<div style="margin: 15px 0; padding: 15px; background-color: #f8f9fa; border-radius: 8px; border-left: 4px solid #17a2b8;">
<strong>πŸ“‹ CSV Format Required</strong><br>
Your CSV must have columns: <strong>Vendor, Amount, Date, Invoice, Description</strong>
</div>
""")
with gr.Row():
sample_btn = gr.Button("πŸ“Š Load Sample Data", variant="secondary")
analyze_btn = gr.Button("πŸ” Analyze with K-means", variant="primary")
gr.HTML("<h3>βš™οΈ Parameters</h3>")
n_clusters = gr.Slider(
minimum=2,
maximum=10,
value=5,
step=1,
label="Number of Clusters",
info="K-means will group similar payments into this many clusters"
)
gr.HTML("""
<div style="margin-top: 20px; padding: 15px; background-color: #e8f4f8; border-radius: 8px;">
<strong>πŸ€– How K-means Works Here</strong>
<ol style="margin: 10px 0 0 20px;">
<li>Extracts numerical features from payment records</li>
<li>Groups similar payments into clusters</li>
<li>Compares payments within each cluster</li>
<li>Calculates similarity scores for potential duplicates</li>
</ol>
</div>
""")
with gr.Column(scale=2):
gr.HTML("<h3>πŸ“Š Results</h3>")
status_message = gr.Textbox(
label="Analysis Status",
interactive=False,
placeholder="Upload data and click 'Analyze' to begin..."
)
results_table = gr.Dataframe(
label="Potential Duplicate Pairs Found by K-means",
interactive=False,
wrap=True,
column_widths=["10%", "15%", "15%", "10%", "10%", "10%", "10%", "10%", "10%"]
)
# Event handlers
def handle_sample_load():
sample_file = load_sample()
return sample_file, "Sample data loaded successfully! Click 'Analyze' to detect duplicates."
def handle_analysis(file_input, n_clusters):
if file_input is None:
return "Please upload a file first!", pd.DataFrame()
result_df, message = analyze_payments(file_input, n_clusters)
return message, result_df
sample_btn.click(
fn=handle_sample_load,
outputs=[file_input, status_message]
)
analyze_btn.click(
fn=handle_analysis,
inputs=[file_input, n_clusters],
outputs=[status_message, results_table]
)
# Launch the app
if __name__ == "__main__":
app.launch(share=True)