import sqlite3 import pandas as pd from pathlib import Path from filelock import FileLock from datetime import datetime DB_PATH = Path(__file__).parent.parent.parent / "data" / "invoices.db" LOCK_PATH = Path(__file__).parent.parent.parent / "data" / "invoices.db.lock" def get_most_common(series): """Get mode (most common value).""" if series.empty: return None return series.mode()[0] if not series.mode().empty else None def update_customer_aggregates(): """Compute and update customer aggregates.""" print("šŸ”„ Starting customer aggregates computation...") with FileLock(str(LOCK_PATH)): conn = sqlite3.connect(str(DB_PATH)) # Load cleared invoices df = pd.read_sql_query(""" SELECT cust_number, days_to_clear, total_open_amount, is_overdue, cust_payment_terms, business_code, invoice_currency FROM invoices_history WHERE clear_date IS NOT NULL """, conn) if df.empty: print("āš ļø No cleared invoices found") conn.close() return print(f"šŸ“Š Processing {len(df)} cleared invoices...") # Compute aggregates per customer agg_results = [] for cust_number, group in df.groupby('cust_number'): agg = { 'cust_number': cust_number, 'cust_invoice_count': len(group), 'cust_cleared_count': len(group), 'cust_avg_days': round(group['days_to_clear'].mean(), 2), 'cust_median_days': round(group['days_to_clear'].median(), 2), 'cust_std_days': round(group['days_to_clear'].std(), 2) if len(group) > 1 else 0.0, 'cust_min_days': int(group['days_to_clear'].min()), 'cust_max_days': int(group['days_to_clear'].max()), 'cust_avg_amount': round(group['total_open_amount'].mean(), 2), 'cust_total_amount': round(group['total_open_amount'].sum(), 2), 'cust_pct_overdue': round((group['is_overdue'].sum() / len(group)) * 100, 2), 'most_common_payment_term': get_most_common(group['cust_payment_terms']), 'most_common_business_code': get_most_common(group['business_code']), 'most_common_currency': get_most_common(group['invoice_currency']) } agg_results.append(agg) # Upsert into customer_aggregates cursor = conn.cursor() for agg in agg_results: cursor.execute(""" INSERT OR REPLACE INTO customer_aggregates ( cust_number, cust_invoice_count, cust_cleared_count, cust_avg_days, cust_median_days, cust_std_days, cust_min_days, cust_max_days, cust_avg_amount, cust_total_amount, cust_pct_overdue, most_common_payment_term, most_common_business_code, most_common_currency, last_computed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) """, ( agg['cust_number'], agg['cust_invoice_count'], agg['cust_cleared_count'], agg['cust_avg_days'], agg['cust_median_days'], agg['cust_std_days'], agg['cust_min_days'], agg['cust_max_days'], agg['cust_avg_amount'], agg['cust_total_amount'], agg['cust_pct_overdue'], agg['most_common_payment_term'], agg['most_common_business_code'], agg['most_common_currency'] )) conn.commit() print(f"āœ… Updated {len(agg_results)} customer aggregates") conn.close() def update_payment_terms_aggregates(): """Compute and update payment terms aggregates.""" print("šŸ”„ Computing payment terms aggregates...") with FileLock(str(LOCK_PATH)): conn = sqlite3.connect(str(DB_PATH)) df = pd.read_sql_query(""" SELECT cust_payment_terms, days_to_clear FROM invoices_history WHERE clear_date IS NOT NULL AND cust_payment_terms IS NOT NULL """, conn) if df.empty: print("āš ļø No data for payment terms") conn.close() return agg = df.groupby('cust_payment_terms')['days_to_clear'].agg(['mean', 'median', 'count']).reset_index() agg.columns = ['cust_payment_terms', 'payment_terms_avg_days', 'payment_terms_median_days', 'payment_terms_count'] cursor = conn.cursor() for _, row in agg.iterrows(): cursor.execute(""" INSERT OR REPLACE INTO payment_terms_aggregates ( cust_payment_terms, payment_terms_avg_days, payment_terms_median_days, payment_terms_count, last_computed_at ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) """, ( row['cust_payment_terms'], round(row['payment_terms_avg_days'], 2), round(row['payment_terms_median_days'], 2), int(row['payment_terms_count']) )) conn.commit() print(f"āœ… Updated {len(agg)} payment terms aggregates") conn.close() def update_business_code_aggregates(): """Compute and update business code aggregates.""" print("šŸ”„ Computing business code aggregates...") with FileLock(str(LOCK_PATH)): conn = sqlite3.connect(str(DB_PATH)) df = pd.read_sql_query(""" SELECT business_code, days_to_clear FROM invoices_history WHERE clear_date IS NOT NULL AND business_code IS NOT NULL """, conn) if df.empty: print("āš ļø No data for business codes") conn.close() return agg = df.groupby('business_code')['days_to_clear'].agg(['mean', 'median', 'count']).reset_index() agg.columns = ['business_code', 'business_avg_days', 'business_median_days', 'business_count'] cursor = conn.cursor() for _, row in agg.iterrows(): cursor.execute(""" INSERT OR REPLACE INTO business_code_aggregates ( business_code, business_avg_days, business_median_days, business_count, last_computed_at ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) """, ( row['business_code'], round(row['business_avg_days'], 2), round(row['business_median_days'], 2), int(row['business_count']) )) conn.commit() print(f"āœ… Updated {len(agg)} business code aggregates") conn.close() if __name__ == "__main__": print("="*60) print("šŸš€ ETL: Updating Aggregates") print("="*60) update_customer_aggregates() update_payment_terms_aggregates() update_business_code_aggregates() print("\nāœ… All aggregates updated successfully!")