Invoice_Digitization_Agent / backend /etl /update_customer_aggregates_sqlite.py
Dipan04's picture
Deploy Invoice Digitization Agent
8a859a8
import sqlite3
import pandas as pd
from pathlib import Path
from filelock import FileLock
from datetime import datetime
DB_PATH = Path(__file__).parent.parent.parent / "data" / "invoices.db"
LOCK_PATH = Path(__file__).parent.parent.parent / "data" / "invoices.db.lock"
def get_most_common(series):
"""Get mode (most common value)."""
if series.empty:
return None
return series.mode()[0] if not series.mode().empty else None
def update_customer_aggregates():
"""Compute and update customer aggregates."""
print("πŸ”„ Starting customer aggregates computation...")
with FileLock(str(LOCK_PATH)):
conn = sqlite3.connect(str(DB_PATH))
# Load cleared invoices
df = pd.read_sql_query("""
SELECT
cust_number,
days_to_clear,
total_open_amount,
is_overdue,
cust_payment_terms,
business_code,
invoice_currency
FROM invoices_history
WHERE clear_date IS NOT NULL
""", conn)
if df.empty:
print("⚠️ No cleared invoices found")
conn.close()
return
print(f"πŸ“Š Processing {len(df)} cleared invoices...")
# Compute aggregates per customer
agg_results = []
for cust_number, group in df.groupby('cust_number'):
agg = {
'cust_number': cust_number,
'cust_invoice_count': len(group),
'cust_cleared_count': len(group),
'cust_avg_days': round(group['days_to_clear'].mean(), 2),
'cust_median_days': round(group['days_to_clear'].median(), 2),
'cust_std_days': round(group['days_to_clear'].std(), 2) if len(group) > 1 else 0.0,
'cust_min_days': int(group['days_to_clear'].min()),
'cust_max_days': int(group['days_to_clear'].max()),
'cust_avg_amount': round(group['total_open_amount'].mean(), 2),
'cust_total_amount': round(group['total_open_amount'].sum(), 2),
'cust_pct_overdue': round((group['is_overdue'].sum() / len(group)) * 100, 2),
'most_common_payment_term': get_most_common(group['cust_payment_terms']),
'most_common_business_code': get_most_common(group['business_code']),
'most_common_currency': get_most_common(group['invoice_currency'])
}
agg_results.append(agg)
# Upsert into customer_aggregates
cursor = conn.cursor()
for agg in agg_results:
cursor.execute("""
INSERT OR REPLACE INTO customer_aggregates (
cust_number, cust_invoice_count, cust_cleared_count,
cust_avg_days, cust_median_days, cust_std_days,
cust_min_days, cust_max_days,
cust_avg_amount, cust_total_amount, cust_pct_overdue,
most_common_payment_term, most_common_business_code,
most_common_currency, last_computed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (
agg['cust_number'], agg['cust_invoice_count'], agg['cust_cleared_count'],
agg['cust_avg_days'], agg['cust_median_days'], agg['cust_std_days'],
agg['cust_min_days'], agg['cust_max_days'],
agg['cust_avg_amount'], agg['cust_total_amount'], agg['cust_pct_overdue'],
agg['most_common_payment_term'], agg['most_common_business_code'],
agg['most_common_currency']
))
conn.commit()
print(f"βœ… Updated {len(agg_results)} customer aggregates")
conn.close()
def update_payment_terms_aggregates():
"""Compute and update payment terms aggregates."""
print("πŸ”„ Computing payment terms aggregates...")
with FileLock(str(LOCK_PATH)):
conn = sqlite3.connect(str(DB_PATH))
df = pd.read_sql_query("""
SELECT cust_payment_terms, days_to_clear
FROM invoices_history
WHERE clear_date IS NOT NULL AND cust_payment_terms IS NOT NULL
""", conn)
if df.empty:
print("⚠️ No data for payment terms")
conn.close()
return
agg = df.groupby('cust_payment_terms')['days_to_clear'].agg(['mean', 'median', 'count']).reset_index()
agg.columns = ['cust_payment_terms', 'payment_terms_avg_days', 'payment_terms_median_days', 'payment_terms_count']
cursor = conn.cursor()
for _, row in agg.iterrows():
cursor.execute("""
INSERT OR REPLACE INTO payment_terms_aggregates (
cust_payment_terms, payment_terms_avg_days, payment_terms_median_days,
payment_terms_count, last_computed_at
) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (
row['cust_payment_terms'],
round(row['payment_terms_avg_days'], 2),
round(row['payment_terms_median_days'], 2),
int(row['payment_terms_count'])
))
conn.commit()
print(f"βœ… Updated {len(agg)} payment terms aggregates")
conn.close()
def update_business_code_aggregates():
"""Compute and update business code aggregates."""
print("πŸ”„ Computing business code aggregates...")
with FileLock(str(LOCK_PATH)):
conn = sqlite3.connect(str(DB_PATH))
df = pd.read_sql_query("""
SELECT business_code, days_to_clear
FROM invoices_history
WHERE clear_date IS NOT NULL AND business_code IS NOT NULL
""", conn)
if df.empty:
print("⚠️ No data for business codes")
conn.close()
return
agg = df.groupby('business_code')['days_to_clear'].agg(['mean', 'median', 'count']).reset_index()
agg.columns = ['business_code', 'business_avg_days', 'business_median_days', 'business_count']
cursor = conn.cursor()
for _, row in agg.iterrows():
cursor.execute("""
INSERT OR REPLACE INTO business_code_aggregates (
business_code, business_avg_days, business_median_days,
business_count, last_computed_at
) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (
row['business_code'],
round(row['business_avg_days'], 2),
round(row['business_median_days'], 2),
int(row['business_count'])
))
conn.commit()
print(f"βœ… Updated {len(agg)} business code aggregates")
conn.close()
if __name__ == "__main__":
print("="*60)
print("πŸš€ ETL: Updating Aggregates")
print("="*60)
update_customer_aggregates()
update_payment_terms_aggregates()
update_business_code_aggregates()
print("\nβœ… All aggregates updated successfully!")