from __future__ import annotations import random import sys from collections import defaultdict from pathlib import Path # Allow running this script from any working directory by ensuring the # `apps/api` directory (parent of the `app` package) is on sys.path. _API_ROOT = Path(__file__).resolve().parents[1] if str(_API_ROOT) not in sys.path: sys.path.insert(0, str(_API_ROOT)) from faker import Faker from sqlmodel import Session, select from app.core.database import engine, init_db from app.core.security import hash_password from app.models.address import Address from app.models.product import Product from app.models.review import Review from app.models.suggestion import SearchSuggestion from app.models.user import User from app.models.variant import ProductVariant fake = Faker() # Pre-computed bcrypt hash for "password123!" to avoid expensive hashing per user _BULK_HASH = hash_password("password123!") # Curated Unsplash sources (seed must use images.unsplash.com) UNSPLASH = [ "https://images.unsplash.com/photo-1512436991641-6745cdb1723f", # clothes "https://images.unsplash.com/photo-1523275335684-37898b6baf30", # watch "https://images.unsplash.com/photo-1503602642458-232111445657", # laptop "https://images.unsplash.com/photo-1505740420928-5e560c06d30e", # headphones "https://images.unsplash.com/photo-1511707171634-5f897ff02aa9", # phone "https://images.unsplash.com/photo-1526170375885-4d8ecf77b99f", # camera "https://images.unsplash.com/photo-1503602642458-232111445657", # laptop "https://images.unsplash.com/photo-1542291026-7eec264c27ff", # sneakers "https://images.unsplash.com/photo-1516979187457-637abb4f9353", # headphones alt ] def u(url: str, *, w: int = 800, q: int = 80) -> str: return f"{url}?auto=format&fit=crop&w={w}&q={q}" def _exists(session: Session, model) -> bool: return session.exec(select(model)).first() is not None def seed_users(session: Session, n: int = 15) -> None: if _exists(session, User): return demo_email = "james@example.com" demo = User( email=demo_email, full_name="James Smith", password_hash=hash_password("password123!"), ) session.add(demo) session.flush() session.add( Address( user_id=demo.id, name=demo.full_name, line1="410 Terry Ave N", line2=None, city="Seattle", state="WA", postal_code="98109", country="United States", is_default=True, ) ) for _ in range(max(0, n - 1)): full_name = fake.name() email = fake.unique.email().lower() user = User(email=email, full_name=full_name, password_hash=_BULK_HASH) session.add(user) session.flush() session.add( Address( user_id=user.id, name=full_name, line1=fake.street_address(), line2=None if random.random() < 0.7 else fake.secondary_address(), city=fake.city(), state=fake.state_abbr(), postal_code=fake.postcode(), country="United States", is_default=True, ) ) session.commit() def seed_products(session: Session, n: int = 60) -> None: if _exists(session, Product): return brands = [ "Amazon Basics", "Google", "Roblox", "Apple", "Netflix", "Spotify", "Uber", "Starbucks", "Best Buy", "GameStop", "Visa", "Outback Steakhouse", "Lyft", ] for _ in range(n): brand = random.choice(brands) kind = random.choices( ["Gift Card", "Digital Gift Card", "E-Gift Card"], weights=[0.35, 0.45, 0.20], k=1, )[0] title = f"{brand} {kind}" if brand != "Google" else f"Google Play {kind}" price_min = random.choice([10, 15, 20, 25, 50]) price_max = random.choice([100, 150, 200, 500, 2000]) if price_max < price_min: price_min, price_max = price_max, price_min base_img = u(random.choice(UNSPLASH), w=600) images = [u(random.choice(UNSPLASH), w=1200) for _ in range(random.randint(4, 7))] p = Product( title=title, brand=brand, description=fake.paragraph(nb_sentences=7), image_url=base_img, images=images, rating_avg=0.0, rating_count=0, price_min=price_min, price_max=price_max, ) session.add(p) session.commit() def seed_variants(session: Session) -> None: if _exists(session, ProductVariant): return products = session.exec(select(Product)).all() for p in products: # Gift-card denominations, plus a custom amount option represented by min price denoms = sorted(set([10, 15, 25, 50, 100, p.price_min, p.price_max])) for d in denoms: if d <= 0: continue v = ProductVariant( product_id=p.id, sku=f"{str(p.id)[:8]}-{d}", price=int(d), attributes={ "type": "gift_card", "denomination": int(d), "delivery": "email", "region": "US", }, ) session.add(v) def seed_reviews(session: Session, target_reviews: int = 300) -> None: if _exists(session, Review): return users = session.exec(select(User)).all() products = session.exec(select(Product)).all() if not users or not products: return # Distribute reviews across products with a long-tail ratings_for_product: dict[str, list[int]] = defaultdict(list) for _ in range(target_reviews): uobj = random.choice(users) pobj = random.choice(products) rating = random.choices([5, 4, 3, 2, 1], weights=[0.45, 0.35, 0.13, 0.05, 0.02], k=1)[0] title = fake.sentence(nb_words=5).rstrip(".") body = fake.paragraph(nb_sentences=random.randint(2, 6)) images: list[str] = [] if random.random() < 0.07: images = [u(random.choice(UNSPLASH), w=900) for _ in range(random.randint(1, 3))] r = Review( product_id=pobj.id, user_id=uobj.id, display_name=uobj.full_name, rating=rating, title=title, body=body, verified_purchase=random.random() < 0.55, images=images, ) session.add(r) ratings_for_product[str(pobj.id)].append(rating) # Update denormalized rating fields for p in products: arr = ratings_for_product.get(str(p.id), []) if not arr: continue p.rating_count = len(arr) p.rating_avg = round(sum(arr) / len(arr), 1) session.add(p) def seed_suggestions(session: Session, n: int = 100) -> None: if _exists(session, SearchSuggestion): return base_queries = [ "google play", "google play gift card", "google play gift card email delivery", "amazon gift card", "roblox gift card", "netflix gift card", "spotify gift card", "uber gift card", "starbucks gift card", "visa gift card", "best buy gift card", "game stop gift card", "digital gift card", "gift cards", "e gift card", ] brands = [ "google play", "amazon", "roblox", "netflix", "spotify", "uber", "starbucks", "best buy", "gamestop", "visa", "lyft", ] templates = [ "{b}", "{b} gift card", "{b} gift card email delivery", "{b} gift card digital code", "{b} gift", "{b} store gift card", "{b} card", "{b} card email delivery", ] queries: set[str] = set(q.lower() for q in base_queries) while len(queries) < n: b = random.choice(brands) t = random.choice(templates) q = t.format(b=b).strip().lower() # Add denomination variants if random.random() < 0.25: q = f"{q} {random.choice([10, 15, 20, 25, 50, 100])}" queries.add(q) for q in queries: session.add(SearchSuggestion(query=q, score=random.randint(1, 5000))) def main() -> None: random.seed(42) Faker.seed(42) data_dir = _API_ROOT / "data" data_dir.mkdir(parents=True, exist_ok=True) db_path = data_dir / "app.db" # Remove corrupt DB if present if db_path.exists(): import sqlite3 try: conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA integrity_check") conn.close() except Exception: print(f"Removing corrupt DB: {db_path}") db_path.unlink(missing_ok=True) init_db() with Session(engine) as session: seed_users(session) seed_products(session) seed_variants(session) seed_reviews(session) seed_suggestions(session) session.commit() if __name__ == "__main__": main()