ReconAI / reconciliation.py
ACA050's picture
Upload 14 files
64e5ee2 verified
import pandas as pd
import numpy as np
from rapidfuzz import fuzz
from sentence_transformers import SentenceTransformer
import faiss
import os
import pickle
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ReconciliationEngine:
def __init__(self, threshold=85.0, model_name='all-MiniLM-L6-v2', index_path='vendor_index.faiss'):
self.threshold = threshold
self.model = SentenceTransformer(model_name)
self.index_path = index_path
self.vendor_mapping_path = 'vendor_mapping.pkl'
self.index = None
self.vendor_names = []
self._load_or_create_index()
def _load_or_create_index(self):
# We need dimension size for the chosen model. MiniLM-L6-v2 is 384
d = self.model.get_sentence_embedding_dimension()
if os.path.exists(self.index_path) and os.path.exists(self.vendor_mapping_path):
logger.info("Loading existing FAISS index.")
self.index = faiss.read_index(self.index_path)
with open(self.vendor_mapping_path, 'rb') as f:
self.vendor_names = pickle.load(f)
else:
logger.info("Creating new FAISS index.")
self.index = faiss.IndexFlatL2(d)
self.vendor_names = []
def _save_index(self):
faiss.write_index(self.index, self.index_path)
with open(self.vendor_mapping_path, 'wb') as f:
pickle.dump(self.vendor_names, f)
def learn_vendors(self, vendors):
"""Adds new vendors to the FAISS index."""
if not hasattr(self, 'embedding_cache'):
self.embedding_cache = {}
new_vendors = [v for v in set(vendors) if pd.notna(v) and v not in self.vendor_names]
if new_vendors:
logger.info(f"Learning {len(new_vendors)} new vendors.")
embeddings = self.model.encode(new_vendors)
self.index.add(np.array(embeddings).astype('float32'))
self.vendor_names.extend(new_vendors)
# Pre-cache to speed up pair-wise matching later
for v, emb in zip(new_vendors, embeddings):
self.embedding_cache[v] = emb / np.linalg.norm(emb)
self._save_index()
def get_embedding(self, vendor):
if not hasattr(self, 'embedding_cache'):
self.embedding_cache = {}
if vendor not in self.embedding_cache:
emb = self.model.encode([vendor])[0]
self.embedding_cache[vendor] = emb / np.linalg.norm(emb)
return self.embedding_cache[vendor]
def get_semantic_similarity(self, vendor1, vendor2):
if pd.isna(vendor1) or pd.isna(vendor2):
return 0.0
emb1_norm = self.get_embedding(vendor1)
emb2_norm = self.get_embedding(vendor2)
sim = np.dot(emb1_norm, emb2_norm)
return max(0.0, sim * 100)
def search_similar_vendor(self, query_vendor, top_k=1):
if not self.vendor_names or pd.isna(query_vendor):
return None, 0.0
query_emb = self.model.encode([query_vendor]).astype('float32')
distances, indices = self.index.search(query_emb, top_k)
best_idx = indices[0][0]
if best_idx != -1:
best_match = self.vendor_names[best_idx]
# Calculate a normalized score based on L2 distance
# For normalized vectors, L2 distance squared is 2 - 2*cos(theta)
# This is a rough proxy; let's combine with fuzz for the final score
fuzz_score = fuzz.ratio(query_vendor.lower(), best_match.lower())
return best_match, fuzz_score
return None, 0.0
def reconcile(self, source_df, target_df, source_key='VendorName', target_key='VendorName', amount_col='Amount'):
logger.info("Starting reconciliation process.")
# Learn vendors from both datasets
self.learn_vendors(source_df[source_key].tolist())
self.learn_vendors(target_df[target_key].tolist())
# Basic exact match on InvoiceID if it exists, otherwise we match on VendorName and Amount
if 'InvoiceID' in source_df.columns and 'InvoiceID' in target_df.columns:
source_df = source_df.drop_duplicates(subset=['InvoiceID'])
target_df = target_df.drop_duplicates(subset=['InvoiceID'])
merged = pd.merge(source_df, target_df, on='InvoiceID', how='outer', suffixes=('_books', '_gst'))
def determine_status(row):
if pd.isna(row.get(f'{amount_col}_books')):
return "Missing in Books"
if pd.isna(row.get(f'{amount_col}_gst')):
return "Missing in GST"
b_amt = float(row.get(f'{amount_col}_books', 0))
g_amt = float(row.get(f'{amount_col}_gst', 0))
if abs(b_amt - g_amt) > 0.01:
return "Amount Mismatch"
b_vendor_val = row.get(f'{source_key}_books')
g_vendor_val = row.get(f'{target_key}_gst')
b_vendor = str(b_vendor_val) if pd.notna(b_vendor_val) else ''
g_vendor = str(g_vendor_val) if pd.notna(g_vendor_val) else ''
if b_vendor.lower() == g_vendor.lower() and b_vendor != '':
return "Exact Match"
fuzz_score = fuzz.ratio(b_vendor.lower(), g_vendor.lower())
if fuzz_score >= self.threshold:
return f"Fuzzy Match ({fuzz_score:.1f}%)"
sem_score = self.get_semantic_similarity(b_vendor, g_vendor)
if sem_score >= self.threshold:
return f"Semantic Match ({sem_score:.1f}%)"
return "Vendor Mismatch"
merged['MatchStatus'] = merged.apply(determine_status, axis=1)
return merged
else:
raise ValueError("InvoiceID column is required for current reconciliation logic.")