from flask import Blueprint, request, jsonify
from flask_jwt_extended import create_access_token
import bcrypt
from pymongo import MongoClient
import os
import re
import requests
from dotenv import load_dotenv
import secrets
from datetime import datetime, timedelta
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
ABSTRACT_API_KEY = os.getenv("ABSTRACT_API_KEY", "")
RESEND_API_KEY = os.getenv("RESEND_API_KEY", "")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
BREVO_API_KEY = os.getenv("BREVO_API_KEY", "")
auth_bp = Blueprint('auth', __name__)
client = MongoClient(MONGO_URI)
db = client["codewhisperer"]
users = db["users"]
reset_tokens = db["reset_tokens"]
# Common disposable email domains to block
DISPOSABLE_DOMAINS = {
'10minutemail.com', 'tempmail.com', 'guerrillamail.com', 'mailinator.com',
'throwaway.email', 'temp-mail.org', 'getnada.com', 'maildrop.cc',
'trashmail.com', 'yopmail.com', 'fakeinbox.com', 'sharklasers.com',
'guerrillamailblock.com', 'pokemail.net', 'spam4.me', 'tempr.email',
'throwawaymail.com', 'wegwerfemail.de', 'mintemail.com', 'mytrashmail.com'
}
def validate_email_format(email):
"""Basic email format validation"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def is_disposable_email(email):
"""Check if email is from a disposable domain"""
domain = email.split('@')[1].lower() if '@' in email else ''
return domain in DISPOSABLE_DOMAINS
def verify_email_with_api(email):
"""
Verify email using AbstractAPI (free tier: 100 requests/month)
Returns: (is_valid, error_message)
"""
if not ABSTRACT_API_KEY:
# Fallback to basic validation if no API key
return validate_email_format(email), None
try:
url = f"https://emailvalidation.abstractapi.com/v1/"
params = {
'api_key': ABSTRACT_API_KEY,
'email': email
}
response = requests.get(url, params=params, timeout=3)
if response.status_code == 200:
data = response.json()
# Check various quality indicators
is_valid_format = data.get('is_valid_format', {}).get('value', False)
is_mx_found = data.get('is_mx_found', {}).get('value', False)
is_smtp_valid = data.get('is_smtp_valid', {}).get('value', False)
is_free_email = data.get('is_free_email', {}).get('value', False)
is_disposable = data.get('is_disposable_email', {}).get('value', False)
is_role_email = data.get('is_role_email', {}).get('value', False) # e.g., admin@, info@
# Strict validation
if not is_valid_format:
return False, "Invalid email format"
if is_disposable:
return False, "Disposable email addresses are not allowed"
if not is_mx_found:
return False, "Email domain does not exist"
if is_smtp_valid is False: # Explicitly False, not None
return False, "Email address does not exist"
# Optional: Block role emails (like admin@, support@)
if is_role_email:
return False, "Role-based email addresses are not allowed. Please use a personal email."
return True, None
else:
# API failed, fallback to basic validation
return validate_email_format(email), None
except requests.exceptions.Timeout:
# API timeout, fallback to basic validation
return validate_email_format(email), None
except Exception as e:
print(f"Email validation API error: {str(e)}")
# Fallback to basic validation
return validate_email_format(email), None
@auth_bp.route("/register", methods=["POST"])
def register():
data = request.json
# Validate input
username = data.get("username", "").strip()
email = data.get("email", "").strip().lower()
password = data.get("password", "")
if not username or not email or not password:
return jsonify({"error": "All fields are required"}), 400
# Username validation
if len(username) < 3:
return jsonify({"error": "Username must be at least 3 characters"}), 400
if len(username) > 30:
return jsonify({"error": "Username must be less than 30 characters"}), 400
# Password strength validation
if len(password) < 8:
return jsonify({"error": "Password must be at least 8 characters"}), 400
# Basic email format check
if not validate_email_format(email):
return jsonify({"error": "Invalid email format"}), 400
# Check disposable email (fast, local check)
if is_disposable_email(email):
return jsonify({"error": "Disposable email addresses are not allowed"}), 400
# Check if user already exists
if users.find_one({"username": username}):
return jsonify({"error": "Username already taken"}), 400
if users.find_one({"email": email}):
return jsonify({"error": "Email already registered"}), 400
# Verify email with API (with fallback)
is_valid, error_message = verify_email_with_api(email)
if not is_valid:
return jsonify({"error": error_message or "Invalid email address"}), 400
# Hash password and create user
hashed_pw = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
users.insert_one({
"username": username,
"email": email,
"password": hashed_pw,
"created_at": __import__('datetime').datetime.utcnow()
})
token = create_access_token(identity=username)
return jsonify({
"token": token,
"message": "Registration successful!"
}), 201
@auth_bp.route("/login", methods=["POST"])
def login():
data = request.json
username = data.get("username", "").strip()
password = data.get("password", "")
if not username or not password:
return jsonify({"error": "Username and password are required"}), 400
user = users.find_one({"username": username})
if not user:
return jsonify({"error": "Invalid credentials"}), 401
if not bcrypt.checkpw(password.encode("utf-8"), user["password"]):
return jsonify({"error": "Invalid credentials"}), 401
token = create_access_token(identity=username)
return jsonify({"token": token, "message": "Login successful"})
@auth_bp.route("/validate-email", methods=["POST"])
def validate_email_endpoint():
"""
Separate endpoint for real-time email validation (optional)
"""
data = request.json
email = data.get("email", "").strip().lower()
if not email:
return jsonify({"valid": False, "error": "Email is required"}), 400
# Basic format check
if not validate_email_format(email):
return jsonify({"valid": False, "error": "Invalid email format"}), 200
# Disposable check
if is_disposable_email(email):
return jsonify({"valid": False, "error": "Disposable emails not allowed"}), 200
# Check if already registered
if users.find_one({"email": email}):
return jsonify({"valid": False, "error": "Email already registered"}), 200
# API validation
is_valid, error_message = verify_email_with_api(email)
if not is_valid:
return jsonify({"valid": False, "error": error_message}), 200
return jsonify({"valid": True, "message": "Email is valid"}), 200
def send_reset_email(email, reset_token, username):
"""
Send password reset email using Brevo (Sendinblue) API
FREE: 300 emails/day forever!
"""
if not BREVO_API_KEY:
print("⚠️ Warning: BREVO_API_KEY not set, email not sent")
return False
try:
reset_link = f"{FRONTEND_URL}/reset-password?token={reset_token}"
# HTML email template
html_content = f"""
Hi {username},
We received a request to reset your password for your AI Code Security account.
Click the button below to create a new password:
Or copy and paste this link into your browser:
{reset_link}
⚠️ Security Notice:
- This link will expire in 1 hour
- If you didn't request this, please ignore this email
- Never share this link with anyone
- Your account is safe until you click the link
If you didn't request a password reset, your account is still secure and you can safely ignore this email.
Best regards,
AI Code Security Team
"""
# Plain text version (fallback)
text_content = f"""
Password Reset Request
Hi {username},
We received a request to reset your password for your AI Code Security account.
Click this link to reset your password:
{reset_link}
This link will expire in 1 hour.
If you didn't request this, please ignore this email. Your account is safe.
Best regards,
AI Code Security Team
"""
# Send via Brevo API
url = "https://api.brevo.com/v3/smtp/email"
headers = {
"accept": "application/json",
"api-key": BREVO_API_KEY,
"content-type": "application/json"
}
payload = {
"sender": {
"name": "AI Code Security",
"email": "bihanbanerjee26@gmail.com" # Can be any email!
},
"to": [
{
"email": email,
"name": username
}
],
"subject": "Reset Your Password - AI Code Security",
"htmlContent": html_content,
"textContent": text_content
}
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code in [200, 201]:
print(f"✅ Reset email sent successfully to {email}")
print(f"📧 Message ID: {response.json().get('messageId', 'N/A')}")
return True
else:
print(f"❌ Failed to send email: {response.status_code}")
print(f"❌ Response: {response.text}")
return False
except requests.exceptions.Timeout:
print("❌ Email sending timeout")
return False
except Exception as e:
print(f"❌ Email sending error: {str(e)}")
return False
@auth_bp.route("/forgot-password", methods=["POST"])
def forgot_password():
"""
Request password reset - generates token and sends email
"""
data = request.json
email = data.get("email", "").strip().lower()
if not email:
return jsonify({"error": "Email is required"}), 400
if not validate_email_format(email):
return jsonify({"error": "Invalid email format"}), 400
# Find user by email
user = users.find_one({"email": email})
# Always return success to prevent email enumeration
# But only send email if user exists
if user:
# Generate secure random token
reset_token = secrets.token_urlsafe(32)
# Store token with expiration (1 hour)
reset_tokens.insert_one({
"email": email,
"token": reset_token,
"created_at": datetime.utcnow(),
"expires_at": datetime.utcnow() + timedelta(hours=1),
"used": False
})
# Send email
send_reset_email(email, reset_token, user["username"])
# Always return success message (security best practice)
return jsonify({
"message": "If an account exists with that email, a password reset link has been sent."
}), 200
@auth_bp.route("/verify-reset-token", methods=["POST"])
def verify_reset_token():
"""
Verify if reset token is valid
"""
data = request.json
token = data.get("token", "")
if not token:
return jsonify({"valid": False, "error": "Token is required"}), 400
# Find token in database
token_doc = reset_tokens.find_one({
"token": token,
"used": False
})
if not token_doc:
return jsonify({"valid": False, "error": "Invalid or expired token"}), 400
# Check if token is expired
if datetime.utcnow() > token_doc["expires_at"]:
return jsonify({"valid": False, "error": "Token has expired"}), 400
return jsonify({"valid": True, "email": token_doc["email"]}), 200
@auth_bp.route("/reset-password", methods=["POST"])
def reset_password():
"""
Reset password using valid token
"""
data = request.json
token = data.get("token", "")
new_password = data.get("password", "")
if not token or not new_password:
return jsonify({"error": "Token and new password are required"}), 400
# Validate password strength
if len(new_password) < 8:
return jsonify({"error": "Password must be at least 8 characters"}), 400
# Find valid token
token_doc = reset_tokens.find_one({
"token": token,
"used": False
})
if not token_doc:
return jsonify({"error": "Invalid or expired reset token"}), 400
# Check expiration
if datetime.utcnow() > token_doc["expires_at"]:
return jsonify({"error": "Reset token has expired"}), 400
# Hash new password
hashed_pw = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt())
# Update user password
users.update_one(
{"email": token_doc["email"]},
{"$set": {"password": hashed_pw, "updated_at": datetime.utcnow()}}
)
# Mark token as used
reset_tokens.update_one(
{"_id": token_doc["_id"]},
{"$set": {"used": True, "used_at": datetime.utcnow()}}
)
return jsonify({"message": "Password reset successfully!"}), 200