Spaces:
Sleeping
Sleeping
| import re | |
| import random | |
| import pandas as pd | |
| import json | |
| from textblob import Word | |
| from rapidfuzz import fuzz as rapidfuzz_fuzz | |
| from fuzzywuzzy import fuzz as fuzzywuzzy_fuzz | |
| from Levenshtein import ratio as levenshtein_ratio, jaro_winkler as levenshtein_jaro_winkler | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from flask import Flask, request, render_template, send_file, redirect, url_for, flash, jsonify | |
| import io | |
| import os | |
| import numpy as np | |
| from wordcloud import WordCloud | |
| import textdistance | |
| import chardet | |
| # --- New import for SBERT & parallel processing --- | |
| from sentence_transformers import SentenceTransformer | |
| import concurrent.futures | |
| from tqdm import tqdm | |
| app = Flask(__name__) | |
| os.environ["HF_HOME"] = os.path.expanduser("~/.cache") | |
| # Global variables | |
| latest_results_df = None | |
| original_df1 = None | |
| original_df2 = None | |
| app.secret_key = '1cdddf3025ba915f2f32baf15d00a79fe63a8dce49935c2f' | |
| # File to store persistent feedback mapping | |
| FEEDBACK_FILE = "feedback_mapping.json" | |
| ######################################### | |
| # Persistent Feedback Storage Functions | |
| ######################################### | |
| def load_feedback_mapping(): | |
| """Load feedback mapping from FEEDBACK_FILE if it exists; otherwise, return an empty dict.""" | |
| if os.path.exists(FEEDBACK_FILE): | |
| with open(FEEDBACK_FILE, "r") as f: | |
| try: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| else: | |
| return {} | |
| def save_feedback_mapping(mapping): | |
| """Save the feedback mapping dictionary to FEEDBACK_FILE.""" | |
| with open(FEEDBACK_FILE, "w") as f: | |
| json.dump(mapping, f, indent=4) | |
| def update_feedback_mapping(invoice1, invoice2): | |
| """Update the mapping with a new entry and persist it to file.""" | |
| mapping = load_feedback_mapping() | |
| mapping[invoice1] = invoice2 | |
| save_feedback_mapping(mapping) | |
| ######################################### | |
| # SBERT Initialization and Helper Function | |
| ######################################### | |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| def generate_embeddings(df, column_name): | |
| sentences = df[column_name].tolist() | |
| embeddings = model.encode(sentences, normalize_embeddings=True) | |
| return embeddings | |
| ######################################### | |
| # Invoice Matching Functions (Part 1) | |
| ######################################### | |
| def remove_year_patterns(s): | |
| if pd.isna(s): | |
| return "" | |
| s = str(s) | |
| s = re.sub(r'\(?\b(?:19|20)?\d{2,4}\s*[-/]\s*(?:19|20)?\d{2,4}\b\)?', '', s) | |
| s = re.sub(r'[,;]\s*\b(?:19|20)?\d{2,4}\b', '', s) | |
| s = re.sub(r'\b(?:19|20)?\d{2,4}\b[,;]', '', s) | |
| s = re.sub(r'\b(19|20)\d{2}\b', '', s) | |
| return s.strip() | |
| def remove_leading_and_adjacent_zeros(s): | |
| s = re.sub(r'\b0+(?=\d)', '', s) | |
| s = re.sub(r'0(?=[A-Za-z])', '', s) | |
| return s | |
| def remove_prefix_dash(s): | |
| return re.sub(r'^[A-Za-z0-9]+[-]', '', s) | |
| def normalize_for_comparison(s): | |
| if pd.isna(s): | |
| return "" | |
| s = str(s).lower().strip() | |
| s = re.sub(r'[\s\-\_,/]+', '', s) | |
| s = re.sub(r'(?<=\d)o|o(?=\d)', '0', s) | |
| return s | |
| def extract_invoice_parts(invoice): | |
| cleaned = re.sub(r'[^a-zA-Z0-9]', '', invoice) | |
| match = re.match(r'^([a-zA-Z]*)(\d+)([a-zA-Z]*)$', cleaned) | |
| if match: | |
| prefix = match.group(1) or "" | |
| numeric_core = match.group(2) | |
| suffix = match.group(3) or "" | |
| return prefix, numeric_core, suffix | |
| return None, None, None | |
| def robust_preprocess_invoice(invoice): | |
| if pd.isna(invoice): | |
| return "" | |
| invoice = str(invoice) | |
| invoice = remove_year_patterns(invoice) | |
| invoice = invoice.lower() | |
| invoice = re.sub(r'bill\s*(?:no\.?|#)\s*:?', '', invoice, flags=re.IGNORECASE) | |
| bill_match = re.search(r'bill\s*(?:no\.?|#)\s*:?\s*([0-9a-zA-Z]+)', invoice, flags=re.IGNORECASE) | |
| if bill_match: | |
| best_seg = bill_match.group(1) | |
| else: | |
| segments = re.split(r'[-/]', invoice) | |
| segments = [seg.strip() for seg in segments if seg.strip()] | |
| best_seg = max(segments, key=lambda seg: len(re.findall(r'\d', seg))) if segments else invoice | |
| best_seg = best_seg.replace("_", "") | |
| KNOWN_INVOICE_VARIANTS = [ | |
| "inv", "invoice", "invoce", "in", "inve", "salesrefno", | |
| "ompl", "insc", "indbo", "kolbo", "thn", "invoiceno", "sales" | |
| ] | |
| for variant in KNOWN_INVOICE_VARIANTS: | |
| best_seg = re.sub(r'^' + variant, '', best_seg, flags=re.IGNORECASE) | |
| best_seg = re.sub(variant + r'$', '', best_seg, flags=re.IGNORECASE) | |
| best_seg = re.sub(r'[\s\-\_,/]+', '', best_seg) | |
| best_seg = remove_leading_and_adjacent_zeros(best_seg) | |
| prefix, core, suffix = extract_invoice_parts(best_seg) | |
| if prefix is None: | |
| return best_seg | |
| if core: | |
| try: | |
| core = str(int(core)) | |
| except Exception: | |
| core = core.lstrip("0") or "0" | |
| return prefix + core + suffix | |
| def extract_numeric_core(invoice): | |
| numbers = re.findall(r'\d+', invoice) | |
| return max(numbers, key=len) if numbers else "" | |
| def determine_invoice_type(invoice): | |
| p, core, s = extract_invoice_parts(invoice) | |
| if p is None: | |
| return "other" | |
| if p == "" and s == "": | |
| return "core_only" | |
| if p != "" and s == "": | |
| return "prefix_only" | |
| if p == "" and s != "": | |
| return "suffix_only" | |
| if p != "" and s != "": | |
| return "both" | |
| return "other" | |
| def check_boost_condition(s1, s2): | |
| n1 = robust_preprocess_invoice(s1) | |
| n2 = robust_preprocess_invoice(s2) | |
| p1, core1, sfx1 = extract_invoice_parts(n1) | |
| p2, core2, sfx2 = extract_invoice_parts(n2) | |
| if p1 is None or p2 is None or core1 != core2: | |
| return False | |
| type1 = determine_invoice_type(n1) | |
| type2 = determine_invoice_type(n2) | |
| if (type1 == "core_only" and type2 in {"prefix_only", "suffix_only"}) or \ | |
| (type2 == "core_only" and type1 in {"prefix_only", "suffix_only"}): | |
| return True | |
| if (p1 and not p2) or (p2 and not p1): | |
| return True | |
| if (sfx1 and not sfx2) or (sfx2 and not sfx1): | |
| return True | |
| if p1 and sfx2 and rapidfuzz_fuzz.ratio(p1, sfx2) > 90: | |
| return True | |
| if p2 and sfx1 and rapidfuzz_fuzz.ratio(p2, sfx1) > 90: | |
| return True | |
| return False | |
| def levenshtein_sim(s1, s2): | |
| return rapidfuzz_fuzz.ratio(s1, s2) | |
| def jaro_winkler_sim(s1, s2): | |
| return textdistance.jaro_winkler.normalized_similarity(s1, s2) * 100 | |
| def rapidfuzz_sim(s1, s2): | |
| return rapidfuzz_fuzz.ratio(s1, s2) | |
| def fuzzbuzz_sim(s1, s2): | |
| return rapidfuzz_fuzz.token_set_ratio(s1, s2) | |
| def hamming_sim(s1, s2): | |
| if not s1 and not s2: | |
| return 100 | |
| max_len = max(len(s1), len(s2)) | |
| match_count = sum(ch1 == ch2 for ch1, ch2 in zip(s1, s2)) | |
| return (match_count / max_len) * 100 | |
| def jaccard_sim(s1, s2): | |
| set1, set2 = set(s1), set(s2) | |
| if not set1 and not set2: | |
| return 100 | |
| return (len(set1.intersection(set2)) / len(set1.union(set2))) * 100 | |
| def cosine_sim(s1, s2): | |
| if not s1.strip() or not s2.strip(): | |
| return 0.0 | |
| vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4)) | |
| try: | |
| tfidf = vectorizer.fit_transform([s1, s2]) | |
| if tfidf.shape[1] == 0: | |
| return 0.0 | |
| cos_sim = cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0] | |
| return cos_sim * 100 | |
| except ValueError: | |
| return 0.0 | |
| def custom_trailing_match(s1, s2): | |
| s1 = str(s1) | |
| s2 = str(s2) | |
| s1_lower = s1.lower() | |
| if not (s1_lower.startswith("p") or s1_lower.startswith("jp")): | |
| return False | |
| digits = re.sub(r'\D', '', s1) | |
| if len(digits) <= 2: | |
| modified = digits | |
| else: | |
| middle = digits[1:-1].replace("0", "") | |
| modified = digits[0] + middle + digits[-1] | |
| return modified.endswith(s2) | |
| def combined_similarity(s1, s2): | |
| if s1.strip().lower() == s2.strip().lower(): | |
| return 100 | |
| s1_proc = robust_preprocess_invoice(s1) | |
| s2_proc = robust_preprocess_invoice(s2) | |
| if custom_trailing_match(s1_proc, s2_proc): | |
| return 95 | |
| scores = [ | |
| levenshtein_sim(s1_proc, s2_proc), | |
| jaro_winkler_sim(s1_proc, s2_proc), | |
| rapidfuzz_sim(s1_proc, s2_proc), | |
| fuzzbuzz_sim(s1_proc, s2_proc), | |
| hamming_sim(s1_proc, s2_proc), | |
| jaccard_sim(s1_proc, s2_proc), | |
| cosine_sim(s1_proc, s2_proc) | |
| ] | |
| avg_score = sum(scores) / len(scores) | |
| p1, core1, sfx1 = extract_invoice_parts(s1_proc) | |
| p2, core2, sfx2 = extract_invoice_parts(s2_proc) | |
| if core1 and core2 and core1 == core2: | |
| if (p1 and not p2) or (p2 and not p1) or (sfx1 and not sfx2) or (sfx2 and not sfx1) or (p1 and sfx2) or (p2 and sfx1): | |
| avg_score = max(avg_score, 90) | |
| def extract_numeric(s): | |
| numbers = re.findall(r'\d+', s) | |
| return max(numbers, key=len) if numbers else "" | |
| num1 = extract_numeric(s1_proc) | |
| num2 = extract_numeric(s2_proc) | |
| try: | |
| if int(num1) != int(num2): | |
| avg_score *= 0.5 | |
| except Exception: | |
| if num1 != num2: | |
| avg_score *= 0.5 | |
| if avg_score >= 100: | |
| avg_score = random.uniform(90, 99) | |
| return avg_score | |
| def generate_review_status(score): | |
| return "No Review Needed" if score > 50 else "Needs Review" | |
| def generate_recommendation(score): | |
| if score == 100: | |
| return "Exact Match" | |
| if score >= 50: | |
| return "Partial Match" | |
| else: | |
| return "Unmatched" | |
| def generate_reason(inv1, inv2, score): | |
| inv1 = str(inv1) | |
| inv2 = str(inv2) | |
| if custom_trailing_match(inv1, inv2): | |
| return "Custom trailing-match pattern detected." | |
| if inv1.lower() == inv2.lower(): | |
| return "Exact match of invoice numbers." | |
| p1, core1, sfx1 = extract_invoice_parts(normalize_for_comparison(inv1)) | |
| p2, core2, sfx2 = extract_invoice_parts(normalize_for_comparison(inv2)) | |
| if core1 is not None and core2 is not None: | |
| if core1 != core2: | |
| return "Numeric core does not match." | |
| if len(core1) != len(core2) and core1.lstrip("0") == core2.lstrip("0"): | |
| return "Numeric padding mismatch (leading zeros removed)." | |
| if p1 and p2 and p1 != p2: | |
| return "Different prefixes found, affecting similarity." | |
| if sfx1 and sfx2 and sfx1 != sfx2: | |
| return "Different suffixes detected, leading to mismatch." | |
| if p1 and not p2: | |
| return "Partial matching: one invoice has a prefix while the other does not." | |
| if sfx1 and not sfx2: | |
| return "Partial matching: one invoice has a suffix while the other does not." | |
| if score >= 50: | |
| if inv1.lower() == inv2.lower(): | |
| return "Identical invoice numbers except for case differences." | |
| if p1 and sfx2 and rapidfuzz_fuzz.ratio(p1, sfx2) > 90: | |
| return "Prefix in one invoice matches suffix in the other." | |
| if any(sep in inv1 or sep in inv2 for sep in [" ", "-", "_"]): | |
| return "Strong match; only minor formatting variations." | |
| if inv1 in inv2 or inv2 in inv1: | |
| return "One invoice is fully contained in the other." | |
| return "Invoices match with minimal differences." | |
| if any(sep in inv1 or sep in inv2 for sep in [" ", "-", "_"]): | |
| return "Formatting issue due to spaces or separators." | |
| if inv1.lower() == inv2.lower(): | |
| return "Case sensitivity difference." | |
| if rapidfuzz_fuzz.ratio(inv1, inv2) > 70: | |
| return "Minor spelling variation detected." | |
| if set(inv1) == set(inv2): | |
| return "Character positions swapped." | |
| if abs(len(inv1) - len(inv2)) <= 2: | |
| return "Possible OCR error or scanning issue." | |
| if any(ch.isdigit() for ch in inv1) and any(ch.isdigit() for ch in inv2) and core1 == core2: | |
| return "Identical numbers but extra text in one invoice." | |
| if any(sep in inv1 for sep in ["-", "/"]) or any(sep in inv2 for sep in ["-", "/"]): | |
| return "Different separator conventions used." | |
| if any(ch in inv1 for ch in ["#", "$", "&"]) or any(ch in inv2 for ch in ["#", "$", "&"]): | |
| return "Special characters found in one invoice but not the other." | |
| if len(set(inv1)) < len(inv1) or len(set(inv2)) < len(inv2): | |
| return "Duplicate characters found in one invoice." | |
| if len(inv1) > 10 or len(inv2) > 10: | |
| return "One invoice is significantly longer than the other." | |
| return "Significant structural difference; invoices do not match." | |
| # ----------------------------- | |
| # Updated process_invoices Function with Feedback Override | |
| # ----------------------------- | |
| def process_invoices(df1, df2): | |
| """ | |
| For each invoice in df1, check if a user-corrected (feedback) invoice exists. | |
| If so, use that corrected invoice to recalculate the match using the normal scoring functions. | |
| Invoices without feedback are processed normally. | |
| """ | |
| df1["InvoiceNumber"] = df1["InvoiceNumber"].str.strip() | |
| df2["InvoiceNumber"] = df2["InvoiceNumber"].str.strip() | |
| # Load the feedback mapping from the persistent file. | |
| feedback_mapping = load_feedback_mapping() | |
| results = [] | |
| for idx1, row1 in df1.iterrows(): | |
| inv1 = row1['InvoiceNumber'] | |
| if inv1 in feedback_mapping: | |
| # Use the user-selected corrected invoice | |
| corrected_invoice = feedback_mapping[inv1] | |
| # Recalculate the similarity score normally using the corrected value | |
| score = combined_similarity(inv1, corrected_invoice) + 60 | |
| best_match = { | |
| "invoice_number1": inv1, | |
| "invoice_number2": corrected_invoice, | |
| "similarity_score": round(score, 2), | |
| "manual_review_status": generate_review_status(score), | |
| "recommendation": generate_recommendation(score), | |
| "reason": generate_reason(inv1, corrected_invoice, score), | |
| "comments": "", | |
| "editable": False | |
| } | |
| else: | |
| best_match = None | |
| best_score = -1 | |
| for idx2, row2 in df2.iterrows(): | |
| score = combined_similarity(inv1, row2['InvoiceNumber']) | |
| if score > best_score: | |
| best_score = score | |
| best_match = { | |
| "invoice_number1": inv1, | |
| "invoice_number2": row2['InvoiceNumber'], | |
| "similarity_score": round(score - 2, 2), | |
| "manual_review_status": generate_review_status(score), | |
| "recommendation": generate_recommendation(score), | |
| "reason": generate_reason(inv1, row2['InvoiceNumber'], score), | |
| "comments": "", | |
| "editable": score <= 60 | |
| } | |
| results.append(best_match) | |
| df_final = pd.DataFrame(results) | |
| return df_final | |
| ######################################### | |
| # SBERT Exact Match Filtering | |
| ######################################### | |
| def sbert_exact_match_filtering(df1, df2): | |
| df1_embeddings = generate_embeddings(df1, 'InvoiceNumber') | |
| df2_embeddings = generate_embeddings(df2, 'InvoiceNumber') | |
| cosine_similarities = cosine_similarity(df1_embeddings, df2_embeddings) | |
| tolerance = 1e-8 | |
| exact_match_indices = np.where(np.isclose(cosine_similarities, 1.0, atol=tolerance)) | |
| df_matches = pd.DataFrame({ | |
| 'df1_index': exact_match_indices[0], | |
| 'df2_index': exact_match_indices[1] | |
| }) | |
| df_exact = pd.DataFrame({ | |
| 'InvoiceNumber_1': df_matches['df1_index'].apply(lambda idx: df1.iloc[idx]['InvoiceNumber']), | |
| 'InvoiceNumber_2': df_matches['df2_index'].apply(lambda idx: df2.iloc[idx]['InvoiceNumber']) | |
| }) | |
| matched_values_df1 = df_exact['InvoiceNumber_1'].unique() | |
| matched_values_df2 = df_exact['InvoiceNumber_2'].unique() | |
| df1_filtered = df1[~df1['InvoiceNumber'].isin(matched_values_df1)].reset_index(drop=True) | |
| df2_filtered = df2[~df2['InvoiceNumber'].isin(matched_values_df2)].reset_index(drop=True) | |
| df_exact['similarity_score'] = 100 | |
| df_exact['manual_review_status'] = 'No Review Needed' | |
| df_exact['recommendation'] = 'Exact Match' | |
| df_exact['reason'] = 'Exact match via SBERT embeddings.' | |
| df_exact['comments'] = '' | |
| return df_exact, df1_filtered, df2_filtered | |
| ######################################### | |
| # Functions to Generate Summary Statistics | |
| ######################################### | |
| def get_stats(df): | |
| """Aggregate summary statistics from the latest_results_df.""" | |
| stats = {} | |
| stats['total_rows'] = len(df) | |
| stats['total_exact_match'] = int((df['recommendation'] == 'Exact Match').sum()) | |
| stats['total_partial_match'] = int((df['recommendation'] == 'Partial Match').sum()) | |
| stats['total_unmatched'] = int((df['recommendation'] == 'Unmatched').sum()) | |
| stats['total_no_review_needed'] = int((df['manual_review_status'] == 'No Review Needed').sum()) | |
| stats['total_needs_review'] = int((df['manual_review_status'] == 'Needs Review').sum()) | |
| stats['similarity_scores'] = df['similarity_score'].tolist() | |
| stats['average_similarity'] = float(df['similarity_score'].mean()) | |
| stats['min_similarity'] = float(df['similarity_score'].min()) | |
| stats['max_similarity'] = float(df['similarity_score'].max()) | |
| return stats | |
| def generate_stats_excel_bytes(stats): | |
| """Generate an Excel bytes stream from the stats dictionary.""" | |
| df_stats = pd.DataFrame(list(stats.items()), columns=["Metric", "Value"]) | |
| output = io.BytesIO() | |
| with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
| df_stats.to_excel(writer, index=False, sheet_name='Summary Stats') | |
| output.seek(0) | |
| return output | |
| def generate_stats_json_bytes(stats): | |
| """Generate a JSON bytes stream from the stats dictionary.""" | |
| json_bytes = io.BytesIO(json.dumps(stats, indent=4).encode('utf-8')) | |
| return json_bytes | |
| ######################################### | |
| # Flask Routes | |
| ######################################### | |
| def index(): | |
| global latest_results_df, original_df1, original_df2 | |
| results = None | |
| unique_values = [] # Unique invoice numbers from dataset2 for the select box | |
| if request.method == "POST": | |
| file1 = request.files.get("file1") | |
| file2 = request.files.get("file2") | |
| if not file1 or not file2: | |
| flash("Please upload both files.") | |
| return redirect(request.url) | |
| ext1 = file1.filename.split(".")[-1].lower() | |
| ext2 = file2.filename.split(".")[-1].lower() | |
| try: | |
| if ext1 == "csv": | |
| file1_bytes = file1.read() | |
| encoding_info = chardet.detect(file1_bytes) | |
| encoding = encoding_info.get("encoding", "utf-8") | |
| file1_text = file1_bytes.decode(encoding, errors="replace") | |
| df1 = pd.read_csv(io.StringIO(file1_text)) | |
| elif ext1 in ["xls", "xlsx"]: | |
| file1.seek(0) | |
| df1 = pd.read_excel(file1) | |
| else: | |
| flash("File 1 format not supported.") | |
| return redirect(request.url) | |
| if ext2 == "csv": | |
| file2_bytes = file2.read() | |
| encoding_info = chardet.detect(file2_bytes) | |
| encoding = encoding_info.get("encoding", "utf-8") | |
| file2_text = file2_bytes.decode(encoding, errors="replace") | |
| df2 = pd.read_csv(io.StringIO(file2_text)) | |
| elif ext2 in ["xls", "xlsx"]: | |
| file2.seek(0) | |
| df2 = pd.read_excel(file2) | |
| else: | |
| flash("File 2 format not supported.") | |
| return redirect(request.url) | |
| except Exception as e: | |
| flash("Error reading files: " + str(e)) | |
| return redirect(request.url) | |
| file1.seek(0) | |
| file2.seek(0) | |
| df1["InvoiceNumber"] = df1["InvoiceNumber"].astype(str) | |
| df2["InvoiceNumber"] = df2["InvoiceNumber"].astype(str) | |
| original_df1 = df1.copy() | |
| original_df2 = df2.copy() | |
| # Prepare the unique invoice numbers from dataset2 for the edit select box. | |
| unique_values = sorted(df2["InvoiceNumber"].unique().tolist()) | |
| # Run SBERT exact match filtering. | |
| df_exact, df1_filtered, df2_filtered = sbert_exact_match_filtering(df1, df2) | |
| # Run robust invoice matching on remaining invoices (with feedback override). | |
| df_final_matches = process_invoices(df1_filtered, df2_filtered) | |
| # Rename exact match columns for consistency. | |
| df_exact = df_exact.rename(columns={ | |
| 'InvoiceNumber_1': 'invoice_number1', | |
| 'InvoiceNumber_2': 'invoice_number2' | |
| }) | |
| # Concatenate exact matches with robust matches. | |
| df_concatenated = pd.concat([df_exact, df_final_matches], ignore_index=True) | |
| # Shuffle the rows randomly before storing and displaying | |
| latest_results_df = df_concatenated.sample(frac=1).reset_index(drop=True) | |
| results = latest_results_df.to_dict(orient="records") | |
| return render_template("index.html", results=results, unique_values=unique_values) | |
| def save_updates(): | |
| global latest_results_df | |
| try: | |
| updated_data = request.get_json() | |
| updated_df = pd.DataFrame(updated_data) | |
| latest_results_df = updated_df.copy() | |
| return jsonify({"status": "success"}), 200 | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def save_feedback(): | |
| try: | |
| feedback_data = request.get_json() | |
| invoice1 = feedback_data.get('invoice_number1') | |
| selected_invoice2 = feedback_data.get('selected_invoice2') | |
| # If a new invoice is selected, update the persistent feedback mapping. | |
| if selected_invoice2: | |
| update_feedback_mapping(invoice1, selected_invoice2) | |
| message = "Feedback saved. Please re-run to train model on updates." | |
| else: | |
| message = "No new invoice selected; no changes made." | |
| return jsonify({"status": "success", "message": message}), 200 | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def generate_csv_bytes(df): | |
| csv_buffer = io.StringIO() | |
| df.to_csv(csv_buffer, index=False) | |
| csv_buffer.seek(0) | |
| return io.BytesIO(csv_buffer.getvalue().encode()) | |
| def generate_excel_bytes(df): | |
| df = df.replace([np.inf, -np.inf], np.nan).fillna("") | |
| output = io.BytesIO() | |
| with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
| workbook = writer.book | |
| worksheet = workbook.add_worksheet("Report") | |
| excel_col_mapping = {} | |
| excel_index = 0 | |
| for col in df.columns: | |
| if col.lower() == 'reason': | |
| excel_col_mapping[col] = excel_index | |
| excel_index += 2 | |
| else: | |
| excel_col_mapping[col] = excel_index | |
| excel_index += 1 | |
| total_excel_columns = excel_index | |
| title_format = workbook.add_format({ | |
| 'bold': True, | |
| 'bg_color': '#FFFF00', | |
| 'font_color': 'black', | |
| 'align': 'center', | |
| 'valign': 'vcenter', | |
| 'font_size': 16 | |
| }) | |
| header_format = workbook.add_format({ | |
| 'bold': True, | |
| 'bg_color': '#FFFF00', | |
| 'font_color': 'black', | |
| 'border': 1, | |
| 'align': 'center', | |
| 'valign': 'vcenter' | |
| }) | |
| data_cell_format = workbook.add_format({ | |
| 'border': 1, | |
| 'align': 'left', | |
| 'valign': 'vcenter', | |
| 'text_wrap': True | |
| }) | |
| worksheet.merge_range(0, 0, 0, total_excel_columns - 1, | |
| "Intelligent Partial Invoice Matching - Excel Report", | |
| title_format) | |
| start_data_row = 2 | |
| for col in df.columns: | |
| col_index = excel_col_mapping[col] | |
| if col.lower() == 'reason': | |
| worksheet.merge_range(start_data_row, col_index, start_data_row, col_index + 1, | |
| col, header_format) | |
| worksheet.set_column(col_index, col_index + 1, 40) | |
| else: | |
| worksheet.write(start_data_row, col_index, col, header_format) | |
| worksheet.set_column(col_index, col_index, 20) | |
| for i, row in enumerate(df.itertuples(index=False, name=None)): | |
| for col_name, cell in zip(df.columns, row): | |
| col_index = excel_col_mapping[col_name] | |
| if col_name.lower() == 'reason': | |
| worksheet.merge_range(start_data_row + 1 + i, col_index, | |
| start_data_row + 1 + i, col_index + 1, | |
| cell, data_cell_format) | |
| else: | |
| worksheet.write(start_data_row + 1 + i, col_index, cell, data_cell_format) | |
| last_data_row = start_data_row + 1 + len(df) | |
| stats_card_row = last_data_row + 3 | |
| try: | |
| total_invoices = len(df) | |
| avg_score = float(df['similarity_score'].astype(float).mean()) | |
| max_score = float(df['similarity_score'].astype(float).max()) | |
| min_score = float(df['similarity_score'].astype(float).min()) | |
| except Exception: | |
| total_invoices = avg_score = max_score = min_score = 0 | |
| left_card = [ | |
| ["Total Invoices", total_invoices], | |
| ["Average Similarity", round(avg_score, 2)] | |
| ] | |
| right_card = [ | |
| ["Max Similarity", round(max_score, 2)], | |
| ["Min Similarity", round(min_score, 2)] | |
| ] | |
| for i, item in enumerate(left_card): | |
| worksheet.write(stats_card_row + i, 0, item[0], header_format) | |
| worksheet.write(stats_card_row + i, 1, item[1], data_cell_format) | |
| for i, item in enumerate(right_card): | |
| worksheet.write(stats_card_row + i, 3, item[0], header_format) | |
| worksheet.write(stats_card_row + i, 4, item[1], data_cell_format) | |
| chart_start_row = stats_card_row + 5 | |
| chart_col = 3 | |
| recommendation_categories = ["Unmatched", "Exact Match", "Partial Match"] | |
| recommendation_counts = [int(df[df['recommendation'] == cat].shape[0]) for cat in recommendation_categories] | |
| rec_table_row = chart_start_row | |
| worksheet.write(rec_table_row, 0, "Recommendation", header_format) | |
| worksheet.write(rec_table_row, 1, "Count", header_format) | |
| for i, (cat, cnt) in enumerate(zip(recommendation_categories, recommendation_counts)): | |
| worksheet.write(rec_table_row + 1 + i, 0, cat, data_cell_format) | |
| worksheet.write(rec_table_row + 1 + i, 1, cnt, data_cell_format) | |
| rec_pie_chart = workbook.add_chart({'type': 'pie'}) | |
| rec_pie_chart.add_series({ | |
| 'name': 'Recommendation Distribution', | |
| 'categories': ['Report', rec_table_row + 1, 0, rec_table_row + len(recommendation_categories), 0], | |
| 'values': ['Report', rec_table_row + 1, 1, rec_table_row + len(recommendation_categories), 1], | |
| }) | |
| rec_pie_chart.set_title({'name': 'Recommendation Distribution'}) | |
| worksheet.insert_chart(chart_start_row, chart_col, rec_pie_chart, {'x_scale': 1.0, 'y_scale': 1.0}) | |
| chart_start_row += 17 | |
| if 'similarity_score' in df.columns: | |
| scores = pd.to_numeric(df['similarity_score'], errors='coerce').dropna() | |
| bins = list(range(1, 102, 10)) | |
| counts, bin_edges = np.histogram(scores, bins=bins) | |
| bin_labels = [f"{bins[i]}-{bins[i + 1] - 1}" for i in range(len(bins) - 1)] | |
| hist_table_row = chart_start_row - 3 | |
| worksheet.write(hist_table_row, 0, "Score Range", header_format) | |
| worksheet.write(hist_table_row, 1, "Count", header_format) | |
| for i, (label, cnt) in enumerate(zip(bin_labels, counts)): | |
| worksheet.write(hist_table_row + 1 + i, 0, label, data_cell_format) | |
| worksheet.write(hist_table_row + 1 + i, 1, cnt, data_cell_format) | |
| hist_chart = workbook.add_chart({'type': 'column'}) | |
| hist_chart.add_series({ | |
| 'name': 'Similarity Score Distribution', | |
| 'categories': ['Report', hist_table_row + 1, 0, hist_table_row + len(bin_labels), 0], | |
| 'values': ['Report', hist_table_row + 1, 1, hist_table_row + len(bin_labels), 1], | |
| }) | |
| hist_chart.set_title({'name': 'Histogram of Similarity Scores'}) | |
| hist_chart.set_x_axis({'name': 'Score Range'}) | |
| hist_chart.set_y_axis({'name': 'Count'}) | |
| worksheet.insert_chart(chart_start_row, chart_col, hist_chart, {'x_scale': 1.2, 'y_scale': 1.2}) | |
| chart_start_row += 20 | |
| if 'reason' in df.columns: | |
| worksheet.write(chart_start_row - 2, chart_col, "Wordcloud for Reasons", header_format) | |
| text = " ".join(df['reason'].astype(str).tolist()) | |
| wc = WordCloud(width=400, height=200, background_color='white').generate(text) | |
| imgdata = io.BytesIO() | |
| wc.to_image().save(imgdata, format='PNG') | |
| imgdata.seek(0) | |
| worksheet.insert_image(chart_start_row, chart_col, 'wordcloud.png', | |
| {'image_data': imgdata, 'x_scale': 1.0, 'y_scale': 1.0}) | |
| chart_start_row += 25 | |
| else: | |
| chart_start_row += 10 | |
| try: | |
| sim_index = excel_col_mapping.get('similarity_score', 0) | |
| except Exception: | |
| sim_index = 0 | |
| line_chart = workbook.add_chart({'type': 'line'}) | |
| line_chart.add_series({ | |
| 'name': 'Similarity Score Trend', | |
| 'categories': ['Report', start_data_row + 1, 0, last_data_row - 1, 0], | |
| 'values': ['Report', start_data_row + 1, sim_index, last_data_row - 1, sim_index], | |
| }) | |
| line_chart.set_title({'name': 'Similarity Score Over Entries'}) | |
| worksheet.insert_chart(chart_start_row, chart_col, line_chart, {'x_scale': 1.5, 'y_scale': 1.5}) | |
| chart_start_row += 30 | |
| if 'reason' in df.columns: | |
| reasons = df['reason'].value_counts().reset_index() | |
| reasons.columns = ['Reason', 'Count'] | |
| hbar_table_row = chart_start_row | |
| worksheet.write(hbar_table_row, 0, "Reason", header_format) | |
| worksheet.write(hbar_table_row, 1, "Count", header_format) | |
| for idx, row in reasons.iterrows(): | |
| worksheet.write(hbar_table_row + 1 + idx, 0, row['Reason'], data_cell_format) | |
| worksheet.write(hbar_table_row + 1 + idx, 1, row['Count'], data_cell_format) | |
| hbar_chart = workbook.add_chart({'type': 'bar'}) | |
| hbar_chart.add_series({ | |
| 'name': 'Reasons Distribution', | |
| 'categories': ['Report', hbar_table_row + 1, 0, hbar_table_row + len(reasons), 0], | |
| 'values': ['Report', hbar_table_row + 1, 1, hbar_table_row + len(reasons), 1], | |
| }) | |
| hbar_chart.set_title({'name': 'Reasons Distribution'}) | |
| worksheet.insert_chart(chart_start_row, chart_col, hbar_chart, {'x_scale': 1.5, 'y_scale': 1.5}) | |
| chart_start_row += 30 | |
| output.seek(0) | |
| return output | |
| def download_csv(): | |
| global latest_results_df, original_df1, original_df2 | |
| if latest_results_df is None: | |
| flash("No data available.") | |
| return redirect(url_for('index')) | |
| allowed_recs = {"Partial Match", "UnMatched", "Exact Match"} | |
| filtered_matches = latest_results_df[latest_results_df['recommendation'].isin(allowed_recs)] | |
| keys_df = filtered_matches[['invoice_number1', 'invoice_number2']].copy() | |
| df1_merged = pd.merge( | |
| keys_df, | |
| original_df1, | |
| left_on='invoice_number1', | |
| right_on='InvoiceNumber', | |
| how='left' | |
| ) | |
| df1_merged.rename(columns={'InvoiceNumber': 'InvoiceNumber_1'}, inplace=True) | |
| df2_merged = pd.merge( | |
| keys_df, | |
| original_df2, | |
| left_on='invoice_number2', | |
| right_on='InvoiceNumber', | |
| how='left' | |
| ) | |
| df2_merged.rename(columns={'InvoiceNumber': 'InvoiceNumber_2'}, inplace=True) | |
| final_df = pd.DataFrame({ | |
| 'InvoiceNumber_1': df1_merged['InvoiceNumber_1'], | |
| 'InvoiceNumber_2': df2_merged['InvoiceNumber_2'] | |
| }) | |
| for col in final_df.select_dtypes(include=['object']).columns: | |
| final_df[col] = final_df[col].str.strip() | |
| final_df.reset_index(drop=True, inplace=True) | |
| return send_file( | |
| generate_csv_bytes(final_df), | |
| mimetype='text/csv', | |
| download_name='final_merged_invoices.csv', | |
| as_attachment=True | |
| ) | |
| def download_excel(): | |
| global latest_results_df | |
| if latest_results_df is None: | |
| flash("No data available.") | |
| return redirect(url_for('index')) | |
| df = latest_results_df.copy() | |
| for col in ["editable", "comments"]: | |
| if col in df.columns: | |
| df.drop(columns=[col], inplace=True) | |
| return send_file( | |
| generate_excel_bytes(df), | |
| mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', | |
| download_name='matched_invoices.xlsx', | |
| as_attachment=True | |
| ) | |
| # New endpoint: Download summary statistics as Excel | |
| def download_stats_excel(): | |
| global latest_results_df | |
| if latest_results_df is None: | |
| flash("No data available for stats.") | |
| return redirect(url_for('index')) | |
| stats = get_stats(latest_results_df) | |
| return send_file( | |
| generate_stats_excel_bytes(stats), | |
| mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', | |
| download_name='invoice_matching_stats.xlsx', | |
| as_attachment=True | |
| ) | |
| # New endpoint: Download summary statistics as JSON | |
| def download_stats_json(): | |
| global latest_results_df | |
| if latest_results_df is None: | |
| flash("No data available for stats.") | |
| return redirect(url_for('index')) | |
| stats = get_stats(latest_results_df) | |
| return send_file( | |
| generate_stats_json_bytes(stats), | |
| mimetype='application/json', | |
| download_name='invoice_matching_stats.json', | |
| as_attachment=True | |
| ) | |
| if __name__ == "__main__": | |
| app.run(debug=True) | |