|
|
import sqlite3 |
|
|
import pandas as pd |
|
|
import os |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import List, Dict, Any, Optional |
|
|
import random |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DatabaseManager: |
|
|
def __init__(self, db_path="sales_management.db"): |
|
|
self.db_path = db_path |
|
|
self._is_logging = False |
|
|
self.init_database() |
|
|
|
|
|
def get_connection(self): |
|
|
"""Get database connection with error handling""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
conn.row_factory = sqlite3.Row |
|
|
return conn |
|
|
except sqlite3.Error as e: |
|
|
logger.error(f"Database connection error: {e}") |
|
|
raise |
|
|
|
|
|
def init_database(self): |
|
|
"""Initialize database with all tables and relationships""" |
|
|
conn = self.get_connection() |
|
|
|
|
|
try: |
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS customers ( |
|
|
customer_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
customer_code TEXT UNIQUE, |
|
|
name TEXT NOT NULL, |
|
|
mobile TEXT, |
|
|
village TEXT, |
|
|
taluka TEXT, |
|
|
district TEXT, |
|
|
status TEXT DEFAULT 'Active', |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS distributors ( |
|
|
distributor_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
name TEXT NOT NULL, |
|
|
village TEXT, |
|
|
taluka TEXT, |
|
|
district TEXT, |
|
|
mantri_name TEXT, |
|
|
mantri_mobile TEXT, |
|
|
sabhasad_count INTEGER DEFAULT 0, |
|
|
contact_in_group INTEGER DEFAULT 0, |
|
|
status TEXT DEFAULT 'Active', |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS products ( |
|
|
product_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
product_name TEXT UNIQUE NOT NULL, |
|
|
packing_type TEXT, |
|
|
capacity_ltr REAL, |
|
|
category TEXT, |
|
|
standard_rate REAL, |
|
|
is_active INTEGER DEFAULT 1, |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS sales ( |
|
|
sale_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
invoice_no TEXT UNIQUE NOT NULL, |
|
|
customer_id INTEGER, |
|
|
sale_date DATE, |
|
|
total_amount REAL DEFAULT 0, |
|
|
total_liters REAL DEFAULT 0, |
|
|
payment_status TEXT DEFAULT 'Pending', |
|
|
notes TEXT, |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS sale_items ( |
|
|
item_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
sale_id INTEGER, |
|
|
product_id INTEGER, |
|
|
quantity INTEGER, |
|
|
rate REAL, |
|
|
amount REAL, |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (sale_id) REFERENCES sales (sale_id) ON DELETE CASCADE, |
|
|
FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE SET NULL |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS payments ( |
|
|
payment_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
sale_id INTEGER, |
|
|
payment_date DATE, |
|
|
payment_method TEXT, |
|
|
amount REAL, |
|
|
rrn TEXT, |
|
|
reference TEXT, |
|
|
status TEXT DEFAULT 'Completed', |
|
|
notes TEXT, |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (sale_id) REFERENCES sales (sale_id) ON DELETE CASCADE |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS demos ( |
|
|
demo_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
customer_id INTEGER, |
|
|
distributor_id INTEGER, |
|
|
demo_date DATE, |
|
|
demo_time TIME, |
|
|
product_id INTEGER, |
|
|
quantity_provided INTEGER, |
|
|
follow_up_date DATE, |
|
|
conversion_status TEXT DEFAULT 'Not Converted', |
|
|
notes TEXT, |
|
|
demo_location TEXT, |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL, |
|
|
FOREIGN KEY (distributor_id) REFERENCES distributors (distributor_id) ON DELETE SET NULL, |
|
|
FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE SET NULL |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS whatsapp_logs ( |
|
|
log_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
customer_id INTEGER, |
|
|
distributor_id INTEGER, |
|
|
message_type TEXT, |
|
|
message_content TEXT, |
|
|
status TEXT, |
|
|
sent_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
response TEXT, |
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL, |
|
|
FOREIGN KEY (distributor_id) REFERENCES distributors (distributor_id) ON DELETE SET NULL |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS follow_ups ( |
|
|
follow_up_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
customer_id INTEGER, |
|
|
distributor_id INTEGER, |
|
|
demo_id INTEGER, |
|
|
follow_up_date DATE, |
|
|
follow_up_type TEXT, |
|
|
notes TEXT, |
|
|
status TEXT DEFAULT 'Pending', |
|
|
next_follow_up_date DATE, |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL, |
|
|
FOREIGN KEY (distributor_id) REFERENCES distributors (distributor_id) ON DELETE SET NULL, |
|
|
FOREIGN KEY (demo_id) REFERENCES demos (demo_id) ON DELETE SET NULL |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS system_logs ( |
|
|
log_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
log_type TEXT, |
|
|
log_message TEXT, |
|
|
table_name TEXT, |
|
|
record_id INTEGER, |
|
|
action TEXT, |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
user_info TEXT |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS rollback_logs ( |
|
|
rollback_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
table_name TEXT, |
|
|
record_id INTEGER, |
|
|
old_data TEXT, |
|
|
new_data TEXT, |
|
|
action TEXT, |
|
|
rollback_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
rolled_back_by TEXT |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS offers ( |
|
|
offer_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
offer_name TEXT NOT NULL, |
|
|
offer_description TEXT, |
|
|
product_id INTEGER, |
|
|
discount_percentage REAL, |
|
|
discount_amount REAL, |
|
|
start_date DATE, |
|
|
end_date DATE, |
|
|
status TEXT DEFAULT 'Active', |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE SET NULL |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
conn.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS demo_teams ( |
|
|
team_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
team_name TEXT NOT NULL, |
|
|
team_leader TEXT, |
|
|
team_members TEXT, |
|
|
assigned_villages TEXT, |
|
|
status TEXT DEFAULT 'Active', |
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
conn.commit() |
|
|
logger.info("Database tables initialized successfully") |
|
|
|
|
|
except sqlite3.Error as e: |
|
|
logger.error(f"Error initializing database: {e}") |
|
|
raise |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
self.initialize_default_data() |
|
|
self.migrate_database() |
|
|
self.create_indexes() |
|
|
|
|
|
def create_indexes(self): |
|
|
"""Create indexes for better performance""" |
|
|
conn = self.get_connection() |
|
|
|
|
|
try: |
|
|
|
|
|
indexes = [ |
|
|
"CREATE INDEX IF NOT EXISTS idx_customers_village ON customers(village)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_customers_mobile ON customers(mobile)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_sales_customer_id ON sales(customer_id)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_sales_date ON sales(sale_date)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_sales_invoice ON sales(invoice_no)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_payments_sale_id ON payments(sale_id)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_demos_customer_id ON demos(customer_id)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_demos_date ON demos(demo_date)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_sale_items_sale_id ON sale_items(sale_id)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_follow_ups_date ON follow_ups(follow_up_date)", |
|
|
"CREATE INDEX IF NOT EXISTS idx_whatsapp_customer_id ON whatsapp_logs(customer_id)", |
|
|
] |
|
|
|
|
|
for index_sql in indexes: |
|
|
conn.execute(index_sql) |
|
|
|
|
|
conn.commit() |
|
|
logger.info("Database indexes created successfully") |
|
|
|
|
|
except sqlite3.Error as e: |
|
|
logger.error(f"Error creating indexes: {e}") |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def migrate_database(self): |
|
|
"""Migrate existing database to add missing columns""" |
|
|
conn = self.get_connection() |
|
|
try: |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute("PRAGMA table_info(demos)") |
|
|
columns = [column[1] for column in cursor.fetchall()] |
|
|
|
|
|
|
|
|
if "demo_time" not in columns: |
|
|
cursor.execute("ALTER TABLE demos ADD COLUMN demo_time TIME") |
|
|
logger.info("Added demo_time column to demos table") |
|
|
|
|
|
|
|
|
if "demo_location" not in columns: |
|
|
cursor.execute("ALTER TABLE demos ADD COLUMN demo_location TEXT") |
|
|
logger.info("Added demo_location column to demos table") |
|
|
|
|
|
conn.commit() |
|
|
logger.info("Database migration completed successfully") |
|
|
|
|
|
except sqlite3.Error as e: |
|
|
logger.error(f"Error during database migration: {e}") |
|
|
conn.rollback() |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def initialize_default_data(self): |
|
|
"""Initialize with default products and demo teams""" |
|
|
default_products = [ |
|
|
("1 LTR PLASTIC JAR", "PLASTIC_JAR", 1.0, "Regular", 95), |
|
|
("2 LTR PLASTIC JAR", "PLASTIC_JAR", 2.0, "Regular", 185), |
|
|
("5 LTR PLASTIC JAR", "PLASTIC_JAR", 5.0, "Regular", 460), |
|
|
("5 LTR STEEL BARNI", "STEEL_BARNI", 5.0, "Premium", 680), |
|
|
("10 LTR STEEL BARNI", "STEEL_BARNI", 10.0, "Premium", 1300), |
|
|
("20 LTR STEEL BARNI", "STEEL_BARNI", 20.0, "Premium", 2950), |
|
|
("20 LTR PLASTIC CAN", "PLASTIC_CAN", 20.0, "Regular", 2400), |
|
|
("1 LTR PET BOTTLE", "PET_BOTTLE", 1.0, "Regular", 85), |
|
|
] |
|
|
|
|
|
default_teams = [ |
|
|
( |
|
|
"Team A - North Region", |
|
|
"Rajesh Kumar", |
|
|
"Mohan, Suresh, Priya", |
|
|
"Amiyad, Amvad, Ankalav", |
|
|
), |
|
|
( |
|
|
"Team B - South Region", |
|
|
"Sunil Patel", |
|
|
"Anita, Vijay, Deepak", |
|
|
"Petlad, Borsad, Vadodara", |
|
|
), |
|
|
] |
|
|
|
|
|
conn = self.get_connection() |
|
|
try: |
|
|
|
|
|
for product in default_products: |
|
|
conn.execute( |
|
|
""" |
|
|
INSERT OR IGNORE INTO products (product_name, packing_type, capacity_ltr, category, standard_rate) |
|
|
VALUES (?, ?, ?, ?, ?) |
|
|
""", |
|
|
product, |
|
|
) |
|
|
|
|
|
|
|
|
for team in default_teams: |
|
|
conn.execute( |
|
|
""" |
|
|
INSERT OR IGNORE INTO demo_teams (team_name, team_leader, team_members, assigned_villages) |
|
|
VALUES (?, ?, ?, ?) |
|
|
""", |
|
|
team, |
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
logger.info("Default data initialized successfully") |
|
|
|
|
|
except sqlite3.Error as e: |
|
|
logger.error(f"Error initializing default data: {e}") |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def _execute_query_internal(self, query: str, params: tuple = None) -> List[tuple]: |
|
|
"""Internal method to execute SQL query without logging""" |
|
|
conn = self.get_connection() |
|
|
try: |
|
|
cursor = conn.cursor() |
|
|
if params: |
|
|
cursor.execute(query, params) |
|
|
else: |
|
|
cursor.execute(query) |
|
|
|
|
|
|
|
|
if query.strip().upper().startswith("SELECT"): |
|
|
result = cursor.fetchall() |
|
|
elif query.strip().upper().startswith("INSERT"): |
|
|
|
|
|
result = [(cursor.lastrowid,)] |
|
|
else: |
|
|
result = [] |
|
|
|
|
|
conn.commit() |
|
|
return result |
|
|
|
|
|
except sqlite3.Error as e: |
|
|
logger.error(f"Database query error: {e}") |
|
|
conn.rollback() |
|
|
raise |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def execute_query( |
|
|
self, query: str, params: tuple = None, log_action: bool = True |
|
|
) -> List[tuple]: |
|
|
"""Execute a SQL query with comprehensive error handling""" |
|
|
try: |
|
|
result = self._execute_query_internal(query, params) |
|
|
|
|
|
|
|
|
if log_action and not self._is_logging: |
|
|
try: |
|
|
self._is_logging = True |
|
|
self._execute_query_internal( |
|
|
""" |
|
|
INSERT INTO system_logs (log_type, log_message, table_name, record_id, action) |
|
|
VALUES (?, ?, ?, ?, ?) |
|
|
""", |
|
|
( |
|
|
"QUERY_EXECUTION", |
|
|
f"Executed query: {query[:100]}...", |
|
|
None, |
|
|
None, |
|
|
"EXECUTE", |
|
|
), |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error logging system action: {e}") |
|
|
finally: |
|
|
self._is_logging = False |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"Error in execute_query: {e}") |
|
|
return [] |
|
|
|
|
|
def get_dataframe( |
|
|
self, table_name: str = None, query: str = None, params: tuple = None |
|
|
) -> pd.DataFrame: |
|
|
"""Get table data as DataFrame with flexible query support""" |
|
|
conn = self.get_connection() |
|
|
try: |
|
|
if query: |
|
|
df = pd.read_sql_query(query, conn, params=params) |
|
|
else: |
|
|
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) |
|
|
return df |
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"Error getting DataFrame for {table_name if table_name else 'query'}: {e}" |
|
|
) |
|
|
|
|
|
return pd.DataFrame() |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def add_customer( |
|
|
self, |
|
|
name: str, |
|
|
mobile: str = "", |
|
|
village: str = "", |
|
|
taluka: str = "", |
|
|
district: str = "", |
|
|
customer_code: str = None, |
|
|
) -> int: |
|
|
"""Add a new customer with duplicate handling""" |
|
|
|
|
|
|
|
|
if not customer_code: |
|
|
customer_code = f"CUST{datetime.now().strftime('%Y%m%d%H%M%S')}{random.randint(100, 999)}" |
|
|
|
|
|
try: |
|
|
|
|
|
existing_customer = self.execute_query( |
|
|
"SELECT customer_id FROM customers WHERE mobile = ? OR (name = ? AND village = ?)", |
|
|
(mobile, name, village), |
|
|
log_action=False, |
|
|
) |
|
|
|
|
|
if existing_customer: |
|
|
|
|
|
return existing_customer[0][0] |
|
|
|
|
|
|
|
|
max_attempts = 5 |
|
|
for attempt in range(max_attempts): |
|
|
try: |
|
|
result = self.execute_query( |
|
|
""" |
|
|
INSERT INTO customers (customer_code, name, mobile, village, taluka, district) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
""", |
|
|
(customer_code, name, mobile, village, taluka, district), |
|
|
log_action=False, |
|
|
) |
|
|
break |
|
|
except sqlite3.IntegrityError as e: |
|
|
if ( |
|
|
"UNIQUE constraint failed: customers.customer_code" in str(e) |
|
|
and attempt < max_attempts - 1 |
|
|
): |
|
|
|
|
|
customer_code = f"CUST{datetime.now().strftime('%Y%m%d%H%M%S')}{random.randint(1000, 9999)}" |
|
|
continue |
|
|
else: |
|
|
raise e |
|
|
|
|
|
|
|
|
customer_id = self.execute_query( |
|
|
"SELECT last_insert_rowid()", log_action=False |
|
|
)[0][0] |
|
|
|
|
|
self.log_system_action( |
|
|
"CUSTOMER_ADD", |
|
|
f"Added customer: {name}", |
|
|
"customers", |
|
|
customer_id, |
|
|
"INSERT", |
|
|
) |
|
|
|
|
|
return customer_id |
|
|
except Exception as e: |
|
|
logger.error(f"Error adding customer: {e}") |
|
|
|
|
|
return -1 |
|
|
|
|
|
def add_distributor( |
|
|
self, |
|
|
name: str, |
|
|
village: str = "", |
|
|
taluka: str = "", |
|
|
district: str = "", |
|
|
mantri_name: str = "", |
|
|
mantri_mobile: str = "", |
|
|
sabhasad_count: int = 0, |
|
|
contact_in_group: int = 0, |
|
|
status: str = "Active", |
|
|
) -> int: |
|
|
"""Add a new distributor with duplicate handling""" |
|
|
|
|
|
try: |
|
|
|
|
|
existing_distributor = self.execute_query( |
|
|
"SELECT distributor_id FROM distributors WHERE name = ? AND village = ? AND taluka = ?", |
|
|
(name, village, taluka), |
|
|
log_action=False, |
|
|
) |
|
|
|
|
|
if existing_distributor: |
|
|
|
|
|
return existing_distributor[0][0] |
|
|
|
|
|
|
|
|
self.execute_query( |
|
|
""" |
|
|
INSERT INTO distributors (name, village, taluka, district, mantri_name, mantri_mobile, |
|
|
sabhasad_count, contact_in_group, status) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
""", |
|
|
( |
|
|
name, |
|
|
village, |
|
|
taluka, |
|
|
district, |
|
|
mantri_name, |
|
|
mantri_mobile, |
|
|
sabhasad_count, |
|
|
contact_in_group, |
|
|
status, |
|
|
), |
|
|
log_action=False, |
|
|
) |
|
|
|
|
|
|
|
|
distributor_id = self.execute_query( |
|
|
"SELECT last_insert_rowid()", log_action=False |
|
|
)[0][0] |
|
|
|
|
|
self.log_system_action( |
|
|
"DISTRIBUTOR_ADD", |
|
|
f"Added distributor: {name}", |
|
|
"distributors", |
|
|
distributor_id, |
|
|
"INSERT", |
|
|
) |
|
|
|
|
|
return distributor_id |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error adding distributor: {e}") |
|
|
return -1 |
|
|
|
|
|
def get_distributor_by_location(self, village: str, taluka: str) -> Optional[Dict]: |
|
|
"""Get distributor by village and taluka""" |
|
|
try: |
|
|
result = self.execute_query( |
|
|
"SELECT * FROM distributors WHERE village = ? AND taluka = ?", |
|
|
(village, taluka), |
|
|
log_action=False, |
|
|
) |
|
|
if result: |
|
|
return dict(result[0]) |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting distributor by location: {e}") |
|
|
return None |
|
|
|
|
|
def distributor_exists(self, name: str, village: str, taluka: str) -> bool: |
|
|
"""Check if distributor already exists""" |
|
|
try: |
|
|
result = self.execute_query( |
|
|
"SELECT distributor_id FROM distributors WHERE name = ? AND village = ? AND taluka = ?", |
|
|
(name, village, taluka), |
|
|
log_action=False, |
|
|
) |
|
|
return len(result) > 0 |
|
|
except Exception as e: |
|
|
logger.error(f"Error checking distributor existence: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def generate_invoice_number(self): |
|
|
"""Generate automatic invoice number in format: INVCLmmyyserial""" |
|
|
try: |
|
|
|
|
|
now = datetime.now() |
|
|
month = now.strftime("%m") |
|
|
year = now.strftime("%y") |
|
|
|
|
|
|
|
|
result = self.execute_query( |
|
|
"SELECT invoice_no FROM sales WHERE invoice_no LIKE ? ORDER BY sale_id DESC LIMIT 1", |
|
|
(f"INVCL{month}{year}%",), |
|
|
log_action=False, |
|
|
) |
|
|
|
|
|
if result: |
|
|
last_invoice = result[0][0] |
|
|
|
|
|
try: |
|
|
|
|
|
serial_part = last_invoice[8:] |
|
|
last_serial = int(serial_part) |
|
|
new_serial = last_serial + 1 |
|
|
except ValueError: |
|
|
new_serial = 1 |
|
|
else: |
|
|
|
|
|
new_serial = 1 |
|
|
|
|
|
|
|
|
return f"INVCL{month}{year}{new_serial:03d}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating invoice number: {e}") |
|
|
|
|
|
return f"INVCL{int(datetime.now().timestamp())}" |
|
|
|
|
|
|
|
|
def generate_invoice_number(self, prefix="INVCL"): |
|
|
"""Generate automatic invoice number in format: PREFIXmmyyserial""" |
|
|
try: |
|
|
now = datetime.now() |
|
|
month = now.strftime("%m") |
|
|
year = now.strftime("%y") |
|
|
|
|
|
result = self.execute_query( |
|
|
"SELECT invoice_no FROM sales WHERE invoice_no LIKE ? ORDER BY sale_id DESC LIMIT 1", |
|
|
(f"{prefix}{month}{year}%",), |
|
|
log_action=False, |
|
|
) |
|
|
|
|
|
if result: |
|
|
last_invoice = result[0][0] |
|
|
try: |
|
|
|
|
|
serial_part = last_invoice[ |
|
|
len(prefix) + 4 : |
|
|
] |
|
|
last_serial = int(serial_part) |
|
|
new_serial = last_serial + 1 |
|
|
except ValueError: |
|
|
new_serial = 1 |
|
|
else: |
|
|
new_serial = 1 |
|
|
|
|
|
return f"{prefix}{month}{year}{new_serial:03d}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating invoice number: {e}") |
|
|
return f"{prefix}{int(datetime.now().timestamp())}" |
|
|
|
|
|
|
|
|
|
|
|
def add_sale( |
|
|
self, |
|
|
invoice_no: str, |
|
|
customer_id: int, |
|
|
sale_date, |
|
|
items: List[Dict], |
|
|
payments: List[Dict] = None, |
|
|
notes: str = "", |
|
|
) -> int: |
|
|
"""Add a new sale with items and optional payments - ENHANCED""" |
|
|
conn = self.get_connection() |
|
|
try: |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
total_amount = sum(item["quantity"] * item["rate"] for item in items) |
|
|
total_liters = sum(item.get("liters", 0) for item in items) |
|
|
|
|
|
print( |
|
|
f"🔧 DEBUG: Creating sale - Invoice: {invoice_no}, Customer: {customer_id}, Total: {total_amount}" |
|
|
) |
|
|
|
|
|
|
|
|
cursor.execute( |
|
|
""" |
|
|
INSERT INTO sales (invoice_no, customer_id, sale_date, total_amount, total_liters, notes) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
""", |
|
|
(invoice_no, customer_id, sale_date, total_amount, total_liters, notes), |
|
|
) |
|
|
|
|
|
|
|
|
sale_id = cursor.lastrowid |
|
|
print(f"🔧 DEBUG: Sale created with ID: {sale_id}") |
|
|
|
|
|
|
|
|
for item in items: |
|
|
amount = item["quantity"] * item["rate"] |
|
|
print( |
|
|
f"🔧 DEBUG: Adding item - Product: {item['product_id']}, Qty: {item['quantity']}, Rate: {item['rate']}" |
|
|
) |
|
|
|
|
|
cursor.execute( |
|
|
""" |
|
|
INSERT INTO sale_items (sale_id, product_id, quantity, rate, amount) |
|
|
VALUES (?, ?, ?, ?, ?) |
|
|
""", |
|
|
( |
|
|
sale_id, |
|
|
item["product_id"], |
|
|
item["quantity"], |
|
|
item["rate"], |
|
|
amount, |
|
|
), |
|
|
) |
|
|
|
|
|
|
|
|
if payments: |
|
|
for payment in payments: |
|
|
cursor.execute( |
|
|
""" |
|
|
INSERT INTO payments (sale_id, payment_date, payment_method, amount, rrn, reference) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
""", |
|
|
( |
|
|
sale_id, |
|
|
payment["payment_date"], |
|
|
payment["method"], |
|
|
payment["amount"], |
|
|
payment.get("rrn", ""), |
|
|
payment.get("reference", ""), |
|
|
), |
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
self._update_payment_status(sale_id) |
|
|
|
|
|
print(f"🔧 DEBUG: Sale {sale_id} completed successfully") |
|
|
return sale_id |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
logger.error(f"Error adding sale: {e}") |
|
|
print(f"❌ ERROR in add_sale: {e}") |
|
|
raise |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def _update_payment_status(self, sale_id: int): |
|
|
"""Update payment status for a sale""" |
|
|
conn = self.get_connection() |
|
|
try: |
|
|
|
|
|
cursor = conn.cursor() |
|
|
cursor.execute( |
|
|
"SELECT COALESCE(SUM(amount), 0) FROM payments WHERE sale_id = ?", |
|
|
(sale_id,), |
|
|
) |
|
|
total_paid = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute( |
|
|
"SELECT total_amount FROM sales WHERE sale_id = ?", (sale_id,) |
|
|
) |
|
|
sale_total = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
if total_paid >= sale_total: |
|
|
status = "Paid" |
|
|
elif total_paid > 0: |
|
|
status = "Partial" |
|
|
else: |
|
|
status = "Pending" |
|
|
|
|
|
|
|
|
cursor.execute( |
|
|
"UPDATE sales SET payment_status = ? WHERE sale_id = ?", |
|
|
(status, sale_id), |
|
|
) |
|
|
conn.commit() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error updating payment status: {e}") |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def get_pending_payments(self) -> pd.DataFrame: |
|
|
"""Get all pending payments with customer details""" |
|
|
return self.get_dataframe( |
|
|
"sales", |
|
|
""" |
|
|
SELECT s.sale_id, s.invoice_no, s.sale_date, c.name as customer_name, |
|
|
c.mobile, c.village, s.total_amount, |
|
|
(s.total_amount - COALESCE(SUM(p.amount), 0)) as pending_amount, |
|
|
COALESCE(SUM(p.amount), 0) as paid_amount |
|
|
FROM sales s |
|
|
LEFT JOIN customers c ON s.customer_id = c.customer_id |
|
|
LEFT JOIN payments p ON s.sale_id = p.sale_id |
|
|
WHERE s.payment_status IN ('Pending', 'Partial') |
|
|
GROUP BY s.sale_id |
|
|
HAVING pending_amount > 0 |
|
|
ORDER BY s.sale_date DESC |
|
|
""", |
|
|
) |
|
|
|
|
|
def get_demo_conversions(self) -> pd.DataFrame: |
|
|
"""Get demo conversion statistics with details""" |
|
|
return self.get_dataframe( |
|
|
"demos", |
|
|
""" |
|
|
SELECT d.*, c.name as customer_name, p.product_name, |
|
|
dist.name as distributor_name, c.village, c.taluka, |
|
|
CASE WHEN d.conversion_status = 'Converted' THEN 1 ELSE 0 END as converted |
|
|
FROM demos d |
|
|
LEFT JOIN customers c ON d.customer_id = c.customer_id |
|
|
LEFT JOIN products p ON d.product_id = p.product_id |
|
|
LEFT JOIN distributors dist ON d.distributor_id = dist.distributor_id |
|
|
ORDER BY d.demo_date DESC |
|
|
""", |
|
|
) |
|
|
|
|
|
def get_sales_analytics(self, start_date: str = None, end_date: str = None) -> Dict: |
|
|
"""Get comprehensive sales analytics""" |
|
|
if not start_date: |
|
|
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") |
|
|
if not end_date: |
|
|
end_date = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
query = """ |
|
|
SELECT |
|
|
COUNT(*) as total_sales, |
|
|
SUM(total_amount) as total_revenue, |
|
|
AVG(total_amount) as avg_sale_value, |
|
|
COUNT(DISTINCT customer_id) as unique_customers, |
|
|
SUM(CASE WHEN payment_status = 'Paid' THEN 1 ELSE 0 END) as completed_payments, |
|
|
SUM(CASE WHEN payment_status IN ('Pending', 'Partial') THEN 1 ELSE 0 END) as pending_payments |
|
|
FROM sales |
|
|
WHERE sale_date BETWEEN ? AND ? |
|
|
""" |
|
|
|
|
|
result = self.execute_query(query, (start_date, end_date), log_action=False) |
|
|
|
|
|
if result: |
|
|
row = result[0] |
|
|
return { |
|
|
"total_sales": row[0] or 0, |
|
|
"total_revenue": row[1] or 0, |
|
|
"avg_sale_value": row[2] or 0, |
|
|
"unique_customers": row[3] or 0, |
|
|
"completed_payments": row[4] or 0, |
|
|
"pending_payments": row[5] or 0, |
|
|
} |
|
|
return {} |
|
|
|
|
|
def log_system_action( |
|
|
self, |
|
|
log_type: str, |
|
|
message: str, |
|
|
table_name: str = None, |
|
|
record_id: int = None, |
|
|
action: str = None, |
|
|
): |
|
|
"""Log system actions for audit trail - without recursion""" |
|
|
if self._is_logging: |
|
|
return |
|
|
|
|
|
try: |
|
|
self._is_logging = True |
|
|
self._execute_query_internal( |
|
|
""" |
|
|
INSERT INTO system_logs (log_type, log_message, table_name, record_id, action) |
|
|
VALUES (?, ?, ?, ?, ?) |
|
|
""", |
|
|
(log_type, message, table_name, record_id, action), |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error logging system action: {e}") |
|
|
finally: |
|
|
self._is_logging = False |
|
|
|
|
|
def create_rollback_point( |
|
|
self, table_name: str, record_id: int, old_data: str, new_data: str, action: str |
|
|
): |
|
|
"""Create a rollback point for data changes""" |
|
|
try: |
|
|
self.execute_query( |
|
|
""" |
|
|
INSERT INTO rollback_logs (table_name, record_id, old_data, new_data, action) |
|
|
VALUES (?, ?, ?, ?, ?) |
|
|
""", |
|
|
(table_name, record_id, old_data, new_data, action), |
|
|
log_action=False, |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error creating rollback point: {e}") |
|
|
|
|
|
def get_recent_activity(self, limit: int = 10) -> pd.DataFrame: |
|
|
"""Get recent system activity""" |
|
|
return self.get_dataframe( |
|
|
"system_logs", |
|
|
f""" |
|
|
SELECT log_type, log_message, table_name, record_id, action, created_date |
|
|
FROM system_logs |
|
|
ORDER BY created_date DESC |
|
|
LIMIT {limit} |
|
|
""", |
|
|
) |
|
|
|
|
|
def backup_database(self, backup_path: str = None): |
|
|
"""Create a database backup""" |
|
|
if not backup_path: |
|
|
backup_path = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db" |
|
|
|
|
|
try: |
|
|
conn = self.get_connection() |
|
|
backup_conn = sqlite3.connect(backup_path) |
|
|
|
|
|
with backup_conn: |
|
|
conn.backup(backup_conn) |
|
|
|
|
|
conn.close() |
|
|
backup_conn.close() |
|
|
|
|
|
logger.info(f"Database backup created: {backup_path}") |
|
|
return backup_path |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating database backup: {e}") |
|
|
return None |
|
|
|
|
|
def get_village_wise_sales(self) -> pd.DataFrame: |
|
|
"""Get sales data grouped by village""" |
|
|
return self.get_dataframe( |
|
|
"sales", |
|
|
""" |
|
|
SELECT c.village, COUNT(s.sale_id) as total_sales, |
|
|
SUM(s.total_amount) as total_revenue, |
|
|
AVG(s.total_amount) as avg_sale_value, |
|
|
COUNT(DISTINCT s.customer_id) as unique_customers |
|
|
FROM sales s |
|
|
JOIN customers c ON s.customer_id = c.customer_id |
|
|
WHERE c.village IS NOT NULL AND c.village != '' |
|
|
GROUP BY c.village |
|
|
ORDER BY total_revenue DESC |
|
|
""", |
|
|
) |
|
|
|
|
|
def get_product_performance(self) -> pd.DataFrame: |
|
|
"""Get product performance analytics""" |
|
|
return self.get_dataframe( |
|
|
"sale_items", |
|
|
""" |
|
|
SELECT p.product_name, COUNT(si.item_id) as times_sold, |
|
|
SUM(si.quantity) as total_quantity, |
|
|
SUM(si.amount) as total_revenue, |
|
|
AVG(si.rate) as avg_rate |
|
|
FROM sale_items si |
|
|
JOIN products p ON si.product_id = p.product_id |
|
|
GROUP BY p.product_id, p.product_name |
|
|
ORDER BY total_revenue DESC |
|
|
""", |
|
|
) |
|
|
|
|
|
def get_upcoming_follow_ups(self) -> pd.DataFrame: |
|
|
"""Get upcoming follow-ups""" |
|
|
return self.get_dataframe( |
|
|
"follow_ups", |
|
|
""" |
|
|
SELECT f.*, c.name as customer_name, c.mobile, |
|
|
d.name as distributor_name, dm.demo_date |
|
|
FROM follow_ups f |
|
|
LEFT JOIN customers c ON f.customer_id = c.customer_id |
|
|
LEFT JOIN distributors d ON f.distributor_id = d.distributor_id |
|
|
LEFT JOIN demos dm ON f.demo_id = dm.demo_id |
|
|
WHERE f.follow_up_date >= date('now') |
|
|
AND f.status = 'Pending' |
|
|
ORDER BY f.follow_up_date ASC |
|
|
LIMIT 20 |
|
|
""", |
|
|
) |
|
|
|
|
|
def get_whatsapp_logs(self, customer_id: int = None) -> pd.DataFrame: |
|
|
"""Get WhatsApp communication logs""" |
|
|
if customer_id: |
|
|
return self.get_dataframe( |
|
|
"whatsapp_logs", |
|
|
""" |
|
|
SELECT w.*, c.name as customer_name, c.mobile |
|
|
FROM whatsapp_logs w |
|
|
LEFT JOIN customers c ON w.customer_id = c.customer_id |
|
|
WHERE w.customer_id = ? |
|
|
ORDER BY w.sent_date DESC |
|
|
""", |
|
|
(customer_id,), |
|
|
) |
|
|
else: |
|
|
return self.get_dataframe( |
|
|
"whatsapp_logs", |
|
|
""" |
|
|
SELECT w.*, c.name as customer_name, c.mobile |
|
|
FROM whatsapp_logs w |
|
|
LEFT JOIN customers c ON w.customer_id = c.customer_id |
|
|
ORDER BY w.sent_date DESC |
|
|
LIMIT 50 |
|
|
""", |
|
|
) |
|
|
|
|
|
def cleanup_old_data(self, days: int = 365): |
|
|
"""Clean up old data (logs, etc.) older than specified days""" |
|
|
try: |
|
|
cutoff_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
self.execute_query( |
|
|
"DELETE FROM system_logs WHERE created_date < ?", |
|
|
(cutoff_date,), |
|
|
log_action=False, |
|
|
) |
|
|
|
|
|
|
|
|
self.execute_query( |
|
|
"DELETE FROM rollback_logs WHERE rollback_date < ?", |
|
|
(cutoff_date,), |
|
|
log_action=False, |
|
|
) |
|
|
|
|
|
logger.info(f"Cleaned up data older than {days} days") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error cleaning up old data: {e}") |
|
|
|
|
|
|
|
|
|
|
|
def check_database_health(db_path: str = "sales_management.db") -> Dict: |
|
|
"""Check database health and statistics""" |
|
|
try: |
|
|
db = DatabaseManager(db_path) |
|
|
|
|
|
|
|
|
tables = ["customers", "sales", "distributors", "demos", "payments", "products"] |
|
|
counts = {} |
|
|
|
|
|
for table in tables: |
|
|
result = db.execute_query(f"SELECT COUNT(*) FROM {table}", log_action=False) |
|
|
counts[table] = result[0][0] if result else 0 |
|
|
|
|
|
|
|
|
db_size = os.path.getsize(db_path) if os.path.exists(db_path) else 0 |
|
|
|
|
|
return { |
|
|
"status": "healthy", |
|
|
"table_counts": counts, |
|
|
"database_size_mb": round(db_size / (1024 * 1024), 2), |
|
|
"last_backup": "N/A", |
|
|
"integrity_check": "passed", |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"status": "error", |
|
|
"error": str(e), |
|
|
"table_counts": {}, |
|
|
"database_size_mb": 0, |
|
|
"integrity_check": "failed", |
|
|
} |
|
|
|