Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.cluster import KMeans | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from datetime import datetime, timedelta | |
| import random | |
| import string | |
| import re | |
| def generate_sample_data(): | |
| """Generate sample payment data for demonstration""" | |
| vendors = ['ABC Corp', 'XYZ Ltd', 'Tech Solutions', 'Global Services', 'ABC Corp', 'XYZ Ltd'] | |
| descriptions = ['Software License', 'Consulting Services', 'Hardware Purchase', 'Monthly Subscription'] | |
| data = [] | |
| base_date = datetime.now() - timedelta(days=30) | |
| for i in range(50): | |
| vendor = random.choice(vendors) | |
| amount = round(random.uniform(100, 5000), 2) | |
| date = base_date + timedelta(days=random.randint(0, 30)) | |
| invoice = f"INV-{random.randint(1000, 9999)}" | |
| description = random.choice(descriptions) | |
| # Create some intentional duplicates | |
| if i % 8 == 0 and i > 0: # Every 8th record, create a near-duplicate | |
| prev_record = data[i-1] | |
| vendor = prev_record['Vendor'] | |
| amount = prev_record['Amount'] + random.uniform(-10, 10) # Slight variation | |
| description = prev_record['Description'] | |
| data.append({ | |
| 'Vendor': vendor, | |
| 'Amount': amount, | |
| 'Date': date.strftime('%Y-%m-%d'), | |
| 'Invoice': invoice, | |
| 'Description': description | |
| }) | |
| return pd.DataFrame(data) | |
| def preprocess_data(df): | |
| """Preprocess data for K-means clustering""" | |
| # Create numerical features | |
| features = [] | |
| # Amount feature | |
| amounts = df['Amount'].values.reshape(-1, 1) | |
| scaler_amount = StandardScaler() | |
| amounts_scaled = scaler_amount.fit_transform(amounts) | |
| # Date feature (days since earliest date) | |
| df['Date'] = pd.to_datetime(df['Date']) | |
| min_date = df['Date'].min() | |
| date_features = (df['Date'] - min_date).dt.days.values.reshape(-1, 1) | |
| scaler_date = StandardScaler() | |
| date_features_scaled = scaler_date.fit_transform(date_features) | |
| # Text features for vendor and description | |
| text_data = df['Vendor'].astype(str) + ' ' + df['Description'].astype(str) | |
| vectorizer = TfidfVectorizer(max_features=10, stop_words='english') | |
| text_features = vectorizer.fit_transform(text_data).toarray() | |
| # Combine all features | |
| features = np.hstack([amounts_scaled, date_features_scaled, text_features]) | |
| return features | |
| def detect_duplicates(df, n_clusters=5): | |
| """Detect potential duplicate payments using K-means clustering""" | |
| if len(df) < 2: | |
| return df, "Not enough data for analysis" | |
| try: | |
| # Preprocess data | |
| features = preprocess_data(df) | |
| # Apply K-means clustering | |
| n_clusters = min(n_clusters, len(df)) | |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) | |
| clusters = kmeans.fit_predict(features) | |
| # Add cluster information to dataframe | |
| df_result = df.copy() | |
| df_result['Cluster'] = clusters | |
| # Find potential duplicates (clusters with multiple entries) | |
| duplicate_pairs = [] | |
| cluster_counts = pd.Series(clusters).value_counts() | |
| for cluster_id in cluster_counts[cluster_counts > 1].index: | |
| cluster_data = df_result[df_result['Cluster'] == cluster_id] | |
| # Calculate similarity within cluster | |
| for i, row1 in cluster_data.iterrows(): | |
| for j, row2 in cluster_data.iterrows(): | |
| if i < j: # Avoid duplicate pairs | |
| # Calculate similarity score | |
| vendor_match = 1 if row1['Vendor'].lower() == row2['Vendor'].lower() else 0 | |
| amount_diff = abs(row1['Amount'] - row2['Amount']) | |
| amount_similarity = max(0, 1 - amount_diff / max(row1['Amount'], row2['Amount'])) | |
| desc_similarity = len(set(row1['Description'].lower().split()) & | |
| set(row2['Description'].lower().split())) / \ | |
| len(set(row1['Description'].lower().split()) | | |
| set(row2['Description'].lower().split())) | |
| similarity_score = (vendor_match * 0.4 + amount_similarity * 0.4 + desc_similarity * 0.2) | |
| if similarity_score > 0.5: # Threshold for potential duplicates | |
| duplicate_pairs.append({ | |
| 'Index_1': i, | |
| 'Index_2': j, | |
| 'Vendor_1': row1['Vendor'], | |
| 'Vendor_2': row2['Vendor'], | |
| 'Amount_1': row1['Amount'], | |
| 'Amount_2': row2['Amount'], | |
| 'Date_1': row1['Date'].strftime('%Y-%m-%d') if hasattr(row1['Date'], 'strftime') else row1['Date'], | |
| 'Date_2': row2['Date'].strftime('%Y-%m-%d') if hasattr(row2['Date'], 'strftime') else row2['Date'], | |
| 'Invoice_1': row1['Invoice'], | |
| 'Invoice_2': row2['Invoice'], | |
| 'Description_1': row1['Description'], | |
| 'Description_2': row2['Description'], | |
| 'Similarity_Score': round(similarity_score * 100, 2), | |
| 'Cluster': cluster_id | |
| }) | |
| if duplicate_pairs: | |
| duplicate_df = pd.DataFrame(duplicate_pairs) | |
| duplicate_df = duplicate_df.sort_values('Similarity_Score', ascending=False) | |
| return duplicate_df, f"Found {len(duplicate_pairs)} potential duplicate pairs" | |
| else: | |
| return pd.DataFrame(), "No potential duplicates found" | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error in analysis: {str(e)}" | |
| def analyze_payments(file_input, n_clusters): | |
| """Main analysis function""" | |
| try: | |
| if file_input is None: | |
| return pd.DataFrame(), "Please upload a file or load sample data" | |
| # Read the uploaded file | |
| if file_input.name.endswith('.csv'): | |
| df = pd.read_csv(file_input.name) | |
| elif file_input.name.endswith(('.xlsx', '.xls')): | |
| df = pd.read_excel(file_input.name) | |
| else: | |
| return pd.DataFrame(), "Please upload a CSV or Excel file" | |
| # Check required columns | |
| required_columns = ['Vendor', 'Amount', 'Date', 'Invoice', 'Description'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| return pd.DataFrame(), f"Missing required columns: {', '.join(missing_columns)}" | |
| # Perform duplicate detection | |
| result_df, message = detect_duplicates(df, n_clusters) | |
| return result_df, message | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error processing file: {str(e)}" | |
| def load_sample(): | |
| """Load sample data for demonstration""" | |
| sample_df = generate_sample_data() | |
| # Save to temporary file | |
| sample_df.to_csv("sample_data.csv", index=False) | |
| return "sample_data.csv" | |
| # Create Gradio interface | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Vendor Duplicate Analyzer") as app: | |
| # Header | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 10px; margin-bottom: 20px;"> | |
| <h1 style="color: white; margin: 0; font-size: 2.5em;">π Vendor Duplicate Analyzer</h1> | |
| <p style="color: white; margin: 10px 0 0 0; font-size: 1.2em;">Using K-means Clustering for Duplicate Detection</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.HTML("<h3>π€ Upload Data</h3>") | |
| file_input = gr.File( | |
| label="Upload Payment CSV/Excel", | |
| file_types=[".csv", ".xlsx", ".xls"], | |
| type="filepath" | |
| ) | |
| gr.HTML(""" | |
| <div style="margin: 15px 0; padding: 15px; background-color: #f8f9fa; border-radius: 8px; border-left: 4px solid #17a2b8;"> | |
| <strong>π CSV Format Required</strong><br> | |
| Your CSV must have columns: <strong>Vendor, Amount, Date, Invoice, Description</strong> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| sample_btn = gr.Button("π Load Sample Data", variant="secondary") | |
| analyze_btn = gr.Button("π Analyze with K-means", variant="primary") | |
| gr.HTML("<h3>βοΈ Parameters</h3>") | |
| n_clusters = gr.Slider( | |
| minimum=2, | |
| maximum=10, | |
| value=5, | |
| step=1, | |
| label="Number of Clusters", | |
| info="K-means will group similar payments into this many clusters" | |
| ) | |
| gr.HTML(""" | |
| <div style="margin-top: 20px; padding: 15px; background-color: #e8f4f8; border-radius: 8px;"> | |
| <strong>π€ How K-means Works Here</strong> | |
| <ol style="margin: 10px 0 0 20px;"> | |
| <li>Extracts numerical features from payment records</li> | |
| <li>Groups similar payments into clusters</li> | |
| <li>Compares payments within each cluster</li> | |
| <li>Calculates similarity scores for potential duplicates</li> | |
| </ol> | |
| </div> | |
| """) | |
| with gr.Column(scale=2): | |
| gr.HTML("<h3>π Results</h3>") | |
| status_message = gr.Textbox( | |
| label="Analysis Status", | |
| interactive=False, | |
| placeholder="Upload data and click 'Analyze' to begin..." | |
| ) | |
| results_table = gr.Dataframe( | |
| label="Potential Duplicate Pairs Found by K-means", | |
| interactive=False, | |
| wrap=True, | |
| column_widths=["10%", "15%", "15%", "10%", "10%", "10%", "10%", "10%", "10%"] | |
| ) | |
| # Event handlers | |
| def handle_sample_load(): | |
| sample_file = load_sample() | |
| return sample_file, "Sample data loaded successfully! Click 'Analyze' to detect duplicates." | |
| def handle_analysis(file_input, n_clusters): | |
| if file_input is None: | |
| return "Please upload a file first!", pd.DataFrame() | |
| result_df, message = analyze_payments(file_input, n_clusters) | |
| return message, result_df | |
| sample_btn.click( | |
| fn=handle_sample_load, | |
| outputs=[file_input, status_message] | |
| ) | |
| analyze_btn.click( | |
| fn=handle_analysis, | |
| inputs=[file_input, n_clusters], | |
| outputs=[status_message, results_table] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| app.launch(share=True) |