"""
Sample E-Commerce Application
This is a sample codebase for testing the Developer Productivity Agent
"""
# ============================================================================
# models/user.py
# ============================================================================
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime
import hashlib
@dataclass
class User:
"""
User model representing a customer in the e-commerce system.
Attributes:
id: Unique user identifier
email: User's email address
name: User's full name
created_at: Account creation timestamp
preferences: User notification preferences
"""
id: str
email: str
name: str
password_hash: str
created_at: datetime
preferences: Optional[dict] = None
@staticmethod
def hash_password(password: str) -> str:
"""Hash a password using SHA-256"""
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(self, password: str) -> bool:
"""Verify a password against the stored hash"""
return self.password_hash == self.hash_password(password)
def update_preferences(self, new_preferences: dict) -> None:
"""Update user preferences"""
if self.preferences is None:
self.preferences = {}
self.preferences.update(new_preferences)
@dataclass
class UserProfile:
"""Extended user profile information"""
user_id: str
avatar_url: Optional[str]
bio: Optional[str]
phone: Optional[str]
address: Optional[dict]
def get_full_address(self) -> str:
"""Format the full address as a string"""
if not self.address:
return ""
parts = [
self.address.get("street", ""),
self.address.get("city", ""),
self.address.get("state", ""),
self.address.get("zip", ""),
self.address.get("country", "")
]
return ", ".join([p for p in parts if p])
# ============================================================================
# models/product.py
# ============================================================================
@dataclass
class Product:
"""
Product model for items in the store.
Attributes:
id: Unique product identifier
name: Product name
description: Product description
price: Product price in cents
stock: Available inventory count
category: Product category
"""
id: str
name: str
description: str
price: int # In cents
stock: int
category: str
images: List[str] = None
def format_price(self) -> str:
"""Format price as currency string"""
return f"${self.price / 100:.2f}"
def is_available(self) -> bool:
"""Check if product is in stock"""
return self.stock > 0
def reserve_stock(self, quantity: int) -> bool:
"""Reserve stock for an order"""
if self.stock >= quantity:
self.stock -= quantity
return True
return False
@dataclass
class ProductReview:
"""Product review from a user"""
id: str
product_id: str
user_id: str
rating: int # 1-5
comment: str
created_at: datetime
def validate(self) -> bool:
"""Validate review data"""
return 1 <= self.rating <= 5 and len(self.comment) > 0
# ============================================================================
# models/order.py
# ============================================================================
from enum import Enum
class OrderStatus(Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
@dataclass
class OrderItem:
"""Single item in an order"""
product_id: str
quantity: int
price_at_purchase: int
def total(self) -> int:
"""Calculate item total"""
return self.quantity * self.price_at_purchase
@dataclass
class Order:
"""
Order model representing a customer purchase.
Attributes:
id: Unique order identifier
user_id: ID of the user who placed the order
items: List of order items
status: Current order status
created_at: Order creation timestamp
"""
id: str
user_id: str
items: List[OrderItem]
status: OrderStatus
created_at: datetime
shipping_address: dict
def calculate_total(self) -> int:
"""Calculate order total in cents"""
return sum(item.total() for item in self.items)
def format_total(self) -> str:
"""Format total as currency string"""
return f"${self.calculate_total() / 100:.2f}"
def can_cancel(self) -> bool:
"""Check if order can be cancelled"""
return self.status in [OrderStatus.PENDING, OrderStatus.CONFIRMED]
# ============================================================================
# services/user_service.py
# ============================================================================
class UserService:
"""Service for user-related operations"""
def __init__(self, db):
self.db = db
def create_user(self, email: str, name: str, password: str) -> User:
"""
Create a new user account.
Args:
email: User's email address
name: User's full name
password: Plain text password
Returns:
Created User object
"""
user = User(
id=self._generate_id(),
email=email,
name=name,
password_hash=User.hash_password(password),
created_at=datetime.now()
)
self.db.users.insert(user)
return user
def get_user_by_email(self, email: str) -> Optional[User]:
"""Find a user by email address"""
return self.db.users.find_one({"email": email})
def update_user_preferences(self, user_id: str, preferences: dict) -> bool:
"""
Update user notification preferences.
Args:
user_id: User's unique identifier
preferences: Dictionary of preference settings
Returns:
True if update successful
"""
user = self.db.users.find_one({"id": user_id})
if user:
user.update_preferences(preferences)
self.db.users.update({"id": user_id}, user)
return True
return False
def authenticate(self, email: str, password: str) -> Optional[User]:
"""Authenticate user with email and password"""
user = self.get_user_by_email(email)
if user and user.verify_password(password):
return user
return None
def _generate_id(self) -> str:
"""Generate unique user ID"""
import uuid
return str(uuid.uuid4())
# ============================================================================
# services/order_service.py
# ============================================================================
class OrderService:
"""Service for order-related operations"""
def __init__(self, db, product_service):
self.db = db
self.product_service = product_service
def create_order(
self,
user_id: str,
items: List[dict],
shipping_address: dict
) -> Order:
"""
Create a new order.
Args:
user_id: ID of the user placing the order
items: List of {product_id, quantity} dicts
shipping_address: Shipping address dict
Returns:
Created Order object
"""
order_items = []
for item in items:
product = self.product_service.get_product(item["product_id"])
if not product or not product.reserve_stock(item["quantity"]):
raise ValueError(f"Product {item['product_id']} not available")
order_items.append(OrderItem(
product_id=product.id,
quantity=item["quantity"],
price_at_purchase=product.price
))
order = Order(
id=self._generate_id(),
user_id=user_id,
items=order_items,
status=OrderStatus.PENDING,
created_at=datetime.now(),
shipping_address=shipping_address
)
self.db.orders.insert(order)
return order
def get_user_orders(self, user_id: str) -> List[Order]:
"""Get all orders for a user"""
return self.db.orders.find({"user_id": user_id})
def update_order_status(self, order_id: str, status: OrderStatus) -> bool:
"""Update order status"""
order = self.db.orders.find_one({"id": order_id})
if order:
order.status = status
self.db.orders.update({"id": order_id}, order)
return True
return False
def cancel_order(self, order_id: str) -> bool:
"""Cancel an order and restore stock"""
order = self.db.orders.find_one({"id": order_id})
if order and order.can_cancel():
# Restore product stock
for item in order.items:
product = self.product_service.get_product(item.product_id)
if product:
product.stock += item.quantity
self.product_service.update_product(product)
order.status = OrderStatus.CANCELLED
self.db.orders.update({"id": order_id}, order)
return True
return False
def _generate_id(self) -> str:
"""Generate unique order ID"""
import uuid
return f"ORD-{str(uuid.uuid4())[:8].upper()}"
# ============================================================================
# api/auth_routes.py
# ============================================================================
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
router = APIRouter(prefix="/auth", tags=["authentication"])
class LoginRequest(BaseModel):
email: str
password: str
class RegisterRequest(BaseModel):
email: str
name: str
password: str
class AuthResponse(BaseModel):
token: str
user_id: str
@router.post("/login", response_model=AuthResponse)
async def login(request: LoginRequest, user_service: UserService = Depends()):
"""
Authenticate user and return JWT token.
Args:
request: Login credentials
Returns:
JWT token and user ID
"""
user = user_service.authenticate(request.email, request.password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
token = generate_jwt_token(user.id)
return AuthResponse(token=token, user_id=user.id)
@router.post("/register", response_model=AuthResponse)
async def register(request: RegisterRequest, user_service: UserService = Depends()):
"""
Register a new user account.
Args:
request: Registration details
Returns:
JWT token and user ID
"""
existing = user_service.get_user_by_email(request.email)
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
user = user_service.create_user(
email=request.email,
name=request.name,
password=request.password
)
token = generate_jwt_token(user.id)
return AuthResponse(token=token, user_id=user.id)
def generate_jwt_token(user_id: str) -> str:
"""Generate JWT token for user"""
import jwt
import time
payload = {
"user_id": user_id,
"exp": time.time() + 3600 # 1 hour expiry
}
return jwt.encode(payload, "secret_key", algorithm="HS256")
# ============================================================================
# api/user_routes.py
# ============================================================================
user_router = APIRouter(prefix="/users", tags=["users"])
class PreferencesRequest(BaseModel):
email_notifications: bool = True
push_notifications: bool = True
notification_frequency: str = "daily" # immediate, daily, weekly
class ProfileUpdateRequest(BaseModel):
name: Optional[str] = None
phone: Optional[str] = None
bio: Optional[str] = None
@user_router.get("/{user_id}")
async def get_user(user_id: str, user_service: UserService = Depends()):
"""Get user by ID"""
user = user_service.db.users.find_one({"id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@user_router.put("/{user_id}/preferences")
async def update_preferences(
user_id: str,
preferences: PreferencesRequest,
user_service: UserService = Depends()
):
"""
Update user notification preferences.
Args:
user_id: User's ID
preferences: New preference settings
Returns:
Updated preferences
"""
success = user_service.update_user_preferences(
user_id,
preferences.model_dump()
)
if not success:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "Preferences updated", "preferences": preferences}
@user_router.put("/{user_id}/profile")
async def update_profile(
user_id: str,
profile: ProfileUpdateRequest,
user_service: UserService = Depends()
):
"""Update user profile information"""
user = user_service.db.users.find_one({"id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Update fields
update_data = {k: v for k, v in profile.model_dump().items() if v is not None}
user_service.db.users.update({"id": user_id}, {"$set": update_data})
return {"message": "Profile updated"}
# ============================================================================
# api/order_routes.py
# ============================================================================
order_router = APIRouter(prefix="/orders", tags=["orders"])
class CreateOrderRequest(BaseModel):
items: List[dict] # [{product_id, quantity}]
shipping_address: dict
@order_router.post("/")
async def create_order(
request: CreateOrderRequest,
user_id: str, # From auth middleware
order_service: OrderService = Depends()
):
"""
Create a new order.
Args:
request: Order details
user_id: Authenticated user's ID
Returns:
Created order
"""
try:
order = order_service.create_order(
user_id=user_id,
items=request.items,
shipping_address=request.shipping_address
)
return order
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@order_router.get("/")
async def get_orders(
user_id: str,
order_service: OrderService = Depends()
):
"""Get all orders for authenticated user"""
return order_service.get_user_orders(user_id)
@order_router.post("/{order_id}/cancel")
async def cancel_order(
order_id: str,
order_service: OrderService = Depends()
):
"""Cancel an order"""
success = order_service.cancel_order(order_id)
if not success:
raise HTTPException(status_code=400, detail="Cannot cancel order")
return {"message": "Order cancelled"}
# ============================================================================
# components/ProfilePage.jsx (React Component Example)
# ============================================================================
"""
// ProfilePage.jsx
import React, { useState, useEffect } from 'react';
import { useAuth } from '../hooks/useAuth';
import { userApi } from '../api/userApi';
export const ProfilePage = () => {
const { user } = useAuth();
const [profile, setProfile] = useState(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
const fetchProfile = async () => {
try {
const data = await userApi.getProfile(user.id);
setProfile(data);
} catch (error) {
console.error('Failed to load profile:', error);
} finally {
setLoading(false);
}
};
if (user) {
fetchProfile();
}
}, [user]);
const handleSave = async (updates) => {
try {
await userApi.updateProfile(user.id, updates);
setProfile({ ...profile, ...updates });
} catch (error) {
console.error('Failed to update profile:', error);
}
};
if (loading) return