|
|
import sqlite3
|
|
|
import pandas as pd
|
|
|
import os
|
|
|
import logging
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
import random
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class DatabaseManager:
|
|
|
def __init__(self, db_path="sales_management.db"):
|
|
|
self.db_path = db_path
|
|
|
self._is_logging = False
|
|
|
self.init_database()
|
|
|
|
|
|
def get_connection(self):
|
|
|
"""Get database connection with error handling"""
|
|
|
try:
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
return conn
|
|
|
except sqlite3.Error as e:
|
|
|
logger.error(f"Database connection error: {e}")
|
|
|
raise
|
|
|
|
|
|
def init_database(self):
|
|
|
"""Initialize database with all tables and relationships"""
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS customers (
|
|
|
customer_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
customer_code TEXT UNIQUE,
|
|
|
name TEXT NOT NULL,
|
|
|
mobile TEXT,
|
|
|
village TEXT,
|
|
|
taluka TEXT,
|
|
|
district TEXT,
|
|
|
status TEXT DEFAULT 'Active',
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS distributors (
|
|
|
distributor_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
name TEXT NOT NULL,
|
|
|
village TEXT,
|
|
|
taluka TEXT,
|
|
|
district TEXT,
|
|
|
mantri_name TEXT,
|
|
|
mantri_mobile TEXT,
|
|
|
sabhasad_count INTEGER DEFAULT 0,
|
|
|
contact_in_group INTEGER DEFAULT 0,
|
|
|
status TEXT DEFAULT 'Active',
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS products (
|
|
|
product_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
product_name TEXT UNIQUE NOT NULL,
|
|
|
packing_type TEXT,
|
|
|
capacity_ltr REAL,
|
|
|
category TEXT,
|
|
|
standard_rate REAL,
|
|
|
is_active INTEGER DEFAULT 1,
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS sales (
|
|
|
sale_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
invoice_no TEXT UNIQUE NOT NULL,
|
|
|
customer_id INTEGER,
|
|
|
sale_date DATE,
|
|
|
total_amount REAL DEFAULT 0,
|
|
|
total_liters REAL DEFAULT 0,
|
|
|
payment_status TEXT DEFAULT 'Pending',
|
|
|
notes TEXT,
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS sale_items (
|
|
|
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
sale_id INTEGER,
|
|
|
product_id INTEGER,
|
|
|
quantity INTEGER,
|
|
|
rate REAL,
|
|
|
amount REAL,
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
FOREIGN KEY (sale_id) REFERENCES sales (sale_id) ON DELETE CASCADE,
|
|
|
FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE SET NULL
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS payments (
|
|
|
payment_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
sale_id INTEGER,
|
|
|
payment_date DATE,
|
|
|
payment_method TEXT,
|
|
|
amount REAL,
|
|
|
rrn TEXT,
|
|
|
reference TEXT,
|
|
|
status TEXT DEFAULT 'Completed',
|
|
|
notes TEXT,
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
FOREIGN KEY (sale_id) REFERENCES sales (sale_id) ON DELETE CASCADE
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS demos (
|
|
|
demo_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
customer_id INTEGER,
|
|
|
distributor_id INTEGER,
|
|
|
demo_date DATE,
|
|
|
demo_time TIME,
|
|
|
product_id INTEGER,
|
|
|
quantity_provided INTEGER,
|
|
|
follow_up_date DATE,
|
|
|
conversion_status TEXT DEFAULT 'Not Converted',
|
|
|
notes TEXT,
|
|
|
demo_location TEXT,
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL,
|
|
|
FOREIGN KEY (distributor_id) REFERENCES distributors (distributor_id) ON DELETE SET NULL,
|
|
|
FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE SET NULL
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS whatsapp_logs (
|
|
|
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
customer_id INTEGER,
|
|
|
distributor_id INTEGER,
|
|
|
message_type TEXT,
|
|
|
message_content TEXT,
|
|
|
status TEXT,
|
|
|
sent_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
response TEXT,
|
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL,
|
|
|
FOREIGN KEY (distributor_id) REFERENCES distributors (distributor_id) ON DELETE SET NULL
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS follow_ups (
|
|
|
follow_up_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
customer_id INTEGER,
|
|
|
distributor_id INTEGER,
|
|
|
demo_id INTEGER,
|
|
|
follow_up_date DATE,
|
|
|
follow_up_type TEXT,
|
|
|
notes TEXT,
|
|
|
status TEXT DEFAULT 'Pending',
|
|
|
next_follow_up_date DATE,
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE SET NULL,
|
|
|
FOREIGN KEY (distributor_id) REFERENCES distributors (distributor_id) ON DELETE SET NULL,
|
|
|
FOREIGN KEY (demo_id) REFERENCES demos (demo_id) ON DELETE SET NULL
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS system_logs (
|
|
|
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
log_type TEXT,
|
|
|
log_message TEXT,
|
|
|
table_name TEXT,
|
|
|
record_id INTEGER,
|
|
|
action TEXT,
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
user_info TEXT
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS rollback_logs (
|
|
|
rollback_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
table_name TEXT,
|
|
|
record_id INTEGER,
|
|
|
old_data TEXT,
|
|
|
new_data TEXT,
|
|
|
action TEXT,
|
|
|
rollback_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
rolled_back_by TEXT
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS offers (
|
|
|
offer_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
offer_name TEXT NOT NULL,
|
|
|
offer_description TEXT,
|
|
|
product_id INTEGER,
|
|
|
discount_percentage REAL,
|
|
|
discount_amount REAL,
|
|
|
start_date DATE,
|
|
|
end_date DATE,
|
|
|
status TEXT DEFAULT 'Active',
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE SET NULL
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS demo_teams (
|
|
|
team_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
team_name TEXT NOT NULL,
|
|
|
team_leader TEXT,
|
|
|
team_members TEXT,
|
|
|
assigned_villages TEXT,
|
|
|
status TEXT DEFAULT 'Active',
|
|
|
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
conn.commit()
|
|
|
logger.info("Database tables initialized successfully")
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
logger.error(f"Error initializing database: {e}")
|
|
|
raise
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
self.initialize_default_data()
|
|
|
self.migrate_database()
|
|
|
self.create_indexes()
|
|
|
|
|
|
def create_indexes(self):
|
|
|
"""Create indexes for better performance"""
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
try:
|
|
|
|
|
|
indexes = [
|
|
|
"CREATE INDEX IF NOT EXISTS idx_customers_village ON customers(village)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_customers_mobile ON customers(mobile)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_sales_customer_id ON sales(customer_id)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_sales_date ON sales(sale_date)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_sales_invoice ON sales(invoice_no)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_payments_sale_id ON payments(sale_id)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_demos_customer_id ON demos(customer_id)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_demos_date ON demos(demo_date)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_sale_items_sale_id ON sale_items(sale_id)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_follow_ups_date ON follow_ups(follow_up_date)",
|
|
|
"CREATE INDEX IF NOT EXISTS idx_whatsapp_customer_id ON whatsapp_logs(customer_id)",
|
|
|
]
|
|
|
|
|
|
for index_sql in indexes:
|
|
|
conn.execute(index_sql)
|
|
|
|
|
|
conn.commit()
|
|
|
logger.info("Database indexes created successfully")
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
logger.error(f"Error creating indexes: {e}")
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def migrate_database(self):
|
|
|
"""Migrate existing database to add missing columns"""
|
|
|
conn = self.get_connection()
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
|
cursor.execute("PRAGMA table_info(demos)")
|
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
|
|
|
|
|
if "demo_time" not in columns:
|
|
|
cursor.execute("ALTER TABLE demos ADD COLUMN demo_time TIME")
|
|
|
logger.info("Added demo_time column to demos table")
|
|
|
|
|
|
|
|
|
if "demo_location" not in columns:
|
|
|
cursor.execute("ALTER TABLE demos ADD COLUMN demo_location TEXT")
|
|
|
logger.info("Added demo_location column to demos table")
|
|
|
|
|
|
conn.commit()
|
|
|
logger.info("Database migration completed successfully")
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
logger.error(f"Error during database migration: {e}")
|
|
|
conn.rollback()
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def initialize_default_data(self):
|
|
|
"""Initialize with default products and demo teams"""
|
|
|
default_products = [
|
|
|
("1 LTR PLASTIC JAR", "PLASTIC_JAR", 1.0, "Regular", 95),
|
|
|
("2 LTR PLASTIC JAR", "PLASTIC_JAR", 2.0, "Regular", 185),
|
|
|
("5 LTR PLASTIC JAR", "PLASTIC_JAR", 5.0, "Regular", 460),
|
|
|
("5 LTR STEEL BARNI", "STEEL_BARNI", 5.0, "Premium", 680),
|
|
|
("10 LTR STEEL BARNI", "STEEL_BARNI", 10.0, "Premium", 1300),
|
|
|
("20 LTR STEEL BARNI", "STEEL_BARNI", 20.0, "Premium", 2950),
|
|
|
("20 LTR PLASTIC CAN", "PLASTIC_CAN", 20.0, "Regular", 2400),
|
|
|
("1 LTR PET BOTTLE", "PET_BOTTLE", 1.0, "Regular", 85),
|
|
|
]
|
|
|
|
|
|
default_teams = [
|
|
|
(
|
|
|
"Team A - North Region",
|
|
|
"Rajesh Kumar",
|
|
|
"Mohan, Suresh, Priya",
|
|
|
"Amiyad, Amvad, Ankalav",
|
|
|
),
|
|
|
(
|
|
|
"Team B - South Region",
|
|
|
"Sunil Patel",
|
|
|
"Anita, Vijay, Deepak",
|
|
|
"Petlad, Borsad, Vadodara",
|
|
|
),
|
|
|
]
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
try:
|
|
|
|
|
|
for product in default_products:
|
|
|
conn.execute(
|
|
|
"""
|
|
|
INSERT OR IGNORE INTO products (product_name, packing_type, capacity_ltr, category, standard_rate)
|
|
|
VALUES (?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
product,
|
|
|
)
|
|
|
|
|
|
|
|
|
for team in default_teams:
|
|
|
conn.execute(
|
|
|
"""
|
|
|
INSERT OR IGNORE INTO demo_teams (team_name, team_leader, team_members, assigned_villages)
|
|
|
VALUES (?, ?, ?, ?)
|
|
|
""",
|
|
|
team,
|
|
|
)
|
|
|
|
|
|
conn.commit()
|
|
|
logger.info("Default data initialized successfully")
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
logger.error(f"Error initializing default data: {e}")
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def _execute_query_internal(self, query: str, params: tuple = None) -> List[tuple]:
|
|
|
"""Internal method to execute SQL query without logging"""
|
|
|
conn = self.get_connection()
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
if params:
|
|
|
cursor.execute(query, params)
|
|
|
else:
|
|
|
cursor.execute(query)
|
|
|
|
|
|
|
|
|
if query.strip().upper().startswith("SELECT"):
|
|
|
result = cursor.fetchall()
|
|
|
elif query.strip().upper().startswith("INSERT"):
|
|
|
|
|
|
result = [(cursor.lastrowid,)]
|
|
|
else:
|
|
|
result = []
|
|
|
|
|
|
conn.commit()
|
|
|
return result
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
logger.error(f"Database query error: {e}")
|
|
|
conn.rollback()
|
|
|
raise
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def execute_query(
|
|
|
self, query: str, params: tuple = None, log_action: bool = True
|
|
|
) -> List[tuple]:
|
|
|
"""Execute a SQL query with comprehensive error handling"""
|
|
|
try:
|
|
|
result = self._execute_query_internal(query, params)
|
|
|
|
|
|
|
|
|
if log_action and not self._is_logging:
|
|
|
try:
|
|
|
self._is_logging = True
|
|
|
self._execute_query_internal(
|
|
|
"""
|
|
|
INSERT INTO system_logs (log_type, log_message, table_name, record_id, action)
|
|
|
VALUES (?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(
|
|
|
"QUERY_EXECUTION",
|
|
|
f"Executed query: {query[:100]}...",
|
|
|
None,
|
|
|
None,
|
|
|
"EXECUTE",
|
|
|
),
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error logging system action: {e}")
|
|
|
finally:
|
|
|
self._is_logging = False
|
|
|
|
|
|
return result
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in execute_query: {e}")
|
|
|
return []
|
|
|
|
|
|
def get_dataframe(
|
|
|
self, table_name: str = None, query: str = None, params: tuple = None
|
|
|
) -> pd.DataFrame:
|
|
|
"""Get table data as DataFrame with flexible query support"""
|
|
|
conn = self.get_connection()
|
|
|
try:
|
|
|
if query:
|
|
|
df = pd.read_sql_query(query, conn, params=params)
|
|
|
else:
|
|
|
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
|
|
|
return df
|
|
|
except Exception as e:
|
|
|
logger.error(
|
|
|
f"Error getting DataFrame for {table_name if table_name else 'query'}: {e}"
|
|
|
)
|
|
|
|
|
|
return pd.DataFrame()
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def add_customer(
|
|
|
self,
|
|
|
name: str,
|
|
|
mobile: str = "",
|
|
|
village: str = "",
|
|
|
taluka: str = "",
|
|
|
district: str = "",
|
|
|
customer_code: str = None,
|
|
|
) -> int:
|
|
|
"""Add a new customer with duplicate handling"""
|
|
|
|
|
|
|
|
|
if not customer_code:
|
|
|
customer_code = f"CUST{datetime.now().strftime('%Y%m%d%H%M%S')}{random.randint(100, 999)}"
|
|
|
|
|
|
try:
|
|
|
|
|
|
existing_customer = self.execute_query(
|
|
|
"SELECT customer_id FROM customers WHERE mobile = ? OR (name = ? AND village = ?)",
|
|
|
(mobile, name, village),
|
|
|
log_action=False,
|
|
|
)
|
|
|
|
|
|
if existing_customer:
|
|
|
|
|
|
return existing_customer[0][0]
|
|
|
|
|
|
|
|
|
max_attempts = 5
|
|
|
for attempt in range(max_attempts):
|
|
|
try:
|
|
|
result = self.execute_query(
|
|
|
"""
|
|
|
INSERT INTO customers (customer_code, name, mobile, village, taluka, district)
|
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(customer_code, name, mobile, village, taluka, district),
|
|
|
log_action=False,
|
|
|
)
|
|
|
break
|
|
|
except sqlite3.IntegrityError as e:
|
|
|
if (
|
|
|
"UNIQUE constraint failed: customers.customer_code" in str(e)
|
|
|
and attempt < max_attempts - 1
|
|
|
):
|
|
|
|
|
|
customer_code = f"CUST{datetime.now().strftime('%Y%m%d%H%M%S')}{random.randint(1000, 9999)}"
|
|
|
continue
|
|
|
else:
|
|
|
raise e
|
|
|
|
|
|
|
|
|
customer_id = self.execute_query(
|
|
|
"SELECT last_insert_rowid()", log_action=False
|
|
|
)[0][0]
|
|
|
|
|
|
self.log_system_action(
|
|
|
"CUSTOMER_ADD",
|
|
|
f"Added customer: {name}",
|
|
|
"customers",
|
|
|
customer_id,
|
|
|
"INSERT",
|
|
|
)
|
|
|
|
|
|
return customer_id
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error adding customer: {e}")
|
|
|
|
|
|
return -1
|
|
|
|
|
|
def add_distributor(
|
|
|
self,
|
|
|
name: str,
|
|
|
village: str = "",
|
|
|
taluka: str = "",
|
|
|
district: str = "",
|
|
|
mantri_name: str = "",
|
|
|
mantri_mobile: str = "",
|
|
|
sabhasad_count: int = 0,
|
|
|
contact_in_group: int = 0,
|
|
|
status: str = "Active",
|
|
|
) -> int:
|
|
|
"""Add a new distributor with duplicate handling"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
existing_distributor = self.execute_query(
|
|
|
"SELECT distributor_id FROM distributors WHERE name = ? AND village = ? AND taluka = ?",
|
|
|
(name, village, taluka),
|
|
|
log_action=False,
|
|
|
)
|
|
|
|
|
|
if existing_distributor:
|
|
|
|
|
|
return existing_distributor[0][0]
|
|
|
|
|
|
|
|
|
self.execute_query(
|
|
|
"""
|
|
|
INSERT INTO distributors (name, village, taluka, district, mantri_name, mantri_mobile,
|
|
|
sabhasad_count, contact_in_group, status)
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(
|
|
|
name,
|
|
|
village,
|
|
|
taluka,
|
|
|
district,
|
|
|
mantri_name,
|
|
|
mantri_mobile,
|
|
|
sabhasad_count,
|
|
|
contact_in_group,
|
|
|
status,
|
|
|
),
|
|
|
log_action=False,
|
|
|
)
|
|
|
|
|
|
|
|
|
distributor_id = self.execute_query(
|
|
|
"SELECT last_insert_rowid()", log_action=False
|
|
|
)[0][0]
|
|
|
|
|
|
self.log_system_action(
|
|
|
"DISTRIBUTOR_ADD",
|
|
|
f"Added distributor: {name}",
|
|
|
"distributors",
|
|
|
distributor_id,
|
|
|
"INSERT",
|
|
|
)
|
|
|
|
|
|
return distributor_id
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error adding distributor: {e}")
|
|
|
return -1
|
|
|
|
|
|
def get_distributor_by_location(self, village: str, taluka: str) -> Optional[Dict]:
|
|
|
"""Get distributor by village and taluka"""
|
|
|
try:
|
|
|
result = self.execute_query(
|
|
|
"SELECT * FROM distributors WHERE village = ? AND taluka = ?",
|
|
|
(village, taluka),
|
|
|
log_action=False,
|
|
|
)
|
|
|
if result:
|
|
|
return dict(result[0])
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error getting distributor by location: {e}")
|
|
|
return None
|
|
|
|
|
|
def distributor_exists(self, name: str, village: str, taluka: str) -> bool:
|
|
|
"""Check if distributor already exists"""
|
|
|
try:
|
|
|
result = self.execute_query(
|
|
|
"SELECT distributor_id FROM distributors WHERE name = ? AND village = ? AND taluka = ?",
|
|
|
(name, village, taluka),
|
|
|
log_action=False,
|
|
|
)
|
|
|
return len(result) > 0
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error checking distributor existence: {e}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def generate_invoice_number(self):
|
|
|
"""Generate automatic invoice number in format: INVCLmmyyserial"""
|
|
|
try:
|
|
|
|
|
|
now = datetime.now()
|
|
|
month = now.strftime("%m")
|
|
|
year = now.strftime("%y")
|
|
|
|
|
|
|
|
|
result = self.execute_query(
|
|
|
"SELECT invoice_no FROM sales WHERE invoice_no LIKE ? ORDER BY sale_id DESC LIMIT 1",
|
|
|
(f"INVCL{month}{year}%",),
|
|
|
log_action=False,
|
|
|
)
|
|
|
|
|
|
if result:
|
|
|
last_invoice = result[0][0]
|
|
|
|
|
|
try:
|
|
|
|
|
|
serial_part = last_invoice[8:]
|
|
|
last_serial = int(serial_part)
|
|
|
new_serial = last_serial + 1
|
|
|
except ValueError:
|
|
|
new_serial = 1
|
|
|
else:
|
|
|
|
|
|
new_serial = 1
|
|
|
|
|
|
|
|
|
return f"INVCL{month}{year}{new_serial:03d}"
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error generating invoice number: {e}")
|
|
|
|
|
|
return f"INVCL{int(datetime.now().timestamp())}"
|
|
|
|
|
|
|
|
|
def generate_invoice_number(self, prefix="INVCL"):
|
|
|
"""Generate automatic invoice number in format: PREFIXmmyyserial"""
|
|
|
try:
|
|
|
now = datetime.now()
|
|
|
month = now.strftime("%m")
|
|
|
year = now.strftime("%y")
|
|
|
|
|
|
result = self.execute_query(
|
|
|
"SELECT invoice_no FROM sales WHERE invoice_no LIKE ? ORDER BY sale_id DESC LIMIT 1",
|
|
|
(f"{prefix}{month}{year}%",),
|
|
|
log_action=False,
|
|
|
)
|
|
|
|
|
|
if result:
|
|
|
last_invoice = result[0][0]
|
|
|
try:
|
|
|
|
|
|
serial_part = last_invoice[
|
|
|
len(prefix) + 4 :
|
|
|
]
|
|
|
last_serial = int(serial_part)
|
|
|
new_serial = last_serial + 1
|
|
|
except ValueError:
|
|
|
new_serial = 1
|
|
|
else:
|
|
|
new_serial = 1
|
|
|
|
|
|
return f"{prefix}{month}{year}{new_serial:03d}"
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error generating invoice number: {e}")
|
|
|
return f"{prefix}{int(datetime.now().timestamp())}"
|
|
|
|
|
|
|
|
|
|
|
|
def add_sale(
|
|
|
self,
|
|
|
invoice_no: str,
|
|
|
customer_id: int,
|
|
|
sale_date,
|
|
|
items: List[Dict],
|
|
|
payments: List[Dict] = None,
|
|
|
notes: str = "",
|
|
|
) -> int:
|
|
|
"""Add a new sale with items and optional payments - ENHANCED"""
|
|
|
conn = self.get_connection()
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
|
total_amount = sum(item["quantity"] * item["rate"] for item in items)
|
|
|
total_liters = sum(item.get("liters", 0) for item in items)
|
|
|
|
|
|
print(
|
|
|
f"🔧 DEBUG: Creating sale - Invoice: {invoice_no}, Customer: {customer_id}, Total: {total_amount}"
|
|
|
)
|
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
"""
|
|
|
INSERT INTO sales (invoice_no, customer_id, sale_date, total_amount, total_liters, notes)
|
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(invoice_no, customer_id, sale_date, total_amount, total_liters, notes),
|
|
|
)
|
|
|
|
|
|
|
|
|
sale_id = cursor.lastrowid
|
|
|
print(f"🔧 DEBUG: Sale created with ID: {sale_id}")
|
|
|
|
|
|
|
|
|
for item in items:
|
|
|
amount = item["quantity"] * item["rate"]
|
|
|
print(
|
|
|
f"🔧 DEBUG: Adding item - Product: {item['product_id']}, Qty: {item['quantity']}, Rate: {item['rate']}"
|
|
|
)
|
|
|
|
|
|
cursor.execute(
|
|
|
"""
|
|
|
INSERT INTO sale_items (sale_id, product_id, quantity, rate, amount)
|
|
|
VALUES (?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(
|
|
|
sale_id,
|
|
|
item["product_id"],
|
|
|
item["quantity"],
|
|
|
item["rate"],
|
|
|
amount,
|
|
|
),
|
|
|
)
|
|
|
|
|
|
|
|
|
if payments:
|
|
|
for payment in payments:
|
|
|
cursor.execute(
|
|
|
"""
|
|
|
INSERT INTO payments (sale_id, payment_date, payment_method, amount, rrn, reference)
|
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(
|
|
|
sale_id,
|
|
|
payment["payment_date"],
|
|
|
payment["method"],
|
|
|
payment["amount"],
|
|
|
payment.get("rrn", ""),
|
|
|
payment.get("reference", ""),
|
|
|
),
|
|
|
)
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
self._update_payment_status(sale_id)
|
|
|
|
|
|
print(f"🔧 DEBUG: Sale {sale_id} completed successfully")
|
|
|
return sale_id
|
|
|
|
|
|
except Exception as e:
|
|
|
conn.rollback()
|
|
|
logger.error(f"Error adding sale: {e}")
|
|
|
print(f"❌ ERROR in add_sale: {e}")
|
|
|
raise
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def _update_payment_status(self, sale_id: int):
|
|
|
"""Update payment status for a sale"""
|
|
|
conn = self.get_connection()
|
|
|
try:
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
cursor.execute(
|
|
|
"SELECT COALESCE(SUM(amount), 0) FROM payments WHERE sale_id = ?",
|
|
|
(sale_id,),
|
|
|
)
|
|
|
total_paid = cursor.fetchone()[0]
|
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
"SELECT total_amount FROM sales WHERE sale_id = ?", (sale_id,)
|
|
|
)
|
|
|
sale_total = cursor.fetchone()[0]
|
|
|
|
|
|
|
|
|
if total_paid >= sale_total:
|
|
|
status = "Paid"
|
|
|
elif total_paid > 0:
|
|
|
status = "Partial"
|
|
|
else:
|
|
|
status = "Pending"
|
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
"UPDATE sales SET payment_status = ? WHERE sale_id = ?",
|
|
|
(status, sale_id),
|
|
|
)
|
|
|
conn.commit()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error updating payment status: {e}")
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def get_pending_payments(self) -> pd.DataFrame:
|
|
|
"""Get all pending payments with customer details"""
|
|
|
return self.get_dataframe(
|
|
|
"sales",
|
|
|
"""
|
|
|
SELECT s.sale_id, s.invoice_no, s.sale_date, c.name as customer_name,
|
|
|
c.mobile, c.village, s.total_amount,
|
|
|
(s.total_amount - COALESCE(SUM(p.amount), 0)) as pending_amount,
|
|
|
COALESCE(SUM(p.amount), 0) as paid_amount
|
|
|
FROM sales s
|
|
|
LEFT JOIN customers c ON s.customer_id = c.customer_id
|
|
|
LEFT JOIN payments p ON s.sale_id = p.sale_id
|
|
|
WHERE s.payment_status IN ('Pending', 'Partial')
|
|
|
GROUP BY s.sale_id
|
|
|
HAVING pending_amount > 0
|
|
|
ORDER BY s.sale_date DESC
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
def get_demo_conversions(self) -> pd.DataFrame:
|
|
|
"""Get demo conversion statistics with details"""
|
|
|
return self.get_dataframe(
|
|
|
"demos",
|
|
|
"""
|
|
|
SELECT d.*, c.name as customer_name, p.product_name,
|
|
|
dist.name as distributor_name, c.village, c.taluka,
|
|
|
CASE WHEN d.conversion_status = 'Converted' THEN 1 ELSE 0 END as converted
|
|
|
FROM demos d
|
|
|
LEFT JOIN customers c ON d.customer_id = c.customer_id
|
|
|
LEFT JOIN products p ON d.product_id = p.product_id
|
|
|
LEFT JOIN distributors dist ON d.distributor_id = dist.distributor_id
|
|
|
ORDER BY d.demo_date DESC
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
def get_sales_analytics(self, start_date: str = None, end_date: str = None) -> Dict:
|
|
|
"""Get comprehensive sales analytics"""
|
|
|
if not start_date:
|
|
|
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
|
|
|
if not end_date:
|
|
|
end_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
query = """
|
|
|
SELECT
|
|
|
COUNT(*) as total_sales,
|
|
|
SUM(total_amount) as total_revenue,
|
|
|
AVG(total_amount) as avg_sale_value,
|
|
|
COUNT(DISTINCT customer_id) as unique_customers,
|
|
|
SUM(CASE WHEN payment_status = 'Paid' THEN 1 ELSE 0 END) as completed_payments,
|
|
|
SUM(CASE WHEN payment_status IN ('Pending', 'Partial') THEN 1 ELSE 0 END) as pending_payments
|
|
|
FROM sales
|
|
|
WHERE sale_date BETWEEN ? AND ?
|
|
|
"""
|
|
|
|
|
|
result = self.execute_query(query, (start_date, end_date), log_action=False)
|
|
|
|
|
|
if result:
|
|
|
row = result[0]
|
|
|
return {
|
|
|
"total_sales": row[0] or 0,
|
|
|
"total_revenue": row[1] or 0,
|
|
|
"avg_sale_value": row[2] or 0,
|
|
|
"unique_customers": row[3] or 0,
|
|
|
"completed_payments": row[4] or 0,
|
|
|
"pending_payments": row[5] or 0,
|
|
|
}
|
|
|
return {}
|
|
|
|
|
|
def log_system_action(
|
|
|
self,
|
|
|
log_type: str,
|
|
|
message: str,
|
|
|
table_name: str = None,
|
|
|
record_id: int = None,
|
|
|
action: str = None,
|
|
|
):
|
|
|
"""Log system actions for audit trail - without recursion"""
|
|
|
if self._is_logging:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
self._is_logging = True
|
|
|
self._execute_query_internal(
|
|
|
"""
|
|
|
INSERT INTO system_logs (log_type, log_message, table_name, record_id, action)
|
|
|
VALUES (?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(log_type, message, table_name, record_id, action),
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error logging system action: {e}")
|
|
|
finally:
|
|
|
self._is_logging = False
|
|
|
|
|
|
def create_rollback_point(
|
|
|
self, table_name: str, record_id: int, old_data: str, new_data: str, action: str
|
|
|
):
|
|
|
"""Create a rollback point for data changes"""
|
|
|
try:
|
|
|
self.execute_query(
|
|
|
"""
|
|
|
INSERT INTO rollback_logs (table_name, record_id, old_data, new_data, action)
|
|
|
VALUES (?, ?, ?, ?, ?)
|
|
|
""",
|
|
|
(table_name, record_id, old_data, new_data, action),
|
|
|
log_action=False,
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error creating rollback point: {e}")
|
|
|
|
|
|
def get_recent_activity(self, limit: int = 10) -> pd.DataFrame:
|
|
|
"""Get recent system activity"""
|
|
|
return self.get_dataframe(
|
|
|
"system_logs",
|
|
|
f"""
|
|
|
SELECT log_type, log_message, table_name, record_id, action, created_date
|
|
|
FROM system_logs
|
|
|
ORDER BY created_date DESC
|
|
|
LIMIT {limit}
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
def backup_database(self, backup_path: str = None):
|
|
|
"""Create a database backup"""
|
|
|
if not backup_path:
|
|
|
backup_path = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
|
|
|
|
|
|
try:
|
|
|
conn = self.get_connection()
|
|
|
backup_conn = sqlite3.connect(backup_path)
|
|
|
|
|
|
with backup_conn:
|
|
|
conn.backup(backup_conn)
|
|
|
|
|
|
conn.close()
|
|
|
backup_conn.close()
|
|
|
|
|
|
logger.info(f"Database backup created: {backup_path}")
|
|
|
return backup_path
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error creating database backup: {e}")
|
|
|
return None
|
|
|
|
|
|
def get_village_wise_sales(self) -> pd.DataFrame:
|
|
|
"""Get sales data grouped by village"""
|
|
|
return self.get_dataframe(
|
|
|
"sales",
|
|
|
"""
|
|
|
SELECT c.village, COUNT(s.sale_id) as total_sales,
|
|
|
SUM(s.total_amount) as total_revenue,
|
|
|
AVG(s.total_amount) as avg_sale_value,
|
|
|
COUNT(DISTINCT s.customer_id) as unique_customers
|
|
|
FROM sales s
|
|
|
JOIN customers c ON s.customer_id = c.customer_id
|
|
|
WHERE c.village IS NOT NULL AND c.village != ''
|
|
|
GROUP BY c.village
|
|
|
ORDER BY total_revenue DESC
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
def get_product_performance(self) -> pd.DataFrame:
|
|
|
"""Get product performance analytics"""
|
|
|
return self.get_dataframe(
|
|
|
"sale_items",
|
|
|
"""
|
|
|
SELECT p.product_name, COUNT(si.item_id) as times_sold,
|
|
|
SUM(si.quantity) as total_quantity,
|
|
|
SUM(si.amount) as total_revenue,
|
|
|
AVG(si.rate) as avg_rate
|
|
|
FROM sale_items si
|
|
|
JOIN products p ON si.product_id = p.product_id
|
|
|
GROUP BY p.product_id, p.product_name
|
|
|
ORDER BY total_revenue DESC
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
def get_upcoming_follow_ups(self) -> pd.DataFrame:
|
|
|
"""Get upcoming follow-ups"""
|
|
|
return self.get_dataframe(
|
|
|
"follow_ups",
|
|
|
"""
|
|
|
SELECT f.*, c.name as customer_name, c.mobile,
|
|
|
d.name as distributor_name, dm.demo_date
|
|
|
FROM follow_ups f
|
|
|
LEFT JOIN customers c ON f.customer_id = c.customer_id
|
|
|
LEFT JOIN distributors d ON f.distributor_id = d.distributor_id
|
|
|
LEFT JOIN demos dm ON f.demo_id = dm.demo_id
|
|
|
WHERE f.follow_up_date >= date('now')
|
|
|
AND f.status = 'Pending'
|
|
|
ORDER BY f.follow_up_date ASC
|
|
|
LIMIT 20
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
def get_whatsapp_logs(self, customer_id: int = None) -> pd.DataFrame:
|
|
|
"""Get WhatsApp communication logs"""
|
|
|
if customer_id:
|
|
|
return self.get_dataframe(
|
|
|
"whatsapp_logs",
|
|
|
"""
|
|
|
SELECT w.*, c.name as customer_name, c.mobile
|
|
|
FROM whatsapp_logs w
|
|
|
LEFT JOIN customers c ON w.customer_id = c.customer_id
|
|
|
WHERE w.customer_id = ?
|
|
|
ORDER BY w.sent_date DESC
|
|
|
""",
|
|
|
(customer_id,),
|
|
|
)
|
|
|
else:
|
|
|
return self.get_dataframe(
|
|
|
"whatsapp_logs",
|
|
|
"""
|
|
|
SELECT w.*, c.name as customer_name, c.mobile
|
|
|
FROM whatsapp_logs w
|
|
|
LEFT JOIN customers c ON w.customer_id = c.customer_id
|
|
|
ORDER BY w.sent_date DESC
|
|
|
LIMIT 50
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
def cleanup_old_data(self, days: int = 365):
|
|
|
"""Clean up old data (logs, etc.) older than specified days"""
|
|
|
try:
|
|
|
cutoff_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
|
|
|
|
|
|
|
|
|
self.execute_query(
|
|
|
"DELETE FROM system_logs WHERE created_date < ?",
|
|
|
(cutoff_date,),
|
|
|
log_action=False,
|
|
|
)
|
|
|
|
|
|
|
|
|
self.execute_query(
|
|
|
"DELETE FROM rollback_logs WHERE rollback_date < ?",
|
|
|
(cutoff_date,),
|
|
|
log_action=False,
|
|
|
)
|
|
|
|
|
|
logger.info(f"Cleaned up data older than {days} days")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error cleaning up old data: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def check_database_health(db_path: str = "sales_management.db") -> Dict:
|
|
|
"""Check database health and statistics"""
|
|
|
try:
|
|
|
db = DatabaseManager(db_path)
|
|
|
|
|
|
|
|
|
tables = ["customers", "sales", "distributors", "demos", "payments", "products"]
|
|
|
counts = {}
|
|
|
|
|
|
for table in tables:
|
|
|
result = db.execute_query(f"SELECT COUNT(*) FROM {table}", log_action=False)
|
|
|
counts[table] = result[0][0] if result else 0
|
|
|
|
|
|
|
|
|
db_size = os.path.getsize(db_path) if os.path.exists(db_path) else 0
|
|
|
|
|
|
return {
|
|
|
"status": "healthy",
|
|
|
"table_counts": counts,
|
|
|
"database_size_mb": round(db_size / (1024 * 1024), 2),
|
|
|
"last_backup": "N/A",
|
|
|
"integrity_check": "passed",
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
"status": "error",
|
|
|
"error": str(e),
|
|
|
"table_counts": {},
|
|
|
"database_size_mb": 0,
|
|
|
"integrity_check": "failed",
|
|
|
}
|
|
|
|