File size: 6,750 Bytes
b59a07e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from __future__ import annotations

import os
import random
from typing import Optional

import psycopg2
from faker import Faker


class ConnectionError(Exception):
    pass


class DBREPostgres:

    def __init__(
        self,
        host: Optional[str] = None,
        port: Optional[str] = None,
        database: Optional[str] = None,
        user: Optional[str] = None,
        password: Optional[str] = None,
    ) -> None:
        self.host = host or os.getenv("DB_HOST", "localhost")
        self.port = port or os.getenv("DB_PORT", "5432")
        self.database = database or os.getenv("DB_NAME", "dbre")
        self.user = user or os.getenv("DB_USER", "postgres")
        self.password = password or os.getenv("DB_PASSWORD", "postgres")
        self.conn = None
        self.fake = Faker()

    def connect(self) -> None:
        try:
            self.conn = psycopg2.connect(
                host=self.host, port=self.port, database=self.database,
                user=self.user, password=self.password
            )
            self.conn.autocommit = False
        except psycopg2.Error as e:
            raise ConnectionError(f"Failed to connect: {e}")

    def create_tables(self) -> None:
        if not self.conn:
            self.connect()
        try:
            cur = self.conn.cursor()
            cur.execute("""
                CREATE TABLE IF NOT EXISTS customers (
                    customer_id SERIAL PRIMARY KEY, name TEXT NOT NULL,
                    email TEXT UNIQUE NOT NULL, city TEXT,
                    created_at TIMESTAMP DEFAULT NOW()
                )
            """)
            cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    product_id SERIAL PRIMARY KEY, name TEXT NOT NULL,
                    category TEXT NOT NULL, price NUMERIC NOT NULL,
                    stock INTEGER DEFAULT 0
                )
            """)
            cur.execute("""
                CREATE TABLE IF NOT EXISTS orders (
                    order_id SERIAL PRIMARY KEY,
                    customer_id INTEGER REFERENCES customers(customer_id),
                    order_date TIMESTAMP DEFAULT NOW(),
                    status TEXT DEFAULT 'pending'
                )
            """)
            cur.execute("""
                CREATE TABLE IF NOT EXISTS order_items (
                    item_id SERIAL PRIMARY KEY,
                    order_id INTEGER REFERENCES orders(order_id),
                    product_id INTEGER REFERENCES products(product_id),
                    quantity INTEGER NOT NULL,
                    unit_price NUMERIC NOT NULL
                )
            """)
            cur.execute("""
                CREATE TABLE IF NOT EXISTS reviews (
                    review_id SERIAL PRIMARY KEY,
                    customer_id INTEGER REFERENCES customers(customer_id),
                    product_id INTEGER REFERENCES products(product_id),
                    rating INTEGER CHECK(rating BETWEEN 1 AND 5),
                    review_text TEXT,
                    created_at TIMESTAMP DEFAULT NOW()
                )
            """)
            self.conn.commit()
            cur.close()
        except psycopg2.Error as e:
            self.conn.rollback()
            raise ConnectionError(f"Failed to create tables: {e}")

    def seed_data(self) -> None:
        """Seed the database with realistic test data. Drops and rebuilds schema first."""
        if not self.conn:
            self.connect()
        random.seed(42)
        Faker.seed(42)
        try:
            cur = self.conn.cursor()
            cur.execute("DROP TABLE IF EXISTS reviews, order_items, orders, products, customers CASCADE;")
            self.conn.commit()
            cur.close()
        except Exception:
            self.conn.rollback()
        
        self.create_tables()
        
        try:
            cur = self.conn.cursor()
            customer_ids = []
            for _ in range(100):
                name = self.fake.name()
                email = self.fake.unique.email()
                city = self.fake.city()
                cur.execute(
                    "INSERT INTO customers (name, email, city) VALUES (%s, %s, %s) RETURNING customer_id",
                    (name, email, city),
                )
                customer_ids.append(cur.fetchone()[0])

            categories = ["Electronics", "Clothing", "Home", "Sports", "Books"]
            product_ids = []
            for _ in range(50):
                name = self.fake.catch_phrase()
                category = random.choice(categories)
                price = round(random.uniform(10, 500), 2)
                stock = random.randint(0, 100)
                cur.execute(
                    "INSERT INTO products (name, category, price, stock) VALUES (%s, %s, %s, %s) RETURNING product_id",
                    (name, category, price, stock),
                )
                product_ids.append(cur.fetchone()[0])

            order_ids = []
            statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]
            for _ in range(300):
                customer_id = random.choice(customer_ids)
                order_date = self.fake.date_time_between(start_date="-1y", end_date="now")
                status = random.choice(statuses)
                cur.execute(
                    "INSERT INTO orders (customer_id, order_date, status) VALUES (%s, %s, %s) RETURNING order_id",
                    (customer_id, order_date, status),
                )
                order_ids.append(cur.fetchone()[0])

            for _ in range(600):
                order_id = random.choice(order_ids)
                product_id = random.choice(product_ids)
                quantity = random.randint(1, 10)
                unit_price = round(random.uniform(10, 500), 2)
                cur.execute(
                    "INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES (%s, %s, %s, %s)",
                    (order_id, product_id, quantity, unit_price),
                )

            for _ in range(200):
                customer_id = random.choice(customer_ids)
                product_id = random.choice(product_ids)
                rating = random.randint(1, 5)
                review_text = self.fake.text(max_nb_chars=200)
                cur.execute(
                    "INSERT INTO reviews (customer_id, product_id, rating, review_text) VALUES (%s, %s, %s, %s)",
                    (customer_id, product_id, rating, review_text),
                )

            self.conn.commit()
            cur.close()
        except psycopg2.Error as e:
            self.conn.rollback()
            raise ConnectionError(f"Failed to seed data: {e}")