import pandas as pd import numpy as np from rapidfuzz import fuzz from sentence_transformers import SentenceTransformer import faiss import os import pickle import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ReconciliationEngine: def __init__(self, threshold=85.0, model_name='all-MiniLM-L6-v2', index_path='vendor_index.faiss'): self.threshold = threshold self.model = SentenceTransformer(model_name) self.index_path = index_path self.vendor_mapping_path = 'vendor_mapping.pkl' self.index = None self.vendor_names = [] self._load_or_create_index() def _load_or_create_index(self): # We need dimension size for the chosen model. MiniLM-L6-v2 is 384 d = self.model.get_sentence_embedding_dimension() if os.path.exists(self.index_path) and os.path.exists(self.vendor_mapping_path): logger.info("Loading existing FAISS index.") self.index = faiss.read_index(self.index_path) with open(self.vendor_mapping_path, 'rb') as f: self.vendor_names = pickle.load(f) else: logger.info("Creating new FAISS index.") self.index = faiss.IndexFlatL2(d) self.vendor_names = [] def _save_index(self): faiss.write_index(self.index, self.index_path) with open(self.vendor_mapping_path, 'wb') as f: pickle.dump(self.vendor_names, f) def learn_vendors(self, vendors): """Adds new vendors to the FAISS index.""" if not hasattr(self, 'embedding_cache'): self.embedding_cache = {} new_vendors = [v for v in set(vendors) if pd.notna(v) and v not in self.vendor_names] if new_vendors: logger.info(f"Learning {len(new_vendors)} new vendors.") embeddings = self.model.encode(new_vendors) self.index.add(np.array(embeddings).astype('float32')) self.vendor_names.extend(new_vendors) # Pre-cache to speed up pair-wise matching later for v, emb in zip(new_vendors, embeddings): self.embedding_cache[v] = emb / np.linalg.norm(emb) self._save_index() def get_embedding(self, vendor): if not hasattr(self, 'embedding_cache'): self.embedding_cache = {} if vendor not in self.embedding_cache: emb = self.model.encode([vendor])[0] self.embedding_cache[vendor] = emb / np.linalg.norm(emb) return self.embedding_cache[vendor] def get_semantic_similarity(self, vendor1, vendor2): if pd.isna(vendor1) or pd.isna(vendor2): return 0.0 emb1_norm = self.get_embedding(vendor1) emb2_norm = self.get_embedding(vendor2) sim = np.dot(emb1_norm, emb2_norm) return max(0.0, sim * 100) def search_similar_vendor(self, query_vendor, top_k=1): if not self.vendor_names or pd.isna(query_vendor): return None, 0.0 query_emb = self.model.encode([query_vendor]).astype('float32') distances, indices = self.index.search(query_emb, top_k) best_idx = indices[0][0] if best_idx != -1: best_match = self.vendor_names[best_idx] # Calculate a normalized score based on L2 distance # For normalized vectors, L2 distance squared is 2 - 2*cos(theta) # This is a rough proxy; let's combine with fuzz for the final score fuzz_score = fuzz.ratio(query_vendor.lower(), best_match.lower()) return best_match, fuzz_score return None, 0.0 def reconcile(self, source_df, target_df, source_key='VendorName', target_key='VendorName', amount_col='Amount'): logger.info("Starting reconciliation process.") # Learn vendors from both datasets self.learn_vendors(source_df[source_key].tolist()) self.learn_vendors(target_df[target_key].tolist()) # Basic exact match on InvoiceID if it exists, otherwise we match on VendorName and Amount if 'InvoiceID' in source_df.columns and 'InvoiceID' in target_df.columns: source_df = source_df.drop_duplicates(subset=['InvoiceID']) target_df = target_df.drop_duplicates(subset=['InvoiceID']) merged = pd.merge(source_df, target_df, on='InvoiceID', how='outer', suffixes=('_books', '_gst')) def determine_status(row): if pd.isna(row.get(f'{amount_col}_books')): return "Missing in Books" if pd.isna(row.get(f'{amount_col}_gst')): return "Missing in GST" b_amt = float(row.get(f'{amount_col}_books', 0)) g_amt = float(row.get(f'{amount_col}_gst', 0)) if abs(b_amt - g_amt) > 0.01: return "Amount Mismatch" b_vendor_val = row.get(f'{source_key}_books') g_vendor_val = row.get(f'{target_key}_gst') b_vendor = str(b_vendor_val) if pd.notna(b_vendor_val) else '' g_vendor = str(g_vendor_val) if pd.notna(g_vendor_val) else '' if b_vendor.lower() == g_vendor.lower() and b_vendor != '': return "Exact Match" fuzz_score = fuzz.ratio(b_vendor.lower(), g_vendor.lower()) if fuzz_score >= self.threshold: return f"Fuzzy Match ({fuzz_score:.1f}%)" sem_score = self.get_semantic_similarity(b_vendor, g_vendor) if sem_score >= self.threshold: return f"Semantic Match ({sem_score:.1f}%)" return "Vendor Mismatch" merged['MatchStatus'] = merged.apply(determine_status, axis=1) return merged else: raise ValueError("InvoiceID column is required for current reconciliation logic.")