| import pandas as pd |
| import numpy as np |
| from rapidfuzz import fuzz |
| from sentence_transformers import SentenceTransformer |
| import faiss |
| import os |
| import pickle |
| import logging |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class ReconciliationEngine: |
| def __init__(self, threshold=85.0, model_name='all-MiniLM-L6-v2', index_path='vendor_index.faiss'): |
| self.threshold = threshold |
| self.model = SentenceTransformer(model_name) |
| self.index_path = index_path |
| self.vendor_mapping_path = 'vendor_mapping.pkl' |
| self.index = None |
| self.vendor_names = [] |
| self._load_or_create_index() |
|
|
| def _load_or_create_index(self): |
| |
| d = self.model.get_sentence_embedding_dimension() |
| |
| if os.path.exists(self.index_path) and os.path.exists(self.vendor_mapping_path): |
| logger.info("Loading existing FAISS index.") |
| self.index = faiss.read_index(self.index_path) |
| with open(self.vendor_mapping_path, 'rb') as f: |
| self.vendor_names = pickle.load(f) |
| else: |
| logger.info("Creating new FAISS index.") |
| self.index = faiss.IndexFlatL2(d) |
| self.vendor_names = [] |
|
|
| def _save_index(self): |
| faiss.write_index(self.index, self.index_path) |
| with open(self.vendor_mapping_path, 'wb') as f: |
| pickle.dump(self.vendor_names, f) |
|
|
| def learn_vendors(self, vendors): |
| """Adds new vendors to the FAISS index.""" |
| if not hasattr(self, 'embedding_cache'): |
| self.embedding_cache = {} |
| |
| new_vendors = [v for v in set(vendors) if pd.notna(v) and v not in self.vendor_names] |
| if new_vendors: |
| logger.info(f"Learning {len(new_vendors)} new vendors.") |
| embeddings = self.model.encode(new_vendors) |
| self.index.add(np.array(embeddings).astype('float32')) |
| self.vendor_names.extend(new_vendors) |
| |
| |
| for v, emb in zip(new_vendors, embeddings): |
| self.embedding_cache[v] = emb / np.linalg.norm(emb) |
| |
| self._save_index() |
|
|
| def get_embedding(self, vendor): |
| if not hasattr(self, 'embedding_cache'): |
| self.embedding_cache = {} |
| if vendor not in self.embedding_cache: |
| emb = self.model.encode([vendor])[0] |
| self.embedding_cache[vendor] = emb / np.linalg.norm(emb) |
| return self.embedding_cache[vendor] |
|
|
| def get_semantic_similarity(self, vendor1, vendor2): |
| if pd.isna(vendor1) or pd.isna(vendor2): |
| return 0.0 |
| emb1_norm = self.get_embedding(vendor1) |
| emb2_norm = self.get_embedding(vendor2) |
| sim = np.dot(emb1_norm, emb2_norm) |
| return max(0.0, sim * 100) |
|
|
| def search_similar_vendor(self, query_vendor, top_k=1): |
| if not self.vendor_names or pd.isna(query_vendor): |
| return None, 0.0 |
| |
| query_emb = self.model.encode([query_vendor]).astype('float32') |
| distances, indices = self.index.search(query_emb, top_k) |
| |
| best_idx = indices[0][0] |
| if best_idx != -1: |
| best_match = self.vendor_names[best_idx] |
| |
| |
| |
| fuzz_score = fuzz.ratio(query_vendor.lower(), best_match.lower()) |
| return best_match, fuzz_score |
| return None, 0.0 |
|
|
| def reconcile(self, source_df, target_df, source_key='VendorName', target_key='VendorName', amount_col='Amount'): |
| logger.info("Starting reconciliation process.") |
| |
| |
| self.learn_vendors(source_df[source_key].tolist()) |
| self.learn_vendors(target_df[target_key].tolist()) |
|
|
| |
| if 'InvoiceID' in source_df.columns and 'InvoiceID' in target_df.columns: |
| source_df = source_df.drop_duplicates(subset=['InvoiceID']) |
| target_df = target_df.drop_duplicates(subset=['InvoiceID']) |
| merged = pd.merge(source_df, target_df, on='InvoiceID', how='outer', suffixes=('_books', '_gst')) |
| |
| def determine_status(row): |
| if pd.isna(row.get(f'{amount_col}_books')): |
| return "Missing in Books" |
| if pd.isna(row.get(f'{amount_col}_gst')): |
| return "Missing in GST" |
| |
| b_amt = float(row.get(f'{amount_col}_books', 0)) |
| g_amt = float(row.get(f'{amount_col}_gst', 0)) |
| |
| if abs(b_amt - g_amt) > 0.01: |
| return "Amount Mismatch" |
| |
| b_vendor_val = row.get(f'{source_key}_books') |
| g_vendor_val = row.get(f'{target_key}_gst') |
| b_vendor = str(b_vendor_val) if pd.notna(b_vendor_val) else '' |
| g_vendor = str(g_vendor_val) if pd.notna(g_vendor_val) else '' |
| |
| if b_vendor.lower() == g_vendor.lower() and b_vendor != '': |
| return "Exact Match" |
| |
| fuzz_score = fuzz.ratio(b_vendor.lower(), g_vendor.lower()) |
| if fuzz_score >= self.threshold: |
| return f"Fuzzy Match ({fuzz_score:.1f}%)" |
| |
| sem_score = self.get_semantic_similarity(b_vendor, g_vendor) |
| if sem_score >= self.threshold: |
| return f"Semantic Match ({sem_score:.1f}%)" |
|
|
| return "Vendor Mismatch" |
| |
| merged['MatchStatus'] = merged.apply(determine_status, axis=1) |
| return merged |
| else: |
| raise ValueError("InvoiceID column is required for current reconciliation logic.") |
|
|