|
|
|
|
|
import pandas as pd
|
|
|
import re
|
|
|
from datetime import datetime
|
|
|
import numpy as np
|
|
|
import sqlite3
|
|
|
import os
|
|
|
import streamlit as st
|
|
|
|
|
|
class SalesDataProcessor:
|
|
|
def __init__(self, db):
|
|
|
self.db = db
|
|
|
self.setup_product_mapping()
|
|
|
self.setup_location_mapping()
|
|
|
self.setup_database_tables()
|
|
|
|
|
|
def setup_database_tables(self):
|
|
|
"""Initialize database tables if they don't exist"""
|
|
|
conn = self.db.get_connection()
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
|
cursor.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS sales (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
source_sheet TEXT,
|
|
|
sr_no TEXT,
|
|
|
customer_name TEXT,
|
|
|
village TEXT,
|
|
|
taluka TEXT,
|
|
|
district TEXT,
|
|
|
invoice_no TEXT UNIQUE,
|
|
|
reference TEXT,
|
|
|
dispatch_date TEXT,
|
|
|
product_type TEXT,
|
|
|
quantity INTEGER,
|
|
|
rate_per_unit REAL,
|
|
|
amount REAL,
|
|
|
final_amount REAL,
|
|
|
total_liters REAL,
|
|
|
payment_date TEXT,
|
|
|
gpay_amount REAL,
|
|
|
cash_amount REAL,
|
|
|
cheque_amount REAL,
|
|
|
rrn_number TEXT,
|
|
|
sold_by TEXT,
|
|
|
sale_type TEXT,
|
|
|
payment_status TEXT,
|
|
|
payment_method TEXT,
|
|
|
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
source_file TEXT
|
|
|
)
|
|
|
''')
|
|
|
|
|
|
|
|
|
cursor.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS customers (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
customer_name TEXT,
|
|
|
village TEXT,
|
|
|
taluka TEXT,
|
|
|
district TEXT,
|
|
|
total_purchases REAL DEFAULT 0,
|
|
|
total_orders INTEGER DEFAULT 0,
|
|
|
last_order_date TEXT,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
''')
|
|
|
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
|
|
|
def setup_product_mapping(self):
|
|
|
"""Standard product mapping for all packaging types"""
|
|
|
self.PRODUCT_MAPPING = {
|
|
|
'1 LTR PLASTIC JAR': '1L_PLASTIC_JAR',
|
|
|
'2 LTR PLASTIC JAR': '2L_PLASTIC_JAR',
|
|
|
'5 LTR PLASTIC JAR': '5L_PLASTIC_JAR',
|
|
|
'10 LTR PLASTIC JAR': '10L_PLASTIC_JAR',
|
|
|
'5 LTR STEEL BARNI': '5L_STEEL_BARNI',
|
|
|
'10 LTR STEEL BARNI': '10L_STEEL_BARNI',
|
|
|
'20 LTR STEEL BARNI': '20L_STEEL_BARNI',
|
|
|
'20 LTR PLASTIC CAN': '20L_PLASTIC_CAN',
|
|
|
'1 LTR PET BOTTLE': '1L_PET_BOTTLE',
|
|
|
'20 LTR CARBO': '20L_CARBO'
|
|
|
}
|
|
|
|
|
|
def setup_location_mapping(self):
|
|
|
"""Gujarati location name standardization"""
|
|
|
self.GUJARATI_LOCALITIES = {
|
|
|
'રામપુરા': 'RAMPURA',
|
|
|
'શેખડી': 'SHEKHADI',
|
|
|
'સિંહોલ': 'SINHOL',
|
|
|
'વનાદરા': 'VANADARA',
|
|
|
'માવલી': 'MAVLI',
|
|
|
'સિમરડા': 'SIMRADA',
|
|
|
'બિલપડ': 'BILPAD',
|
|
|
'વઘોડિયા': 'VAGHODIA',
|
|
|
'સાકરિયા': 'SAKARIYA'
|
|
|
}
|
|
|
|
|
|
def safe_float(self, value):
|
|
|
"""Safely convert to float, handle errors"""
|
|
|
if pd.isna(value) or value in ['', 'NOT_AVAILABLE', None, '_']:
|
|
|
return 0.0
|
|
|
try:
|
|
|
return float(value)
|
|
|
except (ValueError, TypeError):
|
|
|
return 0.0
|
|
|
|
|
|
def safe_int(self, value):
|
|
|
"""Safely convert to integer"""
|
|
|
return int(self.safe_float(value))
|
|
|
|
|
|
def parse_date(self, date_str):
|
|
|
"""Handle all date formats intelligently"""
|
|
|
if pd.isna(date_str) or date_str in ['', 'NOT_AVAILABLE', None, '_']:
|
|
|
return 'NOT_AVAILABLE'
|
|
|
|
|
|
if isinstance(date_str, (int, float)):
|
|
|
try:
|
|
|
return (datetime(1899, 12, 30) + pd.Timedelta(days=date_str)).strftime('%Y-%m-%d')
|
|
|
except:
|
|
|
return 'INVALID_DATE'
|
|
|
|
|
|
date_str = str(date_str).strip()
|
|
|
|
|
|
date_formats = [
|
|
|
'%Y-%m-%d %H:%M:%S',
|
|
|
'%d/%m/%Y',
|
|
|
'%Y-%m-%d',
|
|
|
'%d-%m-%Y',
|
|
|
'%d/%m/%Y %H:%M:%S'
|
|
|
]
|
|
|
|
|
|
for fmt in date_formats:
|
|
|
try:
|
|
|
return datetime.strptime(date_str, fmt).strftime('%Y-%m-%d')
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
return 'INVALID_DATE'
|
|
|
|
|
|
def clean_name(self, name):
|
|
|
"""Handle names, duplicates, variations"""
|
|
|
if pd.isna(name) or name in ['', '-', '_', None]:
|
|
|
return 'NOT_AVAILABLE'
|
|
|
name = ' '.join(str(name).strip().split())
|
|
|
return name
|
|
|
|
|
|
def standardize_location(self, location):
|
|
|
"""Handle Gujarati location names"""
|
|
|
if pd.isna(location) or location in ['', 'NOT_AVAILABLE', None]:
|
|
|
return 'NOT_AVAILABLE'
|
|
|
|
|
|
location_str = str(location).strip()
|
|
|
|
|
|
if isinstance(location_str, str):
|
|
|
for guj_name, eng_name in self.GUJARATI_LOCALITIES.items():
|
|
|
if guj_name in location_str:
|
|
|
return eng_name
|
|
|
|
|
|
return location_str.upper()
|
|
|
|
|
|
def standardize_product(self, product_name):
|
|
|
"""Convert any product name to standard format"""
|
|
|
if pd.isna(product_name) or product_name in ['', 'NOT_AVAILABLE', None]:
|
|
|
return 'UNKNOWN_PRODUCT'
|
|
|
|
|
|
product_str = str(product_name).strip()
|
|
|
product_upper = product_str.upper()
|
|
|
|
|
|
for key, value in self.PRODUCT_MAPPING.items():
|
|
|
if key in product_upper:
|
|
|
return value
|
|
|
|
|
|
|
|
|
if '1 LTR' in product_upper or '1L' in product_upper:
|
|
|
if 'PLASTIC' in product_upper or 'JAR' in product_upper:
|
|
|
return '1L_PLASTIC_JAR'
|
|
|
elif 'PET' in product_upper or 'BOTTLE' in product_upper:
|
|
|
return '1L_PET_BOTTLE'
|
|
|
elif '2 LTR' in product_upper or '2L' in product_upper:
|
|
|
return '2L_PLASTIC_JAR'
|
|
|
elif '5 LTR' in product_upper or '5L' in product_upper:
|
|
|
if 'STEEL' in product_upper or 'BARNI' in product_upper:
|
|
|
return '5L_STEEL_BARNI'
|
|
|
else:
|
|
|
return '5L_PLASTIC_JAR'
|
|
|
elif '10 LTR' in product_upper or '10L' in product_upper:
|
|
|
if 'STEEL' in product_upper or 'BARNI' in product_upper:
|
|
|
return '10L_STEEL_BARNI'
|
|
|
else:
|
|
|
return '10L_PLASTIC_JAR'
|
|
|
elif '20 LTR' in product_upper or '20L' in product_upper:
|
|
|
if 'STEEL' in product_upper or 'BARNI' in product_upper:
|
|
|
return '20L_STEEL_BARNI'
|
|
|
elif 'PLASTIC' in product_upper or 'CAN' in product_upper:
|
|
|
return '20L_PLASTIC_CAN'
|
|
|
elif 'CARBO' in product_upper:
|
|
|
return '20L_CARBO'
|
|
|
|
|
|
return f"UNKNOWN_{product_upper.replace(' ', '_')}"
|
|
|
|
|
|
def detect_sale_type(self, row):
|
|
|
"""Detect if it's demo sale (single unit) or bulk sale"""
|
|
|
quantity = self.safe_int(row.get('QTN', 0))
|
|
|
reference = str(row.get('REF.', '')).upper()
|
|
|
|
|
|
if reference == 'DEMO' or quantity == 1:
|
|
|
return 'DEMO_SALE'
|
|
|
else:
|
|
|
return 'BULK_SALE'
|
|
|
|
|
|
def calculate_payment_status(self, row):
|
|
|
"""Determine payment status intelligently"""
|
|
|
final_amt = self.safe_float(row.get('FINAL AMT', 0))
|
|
|
gpay = self.safe_float(row.get('G-PAY', 0))
|
|
|
cash = self.safe_float(row.get('CASH', 0))
|
|
|
cheque = self.safe_float(row.get('CHQ', 0))
|
|
|
|
|
|
paid_amt = gpay + cash + cheque
|
|
|
|
|
|
if paid_amt >= final_amt:
|
|
|
return 'PAID'
|
|
|
elif paid_amt > 0:
|
|
|
return 'PARTIAL_PAID'
|
|
|
elif self.parse_date(row.get('PAYMENT DATE')) not in ['NOT_AVAILABLE', 'INVALID_DATE']:
|
|
|
return 'PENDING'
|
|
|
else:
|
|
|
return 'UNPAID'
|
|
|
|
|
|
def detect_payment_method(self, row):
|
|
|
"""Intelligently detect payment method"""
|
|
|
gpay = self.safe_float(row.get('G-PAY', 0))
|
|
|
cash = self.safe_float(row.get('CASH', 0))
|
|
|
cheque = self.safe_float(row.get('CHQ', 0))
|
|
|
|
|
|
if gpay > 0:
|
|
|
return 'GPAY'
|
|
|
elif cash > 0:
|
|
|
return 'CASH'
|
|
|
elif cheque > 0:
|
|
|
return 'CHEQUE'
|
|
|
else:
|
|
|
return 'NOT_PAID'
|
|
|
|
|
|
def process_dataframe(self, df, sheet_name, source_file):
|
|
|
"""Process entire dataframe and standardize all records"""
|
|
|
standardized_records = []
|
|
|
|
|
|
for idx, row in df.iterrows():
|
|
|
if (pd.isna(row.get('NAME', '')) and
|
|
|
pd.isna(row.get('PACKING', '')) and
|
|
|
pd.isna(row.get('INV NO', ''))):
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
standardized_record = self.standardize_record(row, sheet_name, source_file)
|
|
|
standardized_records.append(standardized_record)
|
|
|
except Exception as e:
|
|
|
st.error(f"⚠️ Error processing row {idx}: {e}")
|
|
|
continue
|
|
|
|
|
|
return standardized_records
|
|
|
|
|
|
def standardize_record(self, row, sheet_name, source_file):
|
|
|
"""Standardize a single record"""
|
|
|
record = {
|
|
|
'source_sheet': sheet_name,
|
|
|
'sr_no': self.clean_name(row.get('SR NO.', 'NOT_AVAILABLE')),
|
|
|
'customer_name': self.clean_name(row.get('NAME', 'NOT_AVAILABLE')),
|
|
|
'village': self.standardize_location(row.get('VILLAGE', 'NOT_AVAILABLE')),
|
|
|
'taluka': self.standardize_location(row.get('TALUKA', 'NOT_AVAILABLE')),
|
|
|
'district': self.standardize_location(row.get('DISTRICT', 'NOT_AVAILABLE')),
|
|
|
'invoice_no': self.clean_name(row.get('INV NO', 'NOT_AVAILABLE')),
|
|
|
'reference': self.clean_name(row.get('REF.', 'NOT_AVAILABLE')),
|
|
|
'dispatch_date': self.parse_date(row.get('DISPATCH DATE')),
|
|
|
'product_type': self.standardize_product(row.get('PACKING', 'NOT_AVAILABLE')),
|
|
|
'quantity': self.safe_int(row.get('QTN', 0)),
|
|
|
'rate_per_unit': self.safe_float(row.get('RATE', 0)),
|
|
|
'amount': self.safe_float(row.get('AMT', 0)),
|
|
|
'final_amount': self.safe_float(row.get('FINAL AMT', 0)),
|
|
|
'total_liters': self.safe_float(row.get('TOTAL LTR', 0)),
|
|
|
'payment_date': self.parse_date(row.get('PAYMENT DATE')),
|
|
|
'gpay_amount': self.safe_float(row.get('G-PAY', 0)),
|
|
|
'cash_amount': self.safe_float(row.get('CASH', 0)),
|
|
|
'cheque_amount': self.safe_float(row.get('CHQ', 0)),
|
|
|
'rrn_number': self.clean_name(row.get('RRN', 'NOT_AVAILABLE')),
|
|
|
'sold_by': self.clean_name(row.get('BY', 'NOT_AVAILABLE')),
|
|
|
'sale_type': self.detect_sale_type(row),
|
|
|
'payment_status': self.calculate_payment_status(row),
|
|
|
'payment_method': self.detect_payment_method(row),
|
|
|
'source_file': os.path.basename(source_file)
|
|
|
}
|
|
|
|
|
|
|
|
|
if record['amount'] == 0 and record['quantity'] > 0 and record['rate_per_unit'] > 0:
|
|
|
record['amount'] = record['quantity'] * record['rate_per_unit']
|
|
|
|
|
|
if record['final_amount'] == 0 and record['amount'] > 0:
|
|
|
record['final_amount'] = record['amount']
|
|
|
|
|
|
return record
|
|
|
|
|
|
def insert_into_database(self, records):
|
|
|
"""Insert processed records into database"""
|
|
|
conn = self.db.get_connection()
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
inserted_count = 0
|
|
|
updated_count = 0
|
|
|
|
|
|
for record in records:
|
|
|
try:
|
|
|
|
|
|
cursor.execute('SELECT id FROM sales WHERE invoice_no = ?', (record['invoice_no'],))
|
|
|
existing = cursor.fetchone()
|
|
|
|
|
|
if existing:
|
|
|
|
|
|
update_query = '''
|
|
|
UPDATE sales SET
|
|
|
source_sheet=?, sr_no=?, customer_name=?, village=?, taluka=?, district=?,
|
|
|
reference=?, dispatch_date=?, product_type=?, quantity=?, rate_per_unit=?,
|
|
|
amount=?, final_amount=?, total_liters=?, payment_date=?, gpay_amount=?,
|
|
|
cash_amount=?, cheque_amount=?, rrn_number=?, sold_by=?, sale_type=?,
|
|
|
payment_status=?, payment_method=?, source_file=?
|
|
|
WHERE invoice_no=?
|
|
|
'''
|
|
|
cursor.execute(update_query, (
|
|
|
record['source_sheet'], record['sr_no'], record['customer_name'],
|
|
|
record['village'], record['taluka'], record['district'],
|
|
|
record['reference'], record['dispatch_date'], record['product_type'],
|
|
|
record['quantity'], record['rate_per_unit'], record['amount'],
|
|
|
record['final_amount'], record['total_liters'], record['payment_date'],
|
|
|
record['gpay_amount'], record['cash_amount'], record['cheque_amount'],
|
|
|
record['rrn_number'], record['sold_by'], record['sale_type'],
|
|
|
record['payment_status'], record['payment_method'], record['source_file'],
|
|
|
record['invoice_no']
|
|
|
))
|
|
|
updated_count += 1
|
|
|
else:
|
|
|
|
|
|
insert_query = '''
|
|
|
INSERT INTO sales (
|
|
|
source_sheet, sr_no, customer_name, village, taluka, district,
|
|
|
invoice_no, reference, dispatch_date, product_type, quantity,
|
|
|
rate_per_unit, amount, final_amount, total_liters, payment_date,
|
|
|
gpay_amount, cash_amount, cheque_amount, rrn_number, sold_by,
|
|
|
sale_type, payment_status, payment_method, source_file
|
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
'''
|
|
|
cursor.execute(insert_query, (
|
|
|
record['source_sheet'], record['sr_no'], record['customer_name'],
|
|
|
record['village'], record['taluka'], record['district'],
|
|
|
record['invoice_no'], record['reference'], record['dispatch_date'],
|
|
|
record['product_type'], record['quantity'], record['rate_per_unit'],
|
|
|
record['amount'], record['final_amount'], record['total_liters'],
|
|
|
record['payment_date'], record['gpay_amount'], record['cash_amount'],
|
|
|
record['cheque_amount'], record['rrn_number'], record['sold_by'],
|
|
|
record['sale_type'], record['payment_status'], record['payment_method'],
|
|
|
record['source_file']
|
|
|
))
|
|
|
inserted_count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
st.error(f"❌ Database error for invoice {record['invoice_no']}: {e}")
|
|
|
continue
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
self.update_customers_table(conn)
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return inserted_count, updated_count
|
|
|
|
|
|
def update_customers_table(self, conn):
|
|
|
"""Update customers table from sales data"""
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
|
cursor.execute('DELETE FROM customers')
|
|
|
|
|
|
|
|
|
cursor.execute('''
|
|
|
INSERT INTO customers (customer_name, village, taluka, district, total_purchases, total_orders, last_order_date)
|
|
|
SELECT
|
|
|
customer_name,
|
|
|
village,
|
|
|
taluka,
|
|
|
district,
|
|
|
SUM(final_amount) as total_purchases,
|
|
|
COUNT(*) as total_orders,
|
|
|
MAX(dispatch_date) as last_order_date
|
|
|
FROM sales
|
|
|
WHERE customer_name != 'NOT_AVAILABLE'
|
|
|
GROUP BY customer_name, village, taluka, district
|
|
|
''')
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
def process_excel_file(self, file_path):
|
|
|
"""Main method to process Excel file - called from Streamlit"""
|
|
|
try:
|
|
|
st.info(f"🔄 Processing: {os.path.basename(file_path)}")
|
|
|
|
|
|
|
|
|
xl = pd.ExcelFile(file_path)
|
|
|
|
|
|
|
|
|
all_records = []
|
|
|
|
|
|
for sheet_name in xl.sheet_names:
|
|
|
with st.spinner(f"Processing sheet: {sheet_name}..."):
|
|
|
|
|
|
df = pd.read_excel(file_path, sheet_name=sheet_name)
|
|
|
|
|
|
|
|
|
standardized_records = self.process_dataframe(df, sheet_name, file_path)
|
|
|
all_records.extend(standardized_records)
|
|
|
|
|
|
if not all_records:
|
|
|
st.warning("⚠️ No valid records found in the file")
|
|
|
return False
|
|
|
|
|
|
|
|
|
with st.spinner("Inserting into database..."):
|
|
|
inserted, updated = self.insert_into_database(all_records)
|
|
|
|
|
|
|
|
|
if inserted > 0 or updated > 0:
|
|
|
st.success(f"✅ Processed {len(all_records)} records from {os.path.basename(file_path)}")
|
|
|
st.success(f"📊 New: {inserted}, Updated: {updated}")
|
|
|
|
|
|
|
|
|
self.show_import_summary(all_records)
|
|
|
return True
|
|
|
else:
|
|
|
st.warning("⚠️ No records were inserted or updated")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
st.error(f"❌ Error processing file: {e}")
|
|
|
return False
|
|
|
|
|
|
def show_import_summary(self, records):
|
|
|
"""Show summary of imported data"""
|
|
|
if not records:
|
|
|
return
|
|
|
|
|
|
df = pd.DataFrame(records)
|
|
|
|
|
|
col1, col2, col3, col4 = st.columns(4)
|
|
|
|
|
|
with col1:
|
|
|
st.metric("Total Records", len(records))
|
|
|
with col2:
|
|
|
demo_sales = len(df[df['sale_type'] == 'DEMO_SALE'])
|
|
|
st.metric("Demo Sales", demo_sales)
|
|
|
with col3:
|
|
|
bulk_sales = len(df[df['sale_type'] == 'BULK_SALE'])
|
|
|
st.metric("Bulk Sales", bulk_sales)
|
|
|
with col4:
|
|
|
total_amount = df['final_amount'].sum()
|
|
|
st.metric("Total Amount", f"₹{total_amount:,.2f}")
|
|
|
|
|
|
|
|
|
st.subheader("📦 Products Imported")
|
|
|
product_summary = df['product_type'].value_counts().head(5)
|
|
|
for product, count in product_summary.items():
|
|
|
st.write(f"- {product}: {count} records")
|
|
|
|
|
|
def get_import_stats(self):
|
|
|
"""Get import statistics for dashboard"""
|
|
|
conn = self.db.get_connection()
|
|
|
|
|
|
try:
|
|
|
|
|
|
total_records = pd.read_sql('SELECT COUNT(*) as count FROM sales', conn)['count'].iloc[0]
|
|
|
|
|
|
|
|
|
files_processed = pd.read_sql('SELECT COUNT(DISTINCT source_file) as count FROM sales', conn)['count'].iloc[0]
|
|
|
|
|
|
|
|
|
recent_imports = pd.read_sql('''
|
|
|
SELECT source_file, COUNT(*) as records, MAX(processed_at) as last_import
|
|
|
FROM sales
|
|
|
GROUP BY source_file
|
|
|
ORDER BY last_import DESC
|
|
|
LIMIT 5
|
|
|
''', conn)
|
|
|
|
|
|
return {
|
|
|
'total_records': total_records,
|
|
|
'files_processed': files_processed,
|
|
|
'recent_imports': recent_imports.to_dict('records')
|
|
|
}
|
|
|
finally:
|
|
|
conn.close() |