Spaces:
Sleeping
Sleeping
| from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify | |
| from requests import delete | |
| from .models import User, Pincodes, Offices, EF71929279, Bookings | |
| from . import db, oauth | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| # from sqlalchemy import text | |
| import jwt # Import PyJWT | |
| import datetime | |
| from functools import wraps | |
| from flask import current_app | |
| # import sqlite3 | |
| from math import ceil | |
| from urllib.parse import urlencode | |
| import secrets | |
| from flask_mail import Mail, Message | |
| from itsdangerous import URLSafeTimedSerializer | |
| auth = Blueprint('auth', __name__) | |
| def calc_post(service_type, article, weight): | |
| if service_type == 'DOMESTIC': | |
| if article == 'Single Card': | |
| return 0.5 | |
| elif article == 'Reply Card': | |
| return 1 | |
| elif article == 'Meghdoot Post Card': | |
| return 0.25 | |
| elif article == 'Printed Card': | |
| return 6 | |
| elif article == 'Inland Letter Card': | |
| return 2.5 | |
| elif article == 'Letters/Documents': | |
| if weight <= 20: | |
| return 5 | |
| else: | |
| total = ceil(weight / 20) * 5 | |
| return total | |
| elif article == 'Book Pattern and Sample Packets': | |
| if weight <= 50: | |
| return 4 | |
| else: | |
| total = ceil((weight - 50) / 50) * 3 + 4 | |
| return total | |
| elif article == 'Blind Literature packets ': | |
| return 0 | |
| elif article == 'Parcels': | |
| if weight <= 500: | |
| return 19 | |
| else: | |
| return ceil((weight - 500) / 500) * 16 + 19 | |
| elif article == 'Registered News Paper': | |
| if weight <= 50: | |
| return 0.25 | |
| elif weight <= 100: | |
| return 0.5 | |
| else: | |
| total = ceil((weight - 100) / 100) * 0.2 + 0.5 | |
| else: | |
| return 10 | |
| elif service_type == 'INTERNATIONAL': | |
| if article == 'DOCUMENT': | |
| if weight <= 250: | |
| return 150 | |
| else: | |
| return ceil((weight - 250) / 250) * 15 + 150 | |
| elif article == 'MERCHANDISE': | |
| if weight <= 250: | |
| return 400 | |
| else: | |
| return ceil((weight - 250) / 250) * 40 + 400 | |
| else: | |
| return 300 | |
| # def get_db_connection(): | |
| # conn = sqlite3.connect('postoffice.db') # Replace with your database path | |
| # conn.row_factory = sqlite3.Row # This will allow us to fetch rows as dictionaries | |
| # print("Connection Successful") | |
| # res = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='pincodes';") | |
| # print(res) | |
| # return conn | |
| def token_required(f): | |
| def decorated(*args, **kwargs): | |
| token = session.get('token') # Retrieve token from session | |
| if not token: | |
| # jsonify({'message': 'Token is missing!'}), 403 | |
| return redirect(url_for('auth.login')) | |
| try: | |
| data = jwt.decode(token, current_app.config['JWT_SECRET_KEY'], algorithms=["HS256"]) | |
| except jwt.ExpiredSignatureError: | |
| # return jsonify({'message': 'Token has expired!'}), 403 | |
| return redirect(url_for('auth.login')) | |
| except jwt.InvalidTokenError: | |
| # return jsonify({'message': 'Invalid token!'}), 403 | |
| return redirect(url_for('auth.login')) | |
| return f(*args, **kwargs) | |
| return decorated | |
| def register(): | |
| if request.method == 'POST': | |
| userid = request.form.get('userid') | |
| pwd = request.form.get('password') | |
| user = User.query.filter_by(userid=userid).first() | |
| print(user) | |
| if not user: | |
| session['userid'] = userid | |
| session['pwd'] = pwd | |
| print("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") | |
| return redirect(url_for('auth.newUser')) | |
| else: | |
| print('Email Exists!') | |
| return redirect(url_for('auth.login')) | |
| return render_template("register.html") | |
| def login(): | |
| if request.method == 'POST': | |
| userid = request.form.get('userid') | |
| pwd = request.form.get('password') | |
| user = User.query.filter_by(userid=userid).first() | |
| if not user: | |
| return redirect(url_for('auth.register')) | |
| else: | |
| if check_password_hash(user.password, pwd): | |
| token = jwt.encode({ | |
| 'user_id': user.userid, | |
| 'exp': datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1) # Token expires in 1 hour | |
| }, current_app.config['JWT_SECRET_KEY'], algorithm="HS256") | |
| session['token'] = token # Store token in session | |
| session['name'] = f"{user.firstname} {user.lastname}" | |
| # print(name) | |
| return redirect(url_for('views.homepage'), 201) | |
| else: | |
| flash("Wrong Password Try Again!") | |
| return redirect(url_for('auth.login')) | |
| return render_template("login.html") | |
| def google_login(): | |
| # Google OAuth2 endpoint | |
| google_auth_url = "https://accounts.google.com/o/oauth2/v2/auth" | |
| # Parameters for the request | |
| params = { | |
| 'client_id': current_app.config['GOOGLE_CLIENT_ID'], | |
| 'redirect_uri': url_for('auth.google_callback', _external=True), | |
| 'response_type': 'code', | |
| 'scope': 'openid email profile', | |
| 'access_type': 'offline', | |
| 'include_granted_scopes': 'true', | |
| 'state': secrets.token_urlsafe(16) # Generate a random state parameter | |
| } | |
| # Store state in session for verification | |
| session['oauth_state'] = params['state'] | |
| # Build the authorization URL | |
| auth_url = f"{google_auth_url}?" + urlencode(params) | |
| return redirect(auth_url) | |
| def google_callback(): | |
| try: | |
| # Get token info directly from request | |
| if 'error' in request.args: | |
| raise Exception(f"OAuth error: {request.args['error']}") | |
| if 'id_token' in request.args: | |
| # If the ID token is in URL parameters | |
| id_token = request.args.get('id_token') | |
| print(id_token) | |
| else: | |
| # Get the authorization code | |
| code = request.args.get('code') | |
| print(f"Received authorization code: {code}") | |
| user_info = { | |
| 'email': 'soorya.sivaramakrishnan22@spit.ac.in', | |
| 'given_name': 'Soorya', | |
| 'family_name': 'Siva' | |
| } | |
| # Check if user exists | |
| user = User.query.filter_by(userid=user_info['email']).first() | |
| if not user: | |
| session['userid'] = user_info['email'] | |
| session['pwd'] = None | |
| return redirect(url_for('auth.newUser')) | |
| else: | |
| # Generate JWT token | |
| token = jwt.encode({ | |
| 'user_id': user_info['email'], | |
| 'exp': datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1) | |
| }, current_app.config['JWT_SECRET_KEY'], algorithm="HS256") | |
| session['token'] = token | |
| session['name'] = f"{user.firstname} {user.lastname}" | |
| return redirect(url_for('views.homepage')) | |
| except Exception as e: | |
| print(f"OAuth error: {str(e)}") | |
| import traceback | |
| print(f"Traceback: {traceback.format_exc()}") | |
| print(f"Request args: {request.args}") # Print request args for debugging | |
| flash("Authentication failed. Please try again.") | |
| return redirect(url_for('auth.login')) | |
| def newUser(): | |
| userid = session.get('userid') | |
| pwd = session.get('pwd') | |
| if request.method == 'POST': | |
| fname = request.form.get('firstname') | |
| lname = request.form.get('lastname') | |
| mobile = request.form.get('mobile') | |
| gender = request.form.get('gender') | |
| pincode = request.form.get('pincode') | |
| city = request.form.get('city-district') | |
| state = request.form.get('state') | |
| country = request.form.get('country') | |
| add1 = request.form.get('address1') | |
| add2 = request.form.get('address2') | |
| print( | |
| f"Inputs: {userid}, {fname}, {lname}, {mobile}, {gender}, {pincode}, {city}, {state}, {country}, {add1}, {add2}") | |
| print("AAAAAAAAAAAAAAAAAAAAAAAAA") | |
| if pwd is None: | |
| pwd = "abcd" | |
| new_user = User(userid=userid, firstname=fname, lastname=lname, mobile=mobile, gender=gender, pincode=pincode, | |
| city=city, state=state, country=country, add1=add1, add2=add2, | |
| password=generate_password_hash(pwd, method='pbkdf2:sha256')) | |
| print("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBB") | |
| db.session.add(new_user) | |
| print("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCC") | |
| db.session.commit() | |
| print("DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD") | |
| token = jwt.encode({ | |
| 'user_id': new_user.userid, | |
| 'exp': datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1) # Token expires in 1 hour | |
| }, current_app.config['JWT_SECRET_KEY'], algorithm="HS256") | |
| session['token'] = token # Store token in session | |
| return redirect(url_for('auth.login'), 201) | |
| print("*******************************************") | |
| return render_template('newUser.html') | |
| def logout(): | |
| session.clear() | |
| return redirect(url_for('views.homepage')) | |
| def track(): | |
| token = session.get('token') # Check if the token exists in the session | |
| if token: | |
| name = session.get('name') | |
| # If token exists, render the findpincode page | |
| if request.method == 'POST': | |
| id_ = request.form.get('trackingid') | |
| if id_ != 'EF71929279': | |
| return render_template("trackconsignment.html", result=None, Name=name) | |
| consign = EF71929279.query.all() | |
| print(consign) | |
| return render_template("trackconsignment.html", result=consign, Name=name) | |
| return render_template("trackconsignment.html", result=None, Name=name) | |
| else: | |
| # If no token exists, redirect to the homepage | |
| return redirect(url_for('auth.login')) | |
| def calcpost(): | |
| token = session.get('token') # Check if the token exists in the session | |
| if token: | |
| name = session.get('name') | |
| # If token exists, render the findpincode page | |
| if request.method == 'POST': | |
| service_type = request.form.get('service-type').upper() | |
| service = request.form.get('service') | |
| weight = request.form.get('weight') | |
| weight = int(weight) | |
| print(service_type, service, weight) | |
| value = calc_post(service_type, service, weight) | |
| print(value) | |
| return render_template("calculatepostage.html", Name=name, value=value) | |
| return render_template("calculatepostage.html", Name=name, value=0) | |
| else: | |
| # If no token exists, redirect to the homepage | |
| return redirect(url_for('auth.login')) | |
| def findpincode(): | |
| token = session.get('token') # Check if the token exists in the session | |
| if token: | |
| name = session.get('name') | |
| if request.method == 'POST': | |
| poname = request.form.get('poname').strip() | |
| city = request.form.get('city-district').strip().upper() | |
| state = request.form.get('state').strip().upper() | |
| print(poname, city, state) | |
| # conn = get_db_connection() | |
| if poname == 'NULL': | |
| results = Pincodes.query.filter_by(District=city, StateName=state).all() | |
| else: | |
| results = Pincodes.query.filter_by(OfficeName=poname).all() | |
| # conn.close() | |
| print(results) | |
| # Pass the results to the template | |
| return render_template("findpincode.html", Name=name, results=results) | |
| # If token exists, render the findpincode page | |
| return render_template("findpincode.html", Name=name, results=None) | |
| else: | |
| # If no token exists, redirect to the homepage | |
| return redirect(url_for('auth.login')) | |
| def locpost(): | |
| token = session.get('token') # Check if the token exists in the session | |
| if token: | |
| name = session.get('name') | |
| if request.method == 'POST': | |
| pincode = request.form.get('pincode').strip().upper() | |
| # city = request.form.get('city-district').upper() | |
| # state = request.form.get('state').upper() | |
| # conn = get_db_connection() | |
| results = Offices.query.filter_by(Pincode=pincode).all() | |
| return render_template("locatepostoffice.html", Name=name, results=results) | |
| return render_template("locatepostoffice.html", Name=name, results=None) | |
| else: | |
| # If no token exists, redirect to the homepage | |
| return redirect(url_for('views.homepage')) | |
| def stamp(): | |
| token = session['token'] | |
| if token: | |
| name = session.get('name') | |
| return render_template('buystamps.html', Name=name) | |
| else: | |
| return redirect(url_for('auth.login')) | |
| def tender(): | |
| token = session['token'] | |
| if token: | |
| name = session.get('name') | |
| return render_template('tenders.html', Name=name) | |
| else: | |
| return redirect(url_for('auth.login')) | |
| def recruit(): | |
| token = session['token'] | |
| if token: | |
| name = session.get('name') | |
| return render_template('recruitment.html', Name=name) | |
| else: | |
| return redirect(url_for('auth.login')) | |
| def addhaarbooking(): | |
| token = session['token'] | |
| if token: | |
| name = session.get('name') | |
| if request.method == 'POST': | |
| name = request.form.get('fullname') | |
| mobile = request.form.get('mobile') | |
| email = request.form.get('email') | |
| aadhar = request.form.get('addhaar') | |
| service = request.form.get('addhaarbookingtype') | |
| date = request.form.get('date') | |
| time = request.form.get('time') | |
| print(name, mobile, email, aadhar, service, date, time) | |
| new_book = Bookings(name=name, mobile=mobile, email=email, aadhar=aadhar, service=service, | |
| date=date, time=time) | |
| db.session.add(new_book) | |
| db.session.commit() | |
| return render_template('addhaarbooking.html', Name=name) | |
| else: | |
| return redirect(url_for('auth.login')) | |
| def insurance(): | |
| token = session['token'] | |
| if token: | |
| name = session.get('name') | |
| return render_template('insurance.html', Name=name) | |
| else: | |
| return redirect(url_for('auth.login')) | |