Carwala / app.py
arshtech's picture
Update app.py
85b8564 verified
from flask import Flask, render_template, request, redirect, url_for, session, flash
from pymongo import MongoClient
import bcrypt
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import os
app = Flask(__name__)
app.secret_key = 'carwala_secret_key_2024'
# MongoDB configuration - using carwala1 database
MONGODB_URI = "mongodb+srv://arshbir:arshbir123@arshbir.9pulohe.mongodb.net/carwala1?retryWrites=true&w=majority"
client = MongoClient(MONGODB_URI)
db = client.carwala1
# Admin credentials
ADMIN_EMAIL = "singharshbir76@gmail.com"
ADMIN_PASSWORD = "arshbir"
def initialize_database():
"""Initialize database and create collections if they don't exist"""
# Get or create collections
users = db.users
cars = db.cars
appointments = db.appointments
# Create indexes
users.create_index("email", unique=True)
cars.create_index("seller_id")
appointments.create_index("car_id")
# Create admin user if not exists
admin_user = users.find_one({"email": ADMIN_EMAIL})
if not admin_user:
hashed_password = bcrypt.hashpw(ADMIN_PASSWORD.encode('utf-8'), bcrypt.gensalt())
admin_data = {
"name": "Admin User",
"email": ADMIN_EMAIL,
"password": hashed_password,
"role": "admin",
"address": "Admin Address",
"phone": "0000000000",
"approved": True,
"created_at": datetime.now()
}
users.insert_one(admin_data)
print("βœ… Admin user created successfully!")
# Add some sample cars if database is empty
if cars.count_documents({}) == 0:
sample_cars = [
{
"name": "Toyota Camry",
"year": 2020,
"price": 25000,
"description": "Excellent condition, low mileage, fuel efficient",
"seller_id": "admin",
"seller_email": ADMIN_EMAIL,
"status": "approved",
"created_at": datetime.now()
},
{
"name": "Honda Civic",
"year": 2019,
"price": 22000,
"description": "Well maintained, single owner, all services done",
"seller_id": "admin",
"seller_email": ADMIN_EMAIL,
"status": "approved",
"created_at": datetime.now()
},
{
"name": "Ford Mustang",
"year": 2021,
"price": 35000,
"description": "Powerful engine, sporty look, premium features",
"seller_id": "admin",
"seller_email": ADMIN_EMAIL,
"status": "approved",
"created_at": datetime.now()
}
]
cars.insert_many(sample_cars)
print("βœ… Sample cars added successfully!")
print("βœ… Database initialized successfully!")
# Initialize database when app starts
initialize_database()
# Routes
@app.route('/')
def index():
# Get all approved cars
car_list = list(db.cars.find({"status": "approved"}))
return render_template('index.html', cars=car_list)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
email = request.form.get('email')
password = request.form.get('password')
# First check if it's the admin login
if email == ADMIN_EMAIL and password == ADMIN_PASSWORD:
admin_user = db.users.find_one({"email": ADMIN_EMAIL})
if admin_user:
session['user_id'] = str(admin_user['_id'])
session['role'] = 'admin'
session['email'] = ADMIN_EMAIL
flash('Admin login successful!')
return redirect(url_for('admin_dashboard'))
# Regular user login
user = db.users.find_one({"email": email})
if user:
# Check password
if bcrypt.checkpw(password.encode('utf-8'), user['password']):
# Check if seller is approved
if user['role'] == 'seller' and not user.get('approved', False):
flash('Your seller account is pending approval by admin.')
return redirect(url_for('login'))
session['user_id'] = str(user['_id'])
session['role'] = user['role']
session['email'] = email
flash(f'Welcome back, {user["name"]}!')
if user['role'] == 'buyer':
return redirect(url_for('index'))
elif user['role'] == 'seller':
return redirect(url_for('seller_dashboard'))
else:
flash('Invalid email or password')
else:
flash('User not found. Please register first.')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
name = request.form.get('name')
email = request.form.get('email')
password = request.form.get('password')
role = request.form.get('role')
address = request.form.get('address')
phone = request.form.get('phone')
# Check if user already exists
if db.users.find_one({"email": email}):
flash('Email already registered')
return redirect(url_for('register'))
# Hash password
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
# Create user document
user_data = {
"name": name,
"email": email,
"password": hashed_password,
"role": role,
"address": address,
"phone": phone,
"approved": True if role == 'buyer' else False,
"created_at": datetime.now()
}
db.users.insert_one(user_data)
if role == 'buyer':
flash('Registration successful! Please login.')
return redirect(url_for('login'))
else:
flash('Registration submitted. Waiting for admin approval.')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
def logout():
session.clear()
flash('You have been logged out successfully.')
return redirect(url_for('index'))
@app.route('/admin')
def admin_dashboard():
if 'user_id' not in session or session.get('role') != 'admin':
flash('Please login as admin to access this page.')
return redirect(url_for('login'))
# Get pending seller approvals
pending_sellers = list(db.users.find({"role": "seller", "approved": False}))
# Get all cars for approval
pending_cars = list(db.cars.find({"status": "pending"}))
# Get stats
total_users = db.users.count_documents({})
total_cars = db.cars.count_documents({})
total_appointments = db.appointments.count_documents({})
return render_template('admin.html',
pending_sellers=pending_sellers,
pending_cars=pending_cars,
total_users=total_users,
total_cars=total_cars,
total_appointments=total_appointments)
@app.route('/approve_seller/<user_id>')
def approve_seller(user_id):
if 'user_id' not in session or session.get('role') != 'admin':
return redirect(url_for('login'))
from bson.objectid import ObjectId
db.users.update_one({"_id": ObjectId(user_id)}, {"$set": {"approved": True}})
flash('Seller approved successfully')
return redirect(url_for('admin_dashboard'))
@app.route('/seller')
def seller_dashboard():
if 'user_id' not in session or session.get('role') != 'seller':
return redirect(url_for('login'))
# Check if seller is approved
from bson.objectid import ObjectId
user = db.users.find_one({"_id": ObjectId(session['user_id'])})
if not user.get('approved', False):
flash('Your seller account is pending approval.')
return redirect(url_for('login'))
# Get seller's cars
seller_cars = list(db.cars.find({"seller_id": session['user_id']}))
return render_template('seller_dashboard.html', cars=seller_cars)
@app.route('/add_car', methods=['POST'])
def add_car():
if 'user_id' not in session or session.get('role') != 'seller':
return redirect(url_for('login'))
name = request.form.get('name')
year = request.form.get('year')
price = request.form.get('price')
description = request.form.get('description')
car_data = {
"name": name,
"year": year,
"price": price,
"description": description,
"seller_id": session['user_id'],
"seller_email": session['email'],
"status": "pending",
"created_at": datetime.now()
}
db.cars.insert_one(car_data)
flash('Car added successfully. Waiting for admin approval.')
return redirect(url_for('seller_dashboard'))
@app.route('/approve_car/<car_id>')
def approve_car(car_id):
if 'user_id' not in session or session.get('role') != 'admin':
return redirect(url_for('login'))
from bson.objectid import ObjectId
db.cars.update_one({"_id": ObjectId(car_id)}, {"$set": {"status": "approved"}})
flash('Car approved successfully')
return redirect(url_for('admin_dashboard'))
@app.route('/book_appointment/<car_id>', methods=['GET', 'POST'])
def book_appointment(car_id):
if 'user_id' not in session or session.get('role') != 'buyer':
return redirect(url_for('login'))
from bson.objectid import ObjectId
car = db.cars.find_one({"_id": ObjectId(car_id)})
if request.method == 'POST':
name = request.form.get('name')
email = request.form.get('email')
address = request.form.get('address')
phone = request.form.get('phone')
preferred_date = request.form.get('preferred_date')
preferred_time = request.form.get('preferred_time')
appointment_data = {
"car_id": car_id,
"car_name": car['name'],
"buyer_id": session['user_id'],
"buyer_name": name,
"buyer_email": email,
"buyer_address": address,
"buyer_phone": phone,
"preferred_date": preferred_date,
"preferred_time": preferred_time,
"status": "pending",
"created_at": datetime.now()
}
db.appointments.insert_one(appointment_data)
flash('Appointment booked successfully. Seller will contact you soon.')
return redirect(url_for('index'))
return render_template('appointment.html', car=car)
@app.route('/seller_appointments/<car_id>')
def seller_appointments(car_id):
if 'user_id' not in session or session.get('role') != 'seller':
return redirect(url_for('login'))
car_appointments = list(db.appointments.find({"car_id": car_id}))
from bson.objectid import ObjectId
car = db.cars.find_one({"_id": ObjectId(car_id)})
return render_template('seller.html', appointments=car_appointments, car=car)
@app.route('/approve_appointment/<appointment_id>', methods=['POST'])
def approve_appointment(appointment_id):
if 'user_id' not in session or session.get('role') != 'seller':
return redirect(url_for('login'))
meeting_date = request.form.get('meeting_date')
meeting_time = request.form.get('meeting_time')
meeting_place = request.form.get('meeting_place')
from bson.objectid import ObjectId
appointment = db.appointments.find_one({"_id": ObjectId(appointment_id)})
# Update appointment status
db.appointments.update_one(
{"_id": ObjectId(appointment_id)},
{"$set": {
"status": "approved",
"meeting_date": meeting_date,
"meeting_time": meeting_time,
"meeting_place": meeting_place
}}
)
# Send email to buyer
send_meeting_email(
appointment['buyer_email'],
appointment['car_name'],
meeting_date,
meeting_time,
meeting_place
)
flash('Appointment approved and email sent to buyer')
return redirect(url_for('seller_appointments', car_id=appointment['car_id']))
def send_meeting_email(buyer_email, car_name, date, time, place):
try:
subject = f"Appointment Confirmation for {car_name}"
body = f"""
Dear Buyer,
Your appointment for {car_name} has been approved.
Meeting Details:
Date: {date}
Time: {time}
Place: {place}
Please arrive on time for the test drive.
Best regards,
Carwala Team
"""
msg = MIMEMultipart()
msg['From'] = SMTP_EMAIL
msg['To'] = buyer_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(SMTP_EMAIL, SMTP_PASSWORD)
text = msg.as_string()
server.sendmail(SMTP_EMAIL, buyer_email, text)
server.quit()
return True
except Exception as e:
print(f"Email error: {e}")
return False
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False)