pcsales / analytics.py
omgy's picture
Upload 12 files
943deb6 verified
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class Analytics:
def __init__(self, db_manager):
self.db = db_manager
def get_sales_summary(self):
"""Get comprehensive sales summary statistics"""
try:
sales_df = self.db.get_dataframe('sales')
payments_df = self.db.get_dataframe('payments')
if sales_df.empty:
return {
'total_sales': 0,
'total_payments': 0,
'pending_amount': 0,
'total_transactions': 0,
'avg_sale_value': 0
}
total_sales = sales_df['total_amount'].sum()
total_payments = payments_df['amount'].sum() if not payments_df.empty else 0
pending_amount = total_sales - total_payments
return {
'total_sales': total_sales,
'total_payments': total_payments,
'pending_amount': pending_amount,
'total_transactions': len(sales_df),
'avg_sale_value': sales_df['total_amount'].mean()
}
except Exception as e:
return {
'total_sales': 0,
'total_payments': 0,
'pending_amount': 0,
'total_transactions': 0,
'avg_sale_value': 0
}
def get_customer_analysis(self):
"""Analyze customer data"""
try:
customers_df = self.db.get_dataframe('customers')
sales_df = self.db.get_dataframe('sales')
if customers_df.empty:
return {
'total_customers': 0,
'village_distribution': {},
'top_customers': {}
}
# Customer distribution by village
village_stats = customers_df['village'].value_counts().head(10)
# Top customers by spending
if not sales_df.empty:
customer_sales = sales_df.groupby('customer_id')['total_amount'].sum()
top_customers = customer_sales.nlargest(10)
else:
top_customers = pd.Series(dtype=float)
return {
'total_customers': len(customers_df),
'village_distribution': village_stats.to_dict(),
'top_customers': top_customers.to_dict()
}
except Exception as e:
return {
'total_customers': 0,
'village_distribution': {},
'top_customers': {}
}
def get_payment_analysis(self):
"""Analyze payment data"""
try:
pending_payments = self.db.get_pending_payments()
payments_df = self.db.get_dataframe('payments')
if pending_payments.empty:
return {
'total_pending': 0,
'customer_pending': {},
'payment_methods': {}
}
# Group by customer
customer_pending = pending_payments.groupby('customer_id')['pending_amount'].sum()
# Payment method distribution
if not payments_df.empty:
payment_methods = payments_df['payment_method'].value_counts()
else:
payment_methods = pd.Series(dtype=object)
return {
'total_pending': pending_payments['pending_amount'].sum(),
'customer_pending': customer_pending.to_dict(),
'payment_methods': payment_methods.to_dict()
}
except Exception as e:
return {
'total_pending': 0,
'customer_pending': {},
'payment_methods': {}
}
def get_demo_conversion_rates(self):
"""Calculate demo conversion rates"""
try:
demos_df = self.db.get_demo_conversions()
if demos_df.empty:
return {
'total_demos': 0,
'converted_demos': 0,
'conversion_rate': 0
}
total_demos = len(demos_df)
converted_demos = len(demos_df[demos_df['conversion_status'] == 'Converted'])
conversion_rate = (converted_demos / total_demos) * 100 if total_demos > 0 else 0
return {
'total_demos': total_demos,
'converted_demos': converted_demos,
'conversion_rate': conversion_rate
}
except Exception as e:
return {
'total_demos': 0,
'converted_demos': 0,
'conversion_rate': 0
}
def get_sales_trend(self):
"""Get sales trend data for charts"""
try:
sales_df = self.db.get_dataframe('sales')
if sales_df.empty:
return pd.DataFrame()
# Convert sale_date to datetime if it's not
sales_df['sale_date'] = pd.to_datetime(sales_df['sale_date'])
# Group by date
daily_sales = sales_df.groupby('sale_date')['total_amount'].sum().reset_index()
daily_sales = daily_sales.sort_values('sale_date')
return daily_sales
except Exception as e:
return pd.DataFrame()
def get_payment_distribution(self):
"""Get payment distribution for charts"""
try:
payments_df = self.db.get_dataframe('payments')
if payments_df.empty:
return pd.DataFrame()
payment_dist = payments_df.groupby('payment_method')['amount'].sum().reset_index()
return payment_dist
except Exception as e:
return pd.DataFrame()
def get_product_performance(self):
"""Get product performance data"""
try:
sale_items_df = self.db.get_dataframe('sale_items', '''
SELECT si.*, p.product_name
FROM sale_items si
JOIN products p ON si.product_id = p.product_id
''')
if sale_items_df.empty:
return pd.DataFrame()
product_perf = sale_items_df.groupby('product_name').agg({
'quantity': 'sum',
'amount': 'sum'
}).reset_index()
return product_perf
except Exception as e:
return pd.DataFrame()