detector-ai / auth.py
Slovand's picture
Update auth.py
26a79c1 verified
# auth.py - COMPLETE VERSION
from flask import Blueprint, request, jsonify, session, redirect, url_for
from werkzeug.security import generate_password_hash, check_password_hash
from database import Database
import random
import string
from datetime import datetime, timedelta
import re
import requests
import urllib.parse
import os
auth_bp = Blueprint('auth', __name__)
db = Database()
# ============ GOOGLE OAUTH CONFIGURATION ============
GOOGLE_CLIENT_ID = os.environ.get('GOOGLE_CLIENT_ID', '')
GOOGLE_CLIENT_SECRET = os.environ.get('GOOGLE_CLIENT_SECRET', '')
GOOGLE_REDIRECT_URI = os.environ.get('GOOGLE_REDIRECT_URI', 'http://localhost:5001/auth/google/callback')
def generate_verification_code():
return ''.join(random.choices(string.digits, k=6))
def validate_email(email):
if not email:
return True
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return re.match(pattern, email) is not None
def validate_phone(phone):
if not phone:
return True
pattern = r'^(\+62|62|0)8[1-9][0-9]{6,10}$'
return re.match(pattern, phone) is not None
def validate_username(username):
pattern = r'^[a-zA-Z0-9_]{3,20}$'
return re.match(pattern, username) is not None
# ============ GOOGLE LOGIN ROUTES ============
@auth_bp.route('/auth/google')
def google_login():
if not GOOGLE_CLIENT_ID:
return redirect(url_for('login_page') + '?error=google_not_configured')
state = ''.join(random.choices(string.ascii_letters + string.digits, k=32))
session['google_oauth_state'] = state
params = {
'client_id': GOOGLE_CLIENT_ID,
'redirect_uri': GOOGLE_REDIRECT_URI,
'response_type': 'code',
'scope': 'email profile',
'state': state,
'access_type': 'online',
'prompt': 'select_account'
}
auth_url = "https://accounts.google.com/o/oauth2/auth?" + urllib.parse.urlencode(params)
return redirect(auth_url)
@auth_bp.route('/auth/google/callback')
def google_callback():
error = request.args.get('error')
if error:
return redirect(url_for('login_page') + '?error=' + error)
state = request.args.get('state')
stored_state = session.get('google_oauth_state')
if not state or state != stored_state:
return redirect(url_for('login_page') + '?error=invalid_state')
code = request.args.get('code')
if not code:
return redirect(url_for('login_page') + '?error=no_code')
token_data = {
'client_id': GOOGLE_CLIENT_ID,
'client_secret': GOOGLE_CLIENT_SECRET,
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': GOOGLE_REDIRECT_URI
}
try:
token_response = requests.post(
'https://oauth2.googleapis.com/token',
data=token_data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
if token_response.status_code != 200:
return redirect(url_for('login_page') + '?error=token_failed')
tokens = token_response.json()
access_token = tokens.get('access_token')
if not access_token:
return redirect(url_for('login_page') + '?error=no_token')
userinfo_response = requests.get(
'https://www.googleapis.com/oauth2/v2/userinfo',
headers={'Authorization': f'Bearer {access_token}'}
)
if userinfo_response.status_code != 200:
return redirect(url_for('login_page') + '?error=userinfo_failed')
user_info = userinfo_response.json()
email = user_info.get('email')
name = user_info.get('name', '')
if not email:
return redirect(url_for('login_page') + '?error=no_email')
user = db.get_user_by_email(email)
if not user:
import secrets
random_password = secrets.token_urlsafe(16)
password_hash = generate_password_hash(random_password)
username = email.split('@')[0]
counter = 1
original_username = username
while db.get_user_by_username(username):
username = f"{original_username}{counter}"
counter += 1
user = db.create_user(
username=username,
password_hash=password_hash,
email=email,
phone=None,
full_name=name,
is_verified=True
)
if not user:
return redirect(url_for('login_page') + '?error=create_failed')
elif not user.is_verified:
db.update_user(user.id, is_verified=True)
session['user_id'] = user.id
session['username'] = user.username
db.update_user(user.id, last_login=datetime.utcnow())
session.pop('google_oauth_state', None)
return redirect(url_for('dashboard'))
except Exception as e:
print(f"Error during Google OAuth: {e}")
return redirect(url_for('login_page') + '?error=exception')
# ============ REGISTRATION ============
@auth_bp.route('/api/auth/register', methods=['POST'])
def register():
data = request.get_json()
username = data.get('username', '').strip()
email = data.get('email', '').strip() or None
phone = data.get('phone', '').strip() or None
password = data.get('password', '')
full_name = data.get('full_name', '').strip()
if not email and not phone:
return jsonify({'error': 'Email atau nomor HP harus diisi'}), 400
if not validate_username(username):
return jsonify({'error': 'Username harus 3-20 karakter (huruf, angka, underscore)'}), 400
if db.get_user_by_username(username):
return jsonify({'error': 'Username sudah digunakan'}), 400
if email and db.get_user_by_email(email):
return jsonify({'error': 'Email sudah terdaftar'}), 400
if phone and db.get_user_by_phone(phone):
return jsonify({'error': 'Nomor HP sudah terdaftar'}), 400
if email and not validate_email(email):
return jsonify({'error': 'Format email tidak valid'}), 400
if phone and not validate_phone(phone):
return jsonify({'error': 'Format nomor HP tidak valid (contoh: 081234567890)'}), 400
if len(password) < 6:
return jsonify({'error': 'Password minimal 6 karakter'}), 400
password_hash = generate_password_hash(password)
user = db.create_user(username, password_hash, email, phone, full_name, is_verified=False)
if not user:
return jsonify({'error': 'Gagal membuat akun'}), 500
code = generate_verification_code()
expires = datetime.utcnow() + timedelta(minutes=10)
db.set_verification_code(user.id, code, expires)
destination = email if email else phone
# KIRIM KODE LANGSUNG KE RESPONSE
return jsonify({
'success': True,
'user_id': user.id,
'destination': destination,
'verification_code': code,
'message': f'Registrasi berhasil! Kode verifikasi Anda: {code}'
})
# ============ VERIFICATION ============
@auth_bp.route('/api/auth/verify', methods=['POST'])
def verify():
data = request.get_json()
user_id = data.get('user_id')
code = data.get('code')
if not user_id or not code:
return jsonify({'error': 'User ID dan kode verifikasi diperlukan'}), 400
if db.verify_user(user_id, code):
return jsonify({'success': True, 'message': 'Akun berhasil diverifikasi'})
else:
return jsonify({'error': 'Kode verifikasi salah atau sudah kadaluarsa'}), 400
@auth_bp.route('/api/auth/resend-code', methods=['POST'])
def resend_code():
data = request.get_json()
user_id = data.get('user_id')
if not user_id:
return jsonify({'error': 'User ID diperlukan'}), 400
user = db.get_user_by_id(user_id)
if not user:
return jsonify({'error': 'User tidak ditemukan'}), 404
if user.is_verified:
return jsonify({'error': 'Akun sudah diverifikasi'}), 400
code = generate_verification_code()
expires = datetime.utcnow() + timedelta(minutes=10)
db.set_verification_code(user.id, code, expires)
destination = user.email if user.email else user.phone
return jsonify({
'success': True,
'destination': destination,
'verification_code': code,
'message': f'Kode verifikasi baru: {code}'
})
# ============ LOGIN ============
@auth_bp.route('/api/auth/login', methods=['POST'])
def login():
data = request.get_json()
identifier = data.get('identifier', '').strip()
password = data.get('password', '')
if not identifier or not password:
return jsonify({'error': 'Username/Email/HP dan password diperlukan'}), 400
user = db.get_user_by_username(identifier)
if not user and '@' in identifier:
user = db.get_user_by_email(identifier)
if not user and re.match(r'^(\+62|62|0)8', identifier):
user = db.get_user_by_phone(identifier)
if not user:
return jsonify({'error': 'Username/Email/HP atau password salah'}), 401
if not check_password_hash(user.password_hash, password):
return jsonify({'error': 'Username/Email/HP atau password salah'}), 401
if not user.is_verified:
return jsonify({
'error': 'Akun belum diverifikasi',
'requires_verification': True,
'user_id': user.id
}), 401
db.update_user(user.id, last_login=datetime.utcnow())
session['user_id'] = user.id
session['username'] = user.username
session['user_email'] = user.email
session['user_fullname'] = user.full_name
return jsonify({
'success': True,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'phone': user.phone,
'full_name': user.full_name,
'is_verified': user.is_verified
}
})
@auth_bp.route('/api/auth/logout', methods=['POST'])
def logout():
session.clear()
return jsonify({'success': True})
@auth_bp.route('/api/auth/me', methods=['GET'])
def get_current_user():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': 'Not logged in'}), 401
user = db.get_user_by_id(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email,
'phone': user.phone,
'full_name': user.full_name,
'is_verified': user.is_verified,
'created_at': user.created_at.isoformat() if user.created_at else None
})
# ============ PASSWORD MANAGEMENT ============
@auth_bp.route('/api/auth/change-password', methods=['POST'])
def change_password():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': 'Not logged in'}), 401
data = request.get_json()
old_password = data.get('old_password')
new_password = data.get('new_password')
if not old_password or not new_password:
return jsonify({'error': 'Password lama dan baru diperlukan'}), 400
if len(new_password) < 6:
return jsonify({'error': 'Password baru minimal 6 karakter'}), 400
user = db.get_user_by_id(user_id)
if not check_password_hash(user.password_hash, old_password):
return jsonify({'error': 'Password lama salah'}), 401
new_hash = generate_password_hash(new_password)
if db.update_user(user.id, password_hash=new_hash):
return jsonify({'success': True, 'message': 'Password berhasil diubah'})
else:
return jsonify({'error': 'Gagal mengubah password'}), 500
# ============ PROFILE UPDATE ============
@auth_bp.route('/api/auth/update-profile', methods=['PUT'])
def update_profile():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': 'Not logged in'}), 401
data = request.get_json()
email = data.get('email')
phone = data.get('phone')
full_name = data.get('full_name')
if email is not None:
if email and not validate_email(email):
return jsonify({'error': 'Format email tidak valid'}), 400
if email:
existing = db.get_user_by_email(email)
if existing and existing.id != user_id:
return jsonify({'error': 'Email sudah digunakan oleh akun lain'}), 400
if phone is not None:
if phone and not validate_phone(phone):
return jsonify({'error': 'Format nomor HP tidak valid'}), 400
if phone:
existing = db.get_user_by_phone(phone)
if existing and existing.id != user_id:
return jsonify({'error': 'Nomor HP sudah digunakan oleh akun lain'}), 400
updates = {}
if email is not None:
updates['email'] = email if email else None
if phone is not None:
updates['phone'] = phone if phone else None
if full_name is not None:
updates['full_name'] = full_name
if db.update_user(user_id, **updates):
if 'full_name' in updates:
session['user_fullname'] = updates['full_name']
return jsonify({'success': True, 'message': 'Profil berhasil diperbarui'})
else:
return jsonify({'error': 'Gagal memperbarui profil'}), 500