|
|
import pymysql |
|
|
from pymysql.cursors import DictCursor |
|
|
from dotenv import load_dotenv |
|
|
import os |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
MYSQL_HOST = os.getenv('MYSQL_HOST') |
|
|
MYSQL_PORT = int(os.getenv('MYSQL_PORT')) |
|
|
MYSQL_USER = os.getenv('MYSQL_USER') |
|
|
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD') |
|
|
MYSQL_DB = os.getenv('MYSQL_DB') |
|
|
|
|
|
timeout = 10 |
|
|
|
|
|
def get_connection(): |
|
|
|
|
|
return pymysql.connect( |
|
|
host=MYSQL_HOST, |
|
|
port=MYSQL_PORT, |
|
|
user=MYSQL_USER, |
|
|
password=MYSQL_PASSWORD, |
|
|
db=MYSQL_DB, |
|
|
charset="utf8mb4", |
|
|
cursorclass=DictCursor, |
|
|
connect_timeout=timeout, |
|
|
read_timeout=timeout, |
|
|
write_timeout=timeout, |
|
|
init_command="SET time_zone='+00:00';" |
|
|
) |
|
|
|
|
|
def create_tables(): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS Users ( |
|
|
user_id INT AUTO_INCREMENT PRIMARY KEY, |
|
|
username VARCHAR(255) UNIQUE NOT NULL, |
|
|
email VARCHAR(255) UNIQUE NOT NULL, |
|
|
password_hash VARCHAR(255) NOT NULL, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, |
|
|
first_name VARCHAR(255), |
|
|
last_name VARCHAR(255), |
|
|
bio TEXT, |
|
|
profile_picture VARCHAR(255) |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS Roles ( |
|
|
role_id INT AUTO_INCREMENT PRIMARY KEY, |
|
|
role_name VARCHAR(255) UNIQUE NOT NULL, |
|
|
description TEXT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS Permissions ( |
|
|
permission_id INT AUTO_INCREMENT PRIMARY KEY, |
|
|
permission_name VARCHAR(255) UNIQUE NOT NULL, |
|
|
description TEXT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS RolePermissions ( |
|
|
role_id INT NOT NULL, |
|
|
permission_id INT NOT NULL, |
|
|
PRIMARY KEY (role_id, permission_id), |
|
|
FOREIGN KEY (role_id) REFERENCES Roles(role_id), |
|
|
FOREIGN KEY (permission_id) REFERENCES Permissions(permission_id) |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS UserRoles ( |
|
|
user_id INT NOT NULL, |
|
|
role_id INT NOT NULL, |
|
|
PRIMARY KEY (user_id, role_id), |
|
|
FOREIGN KEY (user_id) REFERENCES Users(user_id), |
|
|
FOREIGN KEY (role_id) REFERENCES Roles(role_id) |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS Locations ( |
|
|
location_id INT AUTO_INCREMENT PRIMARY KEY, |
|
|
name VARCHAR(255) NOT NULL, |
|
|
address VARCHAR(255) NOT NULL, |
|
|
city VARCHAR(255) NOT NULL, |
|
|
state VARCHAR(255), |
|
|
country VARCHAR(255), |
|
|
latitude DECIMAL(10, 8) NOT NULL, |
|
|
longitude DECIMAL(11, 8) NOT NULL, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS Events ( |
|
|
event_id INT AUTO_INCREMENT PRIMARY KEY, |
|
|
host_id INT NOT NULL, |
|
|
location_id INT NOT NULL, |
|
|
title VARCHAR(255) NOT NULL, |
|
|
description TEXT, |
|
|
start_time TIMESTAMP NOT NULL, |
|
|
end_time TIMESTAMP NOT NULL, |
|
|
category VARCHAR(255), |
|
|
max_participants INT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, |
|
|
event_picture VARCHAR(255), |
|
|
is_recurring BOOLEAN DEFAULT FALSE, |
|
|
recurrence_type ENUM('daily','weekly','monthly','yearly','custom'), |
|
|
recurrence_interval INT, |
|
|
recurrence_end_date DATE, |
|
|
custom_recurrence_pattern TEXT, |
|
|
FOREIGN KEY (host_id) REFERENCES Users(user_id), |
|
|
FOREIGN KEY (location_id) REFERENCES Locations(location_id) |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS EventParticipants ( |
|
|
event_id INT NOT NULL, |
|
|
user_id INT NOT NULL, |
|
|
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
PRIMARY KEY (event_id, user_id), |
|
|
FOREIGN KEY (event_id) REFERENCES Events(event_id), |
|
|
FOREIGN KEY (user_id) REFERENCES Users(user_id) |
|
|
) |
|
|
""") |
|
|
connection.commit() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def create_user(username: str, email: str, password_hash: str, first_name: str = None, last_name: str = None, bio: str = None, profile_picture: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
INSERT INTO Users (username, email, password_hash, first_name, last_name, bio, profile_picture) |
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s) |
|
|
""" |
|
|
cursor.execute(sql, (username, email, password_hash, first_name, last_name, bio, profile_picture)) |
|
|
connection.commit() |
|
|
return cursor.lastrowid |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_user_by_id(user_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Users WHERE user_id = %s" |
|
|
cursor.execute(sql, (user_id,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_user_by_username(username: str): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Users WHERE username = %s" |
|
|
cursor.execute(sql, (username,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def update_user(user_id: int, username: str = None, email: str = None, password_hash: str = None, first_name: str = None, last_name: str = None, bio: str = None, profile_picture: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
updates = [] |
|
|
params = [] |
|
|
if username is not None: |
|
|
updates.append("username = %s") |
|
|
params.append(username) |
|
|
if email is not None: |
|
|
updates.append("email = %s") |
|
|
params.append(email) |
|
|
if password_hash is not None: |
|
|
updates.append("password_hash = %s") |
|
|
params.append(password_hash) |
|
|
if first_name is not None: |
|
|
updates.append("first_name = %s") |
|
|
params.append(first_name) |
|
|
if last_name is not None: |
|
|
updates.append("last_name = %s") |
|
|
params.append(last_name) |
|
|
if bio is not None: |
|
|
updates.append("bio = %s") |
|
|
params.append(bio) |
|
|
if profile_picture is not None: |
|
|
updates.append("profile_picture = %s") |
|
|
params.append(profile_picture) |
|
|
if not updates: |
|
|
return 0 |
|
|
params.append(user_id) |
|
|
sql = "UPDATE Users SET " + ", ".join(updates) + " WHERE user_id = %s" |
|
|
cursor.execute(sql, tuple(params)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def delete_user(user_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM Users WHERE user_id = %s" |
|
|
cursor.execute(sql, (user_id,)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_all_users(page: int, per_page: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
offset = max((page - 1) * per_page, 0) |
|
|
sql = "SELECT * FROM Users LIMIT %s OFFSET %s" |
|
|
cursor.execute(sql, (per_page, offset)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def create_role(role_name: str, description: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "INSERT INTO Roles (role_name, description) VALUES (%s, %s)" |
|
|
cursor.execute(sql, (role_name, description)) |
|
|
connection.commit() |
|
|
return cursor.lastrowid |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_role_by_id(role_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Roles WHERE role_id = %s" |
|
|
cursor.execute(sql, (role_id,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_role_by_name(role_name: str): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Roles WHERE role_name = %s" |
|
|
cursor.execute(sql, (role_name,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def update_role(role_id: int, role_name: str = None, description: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
updates = [] |
|
|
params = [] |
|
|
if role_name is not None: |
|
|
updates.append("role_name = %s") |
|
|
params.append(role_name) |
|
|
if description is not None: |
|
|
updates.append("description = %s") |
|
|
params.append(description) |
|
|
if not updates: |
|
|
return 0 |
|
|
params.append(role_id) |
|
|
sql = "UPDATE Roles SET " + ", ".join(updates) + " WHERE role_id = %s" |
|
|
cursor.execute(sql, tuple(params)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def delete_role(role_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM Roles WHERE role_id = %s" |
|
|
cursor.execute(sql, (role_id,)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_all_roles(page: int, per_page: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
offset = max((page - 1) * per_page, 0) |
|
|
sql = "SELECT * FROM Roles LIMIT %s OFFSET %s" |
|
|
cursor.execute(sql, (per_page, offset)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def create_permission(permission_name: str, description: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "INSERT INTO Permissions (permission_name, description) VALUES (%s, %s)" |
|
|
cursor.execute(sql, (permission_name, description)) |
|
|
connection.commit() |
|
|
return cursor.lastrowid |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_permission_by_id(permission_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Permissions WHERE permission_id = %s" |
|
|
cursor.execute(sql, (permission_id,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_permission_by_name(permission_name: str): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Permissions WHERE permission_name = %s" |
|
|
cursor.execute(sql, (permission_name,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def update_permission(permission_id: int, permission_name: str = None, description: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
updates = [] |
|
|
params = [] |
|
|
if permission_name is not None: |
|
|
updates.append("permission_name = %s") |
|
|
params.append(permission_name) |
|
|
if description is not None: |
|
|
updates.append("description = %s") |
|
|
params.append(description) |
|
|
if not updates: |
|
|
return 0 |
|
|
params.append(permission_id) |
|
|
sql = "UPDATE Permissions SET " + ", ".join(updates) + " WHERE permission_id = %s" |
|
|
cursor.execute(sql, tuple(params)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def delete_permission(permission_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM Permissions WHERE permission_id = %s" |
|
|
cursor.execute(sql, (permission_id,)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_all_permissions(page: int, per_page: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
offset = max((page - 1) * per_page, 0) |
|
|
sql = "SELECT * FROM Permissions LIMIT %s OFFSET %s" |
|
|
cursor.execute(sql, (per_page, offset)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def add_role_permission(role_id: int, permission_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "INSERT INTO RolePermissions (role_id, permission_id) VALUES (%s, %s)" |
|
|
cursor.execute(sql, (role_id, permission_id)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def remove_role_permission(role_id: int, permission_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM RolePermissions WHERE role_id = %s AND permission_id = %s" |
|
|
cursor.execute(sql, (role_id, permission_id)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_permissions_by_role(role_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
SELECT p.* FROM Permissions p |
|
|
JOIN RolePermissions rp ON p.permission_id = rp.permission_id |
|
|
WHERE rp.role_id = %s |
|
|
""" |
|
|
cursor.execute(sql, (role_id,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_roles_by_permission(permission_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
SELECT r.* FROM Roles r |
|
|
JOIN RolePermissions rp ON r.role_id = rp.role_id |
|
|
WHERE rp.permission_id = %s |
|
|
""" |
|
|
cursor.execute(sql, (permission_id,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def add_user_role(user_id: int, role_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "INSERT INTO UserRoles (user_id, role_id) VALUES (%s, %s)" |
|
|
cursor.execute(sql, (user_id, role_id)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def remove_user_role(user_id: int, role_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM UserRoles WHERE user_id = %s AND role_id = %s" |
|
|
cursor.execute(sql, (user_id, role_id)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_roles_by_user(user_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
SELECT r.* FROM Roles r |
|
|
JOIN UserRoles ur ON r.role_id = ur.role_id |
|
|
WHERE ur.user_id = %s |
|
|
""" |
|
|
cursor.execute(sql, (user_id,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_users_by_role(role_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
SELECT u.* FROM Users u |
|
|
JOIN UserRoles ur ON u.user_id = ur.user_id |
|
|
WHERE ur.role_id = %s |
|
|
""" |
|
|
cursor.execute(sql, (role_id,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def create_location(name: str, address: str, city: str, state: str = None, country: str = None, latitude: float = None, longitude: float = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
INSERT INTO Locations (name, address, city, state, country, latitude, longitude) |
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s) |
|
|
""" |
|
|
cursor.execute(sql, (name, address, city, state, country, latitude, longitude)) |
|
|
connection.commit() |
|
|
return cursor.lastrowid |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_location_by_id(location_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Locations WHERE location_id = %s" |
|
|
cursor.execute(sql, (location_id,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def update_location(location_id: int, name: str = None, address: str = None, city: str = None, state: str = None, country: str = None, latitude: float = None, longitude: float = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
updates = [] |
|
|
params = [] |
|
|
if name is not None: |
|
|
updates.append("name = %s") |
|
|
params.append(name) |
|
|
if address is not None: |
|
|
updates.append("address = %s") |
|
|
params.append(address) |
|
|
if city is not None: |
|
|
updates.append("city = %s") |
|
|
params.append(city) |
|
|
if state is not None: |
|
|
updates.append("state = %s") |
|
|
params.append(state) |
|
|
if country is not None: |
|
|
updates.append("country = %s") |
|
|
params.append(country) |
|
|
if latitude is not None: |
|
|
updates.append("latitude = %s") |
|
|
params.append(latitude) |
|
|
if longitude is not None: |
|
|
updates.append("longitude = %s") |
|
|
params.append(longitude) |
|
|
if not updates: |
|
|
return 0 |
|
|
params.append(location_id) |
|
|
sql = "UPDATE Locations SET " + ", ".join(updates) + " WHERE location_id = %s" |
|
|
cursor.execute(sql, tuple(params)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def delete_location(location_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM Locations WHERE location_id = %s" |
|
|
cursor.execute(sql, (location_id,)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_all_locations(page: int, per_page: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
offset = max((page - 1) * per_page, 0) |
|
|
sql = "SELECT * FROM Locations LIMIT %s OFFSET %s" |
|
|
cursor.execute(sql, (per_page, offset)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def create_event(host_id: int, location_id: int, title: str, start_time, end_time, description: str = None, category: str = None, max_participants: int = None, event_picture: str = None, |
|
|
is_recurring: bool = False, recurrence_type: str = None, recurrence_interval: int = None, recurrence_end_date = None, custom_recurrence_pattern: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
INSERT INTO Events (host_id, location_id, title, description, start_time, end_time, category, max_participants, event_picture, |
|
|
is_recurring, recurrence_type, recurrence_interval, recurrence_end_date, custom_recurrence_pattern) |
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
|
|
""" |
|
|
cursor.execute(sql, (host_id, location_id, title, description, start_time, end_time, category, max_participants, event_picture, |
|
|
is_recurring, recurrence_type, recurrence_interval, recurrence_end_date, custom_recurrence_pattern)) |
|
|
connection.commit() |
|
|
return cursor.lastrowid |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_event_by_id(event_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Events WHERE event_id = %s" |
|
|
cursor.execute(sql, (event_id,)) |
|
|
return cursor.fetchone() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def update_event(event_id: int, host_id: int = None, location_id: int = None, title: str = None, start_time = None, end_time = None, description: str = None, |
|
|
category: str = None, max_participants: int = None, event_picture: str = None, is_recurring: bool = None, recurrence_type: str = None, |
|
|
recurrence_interval: int = None, recurrence_end_date = None, custom_recurrence_pattern: str = None): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
updates = [] |
|
|
params = [] |
|
|
if host_id is not None: |
|
|
updates.append("host_id = %s") |
|
|
params.append(host_id) |
|
|
if location_id is not None: |
|
|
updates.append("location_id = %s") |
|
|
params.append(location_id) |
|
|
if title is not None: |
|
|
updates.append("title = %s") |
|
|
params.append(title) |
|
|
if start_time is not None: |
|
|
updates.append("start_time = %s") |
|
|
params.append(start_time) |
|
|
if end_time is not None: |
|
|
updates.append("end_time = %s") |
|
|
params.append(end_time) |
|
|
if description is not None: |
|
|
updates.append("description = %s") |
|
|
params.append(description) |
|
|
if category is not None: |
|
|
updates.append("category = %s") |
|
|
params.append(category) |
|
|
if max_participants is not None: |
|
|
updates.append("max_participants = %s") |
|
|
params.append(max_participants) |
|
|
if event_picture is not None: |
|
|
updates.append("event_picture = %s") |
|
|
params.append(event_picture) |
|
|
if is_recurring is not None: |
|
|
updates.append("is_recurring = %s") |
|
|
params.append(is_recurring) |
|
|
if recurrence_type is not None: |
|
|
updates.append("recurrence_type = %s") |
|
|
params.append(recurrence_type) |
|
|
if recurrence_interval is not None: |
|
|
updates.append("recurrence_interval = %s") |
|
|
params.append(recurrence_interval) |
|
|
if recurrence_end_date is not None: |
|
|
updates.append("recurrence_end_date = %s") |
|
|
params.append(recurrence_end_date) |
|
|
if custom_recurrence_pattern is not None: |
|
|
updates.append("custom_recurrence_pattern = %s") |
|
|
params.append(custom_recurrence_pattern) |
|
|
if not updates: |
|
|
return 0 |
|
|
params.append(event_id) |
|
|
sql = "UPDATE Events SET " + ", ".join(updates) + " WHERE event_id = %s" |
|
|
cursor.execute(sql, tuple(params)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def delete_event(event_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM Events WHERE event_id = %s" |
|
|
cursor.execute(sql, (event_id,)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_all_events(page: int, per_page: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
offset = max((page - 1) * per_page, 0) |
|
|
sql = "SELECT * FROM Events LIMIT %s OFFSET %s" |
|
|
cursor.execute(sql, (per_page, offset)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_events_by_host(host_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Events WHERE host_id = %s" |
|
|
cursor.execute(sql, (host_id,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_all_event_categories(): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT DISTINCT category FROM Events" |
|
|
cursor.execute(sql) |
|
|
categories = [row['category'] for row in cursor.fetchall() if row['category'] is not None] |
|
|
return categories |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_events_by_category(category: str): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "SELECT * FROM Events WHERE category = %s" |
|
|
cursor.execute(sql, (category,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
|
|
|
def add_event_participant(event_id: int, user_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "INSERT INTO EventParticipants (event_id, user_id) VALUES (%s, %s)" |
|
|
cursor.execute(sql, (event_id, user_id)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def remove_event_participant(event_id: int, user_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = "DELETE FROM EventParticipants WHERE event_id = %s AND user_id = %s" |
|
|
cursor.execute(sql, (event_id, user_id)) |
|
|
connection.commit() |
|
|
return cursor.rowcount |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_participants_by_event(event_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
SELECT u.* FROM Users u |
|
|
JOIN EventParticipants ep ON u.user_id = ep.user_id |
|
|
WHERE ep.event_id = %s |
|
|
""" |
|
|
cursor.execute(sql, (event_id,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|
|
|
def get_events_by_participant(user_id: int): |
|
|
connection = get_connection() |
|
|
try: |
|
|
with connection.cursor() as cursor: |
|
|
sql = """ |
|
|
SELECT e.* FROM Events e |
|
|
JOIN EventParticipants ep ON e.event_id = ep.event_id |
|
|
WHERE ep.user_id = %s |
|
|
""" |
|
|
cursor.execute(sql, (user_id,)) |
|
|
return cursor.fetchall() |
|
|
finally: |
|
|
connection.close() |
|
|
|