import sqlite3 import os import datetime import random # Define the path for the database DB_DIR = "data" DB_PATH = os.path.join(DB_DIR, "ecommerce_data.db") def create_database(): """Creates the SQLite database and tables if they don't exist.""" # Ensure the data directory exists os.makedirs(DB_DIR, exist_ok=True) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create users table cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, address TEXT ); """) # Create products table cursor.execute(""" CREATE TABLE IF NOT EXISTS products ( product_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, price REAL NOT NULL CHECK(price > 0), stock INTEGER NOT NULL DEFAULT 0 CHECK(stock >= 0) ); """) # Create orders table cursor.execute(""" CREATE TABLE IF NOT EXISTS orders ( order_id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, order_date TEXT NOT NULL, status TEXT NOT NULL CHECK(status IN ('Pending', 'Processing', 'Shipped', 'Delivered', 'Cancelled')), total_amount REAL NOT NULL CHECK(total_amount >= 0), FOREIGN KEY (user_id) REFERENCES users(user_id) ); """) # Create order_items table (linking orders and products) cursor.execute(""" CREATE TABLE IF NOT EXISTS order_items ( order_item_id INTEGER PRIMARY KEY AUTOINCREMENT, order_id INTEGER NOT NULL, product_id INTEGER NOT NULL, quantity INTEGER NOT NULL CHECK(quantity > 0), price_per_unit REAL NOT NULL CHECK(price_per_unit > 0), FOREIGN KEY (order_id) REFERENCES orders(order_id), FOREIGN KEY (product_id) REFERENCES products(product_id) ); """) conn.commit() conn.close() print(f"Database created/verified at {DB_PATH}") def populate_data(): """Populates the database with sample data if tables are empty.""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() try: # Check if tables are already populated cursor.execute("SELECT COUNT(*) FROM users") if cursor.fetchone()[0] > 0: print("Database already contains data. Skipping population.") return # Sample Users users_data = [ ('Alice Wonderland', 'alice@example.com', '123 Fantasy Lane'), ('Bob The Builder', 'bob@example.com', '456 Construction Ave'), ('Charlie Chaplin', 'charlie@example.com', '789 Silent Film St') ] cursor.executemany("INSERT INTO users (name, email, address) VALUES (?, ?, ?)", users_data) print(f"Inserted {len(users_data)} users.") # Sample Products products_data = [ ('Laptop Pro', 'High-end laptop for professionals', 1200.00, 100), ('Wireless Mouse', 'Ergonomic wireless mouse', 25.50, 500), ('Mechanical Keyboard', 'RGB Mechanical Keyboard', 75.00, 200), ('USB-C Hub', '7-in-1 USB-C Hub', 40.00, 300), ('Monitor 27"', '27 inch 4K Monitor', 300.00, 150) ] cursor.executemany("INSERT INTO products (name, description, price, stock) VALUES (?, ?, ?, ?)", products_data) print(f"Inserted {len(products_data)} products.") # Sample Orders and Order Items order_statuses = ['Pending', 'Processing', 'Shipped', 'Delivered', 'Cancelled'] user_ids = [row[0] for row in cursor.execute("SELECT user_id FROM users").fetchall()] product_info = {row[0]: row[1] for row in cursor.execute("SELECT product_id, price FROM products").fetchall()} product_ids = list(product_info.keys()) for i in range(5): # Create 5 sample orders user_id = random.choice(user_ids) order_date = (datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 30))).strftime('%Y-%m-%d %H:%M:%S') status = random.choice(order_statuses) # Insert order first with a placeholder total cursor.execute("INSERT INTO orders (user_id, order_date, status, total_amount) VALUES (?, ?, ?, ?)", (user_id, order_date, status, 0.0)) order_id = cursor.lastrowid order_total = 0.0 num_items = random.randint(1, 3) items_in_order = random.sample(product_ids, num_items) order_items_data = [] for product_id in items_in_order: quantity = random.randint(1, 2) price_per_unit = product_info[product_id] order_items_data.append((order_id, product_id, quantity, price_per_unit)) order_total += quantity * price_per_unit cursor.executemany("INSERT INTO order_items (order_id, product_id, quantity, price_per_unit) VALUES (?, ?, ?, ?)", order_items_data) # Update order total cursor.execute("UPDATE orders SET total_amount = ? WHERE order_id = ?", (round(order_total, 2), order_id)) print(f"Inserted 5 sample orders with items.") conn.commit() except sqlite3.Error as e: print(f"An error occurred: {e}") conn.rollback() finally: conn.close() if __name__ == "__main__": create_database() populate_data() print("Data generation complete.")