Spaces:
Runtime error
Runtime error
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from pymongo import MongoClient | |
| from bson import ObjectId | |
| import os | |
| import oss2 | |
| import bcrypt | |
| from datetime import datetime, timedelta | |
| import jwt | |
| from functools import wraps | |
| import random | |
| from dotenv import load_dotenv | |
| # 加载 .env 文件中的环境变量 | |
| load_dotenv() | |
| app = Flask(__name__) | |
| CORS(app) | |
| # MongoDB configuration | |
| # 从环境变量中获取 MongoDB 配置信息 | |
| mongo_uri = os.getenv("MONGO_URI") | |
| client = MongoClient(mongo_uri) | |
| db = client['stylespace'] | |
| if 'carts' not in db.list_collection_names(): | |
| db.create_collection('carts') | |
| # 从环境变量中获取阿里云 OSS 配置信息 | |
| access_key_id = os.getenv("OSS_ACCESS_KEY_ID") | |
| access_key_secret = os.getenv("OSS_ACCESS_KEY_SECRET") | |
| bucket_name = os.getenv("OSS_BUCKET_NAME") | |
| endpoint = os.getenv("OSS_ENDPOINT") | |
| # Create Aliyun OSS bucket object | |
| bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) | |
| # JWT configuration | |
| app.config['SECRET_KEY'] = '1234' # Change this to a secure secret key | |
| def token_required(f): | |
| def decorated(*args, **kwargs): | |
| token = request.headers.get('Authorization') | |
| if not token: | |
| return jsonify({'message': 'Token is missing!'}), 401 | |
| try: | |
| token = token.split(" ")[1] if token.startswith("Bearer ") else token | |
| data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) | |
| current_user = db.users.find_one({'_id': ObjectId(data['user_id'])}) | |
| if not current_user: | |
| raise jwt.InvalidTokenError | |
| except jwt.ExpiredSignatureError: | |
| return jsonify({'message': 'Token has expired!'}), 401 | |
| except jwt.InvalidTokenError: | |
| return jsonify({'message': 'Invalid token!'}), 401 | |
| return f(current_user, *args, **kwargs) | |
| return decorated | |
| def register(): | |
| data = request.json | |
| if db.users.find_one({'username': data['username']}): | |
| return jsonify({'message': 'Username already exists'}), 400 | |
| if db.users.find_one({'email': data['email']}): | |
| return jsonify({'message': 'Email already exists'}), 400 | |
| hashed_password = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt()) | |
| user = { | |
| 'username': data['username'], | |
| 'email': data['email'], | |
| 'password': hashed_password, | |
| 'created_at': datetime.utcnow() | |
| } | |
| result = db.users.insert_one(user) | |
| return jsonify({'message': 'User created successfully', 'id': str(result.inserted_id)}), 201 | |
| def login(): | |
| data = request.json | |
| user = db.users.find_one({'username': data['username']}) | |
| if user and bcrypt.checkpw(data['password'].encode('utf-8'), user['password']): | |
| token = jwt.encode( | |
| {'user_id': str(user['_id']), 'exp': datetime.utcnow() + timedelta(hours=24)}, | |
| app.config['SECRET_KEY'], | |
| algorithm="HS256" | |
| ) | |
| return jsonify({'token': token, 'user': {'id': str(user['_id']), 'username': user['username'], 'email': user['email']}}), 200 | |
| return jsonify({'message': 'Invalid username or password'}), 401 | |
| def verify_token(current_user): | |
| return jsonify({'user': {'id': str(current_user['_id']), 'username': current_user['username'], 'email': current_user['email']}}), 200 | |
| def logout(current_user): | |
| # 由于使用JWT,服务器端不需要做特殊处理 | |
| return jsonify({'message': 'Successfully logged out'}), 200 | |
| def get_products(): | |
| product_type = request.args.get('type', 'all') | |
| limit = int(request.args.get('limit', 20)) | |
| all_products = list(db.products.find()) | |
| if product_type == 'hot': | |
| selected_products = random.sample(all_products, min(4, len(all_products))) | |
| elif product_type == 'new': | |
| selected_products = random.sample(all_products, min(4, len(all_products))) | |
| else: | |
| selected_products = all_products[:limit] | |
| result = [] | |
| for product in selected_products: | |
| result.append({ | |
| '_id': str(product['_id']), | |
| 'name': product.get('name', 'Unknown Product'), | |
| 'price': float(product.get('price', 0)), | |
| 'originalPrice': float(product.get('originalPrice', 0)), | |
| 'brief': product.get('brief', 'No description available'), | |
| 'image': product.get('images', [None])[0], | |
| 'description': product.get('description', ''), | |
| 'images': product.get('images', []), | |
| 'sizes': product.get('sizes', []) | |
| }) | |
| return jsonify({'data': result}), 200 | |
| def get_product_details(product_id): | |
| product = db.products.find_one({'_id': ObjectId(product_id)}) | |
| if product: | |
| product['_id'] = str(product['_id']) | |
| return jsonify({'data': product}), 200 | |
| return jsonify({'message': 'Product not found'}), 404 | |
| def get_cart(current_user): | |
| cart = db.carts.find_one({'user_id': str(current_user['_id'])}) | |
| if cart: | |
| cart_items = [] | |
| for item in cart['items']: | |
| product = db.products.find_one({'_id': item['product_id']}) | |
| if product: | |
| cart_items.append({ | |
| '_id': str(item['_id']), | |
| 'product_id': str(product['_id']), | |
| 'name': product['name'], | |
| 'price': product['price'], | |
| 'image': product['images'][0] if product['images'] else None, | |
| 'quantity': item['quantity'], | |
| 'size': item['size'] | |
| }) | |
| return jsonify({'data': cart_items}), 200 | |
| else: | |
| return jsonify({'data': []}), 200 | |
| def add_to_cart(current_user): | |
| data = request.json | |
| cart = db.carts.find_one({'user_id': str(current_user['_id'])}) | |
| if not cart: | |
| cart = { | |
| 'user_id': str(current_user['_id']), | |
| 'items': [] | |
| } | |
| result = db.carts.insert_one(cart) | |
| cart['_id'] = result.inserted_id | |
| item = next((item for item in cart['items'] if str(item['product_id']) == data['productId'] and item['size'] == data['size']), None) | |
| if item: | |
| item['quantity'] += data['quantity'] | |
| else: | |
| cart['items'].append({ | |
| '_id': ObjectId(), | |
| 'product_id': ObjectId(data['productId']), | |
| 'quantity': data['quantity'], | |
| 'size': data['size'] | |
| }) | |
| db.carts.update_one({'_id': cart['_id']}, {'$set': cart}) | |
| return jsonify({'message': 'Item added to cart successfully'}), 200 | |
| def update_cart_item(current_user, item_id): | |
| data = request.json | |
| cart = db.carts.find_one({'user_id': str(current_user['_id'])}) | |
| if cart: | |
| for item in cart['items']: | |
| if str(item['_id']) == item_id: | |
| item['quantity'] = data['quantity'] | |
| break | |
| db.carts.update_one({'_id': cart['_id']}, {'$set': cart}) | |
| updated_item = next((item for item in cart['items'] if str(item['_id']) == item_id), None) | |
| if updated_item: | |
| product = db.products.find_one({'_id': updated_item['product_id']}) | |
| if product: | |
| return jsonify({ | |
| 'data': { | |
| '_id': str(updated_item['_id']), | |
| 'product_id': str(product['_id']), | |
| 'name': product['name'], | |
| 'price': product['price'], | |
| 'image': product['images'][0] if product['images'] else None, | |
| 'quantity': updated_item['quantity'], | |
| 'size': updated_item['size'] | |
| } | |
| }), 200 | |
| return jsonify({'message': 'Cart item updated successfully'}), 200 | |
| return jsonify({'message': 'Cart not found'}), 404 | |
| def remove_from_cart(current_user, item_id): | |
| cart = db.carts.find_one({'user_id': str(current_user['_id'])}) | |
| if cart: | |
| cart['items'] = [item for item in cart['items'] if str(item['_id']) != item_id] | |
| db.carts.update_one({'_id': cart['_id']}, {'$set': cart}) | |
| return jsonify({'message': 'Item removed from cart successfully'}), 200 | |
| return jsonify({'message': 'Cart not found'}), 404 | |
| def create_order(current_user): | |
| data = request.json | |
| order = { | |
| 'user_id': str(current_user['_id']), | |
| 'orderNumber': generate_order_number(), | |
| 'items': data['items'], | |
| 'subtotal': sum(item['price'] * item['quantity'] for item in data['items']), | |
| 'shippingFee': 10, # 固定运费,您可以根据需要修改 | |
| 'total': sum(item['price'] * item['quantity'] for item in data['items']) + 10, | |
| 'status': 'unpaid', | |
| 'createdAt': datetime.utcnow(), | |
| 'updatedAt': datetime.utcnow(), | |
| 'paymentMethod': 'Pending', | |
| 'shippingAddress': data.get('shippingAddress', {}) | |
| } | |
| result = db.orders.insert_one(order) | |
| return jsonify({'message': 'Order created successfully', 'order_id': str(result.inserted_id)}), 201 | |
| def get_order_details(current_user, order_id): | |
| order = db.orders.find_one({'_id': ObjectId(order_id), 'user_id': str(current_user['_id'])}) | |
| if order: | |
| order['_id'] = str(order['_id']) | |
| return jsonify({'data': order}), 200 | |
| return jsonify({'message': 'Order not found'}), 404 | |
| def update_order_status(current_user, order_id): | |
| data = request.json | |
| new_status = data.get('status') | |
| if not new_status: | |
| return jsonify({'message': 'Status is required'}), 400 | |
| order = db.orders.find_one({'_id': ObjectId(order_id), 'user_id': str(current_user['_id'])}) | |
| if not order: | |
| return jsonify({'message': 'Order not found'}), 404 | |
| update_data = { | |
| 'status': new_status, | |
| 'updatedAt': datetime.utcnow() | |
| } | |
| if new_status == 'unshipped': | |
| update_data['paymentMethod'] = data.get('paymentMethod', 'Not specified') | |
| db.orders.update_one({'_id': ObjectId(order_id)}, {'$set': update_data}) | |
| return jsonify({'message': 'Order status updated successfully'}), 200 | |
| def get_user_orders(current_user): | |
| status = request.args.get('status') | |
| query = {'user_id': str(current_user['_id'])} | |
| if status: | |
| query['status'] = status | |
| orders = list(db.orders.find(query).sort('createdAt', -1)) | |
| for order in orders: | |
| order['_id'] = str(order['_id']) | |
| return jsonify({'data': orders}), 200 | |
| def upload_image(current_user): | |
| if 'file' not in request.files: | |
| return jsonify({'message': 'No file part in the request'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'message': 'No file selected for uploading'}), 400 | |
| if file: | |
| filename = f"uploads/{datetime.now().strftime('%Y%m%d%H%M%S')}_{file.filename}" | |
| bucket.put_object(filename, file) | |
| url = f"https://{bucket_name}.{endpoint}/{filename}" | |
| return jsonify({'message': 'File uploaded successfully', 'url': url}), 200 | |
| def generate_order_number(): | |
| return f"ORD{datetime.utcnow().strftime('%Y%m%d%H%M%S')}{random.randint(1000, 9999)}" | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) |