File size: 6,980 Bytes
943deb6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class Analytics:
def __init__(self, db_manager):
self.db = db_manager
def get_sales_summary(self):
"""Get comprehensive sales summary statistics"""
try:
sales_df = self.db.get_dataframe('sales')
payments_df = self.db.get_dataframe('payments')
if sales_df.empty:
return {
'total_sales': 0,
'total_payments': 0,
'pending_amount': 0,
'total_transactions': 0,
'avg_sale_value': 0
}
total_sales = sales_df['total_amount'].sum()
total_payments = payments_df['amount'].sum() if not payments_df.empty else 0
pending_amount = total_sales - total_payments
return {
'total_sales': total_sales,
'total_payments': total_payments,
'pending_amount': pending_amount,
'total_transactions': len(sales_df),
'avg_sale_value': sales_df['total_amount'].mean()
}
except Exception as e:
return {
'total_sales': 0,
'total_payments': 0,
'pending_amount': 0,
'total_transactions': 0,
'avg_sale_value': 0
}
def get_customer_analysis(self):
"""Analyze customer data"""
try:
customers_df = self.db.get_dataframe('customers')
sales_df = self.db.get_dataframe('sales')
if customers_df.empty:
return {
'total_customers': 0,
'village_distribution': {},
'top_customers': {}
}
# Customer distribution by village
village_stats = customers_df['village'].value_counts().head(10)
# Top customers by spending
if not sales_df.empty:
customer_sales = sales_df.groupby('customer_id')['total_amount'].sum()
top_customers = customer_sales.nlargest(10)
else:
top_customers = pd.Series(dtype=float)
return {
'total_customers': len(customers_df),
'village_distribution': village_stats.to_dict(),
'top_customers': top_customers.to_dict()
}
except Exception as e:
return {
'total_customers': 0,
'village_distribution': {},
'top_customers': {}
}
def get_payment_analysis(self):
"""Analyze payment data"""
try:
pending_payments = self.db.get_pending_payments()
payments_df = self.db.get_dataframe('payments')
if pending_payments.empty:
return {
'total_pending': 0,
'customer_pending': {},
'payment_methods': {}
}
# Group by customer
customer_pending = pending_payments.groupby('customer_id')['pending_amount'].sum()
# Payment method distribution
if not payments_df.empty:
payment_methods = payments_df['payment_method'].value_counts()
else:
payment_methods = pd.Series(dtype=object)
return {
'total_pending': pending_payments['pending_amount'].sum(),
'customer_pending': customer_pending.to_dict(),
'payment_methods': payment_methods.to_dict()
}
except Exception as e:
return {
'total_pending': 0,
'customer_pending': {},
'payment_methods': {}
}
def get_demo_conversion_rates(self):
"""Calculate demo conversion rates"""
try:
demos_df = self.db.get_demo_conversions()
if demos_df.empty:
return {
'total_demos': 0,
'converted_demos': 0,
'conversion_rate': 0
}
total_demos = len(demos_df)
converted_demos = len(demos_df[demos_df['conversion_status'] == 'Converted'])
conversion_rate = (converted_demos / total_demos) * 100 if total_demos > 0 else 0
return {
'total_demos': total_demos,
'converted_demos': converted_demos,
'conversion_rate': conversion_rate
}
except Exception as e:
return {
'total_demos': 0,
'converted_demos': 0,
'conversion_rate': 0
}
def get_sales_trend(self):
"""Get sales trend data for charts"""
try:
sales_df = self.db.get_dataframe('sales')
if sales_df.empty:
return pd.DataFrame()
# Convert sale_date to datetime if it's not
sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date'])
# Group by date
daily_sales = sales_df.groupby('sale_date')['total_amount'].sum().reset_index()
daily_sales = daily_sales.sort_values('sale_date')
return daily_sales
except Exception as e:
return pd.DataFrame()
def get_payment_distribution(self):
"""Get payment distribution for charts"""
try:
payments_df = self.db.get_dataframe('payments')
if payments_df.empty:
return pd.DataFrame()
payment_dist = payments_df.groupby('payment_method')['amount'].sum().reset_index()
return payment_dist
except Exception as e:
return pd.DataFrame()
def get_product_performance(self):
"""Get product performance data"""
try:
sale_items_df = self.db.get_dataframe('sale_items', '''
SELECT si.*, p.product_name
FROM sale_items si
JOIN products p ON si.product_id = p.product_id
''')
if sale_items_df.empty:
return pd.DataFrame()
product_perf = sale_items_df.groupby('product_name').agg({
'quantity': 'sum',
'amount': 'sum'
}).reset_index()
return product_perf
except Exception as e:
return pd.DataFrame() |