backend / database.py
Chandima Prabhath
Add functions to retrieve event categories and events by category
73578f3
import pymysql
from pymysql.cursors import DictCursor
from dotenv import load_dotenv
import os
load_dotenv()
MYSQL_HOST = os.getenv('MYSQL_HOST')
MYSQL_PORT = int(os.getenv('MYSQL_PORT'))
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')
MYSQL_DB = os.getenv('MYSQL_DB')
timeout = 10
def get_connection():
# Set the session time zone to UTC so all timestamps are in UTC.
return pymysql.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
db=MYSQL_DB,
charset="utf8mb4",
cursorclass=DictCursor,
connect_timeout=timeout,
read_timeout=timeout,
write_timeout=timeout,
init_command="SET time_zone='+00:00';" # Force UTC for all time operations.
)
def create_tables():
connection = get_connection()
try:
with connection.cursor() as cursor:
# Users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS Users (
user_id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
first_name VARCHAR(255),
last_name VARCHAR(255),
bio TEXT,
profile_picture VARCHAR(255)
)
""")
# Roles table
cursor.execute("""
CREATE TABLE IF NOT EXISTS Roles (
role_id INT AUTO_INCREMENT PRIMARY KEY,
role_name VARCHAR(255) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
# Permissions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS Permissions (
permission_id INT AUTO_INCREMENT PRIMARY KEY,
permission_name VARCHAR(255) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
# RolePermissions junction table
cursor.execute("""
CREATE TABLE IF NOT EXISTS RolePermissions (
role_id INT NOT NULL,
permission_id INT NOT NULL,
PRIMARY KEY (role_id, permission_id),
FOREIGN KEY (role_id) REFERENCES Roles(role_id),
FOREIGN KEY (permission_id) REFERENCES Permissions(permission_id)
)
""")
# UserRoles junction table
cursor.execute("""
CREATE TABLE IF NOT EXISTS UserRoles (
user_id INT NOT NULL,
role_id INT NOT NULL,
PRIMARY KEY (user_id, role_id),
FOREIGN KEY (user_id) REFERENCES Users(user_id),
FOREIGN KEY (role_id) REFERENCES Roles(role_id)
)
""")
# Locations table
cursor.execute("""
CREATE TABLE IF NOT EXISTS Locations (
location_id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
address VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
state VARCHAR(255),
country VARCHAR(255),
latitude DECIMAL(10, 8) NOT NULL,
longitude DECIMAL(11, 8) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
# Events table
cursor.execute("""
CREATE TABLE IF NOT EXISTS Events (
event_id INT AUTO_INCREMENT PRIMARY KEY,
host_id INT NOT NULL,
location_id INT NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP NOT NULL,
category VARCHAR(255),
max_participants INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
event_picture VARCHAR(255),
is_recurring BOOLEAN DEFAULT FALSE,
recurrence_type ENUM('daily','weekly','monthly','yearly','custom'),
recurrence_interval INT,
recurrence_end_date DATE,
custom_recurrence_pattern TEXT,
FOREIGN KEY (host_id) REFERENCES Users(user_id),
FOREIGN KEY (location_id) REFERENCES Locations(location_id)
)
""")
# EventParticipants junction table
cursor.execute("""
CREATE TABLE IF NOT EXISTS EventParticipants (
event_id INT NOT NULL,
user_id INT NOT NULL,
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (event_id, user_id),
FOREIGN KEY (event_id) REFERENCES Events(event_id),
FOREIGN KEY (user_id) REFERENCES Users(user_id)
)
""")
connection.commit()
finally:
connection.close()
# CRUD functions for Users
def create_user(username: str, email: str, password_hash: str, first_name: str = None, last_name: str = None, bio: str = None, profile_picture: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO Users (username, email, password_hash, first_name, last_name, bio, profile_picture)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (username, email, password_hash, first_name, last_name, bio, profile_picture))
connection.commit()
return cursor.lastrowid
finally:
connection.close()
def get_user_by_id(user_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Users WHERE user_id = %s"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
finally:
connection.close()
def get_user_by_username(username: str):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Users WHERE username = %s"
cursor.execute(sql, (username,))
return cursor.fetchone()
finally:
connection.close()
def update_user(user_id: int, username: str = None, email: str = None, password_hash: str = None, first_name: str = None, last_name: str = None, bio: str = None, profile_picture: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
updates = []
params = []
if username is not None:
updates.append("username = %s")
params.append(username)
if email is not None:
updates.append("email = %s")
params.append(email)
if password_hash is not None:
updates.append("password_hash = %s")
params.append(password_hash)
if first_name is not None:
updates.append("first_name = %s")
params.append(first_name)
if last_name is not None:
updates.append("last_name = %s")
params.append(last_name)
if bio is not None:
updates.append("bio = %s")
params.append(bio)
if profile_picture is not None:
updates.append("profile_picture = %s")
params.append(profile_picture)
if not updates:
return 0
params.append(user_id)
sql = "UPDATE Users SET " + ", ".join(updates) + " WHERE user_id = %s"
cursor.execute(sql, tuple(params))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def delete_user(user_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM Users WHERE user_id = %s"
cursor.execute(sql, (user_id,))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_all_users(page: int, per_page: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
offset = max((page - 1) * per_page, 0)
sql = "SELECT * FROM Users LIMIT %s OFFSET %s"
cursor.execute(sql, (per_page, offset))
return cursor.fetchall()
finally:
connection.close()
# CRUD functions for Roles
def create_role(role_name: str, description: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO Roles (role_name, description) VALUES (%s, %s)"
cursor.execute(sql, (role_name, description))
connection.commit()
return cursor.lastrowid
finally:
connection.close()
def get_role_by_id(role_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Roles WHERE role_id = %s"
cursor.execute(sql, (role_id,))
return cursor.fetchone()
finally:
connection.close()
def get_role_by_name(role_name: str):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Roles WHERE role_name = %s"
cursor.execute(sql, (role_name,))
return cursor.fetchone()
finally:
connection.close()
def update_role(role_id: int, role_name: str = None, description: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
updates = []
params = []
if role_name is not None:
updates.append("role_name = %s")
params.append(role_name)
if description is not None:
updates.append("description = %s")
params.append(description)
if not updates:
return 0
params.append(role_id)
sql = "UPDATE Roles SET " + ", ".join(updates) + " WHERE role_id = %s"
cursor.execute(sql, tuple(params))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def delete_role(role_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM Roles WHERE role_id = %s"
cursor.execute(sql, (role_id,))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_all_roles(page: int, per_page: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
offset = max((page - 1) * per_page, 0)
sql = "SELECT * FROM Roles LIMIT %s OFFSET %s"
cursor.execute(sql, (per_page, offset))
return cursor.fetchall()
finally:
connection.close()
# CRUD functions for Permissions
def create_permission(permission_name: str, description: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO Permissions (permission_name, description) VALUES (%s, %s)"
cursor.execute(sql, (permission_name, description))
connection.commit()
return cursor.lastrowid
finally:
connection.close()
def get_permission_by_id(permission_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Permissions WHERE permission_id = %s"
cursor.execute(sql, (permission_id,))
return cursor.fetchone()
finally:
connection.close()
def get_permission_by_name(permission_name: str):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Permissions WHERE permission_name = %s"
cursor.execute(sql, (permission_name,))
return cursor.fetchone()
finally:
connection.close()
def update_permission(permission_id: int, permission_name: str = None, description: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
updates = []
params = []
if permission_name is not None:
updates.append("permission_name = %s")
params.append(permission_name)
if description is not None:
updates.append("description = %s")
params.append(description)
if not updates:
return 0
params.append(permission_id)
sql = "UPDATE Permissions SET " + ", ".join(updates) + " WHERE permission_id = %s"
cursor.execute(sql, tuple(params))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def delete_permission(permission_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM Permissions WHERE permission_id = %s"
cursor.execute(sql, (permission_id,))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_all_permissions(page: int, per_page: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
offset = max((page - 1) * per_page, 0)
sql = "SELECT * FROM Permissions LIMIT %s OFFSET %s"
cursor.execute(sql, (per_page, offset))
return cursor.fetchall()
finally:
connection.close()
# Functions for RolePermissions junction table
def add_role_permission(role_id: int, permission_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO RolePermissions (role_id, permission_id) VALUES (%s, %s)"
cursor.execute(sql, (role_id, permission_id))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def remove_role_permission(role_id: int, permission_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM RolePermissions WHERE role_id = %s AND permission_id = %s"
cursor.execute(sql, (role_id, permission_id))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_permissions_by_role(role_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
SELECT p.* FROM Permissions p
JOIN RolePermissions rp ON p.permission_id = rp.permission_id
WHERE rp.role_id = %s
"""
cursor.execute(sql, (role_id,))
return cursor.fetchall()
finally:
connection.close()
def get_roles_by_permission(permission_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
SELECT r.* FROM Roles r
JOIN RolePermissions rp ON r.role_id = rp.role_id
WHERE rp.permission_id = %s
"""
cursor.execute(sql, (permission_id,))
return cursor.fetchall()
finally:
connection.close()
# Functions for UserRoles junction table
def add_user_role(user_id: int, role_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO UserRoles (user_id, role_id) VALUES (%s, %s)"
cursor.execute(sql, (user_id, role_id))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def remove_user_role(user_id: int, role_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM UserRoles WHERE user_id = %s AND role_id = %s"
cursor.execute(sql, (user_id, role_id))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_roles_by_user(user_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
SELECT r.* FROM Roles r
JOIN UserRoles ur ON r.role_id = ur.role_id
WHERE ur.user_id = %s
"""
cursor.execute(sql, (user_id,))
return cursor.fetchall()
finally:
connection.close()
def get_users_by_role(role_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
SELECT u.* FROM Users u
JOIN UserRoles ur ON u.user_id = ur.user_id
WHERE ur.role_id = %s
"""
cursor.execute(sql, (role_id,))
return cursor.fetchall()
finally:
connection.close()
# CRUD functions for Locations
def create_location(name: str, address: str, city: str, state: str = None, country: str = None, latitude: float = None, longitude: float = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO Locations (name, address, city, state, country, latitude, longitude)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (name, address, city, state, country, latitude, longitude))
connection.commit()
return cursor.lastrowid
finally:
connection.close()
def get_location_by_id(location_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Locations WHERE location_id = %s"
cursor.execute(sql, (location_id,))
return cursor.fetchone()
finally:
connection.close()
def update_location(location_id: int, name: str = None, address: str = None, city: str = None, state: str = None, country: str = None, latitude: float = None, longitude: float = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if address is not None:
updates.append("address = %s")
params.append(address)
if city is not None:
updates.append("city = %s")
params.append(city)
if state is not None:
updates.append("state = %s")
params.append(state)
if country is not None:
updates.append("country = %s")
params.append(country)
if latitude is not None:
updates.append("latitude = %s")
params.append(latitude)
if longitude is not None:
updates.append("longitude = %s")
params.append(longitude)
if not updates:
return 0
params.append(location_id)
sql = "UPDATE Locations SET " + ", ".join(updates) + " WHERE location_id = %s"
cursor.execute(sql, tuple(params))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def delete_location(location_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM Locations WHERE location_id = %s"
cursor.execute(sql, (location_id,))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_all_locations(page: int, per_page: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
offset = max((page - 1) * per_page, 0)
sql = "SELECT * FROM Locations LIMIT %s OFFSET %s"
cursor.execute(sql, (per_page, offset))
return cursor.fetchall()
finally:
connection.close()
# CRUD functions for Events
def create_event(host_id: int, location_id: int, title: str, start_time, end_time, description: str = None, category: str = None, max_participants: int = None, event_picture: str = None,
is_recurring: bool = False, recurrence_type: str = None, recurrence_interval: int = None, recurrence_end_date = None, custom_recurrence_pattern: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO Events (host_id, location_id, title, description, start_time, end_time, category, max_participants, event_picture,
is_recurring, recurrence_type, recurrence_interval, recurrence_end_date, custom_recurrence_pattern)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (host_id, location_id, title, description, start_time, end_time, category, max_participants, event_picture,
is_recurring, recurrence_type, recurrence_interval, recurrence_end_date, custom_recurrence_pattern))
connection.commit()
return cursor.lastrowid
finally:
connection.close()
def get_event_by_id(event_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Events WHERE event_id = %s"
cursor.execute(sql, (event_id,))
return cursor.fetchone()
finally:
connection.close()
def update_event(event_id: int, host_id: int = None, location_id: int = None, title: str = None, start_time = None, end_time = None, description: str = None,
category: str = None, max_participants: int = None, event_picture: str = None, is_recurring: bool = None, recurrence_type: str = None,
recurrence_interval: int = None, recurrence_end_date = None, custom_recurrence_pattern: str = None):
connection = get_connection()
try:
with connection.cursor() as cursor:
updates = []
params = []
if host_id is not None:
updates.append("host_id = %s")
params.append(host_id)
if location_id is not None:
updates.append("location_id = %s")
params.append(location_id)
if title is not None:
updates.append("title = %s")
params.append(title)
if start_time is not None:
updates.append("start_time = %s")
params.append(start_time)
if end_time is not None:
updates.append("end_time = %s")
params.append(end_time)
if description is not None:
updates.append("description = %s")
params.append(description)
if category is not None:
updates.append("category = %s")
params.append(category)
if max_participants is not None:
updates.append("max_participants = %s")
params.append(max_participants)
if event_picture is not None:
updates.append("event_picture = %s")
params.append(event_picture)
if is_recurring is not None:
updates.append("is_recurring = %s")
params.append(is_recurring)
if recurrence_type is not None:
updates.append("recurrence_type = %s")
params.append(recurrence_type)
if recurrence_interval is not None:
updates.append("recurrence_interval = %s")
params.append(recurrence_interval)
if recurrence_end_date is not None:
updates.append("recurrence_end_date = %s")
params.append(recurrence_end_date)
if custom_recurrence_pattern is not None:
updates.append("custom_recurrence_pattern = %s")
params.append(custom_recurrence_pattern)
if not updates:
return 0
params.append(event_id)
sql = "UPDATE Events SET " + ", ".join(updates) + " WHERE event_id = %s"
cursor.execute(sql, tuple(params))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def delete_event(event_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM Events WHERE event_id = %s"
cursor.execute(sql, (event_id,))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_all_events(page: int, per_page: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
offset = max((page - 1) * per_page, 0)
sql = "SELECT * FROM Events LIMIT %s OFFSET %s"
cursor.execute(sql, (per_page, offset))
return cursor.fetchall()
finally:
connection.close()
def get_events_by_host(host_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Events WHERE host_id = %s"
cursor.execute(sql, (host_id,))
return cursor.fetchall()
finally:
connection.close()
def get_all_event_categories():
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT DISTINCT category FROM Events"
cursor.execute(sql)
categories = [row['category'] for row in cursor.fetchall() if row['category'] is not None]
return categories
finally:
connection.close()
def get_events_by_category(category: str):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM Events WHERE category = %s"
cursor.execute(sql, (category,))
return cursor.fetchall()
finally:
connection.close()
# Functions for EventParticipants junction table
def add_event_participant(event_id: int, user_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO EventParticipants (event_id, user_id) VALUES (%s, %s)"
cursor.execute(sql, (event_id, user_id))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def remove_event_participant(event_id: int, user_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM EventParticipants WHERE event_id = %s AND user_id = %s"
cursor.execute(sql, (event_id, user_id))
connection.commit()
return cursor.rowcount
finally:
connection.close()
def get_participants_by_event(event_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
SELECT u.* FROM Users u
JOIN EventParticipants ep ON u.user_id = ep.user_id
WHERE ep.event_id = %s
"""
cursor.execute(sql, (event_id,))
return cursor.fetchall()
finally:
connection.close()
def get_events_by_participant(user_id: int):
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = """
SELECT e.* FROM Events e
JOIN EventParticipants ep ON e.event_id = ep.event_id
WHERE ep.user_id = %s
"""
cursor.execute(sql, (user_id,))
return cursor.fetchall()
finally:
connection.close()