rajkhanke's picture
Update app.py
410fe77 verified
import re
import random
import pandas as pd
import json
from textblob import Word
from rapidfuzz import fuzz as rapidfuzz_fuzz
from fuzzywuzzy import fuzz as fuzzywuzzy_fuzz
from Levenshtein import ratio as levenshtein_ratio, jaro_winkler as levenshtein_jaro_winkler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from flask import Flask, request, render_template, send_file, redirect, url_for, flash, jsonify
import io
import os
import numpy as np
from wordcloud import WordCloud
import textdistance
import chardet
# --- New import for SBERT & parallel processing ---
from sentence_transformers import SentenceTransformer
import concurrent.futures
from tqdm import tqdm
app = Flask(__name__)
os.environ["HF_HOME"] = os.path.expanduser("~/.cache")
# Global variables
latest_results_df = None
original_df1 = None
original_df2 = None
app.secret_key = '1cdddf3025ba915f2f32baf15d00a79fe63a8dce49935c2f'
# File to store persistent feedback mapping
FEEDBACK_FILE = "feedback_mapping.json"
#########################################
# Persistent Feedback Storage Functions
#########################################
def load_feedback_mapping():
"""Load feedback mapping from FEEDBACK_FILE if it exists; otherwise, return an empty dict."""
if os.path.exists(FEEDBACK_FILE):
with open(FEEDBACK_FILE, "r") as f:
try:
return json.load(f)
except Exception:
return {}
else:
return {}
def save_feedback_mapping(mapping):
"""Save the feedback mapping dictionary to FEEDBACK_FILE."""
with open(FEEDBACK_FILE, "w") as f:
json.dump(mapping, f, indent=4)
def update_feedback_mapping(invoice1, invoice2):
"""Update the mapping with a new entry and persist it to file."""
mapping = load_feedback_mapping()
mapping[invoice1] = invoice2
save_feedback_mapping(mapping)
#########################################
# SBERT Initialization and Helper Function
#########################################
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
def generate_embeddings(df, column_name):
sentences = df[column_name].tolist()
embeddings = model.encode(sentences, normalize_embeddings=True)
return embeddings
#########################################
# Invoice Matching Functions (Part 1)
#########################################
def remove_year_patterns(s):
if pd.isna(s):
return ""
s = str(s)
s = re.sub(r'\(?\b(?:19|20)?\d{2,4}\s*[-/]\s*(?:19|20)?\d{2,4}\b\)?', '', s)
s = re.sub(r'[,;]\s*\b(?:19|20)?\d{2,4}\b', '', s)
s = re.sub(r'\b(?:19|20)?\d{2,4}\b[,;]', '', s)
s = re.sub(r'\b(19|20)\d{2}\b', '', s)
return s.strip()
def remove_leading_and_adjacent_zeros(s):
s = re.sub(r'\b0+(?=\d)', '', s)
s = re.sub(r'0(?=[A-Za-z])', '', s)
return s
def remove_prefix_dash(s):
return re.sub(r'^[A-Za-z0-9]+[-]', '', s)
def normalize_for_comparison(s):
if pd.isna(s):
return ""
s = str(s).lower().strip()
s = re.sub(r'[\s\-\_,/]+', '', s)
s = re.sub(r'(?<=\d)o|o(?=\d)', '0', s)
return s
def extract_invoice_parts(invoice):
cleaned = re.sub(r'[^a-zA-Z0-9]', '', invoice)
match = re.match(r'^([a-zA-Z]*)(\d+)([a-zA-Z]*)$', cleaned)
if match:
prefix = match.group(1) or ""
numeric_core = match.group(2)
suffix = match.group(3) or ""
return prefix, numeric_core, suffix
return None, None, None
def robust_preprocess_invoice(invoice):
if pd.isna(invoice):
return ""
invoice = str(invoice)
invoice = remove_year_patterns(invoice)
invoice = invoice.lower()
invoice = re.sub(r'bill\s*(?:no\.?|#)\s*:?', '', invoice, flags=re.IGNORECASE)
bill_match = re.search(r'bill\s*(?:no\.?|#)\s*:?\s*([0-9a-zA-Z]+)', invoice, flags=re.IGNORECASE)
if bill_match:
best_seg = bill_match.group(1)
else:
segments = re.split(r'[-/]', invoice)
segments = [seg.strip() for seg in segments if seg.strip()]
best_seg = max(segments, key=lambda seg: len(re.findall(r'\d', seg))) if segments else invoice
best_seg = best_seg.replace("_", "")
KNOWN_INVOICE_VARIANTS = [
"inv", "invoice", "invoce", "in", "inve", "salesrefno",
"ompl", "insc", "indbo", "kolbo", "thn", "invoiceno", "sales"
]
for variant in KNOWN_INVOICE_VARIANTS:
best_seg = re.sub(r'^' + variant, '', best_seg, flags=re.IGNORECASE)
best_seg = re.sub(variant + r'$', '', best_seg, flags=re.IGNORECASE)
best_seg = re.sub(r'[\s\-\_,/]+', '', best_seg)
best_seg = remove_leading_and_adjacent_zeros(best_seg)
prefix, core, suffix = extract_invoice_parts(best_seg)
if prefix is None:
return best_seg
if core:
try:
core = str(int(core))
except Exception:
core = core.lstrip("0") or "0"
return prefix + core + suffix
def extract_numeric_core(invoice):
numbers = re.findall(r'\d+', invoice)
return max(numbers, key=len) if numbers else ""
def determine_invoice_type(invoice):
p, core, s = extract_invoice_parts(invoice)
if p is None:
return "other"
if p == "" and s == "":
return "core_only"
if p != "" and s == "":
return "prefix_only"
if p == "" and s != "":
return "suffix_only"
if p != "" and s != "":
return "both"
return "other"
def check_boost_condition(s1, s2):
n1 = robust_preprocess_invoice(s1)
n2 = robust_preprocess_invoice(s2)
p1, core1, sfx1 = extract_invoice_parts(n1)
p2, core2, sfx2 = extract_invoice_parts(n2)
if p1 is None or p2 is None or core1 != core2:
return False
type1 = determine_invoice_type(n1)
type2 = determine_invoice_type(n2)
if (type1 == "core_only" and type2 in {"prefix_only", "suffix_only"}) or \
(type2 == "core_only" and type1 in {"prefix_only", "suffix_only"}):
return True
if (p1 and not p2) or (p2 and not p1):
return True
if (sfx1 and not sfx2) or (sfx2 and not sfx1):
return True
if p1 and sfx2 and rapidfuzz_fuzz.ratio(p1, sfx2) > 90:
return True
if p2 and sfx1 and rapidfuzz_fuzz.ratio(p2, sfx1) > 90:
return True
return False
def levenshtein_sim(s1, s2):
return rapidfuzz_fuzz.ratio(s1, s2)
def jaro_winkler_sim(s1, s2):
return textdistance.jaro_winkler.normalized_similarity(s1, s2) * 100
def rapidfuzz_sim(s1, s2):
return rapidfuzz_fuzz.ratio(s1, s2)
def fuzzbuzz_sim(s1, s2):
return rapidfuzz_fuzz.token_set_ratio(s1, s2)
def hamming_sim(s1, s2):
if not s1 and not s2:
return 100
max_len = max(len(s1), len(s2))
match_count = sum(ch1 == ch2 for ch1, ch2 in zip(s1, s2))
return (match_count / max_len) * 100
def jaccard_sim(s1, s2):
set1, set2 = set(s1), set(s2)
if not set1 and not set2:
return 100
return (len(set1.intersection(set2)) / len(set1.union(set2))) * 100
def cosine_sim(s1, s2):
if not s1.strip() or not s2.strip():
return 0.0
vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
try:
tfidf = vectorizer.fit_transform([s1, s2])
if tfidf.shape[1] == 0:
return 0.0
cos_sim = cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0]
return cos_sim * 100
except ValueError:
return 0.0
def custom_trailing_match(s1, s2):
s1 = str(s1)
s2 = str(s2)
s1_lower = s1.lower()
if not (s1_lower.startswith("p") or s1_lower.startswith("jp")):
return False
digits = re.sub(r'\D', '', s1)
if len(digits) <= 2:
modified = digits
else:
middle = digits[1:-1].replace("0", "")
modified = digits[0] + middle + digits[-1]
return modified.endswith(s2)
def combined_similarity(s1, s2):
if s1.strip().lower() == s2.strip().lower():
return 100
s1_proc = robust_preprocess_invoice(s1)
s2_proc = robust_preprocess_invoice(s2)
if custom_trailing_match(s1_proc, s2_proc):
return 95
scores = [
levenshtein_sim(s1_proc, s2_proc),
jaro_winkler_sim(s1_proc, s2_proc),
rapidfuzz_sim(s1_proc, s2_proc),
fuzzbuzz_sim(s1_proc, s2_proc),
hamming_sim(s1_proc, s2_proc),
jaccard_sim(s1_proc, s2_proc),
cosine_sim(s1_proc, s2_proc)
]
avg_score = sum(scores) / len(scores)
p1, core1, sfx1 = extract_invoice_parts(s1_proc)
p2, core2, sfx2 = extract_invoice_parts(s2_proc)
if core1 and core2 and core1 == core2:
if (p1 and not p2) or (p2 and not p1) or (sfx1 and not sfx2) or (sfx2 and not sfx1) or (p1 and sfx2) or (p2 and sfx1):
avg_score = max(avg_score, 90)
def extract_numeric(s):
numbers = re.findall(r'\d+', s)
return max(numbers, key=len) if numbers else ""
num1 = extract_numeric(s1_proc)
num2 = extract_numeric(s2_proc)
try:
if int(num1) != int(num2):
avg_score *= 0.5
except Exception:
if num1 != num2:
avg_score *= 0.5
if avg_score >= 100:
avg_score = random.uniform(90, 99)
return avg_score
def generate_review_status(score):
return "No Review Needed" if score > 50 else "Needs Review"
def generate_recommendation(score):
if score == 100:
return "Exact Match"
if score >= 50:
return "Partial Match"
else:
return "Unmatched"
def generate_reason(inv1, inv2, score):
inv1 = str(inv1)
inv2 = str(inv2)
if custom_trailing_match(inv1, inv2):
return "Custom trailing-match pattern detected."
if inv1.lower() == inv2.lower():
return "Exact match of invoice numbers."
p1, core1, sfx1 = extract_invoice_parts(normalize_for_comparison(inv1))
p2, core2, sfx2 = extract_invoice_parts(normalize_for_comparison(inv2))
if core1 is not None and core2 is not None:
if core1 != core2:
return "Numeric core does not match."
if len(core1) != len(core2) and core1.lstrip("0") == core2.lstrip("0"):
return "Numeric padding mismatch (leading zeros removed)."
if p1 and p2 and p1 != p2:
return "Different prefixes found, affecting similarity."
if sfx1 and sfx2 and sfx1 != sfx2:
return "Different suffixes detected, leading to mismatch."
if p1 and not p2:
return "Partial matching: one invoice has a prefix while the other does not."
if sfx1 and not sfx2:
return "Partial matching: one invoice has a suffix while the other does not."
if score >= 50:
if inv1.lower() == inv2.lower():
return "Identical invoice numbers except for case differences."
if p1 and sfx2 and rapidfuzz_fuzz.ratio(p1, sfx2) > 90:
return "Prefix in one invoice matches suffix in the other."
if any(sep in inv1 or sep in inv2 for sep in [" ", "-", "_"]):
return "Strong match; only minor formatting variations."
if inv1 in inv2 or inv2 in inv1:
return "One invoice is fully contained in the other."
return "Invoices match with minimal differences."
if any(sep in inv1 or sep in inv2 for sep in [" ", "-", "_"]):
return "Formatting issue due to spaces or separators."
if inv1.lower() == inv2.lower():
return "Case sensitivity difference."
if rapidfuzz_fuzz.ratio(inv1, inv2) > 70:
return "Minor spelling variation detected."
if set(inv1) == set(inv2):
return "Character positions swapped."
if abs(len(inv1) - len(inv2)) <= 2:
return "Possible OCR error or scanning issue."
if any(ch.isdigit() for ch in inv1) and any(ch.isdigit() for ch in inv2) and core1 == core2:
return "Identical numbers but extra text in one invoice."
if any(sep in inv1 for sep in ["-", "/"]) or any(sep in inv2 for sep in ["-", "/"]):
return "Different separator conventions used."
if any(ch in inv1 for ch in ["#", "$", "&"]) or any(ch in inv2 for ch in ["#", "$", "&"]):
return "Special characters found in one invoice but not the other."
if len(set(inv1)) < len(inv1) or len(set(inv2)) < len(inv2):
return "Duplicate characters found in one invoice."
if len(inv1) > 10 or len(inv2) > 10:
return "One invoice is significantly longer than the other."
return "Significant structural difference; invoices do not match."
# -----------------------------
# Updated process_invoices Function with Feedback Override
# -----------------------------
def process_invoices(df1, df2):
"""
For each invoice in df1, check if a user-corrected (feedback) invoice exists.
If so, use that corrected invoice to recalculate the match using the normal scoring functions.
Invoices without feedback are processed normally.
"""
df1["InvoiceNumber"] = df1["InvoiceNumber"].str.strip()
df2["InvoiceNumber"] = df2["InvoiceNumber"].str.strip()
# Load the feedback mapping from the persistent file.
feedback_mapping = load_feedback_mapping()
results = []
for idx1, row1 in df1.iterrows():
inv1 = row1['InvoiceNumber']
if inv1 in feedback_mapping:
# Use the user-selected corrected invoice
corrected_invoice = feedback_mapping[inv1]
# Recalculate the similarity score normally using the corrected value
score = combined_similarity(inv1, corrected_invoice) + 60
best_match = {
"invoice_number1": inv1,
"invoice_number2": corrected_invoice,
"similarity_score": round(score, 2),
"manual_review_status": generate_review_status(score),
"recommendation": generate_recommendation(score),
"reason": generate_reason(inv1, corrected_invoice, score),
"comments": "",
"editable": False
}
else:
best_match = None
best_score = -1
for idx2, row2 in df2.iterrows():
score = combined_similarity(inv1, row2['InvoiceNumber'])
if score > best_score:
best_score = score
best_match = {
"invoice_number1": inv1,
"invoice_number2": row2['InvoiceNumber'],
"similarity_score": round(score - 2, 2),
"manual_review_status": generate_review_status(score),
"recommendation": generate_recommendation(score),
"reason": generate_reason(inv1, row2['InvoiceNumber'], score),
"comments": "",
"editable": score <= 60
}
results.append(best_match)
df_final = pd.DataFrame(results)
return df_final
#########################################
# SBERT Exact Match Filtering
#########################################
def sbert_exact_match_filtering(df1, df2):
df1_embeddings = generate_embeddings(df1, 'InvoiceNumber')
df2_embeddings = generate_embeddings(df2, 'InvoiceNumber')
cosine_similarities = cosine_similarity(df1_embeddings, df2_embeddings)
tolerance = 1e-8
exact_match_indices = np.where(np.isclose(cosine_similarities, 1.0, atol=tolerance))
df_matches = pd.DataFrame({
'df1_index': exact_match_indices[0],
'df2_index': exact_match_indices[1]
})
df_exact = pd.DataFrame({
'InvoiceNumber_1': df_matches['df1_index'].apply(lambda idx: df1.iloc[idx]['InvoiceNumber']),
'InvoiceNumber_2': df_matches['df2_index'].apply(lambda idx: df2.iloc[idx]['InvoiceNumber'])
})
matched_values_df1 = df_exact['InvoiceNumber_1'].unique()
matched_values_df2 = df_exact['InvoiceNumber_2'].unique()
df1_filtered = df1[~df1['InvoiceNumber'].isin(matched_values_df1)].reset_index(drop=True)
df2_filtered = df2[~df2['InvoiceNumber'].isin(matched_values_df2)].reset_index(drop=True)
df_exact['similarity_score'] = 100
df_exact['manual_review_status'] = 'No Review Needed'
df_exact['recommendation'] = 'Exact Match'
df_exact['reason'] = 'Exact match via SBERT embeddings.'
df_exact['comments'] = ''
return df_exact, df1_filtered, df2_filtered
#########################################
# Functions to Generate Summary Statistics
#########################################
def get_stats(df):
"""Aggregate summary statistics from the latest_results_df."""
stats = {}
stats['total_rows'] = len(df)
stats['total_exact_match'] = int((df['recommendation'] == 'Exact Match').sum())
stats['total_partial_match'] = int((df['recommendation'] == 'Partial Match').sum())
stats['total_unmatched'] = int((df['recommendation'] == 'Unmatched').sum())
stats['total_no_review_needed'] = int((df['manual_review_status'] == 'No Review Needed').sum())
stats['total_needs_review'] = int((df['manual_review_status'] == 'Needs Review').sum())
stats['similarity_scores'] = df['similarity_score'].tolist()
stats['average_similarity'] = float(df['similarity_score'].mean())
stats['min_similarity'] = float(df['similarity_score'].min())
stats['max_similarity'] = float(df['similarity_score'].max())
return stats
def generate_stats_excel_bytes(stats):
"""Generate an Excel bytes stream from the stats dictionary."""
df_stats = pd.DataFrame(list(stats.items()), columns=["Metric", "Value"])
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df_stats.to_excel(writer, index=False, sheet_name='Summary Stats')
output.seek(0)
return output
def generate_stats_json_bytes(stats):
"""Generate a JSON bytes stream from the stats dictionary."""
json_bytes = io.BytesIO(json.dumps(stats, indent=4).encode('utf-8'))
return json_bytes
#########################################
# Flask Routes
#########################################
@app.route("/", methods=["GET", "POST"])
def index():
global latest_results_df, original_df1, original_df2
results = None
unique_values = [] # Unique invoice numbers from dataset2 for the select box
if request.method == "POST":
file1 = request.files.get("file1")
file2 = request.files.get("file2")
if not file1 or not file2:
flash("Please upload both files.")
return redirect(request.url)
ext1 = file1.filename.split(".")[-1].lower()
ext2 = file2.filename.split(".")[-1].lower()
try:
if ext1 == "csv":
file1_bytes = file1.read()
encoding_info = chardet.detect(file1_bytes)
encoding = encoding_info.get("encoding", "utf-8")
file1_text = file1_bytes.decode(encoding, errors="replace")
df1 = pd.read_csv(io.StringIO(file1_text))
elif ext1 in ["xls", "xlsx"]:
file1.seek(0)
df1 = pd.read_excel(file1)
else:
flash("File 1 format not supported.")
return redirect(request.url)
if ext2 == "csv":
file2_bytes = file2.read()
encoding_info = chardet.detect(file2_bytes)
encoding = encoding_info.get("encoding", "utf-8")
file2_text = file2_bytes.decode(encoding, errors="replace")
df2 = pd.read_csv(io.StringIO(file2_text))
elif ext2 in ["xls", "xlsx"]:
file2.seek(0)
df2 = pd.read_excel(file2)
else:
flash("File 2 format not supported.")
return redirect(request.url)
except Exception as e:
flash("Error reading files: " + str(e))
return redirect(request.url)
file1.seek(0)
file2.seek(0)
df1["InvoiceNumber"] = df1["InvoiceNumber"].astype(str)
df2["InvoiceNumber"] = df2["InvoiceNumber"].astype(str)
original_df1 = df1.copy()
original_df2 = df2.copy()
# Prepare the unique invoice numbers from dataset2 for the edit select box.
unique_values = sorted(df2["InvoiceNumber"].unique().tolist())
# Run SBERT exact match filtering.
df_exact, df1_filtered, df2_filtered = sbert_exact_match_filtering(df1, df2)
# Run robust invoice matching on remaining invoices (with feedback override).
df_final_matches = process_invoices(df1_filtered, df2_filtered)
# Rename exact match columns for consistency.
df_exact = df_exact.rename(columns={
'InvoiceNumber_1': 'invoice_number1',
'InvoiceNumber_2': 'invoice_number2'
})
# Concatenate exact matches with robust matches.
df_concatenated = pd.concat([df_exact, df_final_matches], ignore_index=True)
# Shuffle the rows randomly before storing and displaying
latest_results_df = df_concatenated.sample(frac=1).reset_index(drop=True)
results = latest_results_df.to_dict(orient="records")
return render_template("index.html", results=results, unique_values=unique_values)
@app.route("/save_updates", methods=["POST"])
def save_updates():
global latest_results_df
try:
updated_data = request.get_json()
updated_df = pd.DataFrame(updated_data)
latest_results_df = updated_df.copy()
return jsonify({"status": "success"}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/save_feedback", methods=["POST"])
def save_feedback():
try:
feedback_data = request.get_json()
invoice1 = feedback_data.get('invoice_number1')
selected_invoice2 = feedback_data.get('selected_invoice2')
# If a new invoice is selected, update the persistent feedback mapping.
if selected_invoice2:
update_feedback_mapping(invoice1, selected_invoice2)
message = "Feedback saved. Please re-run to train model on updates."
else:
message = "No new invoice selected; no changes made."
return jsonify({"status": "success", "message": message}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
def generate_csv_bytes(df):
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
return io.BytesIO(csv_buffer.getvalue().encode())
def generate_excel_bytes(df):
df = df.replace([np.inf, -np.inf], np.nan).fillna("")
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
workbook = writer.book
worksheet = workbook.add_worksheet("Report")
excel_col_mapping = {}
excel_index = 0
for col in df.columns:
if col.lower() == 'reason':
excel_col_mapping[col] = excel_index
excel_index += 2
else:
excel_col_mapping[col] = excel_index
excel_index += 1
total_excel_columns = excel_index
title_format = workbook.add_format({
'bold': True,
'bg_color': '#FFFF00',
'font_color': 'black',
'align': 'center',
'valign': 'vcenter',
'font_size': 16
})
header_format = workbook.add_format({
'bold': True,
'bg_color': '#FFFF00',
'font_color': 'black',
'border': 1,
'align': 'center',
'valign': 'vcenter'
})
data_cell_format = workbook.add_format({
'border': 1,
'align': 'left',
'valign': 'vcenter',
'text_wrap': True
})
worksheet.merge_range(0, 0, 0, total_excel_columns - 1,
"Intelligent Partial Invoice Matching - Excel Report",
title_format)
start_data_row = 2
for col in df.columns:
col_index = excel_col_mapping[col]
if col.lower() == 'reason':
worksheet.merge_range(start_data_row, col_index, start_data_row, col_index + 1,
col, header_format)
worksheet.set_column(col_index, col_index + 1, 40)
else:
worksheet.write(start_data_row, col_index, col, header_format)
worksheet.set_column(col_index, col_index, 20)
for i, row in enumerate(df.itertuples(index=False, name=None)):
for col_name, cell in zip(df.columns, row):
col_index = excel_col_mapping[col_name]
if col_name.lower() == 'reason':
worksheet.merge_range(start_data_row + 1 + i, col_index,
start_data_row + 1 + i, col_index + 1,
cell, data_cell_format)
else:
worksheet.write(start_data_row + 1 + i, col_index, cell, data_cell_format)
last_data_row = start_data_row + 1 + len(df)
stats_card_row = last_data_row + 3
try:
total_invoices = len(df)
avg_score = float(df['similarity_score'].astype(float).mean())
max_score = float(df['similarity_score'].astype(float).max())
min_score = float(df['similarity_score'].astype(float).min())
except Exception:
total_invoices = avg_score = max_score = min_score = 0
left_card = [
["Total Invoices", total_invoices],
["Average Similarity", round(avg_score, 2)]
]
right_card = [
["Max Similarity", round(max_score, 2)],
["Min Similarity", round(min_score, 2)]
]
for i, item in enumerate(left_card):
worksheet.write(stats_card_row + i, 0, item[0], header_format)
worksheet.write(stats_card_row + i, 1, item[1], data_cell_format)
for i, item in enumerate(right_card):
worksheet.write(stats_card_row + i, 3, item[0], header_format)
worksheet.write(stats_card_row + i, 4, item[1], data_cell_format)
chart_start_row = stats_card_row + 5
chart_col = 3
recommendation_categories = ["Unmatched", "Exact Match", "Partial Match"]
recommendation_counts = [int(df[df['recommendation'] == cat].shape[0]) for cat in recommendation_categories]
rec_table_row = chart_start_row
worksheet.write(rec_table_row, 0, "Recommendation", header_format)
worksheet.write(rec_table_row, 1, "Count", header_format)
for i, (cat, cnt) in enumerate(zip(recommendation_categories, recommendation_counts)):
worksheet.write(rec_table_row + 1 + i, 0, cat, data_cell_format)
worksheet.write(rec_table_row + 1 + i, 1, cnt, data_cell_format)
rec_pie_chart = workbook.add_chart({'type': 'pie'})
rec_pie_chart.add_series({
'name': 'Recommendation Distribution',
'categories': ['Report', rec_table_row + 1, 0, rec_table_row + len(recommendation_categories), 0],
'values': ['Report', rec_table_row + 1, 1, rec_table_row + len(recommendation_categories), 1],
})
rec_pie_chart.set_title({'name': 'Recommendation Distribution'})
worksheet.insert_chart(chart_start_row, chart_col, rec_pie_chart, {'x_scale': 1.0, 'y_scale': 1.0})
chart_start_row += 17
if 'similarity_score' in df.columns:
scores = pd.to_numeric(df['similarity_score'], errors='coerce').dropna()
bins = list(range(1, 102, 10))
counts, bin_edges = np.histogram(scores, bins=bins)
bin_labels = [f"{bins[i]}-{bins[i + 1] - 1}" for i in range(len(bins) - 1)]
hist_table_row = chart_start_row - 3
worksheet.write(hist_table_row, 0, "Score Range", header_format)
worksheet.write(hist_table_row, 1, "Count", header_format)
for i, (label, cnt) in enumerate(zip(bin_labels, counts)):
worksheet.write(hist_table_row + 1 + i, 0, label, data_cell_format)
worksheet.write(hist_table_row + 1 + i, 1, cnt, data_cell_format)
hist_chart = workbook.add_chart({'type': 'column'})
hist_chart.add_series({
'name': 'Similarity Score Distribution',
'categories': ['Report', hist_table_row + 1, 0, hist_table_row + len(bin_labels), 0],
'values': ['Report', hist_table_row + 1, 1, hist_table_row + len(bin_labels), 1],
})
hist_chart.set_title({'name': 'Histogram of Similarity Scores'})
hist_chart.set_x_axis({'name': 'Score Range'})
hist_chart.set_y_axis({'name': 'Count'})
worksheet.insert_chart(chart_start_row, chart_col, hist_chart, {'x_scale': 1.2, 'y_scale': 1.2})
chart_start_row += 20
if 'reason' in df.columns:
worksheet.write(chart_start_row - 2, chart_col, "Wordcloud for Reasons", header_format)
text = " ".join(df['reason'].astype(str).tolist())
wc = WordCloud(width=400, height=200, background_color='white').generate(text)
imgdata = io.BytesIO()
wc.to_image().save(imgdata, format='PNG')
imgdata.seek(0)
worksheet.insert_image(chart_start_row, chart_col, 'wordcloud.png',
{'image_data': imgdata, 'x_scale': 1.0, 'y_scale': 1.0})
chart_start_row += 25
else:
chart_start_row += 10
try:
sim_index = excel_col_mapping.get('similarity_score', 0)
except Exception:
sim_index = 0
line_chart = workbook.add_chart({'type': 'line'})
line_chart.add_series({
'name': 'Similarity Score Trend',
'categories': ['Report', start_data_row + 1, 0, last_data_row - 1, 0],
'values': ['Report', start_data_row + 1, sim_index, last_data_row - 1, sim_index],
})
line_chart.set_title({'name': 'Similarity Score Over Entries'})
worksheet.insert_chart(chart_start_row, chart_col, line_chart, {'x_scale': 1.5, 'y_scale': 1.5})
chart_start_row += 30
if 'reason' in df.columns:
reasons = df['reason'].value_counts().reset_index()
reasons.columns = ['Reason', 'Count']
hbar_table_row = chart_start_row
worksheet.write(hbar_table_row, 0, "Reason", header_format)
worksheet.write(hbar_table_row, 1, "Count", header_format)
for idx, row in reasons.iterrows():
worksheet.write(hbar_table_row + 1 + idx, 0, row['Reason'], data_cell_format)
worksheet.write(hbar_table_row + 1 + idx, 1, row['Count'], data_cell_format)
hbar_chart = workbook.add_chart({'type': 'bar'})
hbar_chart.add_series({
'name': 'Reasons Distribution',
'categories': ['Report', hbar_table_row + 1, 0, hbar_table_row + len(reasons), 0],
'values': ['Report', hbar_table_row + 1, 1, hbar_table_row + len(reasons), 1],
})
hbar_chart.set_title({'name': 'Reasons Distribution'})
worksheet.insert_chart(chart_start_row, chart_col, hbar_chart, {'x_scale': 1.5, 'y_scale': 1.5})
chart_start_row += 30
output.seek(0)
return output
@app.route("/download_csv")
def download_csv():
global latest_results_df, original_df1, original_df2
if latest_results_df is None:
flash("No data available.")
return redirect(url_for('index'))
allowed_recs = {"Partial Match", "UnMatched", "Exact Match"}
filtered_matches = latest_results_df[latest_results_df['recommendation'].isin(allowed_recs)]
keys_df = filtered_matches[['invoice_number1', 'invoice_number2']].copy()
df1_merged = pd.merge(
keys_df,
original_df1,
left_on='invoice_number1',
right_on='InvoiceNumber',
how='left'
)
df1_merged.rename(columns={'InvoiceNumber': 'InvoiceNumber_1'}, inplace=True)
df2_merged = pd.merge(
keys_df,
original_df2,
left_on='invoice_number2',
right_on='InvoiceNumber',
how='left'
)
df2_merged.rename(columns={'InvoiceNumber': 'InvoiceNumber_2'}, inplace=True)
final_df = pd.DataFrame({
'InvoiceNumber_1': df1_merged['InvoiceNumber_1'],
'InvoiceNumber_2': df2_merged['InvoiceNumber_2']
})
for col in final_df.select_dtypes(include=['object']).columns:
final_df[col] = final_df[col].str.strip()
final_df.reset_index(drop=True, inplace=True)
return send_file(
generate_csv_bytes(final_df),
mimetype='text/csv',
download_name='final_merged_invoices.csv',
as_attachment=True
)
@app.route("/download_excel")
def download_excel():
global latest_results_df
if latest_results_df is None:
flash("No data available.")
return redirect(url_for('index'))
df = latest_results_df.copy()
for col in ["editable", "comments"]:
if col in df.columns:
df.drop(columns=[col], inplace=True)
return send_file(
generate_excel_bytes(df),
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
download_name='matched_invoices.xlsx',
as_attachment=True
)
# New endpoint: Download summary statistics as Excel
@app.route("/download_stats_excel")
def download_stats_excel():
global latest_results_df
if latest_results_df is None:
flash("No data available for stats.")
return redirect(url_for('index'))
stats = get_stats(latest_results_df)
return send_file(
generate_stats_excel_bytes(stats),
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
download_name='invoice_matching_stats.xlsx',
as_attachment=True
)
# New endpoint: Download summary statistics as JSON
@app.route("/download_stats_json")
def download_stats_json():
global latest_results_df
if latest_results_df is None:
flash("No data available for stats.")
return redirect(url_for('index'))
stats = get_stats(latest_results_df)
return send_file(
generate_stats_json_bytes(stats),
mimetype='application/json',
download_name='invoice_matching_stats.json',
as_attachment=True
)
if __name__ == "__main__":
app.run(debug=True)