import pymysql from pymysql.cursors import DictCursor from dotenv import load_dotenv import os load_dotenv() MYSQL_HOST = os.getenv('MYSQL_HOST') MYSQL_PORT = int(os.getenv('MYSQL_PORT')) MYSQL_USER = os.getenv('MYSQL_USER') MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD') MYSQL_DB = os.getenv('MYSQL_DB') timeout = 10 def get_connection(): # Set the session time zone to UTC so all timestamps are in UTC. return pymysql.connect( host=MYSQL_HOST, port=MYSQL_PORT, user=MYSQL_USER, password=MYSQL_PASSWORD, db=MYSQL_DB, charset="utf8mb4", cursorclass=DictCursor, connect_timeout=timeout, read_timeout=timeout, write_timeout=timeout, init_command="SET time_zone='+00:00';" # Force UTC for all time operations. ) def create_tables(): connection = get_connection() try: with connection.cursor() as cursor: # Users table cursor.execute(""" CREATE TABLE IF NOT EXISTS Users ( user_id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(255) UNIQUE NOT NULL, email VARCHAR(255) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, first_name VARCHAR(255), last_name VARCHAR(255), bio TEXT, profile_picture VARCHAR(255) ) """) # Roles table cursor.execute(""" CREATE TABLE IF NOT EXISTS Roles ( role_id INT AUTO_INCREMENT PRIMARY KEY, role_name VARCHAR(255) UNIQUE NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) """) # Permissions table cursor.execute(""" CREATE TABLE IF NOT EXISTS Permissions ( permission_id INT AUTO_INCREMENT PRIMARY KEY, permission_name VARCHAR(255) UNIQUE NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) """) # RolePermissions junction table cursor.execute(""" CREATE TABLE IF NOT EXISTS RolePermissions ( role_id INT NOT NULL, permission_id INT NOT NULL, PRIMARY KEY (role_id, permission_id), FOREIGN KEY (role_id) REFERENCES Roles(role_id), FOREIGN KEY (permission_id) REFERENCES Permissions(permission_id) ) """) # UserRoles junction table cursor.execute(""" CREATE TABLE IF NOT EXISTS UserRoles ( user_id INT NOT NULL, role_id INT NOT NULL, PRIMARY KEY (user_id, role_id), FOREIGN KEY (user_id) REFERENCES Users(user_id), FOREIGN KEY (role_id) REFERENCES Roles(role_id) ) """) # Locations table cursor.execute(""" CREATE TABLE IF NOT EXISTS Locations ( location_id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, address VARCHAR(255) NOT NULL, city VARCHAR(255) NOT NULL, state VARCHAR(255), country VARCHAR(255), latitude DECIMAL(10, 8) NOT NULL, longitude DECIMAL(11, 8) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) """) # Events table cursor.execute(""" CREATE TABLE IF NOT EXISTS Events ( event_id INT AUTO_INCREMENT PRIMARY KEY, host_id INT NOT NULL, location_id INT NOT NULL, title VARCHAR(255) NOT NULL, description TEXT, start_time TIMESTAMP NOT NULL, end_time TIMESTAMP NOT NULL, category VARCHAR(255), max_participants INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, event_picture VARCHAR(255), is_recurring BOOLEAN DEFAULT FALSE, recurrence_type ENUM('daily','weekly','monthly','yearly','custom'), recurrence_interval INT, recurrence_end_date DATE, custom_recurrence_pattern TEXT, FOREIGN KEY (host_id) REFERENCES Users(user_id), FOREIGN KEY (location_id) REFERENCES Locations(location_id) ) """) # EventParticipants junction table cursor.execute(""" CREATE TABLE IF NOT EXISTS EventParticipants ( event_id INT NOT NULL, user_id INT NOT NULL, joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (event_id, user_id), FOREIGN KEY (event_id) REFERENCES Events(event_id), FOREIGN KEY (user_id) REFERENCES Users(user_id) ) """) connection.commit() finally: connection.close() # CRUD functions for Users def create_user(username: str, email: str, password_hash: str, first_name: str = None, last_name: str = None, bio: str = None, profile_picture: str = None): connection = get_connection() try: with connection.cursor() as cursor: sql = """ INSERT INTO Users (username, email, password_hash, first_name, last_name, bio, profile_picture) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cursor.execute(sql, (username, email, password_hash, first_name, last_name, bio, profile_picture)) connection.commit() return cursor.lastrowid finally: connection.close() def get_user_by_id(user_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Users WHERE user_id = %s" cursor.execute(sql, (user_id,)) return cursor.fetchone() finally: connection.close() def get_user_by_username(username: str): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Users WHERE username = %s" cursor.execute(sql, (username,)) return cursor.fetchone() finally: connection.close() def update_user(user_id: int, username: str = None, email: str = None, password_hash: str = None, first_name: str = None, last_name: str = None, bio: str = None, profile_picture: str = None): connection = get_connection() try: with connection.cursor() as cursor: updates = [] params = [] if username is not None: updates.append("username = %s") params.append(username) if email is not None: updates.append("email = %s") params.append(email) if password_hash is not None: updates.append("password_hash = %s") params.append(password_hash) if first_name is not None: updates.append("first_name = %s") params.append(first_name) if last_name is not None: updates.append("last_name = %s") params.append(last_name) if bio is not None: updates.append("bio = %s") params.append(bio) if profile_picture is not None: updates.append("profile_picture = %s") params.append(profile_picture) if not updates: return 0 params.append(user_id) sql = "UPDATE Users SET " + ", ".join(updates) + " WHERE user_id = %s" cursor.execute(sql, tuple(params)) connection.commit() return cursor.rowcount finally: connection.close() def delete_user(user_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM Users WHERE user_id = %s" cursor.execute(sql, (user_id,)) connection.commit() return cursor.rowcount finally: connection.close() def get_all_users(page: int, per_page: int): connection = get_connection() try: with connection.cursor() as cursor: offset = max((page - 1) * per_page, 0) sql = "SELECT * FROM Users LIMIT %s OFFSET %s" cursor.execute(sql, (per_page, offset)) return cursor.fetchall() finally: connection.close() # CRUD functions for Roles def create_role(role_name: str, description: str = None): connection = get_connection() try: with connection.cursor() as cursor: sql = "INSERT INTO Roles (role_name, description) VALUES (%s, %s)" cursor.execute(sql, (role_name, description)) connection.commit() return cursor.lastrowid finally: connection.close() def get_role_by_id(role_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Roles WHERE role_id = %s" cursor.execute(sql, (role_id,)) return cursor.fetchone() finally: connection.close() def get_role_by_name(role_name: str): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Roles WHERE role_name = %s" cursor.execute(sql, (role_name,)) return cursor.fetchone() finally: connection.close() def update_role(role_id: int, role_name: str = None, description: str = None): connection = get_connection() try: with connection.cursor() as cursor: updates = [] params = [] if role_name is not None: updates.append("role_name = %s") params.append(role_name) if description is not None: updates.append("description = %s") params.append(description) if not updates: return 0 params.append(role_id) sql = "UPDATE Roles SET " + ", ".join(updates) + " WHERE role_id = %s" cursor.execute(sql, tuple(params)) connection.commit() return cursor.rowcount finally: connection.close() def delete_role(role_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM Roles WHERE role_id = %s" cursor.execute(sql, (role_id,)) connection.commit() return cursor.rowcount finally: connection.close() def get_all_roles(page: int, per_page: int): connection = get_connection() try: with connection.cursor() as cursor: offset = max((page - 1) * per_page, 0) sql = "SELECT * FROM Roles LIMIT %s OFFSET %s" cursor.execute(sql, (per_page, offset)) return cursor.fetchall() finally: connection.close() # CRUD functions for Permissions def create_permission(permission_name: str, description: str = None): connection = get_connection() try: with connection.cursor() as cursor: sql = "INSERT INTO Permissions (permission_name, description) VALUES (%s, %s)" cursor.execute(sql, (permission_name, description)) connection.commit() return cursor.lastrowid finally: connection.close() def get_permission_by_id(permission_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Permissions WHERE permission_id = %s" cursor.execute(sql, (permission_id,)) return cursor.fetchone() finally: connection.close() def get_permission_by_name(permission_name: str): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Permissions WHERE permission_name = %s" cursor.execute(sql, (permission_name,)) return cursor.fetchone() finally: connection.close() def update_permission(permission_id: int, permission_name: str = None, description: str = None): connection = get_connection() try: with connection.cursor() as cursor: updates = [] params = [] if permission_name is not None: updates.append("permission_name = %s") params.append(permission_name) if description is not None: updates.append("description = %s") params.append(description) if not updates: return 0 params.append(permission_id) sql = "UPDATE Permissions SET " + ", ".join(updates) + " WHERE permission_id = %s" cursor.execute(sql, tuple(params)) connection.commit() return cursor.rowcount finally: connection.close() def delete_permission(permission_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM Permissions WHERE permission_id = %s" cursor.execute(sql, (permission_id,)) connection.commit() return cursor.rowcount finally: connection.close() def get_all_permissions(page: int, per_page: int): connection = get_connection() try: with connection.cursor() as cursor: offset = max((page - 1) * per_page, 0) sql = "SELECT * FROM Permissions LIMIT %s OFFSET %s" cursor.execute(sql, (per_page, offset)) return cursor.fetchall() finally: connection.close() # Functions for RolePermissions junction table def add_role_permission(role_id: int, permission_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "INSERT INTO RolePermissions (role_id, permission_id) VALUES (%s, %s)" cursor.execute(sql, (role_id, permission_id)) connection.commit() return cursor.rowcount finally: connection.close() def remove_role_permission(role_id: int, permission_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM RolePermissions WHERE role_id = %s AND permission_id = %s" cursor.execute(sql, (role_id, permission_id)) connection.commit() return cursor.rowcount finally: connection.close() def get_permissions_by_role(role_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = """ SELECT p.* FROM Permissions p JOIN RolePermissions rp ON p.permission_id = rp.permission_id WHERE rp.role_id = %s """ cursor.execute(sql, (role_id,)) return cursor.fetchall() finally: connection.close() def get_roles_by_permission(permission_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = """ SELECT r.* FROM Roles r JOIN RolePermissions rp ON r.role_id = rp.role_id WHERE rp.permission_id = %s """ cursor.execute(sql, (permission_id,)) return cursor.fetchall() finally: connection.close() # Functions for UserRoles junction table def add_user_role(user_id: int, role_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "INSERT INTO UserRoles (user_id, role_id) VALUES (%s, %s)" cursor.execute(sql, (user_id, role_id)) connection.commit() return cursor.rowcount finally: connection.close() def remove_user_role(user_id: int, role_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM UserRoles WHERE user_id = %s AND role_id = %s" cursor.execute(sql, (user_id, role_id)) connection.commit() return cursor.rowcount finally: connection.close() def get_roles_by_user(user_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = """ SELECT r.* FROM Roles r JOIN UserRoles ur ON r.role_id = ur.role_id WHERE ur.user_id = %s """ cursor.execute(sql, (user_id,)) return cursor.fetchall() finally: connection.close() def get_users_by_role(role_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = """ SELECT u.* FROM Users u JOIN UserRoles ur ON u.user_id = ur.user_id WHERE ur.role_id = %s """ cursor.execute(sql, (role_id,)) return cursor.fetchall() finally: connection.close() # CRUD functions for Locations def create_location(name: str, address: str, city: str, state: str = None, country: str = None, latitude: float = None, longitude: float = None): connection = get_connection() try: with connection.cursor() as cursor: sql = """ INSERT INTO Locations (name, address, city, state, country, latitude, longitude) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cursor.execute(sql, (name, address, city, state, country, latitude, longitude)) connection.commit() return cursor.lastrowid finally: connection.close() def get_location_by_id(location_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Locations WHERE location_id = %s" cursor.execute(sql, (location_id,)) return cursor.fetchone() finally: connection.close() def update_location(location_id: int, name: str = None, address: str = None, city: str = None, state: str = None, country: str = None, latitude: float = None, longitude: float = None): connection = get_connection() try: with connection.cursor() as cursor: updates = [] params = [] if name is not None: updates.append("name = %s") params.append(name) if address is not None: updates.append("address = %s") params.append(address) if city is not None: updates.append("city = %s") params.append(city) if state is not None: updates.append("state = %s") params.append(state) if country is not None: updates.append("country = %s") params.append(country) if latitude is not None: updates.append("latitude = %s") params.append(latitude) if longitude is not None: updates.append("longitude = %s") params.append(longitude) if not updates: return 0 params.append(location_id) sql = "UPDATE Locations SET " + ", ".join(updates) + " WHERE location_id = %s" cursor.execute(sql, tuple(params)) connection.commit() return cursor.rowcount finally: connection.close() def delete_location(location_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM Locations WHERE location_id = %s" cursor.execute(sql, (location_id,)) connection.commit() return cursor.rowcount finally: connection.close() def get_all_locations(page: int, per_page: int): connection = get_connection() try: with connection.cursor() as cursor: offset = max((page - 1) * per_page, 0) sql = "SELECT * FROM Locations LIMIT %s OFFSET %s" cursor.execute(sql, (per_page, offset)) return cursor.fetchall() finally: connection.close() # CRUD functions for Events def create_event(host_id: int, location_id: int, title: str, start_time, end_time, description: str = None, category: str = None, max_participants: int = None, event_picture: str = None, is_recurring: bool = False, recurrence_type: str = None, recurrence_interval: int = None, recurrence_end_date = None, custom_recurrence_pattern: str = None): connection = get_connection() try: with connection.cursor() as cursor: sql = """ INSERT INTO Events (host_id, location_id, title, description, start_time, end_time, category, max_participants, event_picture, is_recurring, recurrence_type, recurrence_interval, recurrence_end_date, custom_recurrence_pattern) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(sql, (host_id, location_id, title, description, start_time, end_time, category, max_participants, event_picture, is_recurring, recurrence_type, recurrence_interval, recurrence_end_date, custom_recurrence_pattern)) connection.commit() return cursor.lastrowid finally: connection.close() def get_event_by_id(event_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Events WHERE event_id = %s" cursor.execute(sql, (event_id,)) return cursor.fetchone() finally: connection.close() def update_event(event_id: int, host_id: int = None, location_id: int = None, title: str = None, start_time = None, end_time = None, description: str = None, category: str = None, max_participants: int = None, event_picture: str = None, is_recurring: bool = None, recurrence_type: str = None, recurrence_interval: int = None, recurrence_end_date = None, custom_recurrence_pattern: str = None): connection = get_connection() try: with connection.cursor() as cursor: updates = [] params = [] if host_id is not None: updates.append("host_id = %s") params.append(host_id) if location_id is not None: updates.append("location_id = %s") params.append(location_id) if title is not None: updates.append("title = %s") params.append(title) if start_time is not None: updates.append("start_time = %s") params.append(start_time) if end_time is not None: updates.append("end_time = %s") params.append(end_time) if description is not None: updates.append("description = %s") params.append(description) if category is not None: updates.append("category = %s") params.append(category) if max_participants is not None: updates.append("max_participants = %s") params.append(max_participants) if event_picture is not None: updates.append("event_picture = %s") params.append(event_picture) if is_recurring is not None: updates.append("is_recurring = %s") params.append(is_recurring) if recurrence_type is not None: updates.append("recurrence_type = %s") params.append(recurrence_type) if recurrence_interval is not None: updates.append("recurrence_interval = %s") params.append(recurrence_interval) if recurrence_end_date is not None: updates.append("recurrence_end_date = %s") params.append(recurrence_end_date) if custom_recurrence_pattern is not None: updates.append("custom_recurrence_pattern = %s") params.append(custom_recurrence_pattern) if not updates: return 0 params.append(event_id) sql = "UPDATE Events SET " + ", ".join(updates) + " WHERE event_id = %s" cursor.execute(sql, tuple(params)) connection.commit() return cursor.rowcount finally: connection.close() def delete_event(event_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM Events WHERE event_id = %s" cursor.execute(sql, (event_id,)) connection.commit() return cursor.rowcount finally: connection.close() def get_all_events(page: int, per_page: int): connection = get_connection() try: with connection.cursor() as cursor: offset = max((page - 1) * per_page, 0) sql = "SELECT * FROM Events LIMIT %s OFFSET %s" cursor.execute(sql, (per_page, offset)) return cursor.fetchall() finally: connection.close() def get_events_by_host(host_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Events WHERE host_id = %s" cursor.execute(sql, (host_id,)) return cursor.fetchall() finally: connection.close() def get_all_event_categories(): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT DISTINCT category FROM Events" cursor.execute(sql) categories = [row['category'] for row in cursor.fetchall() if row['category'] is not None] return categories finally: connection.close() def get_events_by_category(category: str): connection = get_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM Events WHERE category = %s" cursor.execute(sql, (category,)) return cursor.fetchall() finally: connection.close() # Functions for EventParticipants junction table def add_event_participant(event_id: int, user_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "INSERT INTO EventParticipants (event_id, user_id) VALUES (%s, %s)" cursor.execute(sql, (event_id, user_id)) connection.commit() return cursor.rowcount finally: connection.close() def remove_event_participant(event_id: int, user_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM EventParticipants WHERE event_id = %s AND user_id = %s" cursor.execute(sql, (event_id, user_id)) connection.commit() return cursor.rowcount finally: connection.close() def get_participants_by_event(event_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = """ SELECT u.* FROM Users u JOIN EventParticipants ep ON u.user_id = ep.user_id WHERE ep.event_id = %s """ cursor.execute(sql, (event_id,)) return cursor.fetchall() finally: connection.close() def get_events_by_participant(user_id: int): connection = get_connection() try: with connection.cursor() as cursor: sql = """ SELECT e.* FROM Events e JOIN EventParticipants ep ON e.event_id = ep.event_id WHERE ep.user_id = %s """ cursor.execute(sql, (user_id,)) return cursor.fetchall() finally: connection.close()