Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import pandas as pd | |
| from pathlib import Path | |
| from filelock import FileLock | |
| from datetime import datetime | |
| DB_PATH = Path(__file__).parent.parent.parent / "data" / "invoices.db" | |
| LOCK_PATH = Path(__file__).parent.parent.parent / "data" / "invoices.db.lock" | |
| def get_most_common(series): | |
| """Get mode (most common value).""" | |
| if series.empty: | |
| return None | |
| return series.mode()[0] if not series.mode().empty else None | |
| def update_customer_aggregates(): | |
| """Compute and update customer aggregates.""" | |
| print("π Starting customer aggregates computation...") | |
| with FileLock(str(LOCK_PATH)): | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| # Load cleared invoices | |
| df = pd.read_sql_query(""" | |
| SELECT | |
| cust_number, | |
| days_to_clear, | |
| total_open_amount, | |
| is_overdue, | |
| cust_payment_terms, | |
| business_code, | |
| invoice_currency | |
| FROM invoices_history | |
| WHERE clear_date IS NOT NULL | |
| """, conn) | |
| if df.empty: | |
| print("β οΈ No cleared invoices found") | |
| conn.close() | |
| return | |
| print(f"π Processing {len(df)} cleared invoices...") | |
| # Compute aggregates per customer | |
| agg_results = [] | |
| for cust_number, group in df.groupby('cust_number'): | |
| agg = { | |
| 'cust_number': cust_number, | |
| 'cust_invoice_count': len(group), | |
| 'cust_cleared_count': len(group), | |
| 'cust_avg_days': round(group['days_to_clear'].mean(), 2), | |
| 'cust_median_days': round(group['days_to_clear'].median(), 2), | |
| 'cust_std_days': round(group['days_to_clear'].std(), 2) if len(group) > 1 else 0.0, | |
| 'cust_min_days': int(group['days_to_clear'].min()), | |
| 'cust_max_days': int(group['days_to_clear'].max()), | |
| 'cust_avg_amount': round(group['total_open_amount'].mean(), 2), | |
| 'cust_total_amount': round(group['total_open_amount'].sum(), 2), | |
| 'cust_pct_overdue': round((group['is_overdue'].sum() / len(group)) * 100, 2), | |
| 'most_common_payment_term': get_most_common(group['cust_payment_terms']), | |
| 'most_common_business_code': get_most_common(group['business_code']), | |
| 'most_common_currency': get_most_common(group['invoice_currency']) | |
| } | |
| agg_results.append(agg) | |
| # Upsert into customer_aggregates | |
| cursor = conn.cursor() | |
| for agg in agg_results: | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO customer_aggregates ( | |
| cust_number, cust_invoice_count, cust_cleared_count, | |
| cust_avg_days, cust_median_days, cust_std_days, | |
| cust_min_days, cust_max_days, | |
| cust_avg_amount, cust_total_amount, cust_pct_overdue, | |
| most_common_payment_term, most_common_business_code, | |
| most_common_currency, last_computed_at | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) | |
| """, ( | |
| agg['cust_number'], agg['cust_invoice_count'], agg['cust_cleared_count'], | |
| agg['cust_avg_days'], agg['cust_median_days'], agg['cust_std_days'], | |
| agg['cust_min_days'], agg['cust_max_days'], | |
| agg['cust_avg_amount'], agg['cust_total_amount'], agg['cust_pct_overdue'], | |
| agg['most_common_payment_term'], agg['most_common_business_code'], | |
| agg['most_common_currency'] | |
| )) | |
| conn.commit() | |
| print(f"β Updated {len(agg_results)} customer aggregates") | |
| conn.close() | |
| def update_payment_terms_aggregates(): | |
| """Compute and update payment terms aggregates.""" | |
| print("π Computing payment terms aggregates...") | |
| with FileLock(str(LOCK_PATH)): | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| df = pd.read_sql_query(""" | |
| SELECT cust_payment_terms, days_to_clear | |
| FROM invoices_history | |
| WHERE clear_date IS NOT NULL AND cust_payment_terms IS NOT NULL | |
| """, conn) | |
| if df.empty: | |
| print("β οΈ No data for payment terms") | |
| conn.close() | |
| return | |
| agg = df.groupby('cust_payment_terms')['days_to_clear'].agg(['mean', 'median', 'count']).reset_index() | |
| agg.columns = ['cust_payment_terms', 'payment_terms_avg_days', 'payment_terms_median_days', 'payment_terms_count'] | |
| cursor = conn.cursor() | |
| for _, row in agg.iterrows(): | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO payment_terms_aggregates ( | |
| cust_payment_terms, payment_terms_avg_days, payment_terms_median_days, | |
| payment_terms_count, last_computed_at | |
| ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) | |
| """, ( | |
| row['cust_payment_terms'], | |
| round(row['payment_terms_avg_days'], 2), | |
| round(row['payment_terms_median_days'], 2), | |
| int(row['payment_terms_count']) | |
| )) | |
| conn.commit() | |
| print(f"β Updated {len(agg)} payment terms aggregates") | |
| conn.close() | |
| def update_business_code_aggregates(): | |
| """Compute and update business code aggregates.""" | |
| print("π Computing business code aggregates...") | |
| with FileLock(str(LOCK_PATH)): | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| df = pd.read_sql_query(""" | |
| SELECT business_code, days_to_clear | |
| FROM invoices_history | |
| WHERE clear_date IS NOT NULL AND business_code IS NOT NULL | |
| """, conn) | |
| if df.empty: | |
| print("β οΈ No data for business codes") | |
| conn.close() | |
| return | |
| agg = df.groupby('business_code')['days_to_clear'].agg(['mean', 'median', 'count']).reset_index() | |
| agg.columns = ['business_code', 'business_avg_days', 'business_median_days', 'business_count'] | |
| cursor = conn.cursor() | |
| for _, row in agg.iterrows(): | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO business_code_aggregates ( | |
| business_code, business_avg_days, business_median_days, | |
| business_count, last_computed_at | |
| ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) | |
| """, ( | |
| row['business_code'], | |
| round(row['business_avg_days'], 2), | |
| round(row['business_median_days'], 2), | |
| int(row['business_count']) | |
| )) | |
| conn.commit() | |
| print(f"β Updated {len(agg)} business code aggregates") | |
| conn.close() | |
| if __name__ == "__main__": | |
| print("="*60) | |
| print("π ETL: Updating Aggregates") | |
| print("="*60) | |
| update_customer_aggregates() | |
| update_payment_terms_aggregates() | |
| update_business_code_aggregates() | |
| print("\nβ All aggregates updated successfully!") |