Spaces:
Paused
Paused
File size: 2,784 Bytes
eff8aa5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
from datetime import datetime, timedelta
import random
from sqlalchemy import create_engine, MetaData, inspect, text
from app.core.config import settings
engine = create_engine(settings.DATABASE_URL)
metadata = MetaData()
def ensure_sales_dataset():
inspector = inspect(engine)
if inspector.has_table("sales"):
return
with engine.begin() as conn:
conn.exec_driver_sql(
"""
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date DATE,
product_category TEXT,
product_name TEXT,
quantity INTEGER,
unit_price REAL,
total_amount REAL,
region TEXT
)
"""
)
categories = ["Electronics", "Clothing", "Home", "Books"]
products = {
"Electronics": ["Laptop", "Smartphone", "Headphones", "Monitor"],
"Clothing": ["T-Shirt", "Jeans", "Jacket", "Sneakers"],
"Home": ["Sofa", "Table", "Lamp", "Rug"],
"Books": ["Fiction", "Non-Fiction", "Sci-Fi", "Biography"],
}
regions = ["North", "South", "East", "West"]
rows = []
start_date = datetime(2023, 1, 1)
for _ in range(365):
date = start_date + timedelta(days=random.randint(0, 364))
category = random.choice(categories)
product = random.choice(products[category])
quantity = random.randint(1, 10)
unit_price = round(random.uniform(10.0, 1200.0), 2)
total_amount = round(quantity * unit_price, 2)
region = random.choice(regions)
rows.append(
{
"date": date.strftime("%Y-%m-%d"),
"product_category": category,
"product_name": product,
"quantity": quantity,
"unit_price": unit_price,
"total_amount": total_amount,
"region": region,
}
)
conn.execute(
text(
"""
INSERT INTO sales (date, product_category, product_name, quantity, unit_price, total_amount, region)
VALUES (:date, :product_category, :product_name, :quantity, :unit_price, :total_amount, :region)
"""
),
rows,
)
ensure_sales_dataset()
def get_db_schema():
metadata.reflect(bind=engine)
schema_info = []
for table in metadata.tables.values():
columns = [f"{col.name} ({col.type})" for col in table.columns]
schema_info.append(f"Table: {table.name}\nColumns: {', '.join(columns)}")
return "\n\n".join(schema_info)
|