Spaces:
Running
Running
| # auth.py - COMPLETE VERSION | |
| from flask import Blueprint, request, jsonify, session, redirect, url_for | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from database import Database | |
| import random | |
| import string | |
| from datetime import datetime, timedelta | |
| import re | |
| import requests | |
| import urllib.parse | |
| import os | |
| auth_bp = Blueprint('auth', __name__) | |
| db = Database() | |
| # ============ GOOGLE OAUTH CONFIGURATION ============ | |
| GOOGLE_CLIENT_ID = os.environ.get('GOOGLE_CLIENT_ID', '') | |
| GOOGLE_CLIENT_SECRET = os.environ.get('GOOGLE_CLIENT_SECRET', '') | |
| GOOGLE_REDIRECT_URI = os.environ.get('GOOGLE_REDIRECT_URI', 'http://localhost:5001/auth/google/callback') | |
| def generate_verification_code(): | |
| return ''.join(random.choices(string.digits, k=6)) | |
| def validate_email(email): | |
| if not email: | |
| return True | |
| pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' | |
| return re.match(pattern, email) is not None | |
| def validate_phone(phone): | |
| if not phone: | |
| return True | |
| pattern = r'^(\+62|62|0)8[1-9][0-9]{6,10}$' | |
| return re.match(pattern, phone) is not None | |
| def validate_username(username): | |
| pattern = r'^[a-zA-Z0-9_]{3,20}$' | |
| return re.match(pattern, username) is not None | |
| # ============ GOOGLE LOGIN ROUTES ============ | |
| def google_login(): | |
| if not GOOGLE_CLIENT_ID: | |
| return redirect(url_for('login_page') + '?error=google_not_configured') | |
| state = ''.join(random.choices(string.ascii_letters + string.digits, k=32)) | |
| session['google_oauth_state'] = state | |
| params = { | |
| 'client_id': GOOGLE_CLIENT_ID, | |
| 'redirect_uri': GOOGLE_REDIRECT_URI, | |
| 'response_type': 'code', | |
| 'scope': 'email profile', | |
| 'state': state, | |
| 'access_type': 'online', | |
| 'prompt': 'select_account' | |
| } | |
| auth_url = "https://accounts.google.com/o/oauth2/auth?" + urllib.parse.urlencode(params) | |
| return redirect(auth_url) | |
| def google_callback(): | |
| error = request.args.get('error') | |
| if error: | |
| return redirect(url_for('login_page') + '?error=' + error) | |
| state = request.args.get('state') | |
| stored_state = session.get('google_oauth_state') | |
| if not state or state != stored_state: | |
| return redirect(url_for('login_page') + '?error=invalid_state') | |
| code = request.args.get('code') | |
| if not code: | |
| return redirect(url_for('login_page') + '?error=no_code') | |
| token_data = { | |
| 'client_id': GOOGLE_CLIENT_ID, | |
| 'client_secret': GOOGLE_CLIENT_SECRET, | |
| 'code': code, | |
| 'grant_type': 'authorization_code', | |
| 'redirect_uri': GOOGLE_REDIRECT_URI | |
| } | |
| try: | |
| token_response = requests.post( | |
| 'https://oauth2.googleapis.com/token', | |
| data=token_data, | |
| headers={'Content-Type': 'application/x-www-form-urlencoded'} | |
| ) | |
| if token_response.status_code != 200: | |
| return redirect(url_for('login_page') + '?error=token_failed') | |
| tokens = token_response.json() | |
| access_token = tokens.get('access_token') | |
| if not access_token: | |
| return redirect(url_for('login_page') + '?error=no_token') | |
| userinfo_response = requests.get( | |
| 'https://www.googleapis.com/oauth2/v2/userinfo', | |
| headers={'Authorization': f'Bearer {access_token}'} | |
| ) | |
| if userinfo_response.status_code != 200: | |
| return redirect(url_for('login_page') + '?error=userinfo_failed') | |
| user_info = userinfo_response.json() | |
| email = user_info.get('email') | |
| name = user_info.get('name', '') | |
| if not email: | |
| return redirect(url_for('login_page') + '?error=no_email') | |
| user = db.get_user_by_email(email) | |
| if not user: | |
| import secrets | |
| random_password = secrets.token_urlsafe(16) | |
| password_hash = generate_password_hash(random_password) | |
| username = email.split('@')[0] | |
| counter = 1 | |
| original_username = username | |
| while db.get_user_by_username(username): | |
| username = f"{original_username}{counter}" | |
| counter += 1 | |
| user = db.create_user( | |
| username=username, | |
| password_hash=password_hash, | |
| email=email, | |
| phone=None, | |
| full_name=name, | |
| is_verified=True | |
| ) | |
| if not user: | |
| return redirect(url_for('login_page') + '?error=create_failed') | |
| elif not user.is_verified: | |
| db.update_user(user.id, is_verified=True) | |
| session['user_id'] = user.id | |
| session['username'] = user.username | |
| db.update_user(user.id, last_login=datetime.utcnow()) | |
| session.pop('google_oauth_state', None) | |
| return redirect(url_for('dashboard')) | |
| except Exception as e: | |
| print(f"Error during Google OAuth: {e}") | |
| return redirect(url_for('login_page') + '?error=exception') | |
| # ============ REGISTRATION ============ | |
| def register(): | |
| data = request.get_json() | |
| username = data.get('username', '').strip() | |
| email = data.get('email', '').strip() or None | |
| phone = data.get('phone', '').strip() or None | |
| password = data.get('password', '') | |
| full_name = data.get('full_name', '').strip() | |
| if not email and not phone: | |
| return jsonify({'error': 'Email atau nomor HP harus diisi'}), 400 | |
| if not validate_username(username): | |
| return jsonify({'error': 'Username harus 3-20 karakter (huruf, angka, underscore)'}), 400 | |
| if db.get_user_by_username(username): | |
| return jsonify({'error': 'Username sudah digunakan'}), 400 | |
| if email and db.get_user_by_email(email): | |
| return jsonify({'error': 'Email sudah terdaftar'}), 400 | |
| if phone and db.get_user_by_phone(phone): | |
| return jsonify({'error': 'Nomor HP sudah terdaftar'}), 400 | |
| if email and not validate_email(email): | |
| return jsonify({'error': 'Format email tidak valid'}), 400 | |
| if phone and not validate_phone(phone): | |
| return jsonify({'error': 'Format nomor HP tidak valid (contoh: 081234567890)'}), 400 | |
| if len(password) < 6: | |
| return jsonify({'error': 'Password minimal 6 karakter'}), 400 | |
| password_hash = generate_password_hash(password) | |
| user = db.create_user(username, password_hash, email, phone, full_name, is_verified=False) | |
| if not user: | |
| return jsonify({'error': 'Gagal membuat akun'}), 500 | |
| code = generate_verification_code() | |
| expires = datetime.utcnow() + timedelta(minutes=10) | |
| db.set_verification_code(user.id, code, expires) | |
| destination = email if email else phone | |
| # KIRIM KODE LANGSUNG KE RESPONSE | |
| return jsonify({ | |
| 'success': True, | |
| 'user_id': user.id, | |
| 'destination': destination, | |
| 'verification_code': code, | |
| 'message': f'Registrasi berhasil! Kode verifikasi Anda: {code}' | |
| }) | |
| # ============ VERIFICATION ============ | |
| def verify(): | |
| data = request.get_json() | |
| user_id = data.get('user_id') | |
| code = data.get('code') | |
| if not user_id or not code: | |
| return jsonify({'error': 'User ID dan kode verifikasi diperlukan'}), 400 | |
| if db.verify_user(user_id, code): | |
| return jsonify({'success': True, 'message': 'Akun berhasil diverifikasi'}) | |
| else: | |
| return jsonify({'error': 'Kode verifikasi salah atau sudah kadaluarsa'}), 400 | |
| def resend_code(): | |
| data = request.get_json() | |
| user_id = data.get('user_id') | |
| if not user_id: | |
| return jsonify({'error': 'User ID diperlukan'}), 400 | |
| user = db.get_user_by_id(user_id) | |
| if not user: | |
| return jsonify({'error': 'User tidak ditemukan'}), 404 | |
| if user.is_verified: | |
| return jsonify({'error': 'Akun sudah diverifikasi'}), 400 | |
| code = generate_verification_code() | |
| expires = datetime.utcnow() + timedelta(minutes=10) | |
| db.set_verification_code(user.id, code, expires) | |
| destination = user.email if user.email else user.phone | |
| return jsonify({ | |
| 'success': True, | |
| 'destination': destination, | |
| 'verification_code': code, | |
| 'message': f'Kode verifikasi baru: {code}' | |
| }) | |
| # ============ LOGIN ============ | |
| def login(): | |
| data = request.get_json() | |
| identifier = data.get('identifier', '').strip() | |
| password = data.get('password', '') | |
| if not identifier or not password: | |
| return jsonify({'error': 'Username/Email/HP dan password diperlukan'}), 400 | |
| user = db.get_user_by_username(identifier) | |
| if not user and '@' in identifier: | |
| user = db.get_user_by_email(identifier) | |
| if not user and re.match(r'^(\+62|62|0)8', identifier): | |
| user = db.get_user_by_phone(identifier) | |
| if not user: | |
| return jsonify({'error': 'Username/Email/HP atau password salah'}), 401 | |
| if not check_password_hash(user.password_hash, password): | |
| return jsonify({'error': 'Username/Email/HP atau password salah'}), 401 | |
| if not user.is_verified: | |
| return jsonify({ | |
| 'error': 'Akun belum diverifikasi', | |
| 'requires_verification': True, | |
| 'user_id': user.id | |
| }), 401 | |
| db.update_user(user.id, last_login=datetime.utcnow()) | |
| session['user_id'] = user.id | |
| session['username'] = user.username | |
| session['user_email'] = user.email | |
| session['user_fullname'] = user.full_name | |
| return jsonify({ | |
| 'success': True, | |
| 'user': { | |
| 'id': user.id, | |
| 'username': user.username, | |
| 'email': user.email, | |
| 'phone': user.phone, | |
| 'full_name': user.full_name, | |
| 'is_verified': user.is_verified | |
| } | |
| }) | |
| def logout(): | |
| session.clear() | |
| return jsonify({'success': True}) | |
| def get_current_user(): | |
| user_id = session.get('user_id') | |
| if not user_id: | |
| return jsonify({'error': 'Not logged in'}), 401 | |
| user = db.get_user_by_id(user_id) | |
| if not user: | |
| return jsonify({'error': 'User not found'}), 404 | |
| return jsonify({ | |
| 'id': user.id, | |
| 'username': user.username, | |
| 'email': user.email, | |
| 'phone': user.phone, | |
| 'full_name': user.full_name, | |
| 'is_verified': user.is_verified, | |
| 'created_at': user.created_at.isoformat() if user.created_at else None | |
| }) | |
| # ============ PASSWORD MANAGEMENT ============ | |
| def change_password(): | |
| user_id = session.get('user_id') | |
| if not user_id: | |
| return jsonify({'error': 'Not logged in'}), 401 | |
| data = request.get_json() | |
| old_password = data.get('old_password') | |
| new_password = data.get('new_password') | |
| if not old_password or not new_password: | |
| return jsonify({'error': 'Password lama dan baru diperlukan'}), 400 | |
| if len(new_password) < 6: | |
| return jsonify({'error': 'Password baru minimal 6 karakter'}), 400 | |
| user = db.get_user_by_id(user_id) | |
| if not check_password_hash(user.password_hash, old_password): | |
| return jsonify({'error': 'Password lama salah'}), 401 | |
| new_hash = generate_password_hash(new_password) | |
| if db.update_user(user.id, password_hash=new_hash): | |
| return jsonify({'success': True, 'message': 'Password berhasil diubah'}) | |
| else: | |
| return jsonify({'error': 'Gagal mengubah password'}), 500 | |
| # ============ PROFILE UPDATE ============ | |
| def update_profile(): | |
| user_id = session.get('user_id') | |
| if not user_id: | |
| return jsonify({'error': 'Not logged in'}), 401 | |
| data = request.get_json() | |
| email = data.get('email') | |
| phone = data.get('phone') | |
| full_name = data.get('full_name') | |
| if email is not None: | |
| if email and not validate_email(email): | |
| return jsonify({'error': 'Format email tidak valid'}), 400 | |
| if email: | |
| existing = db.get_user_by_email(email) | |
| if existing and existing.id != user_id: | |
| return jsonify({'error': 'Email sudah digunakan oleh akun lain'}), 400 | |
| if phone is not None: | |
| if phone and not validate_phone(phone): | |
| return jsonify({'error': 'Format nomor HP tidak valid'}), 400 | |
| if phone: | |
| existing = db.get_user_by_phone(phone) | |
| if existing and existing.id != user_id: | |
| return jsonify({'error': 'Nomor HP sudah digunakan oleh akun lain'}), 400 | |
| updates = {} | |
| if email is not None: | |
| updates['email'] = email if email else None | |
| if phone is not None: | |
| updates['phone'] = phone if phone else None | |
| if full_name is not None: | |
| updates['full_name'] = full_name | |
| if db.update_user(user_id, **updates): | |
| if 'full_name' in updates: | |
| session['user_fullname'] = updates['full_name'] | |
| return jsonify({'success': True, 'message': 'Profil berhasil diperbarui'}) | |
| else: | |
| return jsonify({'error': 'Gagal memperbarui profil'}), 500 |