|
|
import psycopg2 |
|
|
from psycopg2 import sql |
|
|
|
|
|
import uuid |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
db_params = { |
|
|
'dbname': 'API_DB', |
|
|
'user': 'postgres', |
|
|
'password': '4b95dfe8-4644-46ce-a4fe-648d6d4860a4', |
|
|
'host': 'localhost', |
|
|
'port': '5432' |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def insert_user(db_params ,uid: uuid.UUID, paid: bool, first_name: str, last_name: str, email: str, password: str): |
|
|
|
|
|
if uid is None or paid is None: |
|
|
return False |
|
|
|
|
|
insert_query = """ |
|
|
INSERT INTO users (id, paid, first_name, last_name, email, dor, la, password) |
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s); |
|
|
""" |
|
|
|
|
|
try: |
|
|
current_time = datetime.now() |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
cur.execute(insert_query, (str(uid), paid, first_name, last_name, email, current_time, current_time, password)) |
|
|
conn.commit() |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Error: {e}") |
|
|
return False |
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def user_exists(db_params, user_id: uuid.UUID) -> bool: |
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
query = "SELECT EXISTS (SELECT 1 FROM users WHERE id = %s);" |
|
|
cur.execute(query, (str(user_id),)) |
|
|
result = cur.fetchone() |
|
|
|
|
|
if result: |
|
|
exists = result[0] |
|
|
return exists |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error checking if user exists: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def has_user_paid(db_params, user_id: uuid.UUID) -> bool: |
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
query = "SELECT paid FROM users WHERE id = %s;" |
|
|
cur.execute(query, (str(user_id),)) |
|
|
result = cur.fetchone() |
|
|
|
|
|
if result: |
|
|
paid_status = result[0] |
|
|
return paid_status |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error checking if user has paid: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def get_user_id_by_email(db_params, email: str) -> uuid.UUID: |
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
query = "SELECT id FROM users WHERE email = %s;" |
|
|
cur.execute(query, (email,)) |
|
|
result = cur.fetchone() |
|
|
|
|
|
if result: |
|
|
user_id = result[0] |
|
|
return uuid.UUID(user_id) |
|
|
else: |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error retrieving user ID by email: {e}") |
|
|
return None |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def user_change_password(db_params, email: str, old_password: str, new_password: str) -> bool: |
|
|
try: |
|
|
user_id = get_user_id_by_email(db_params, email) |
|
|
|
|
|
if user_id is None: |
|
|
return False |
|
|
|
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
query = "SELECT password FROM users WHERE id = %s;" |
|
|
cur.execute(query, (str(user_id),)) |
|
|
result = cur.fetchone() |
|
|
|
|
|
if result: |
|
|
current_password = result[0] |
|
|
if current_password == old_password: |
|
|
update_query = "UPDATE users SET password = %s WHERE id = %s;" |
|
|
cur.execute(update_query, (new_password, str(user_id))) |
|
|
conn.commit() |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error changing password: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def update_user_paid_state(db_params, email: str = None, user_id: uuid.UUID = None, paid_state: bool = False) -> bool: |
|
|
|
|
|
if email is None and user_id is None: |
|
|
print("Error: Either email or user ID must be provided.") |
|
|
return False |
|
|
|
|
|
if email is not None: |
|
|
user_id = get_user_id_by_email(db_params, email) |
|
|
if user_id is None: |
|
|
return False |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
update_query = "UPDATE users SET paid = %s WHERE id = %s;" |
|
|
cur.execute(update_query, (paid_state, str(user_id))) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating paid state: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def delete_user_by_id(db_params, user_id: uuid.UUID) -> bool: |
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
delete_query = "DELETE FROM users WHERE id = %s;" |
|
|
cur.execute(delete_query, (str(user_id),)) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error deleting user by ID: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def update_last_activity(db_params, email: str = None, user_id: uuid.UUID = None) -> bool: |
|
|
|
|
|
if email is None and user_id is None: |
|
|
print("Error: Either email or user ID must be provided.") |
|
|
return False |
|
|
|
|
|
if email is not None: |
|
|
user_id = get_user_id_by_email(db_params, email) |
|
|
if user_id is None: |
|
|
return False |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
current_time = datetime.now() |
|
|
update_query = "UPDATE users SET la = %s WHERE id = %s;" |
|
|
cur.execute(update_query, (current_time, str(user_id))) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating last activity: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
|
|
|
def insert_request(db_params, request_id: uuid.UUID, video_id: uuid.UUID, user_id: uuid.UUID, image_base64: str, public: bool, report: bool, |
|
|
request_service: str, registration_token: str) -> bool: |
|
|
if not user_exists(db_params, user_id): |
|
|
print("Error: User does not exist.") |
|
|
return False |
|
|
|
|
|
paid = has_user_paid(db_params, user_id) |
|
|
|
|
|
insert_query = """ |
|
|
INSERT INTO request (id, video_id, user_id, image_base64, status, tor, toc, public, paid, report, request_service, registration_token) |
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); |
|
|
""" |
|
|
|
|
|
conn = None |
|
|
cur = None |
|
|
try: |
|
|
current_time = datetime.now() |
|
|
|
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
cur.execute(insert_query, ( |
|
|
str(request_id), |
|
|
str(video_id), |
|
|
str(user_id), |
|
|
image_base64, |
|
|
'Requested', |
|
|
current_time, |
|
|
None, |
|
|
public, |
|
|
paid, |
|
|
report, |
|
|
request_service, |
|
|
registration_token |
|
|
)) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error inserting request: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def get_request_data(db_params, request_id: uuid.UUID): |
|
|
select_query = """ |
|
|
SELECT image_base64, video_id, request_service, registration_token |
|
|
FROM request |
|
|
WHERE id = %s; |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(select_query, (str(request_id),)) |
|
|
|
|
|
result = cur.fetchone() |
|
|
|
|
|
if result is None: |
|
|
return None |
|
|
|
|
|
image_base64 = result[0] |
|
|
video_id = result[1] |
|
|
request_service = result[2] |
|
|
registration_token = result[3] |
|
|
|
|
|
return [image_base64, video_id, request_service, registration_token] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error retrieving image and video: {e}") |
|
|
return None |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def update_request_status(db_params, request_id: uuid.UUID, new_status: str) -> bool: |
|
|
update_query = """ |
|
|
UPDATE request |
|
|
SET status = %s, |
|
|
dor = %s |
|
|
WHERE id = %s; |
|
|
""" |
|
|
|
|
|
conn = None |
|
|
cur = None |
|
|
try: |
|
|
current_time = datetime.now() |
|
|
|
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
cur.execute(update_query, (new_status, current_time, str(request_id))) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating request status: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def check_and_update_request(db_params, request_service: str, video_id: uuid.UUID, image_base64: str, registration_token: str): |
|
|
five_minutes_ago = datetime.now() - timedelta(minutes=5) |
|
|
|
|
|
check_query = """ |
|
|
SELECT id, dor |
|
|
FROM request |
|
|
WHERE video_id = %s |
|
|
AND image_base64 = %s |
|
|
AND created_at >= %s |
|
|
AND status = 'Requested' |
|
|
AND request_service = %s |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(check_query, (str(video_id), image_base64, five_minutes_ago, request_service)) |
|
|
existing_request = cur.fetchone() |
|
|
|
|
|
if existing_request: |
|
|
existing_request_id, dor, existing_token, existing_ls = existing_request |
|
|
|
|
|
update_query = """ |
|
|
UPDATE request |
|
|
SET dor = %s, |
|
|
registration_token = %s, |
|
|
ls = %s |
|
|
WHERE id = %s |
|
|
""" |
|
|
cur.execute(update_query, (datetime.now(), registration_token, datetime.now(), str(existing_request_id))) |
|
|
conn.commit() |
|
|
|
|
|
return existing_request_id |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error checking or updating request: {e}") |
|
|
return None |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def update_reported_status(db_params, request_id: uuid.UUID) -> bool: |
|
|
update_query = """ |
|
|
UPDATE request |
|
|
SET report = TRUE |
|
|
WHERE id = %s; |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(update_query, (str(request_id),)) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating reported status: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def request_id_exists(db_params, request_id: uuid.UUID) -> bool: |
|
|
check_query = """ |
|
|
SELECT EXISTS ( |
|
|
SELECT 1 |
|
|
FROM request |
|
|
WHERE id = %s |
|
|
); |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(check_query, (str(request_id),)) |
|
|
exists = cur.fetchone()[0] |
|
|
return exists |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error checking if request ID exists: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def delete_request(db_params, request_id: uuid.UUID) -> bool: |
|
|
delete_query = """ |
|
|
DELETE FROM request |
|
|
WHERE id = %s; |
|
|
""" |
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
cur.execute(delete_query, (str(request_id),)) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error deleting request: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def update_request_status(db_params, request_id: uuid.UUID, new_status: str) -> bool: |
|
|
update_query = """ |
|
|
UPDATE request |
|
|
SET status = %s |
|
|
WHERE id = %s; |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
cur.execute(update_query, (new_status, str(request_id))) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating request status: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def get_oldest_requests(db_params, limit: int): |
|
|
select_query = """ |
|
|
SELECT id, video_id, image_base64, tor, paid |
|
|
FROM request |
|
|
WHERE status = 'Requested'-- AND paid = True |
|
|
ORDER BY CASE WHEN paid THEN 0 ELSE 1 END, tor ASC |
|
|
LIMIT %s; |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
cur.execute(select_query, (limit,)) |
|
|
rows = cur.fetchall() |
|
|
|
|
|
if rows: |
|
|
return rows |
|
|
else: |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error retrieving oldest requests: {e}") |
|
|
return None |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def request_remove_image(db_params, request_id: uuid.UUID) -> bool: |
|
|
check_query = """ |
|
|
SELECT image_base64 |
|
|
FROM request |
|
|
WHERE id = %s; |
|
|
""" |
|
|
|
|
|
update_query = """ |
|
|
UPDATE request |
|
|
SET image_base64 = 'image_placeholder' |
|
|
WHERE id = %s AND image_base64 IS NOT NULL AND image_base64 <> ''; |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(check_query, (str(request_id),)) |
|
|
result = cur.fetchone() |
|
|
|
|
|
if result is None or result[0] in (None, ''): |
|
|
return False |
|
|
|
|
|
cur.execute(update_query, (str(request_id),)) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating image_base64: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def insert_ban(db_params, ip: str, reason: str) -> bool: |
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute('''INSERT INTO ban_list (ip, reason) VALUES (%s, %s)''', (ip, reason)) |
|
|
conn.commit() |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Error inserting ban: {e}") |
|
|
return False |
|
|
finally: |
|
|
if cursor: |
|
|
cursor.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def search_ban(db_params, ip: str) -> bool: |
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute('''SELECT * FROM ban_list WHERE ip = %s''', (ip,)) |
|
|
rows = cursor.fetchall() |
|
|
return len(rows) > 0 |
|
|
except Exception as e: |
|
|
print(f"Error searching ban list: {e}") |
|
|
return False |
|
|
finally: |
|
|
if cursor: |
|
|
cursor.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
|
|
|
|
|
|
def get_videos_data(db_params): |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
try: |
|
|
cursor.execute(''' |
|
|
SELECT id, title, backend_video_url, backend_thumbnail_url |
|
|
FROM videos |
|
|
ORDER BY created_at DESC |
|
|
LIMIT 4 |
|
|
''') |
|
|
latest_videos = cursor.fetchall() |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT id, title, backend_video_url, backend_thumbnail_url |
|
|
FROM videos |
|
|
ORDER BY likes DESC |
|
|
LIMIT 4 |
|
|
''') |
|
|
trending_videos = cursor.fetchall() |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT id, title, backend_video_url, backend_thumbnail_url |
|
|
FROM videos |
|
|
ORDER BY hits DESC |
|
|
LIMIT 4 |
|
|
''') |
|
|
hot_videos = cursor.fetchall() |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT DISTINCT category FROM videos |
|
|
''') |
|
|
categories = cursor.fetchall() |
|
|
|
|
|
def format_video(video): |
|
|
return { |
|
|
"id": video[0], |
|
|
"url": video[2], |
|
|
"title": video[1], |
|
|
"info": "", |
|
|
"thumbnail": video[3] |
|
|
} |
|
|
|
|
|
response = [] |
|
|
|
|
|
latest_category = { |
|
|
"id": "latest", |
|
|
"title": "Latest", |
|
|
"urls": [format_video(video) for video in latest_videos] |
|
|
} |
|
|
|
|
|
trending_category = { |
|
|
"id": "trending", |
|
|
"title": "Trending", |
|
|
"urls": [format_video(video) for video in trending_videos] |
|
|
} |
|
|
|
|
|
hot_category = { |
|
|
"id": "hot", |
|
|
"title": "Hot", |
|
|
"urls": [format_video(video) for video in hot_videos] |
|
|
} |
|
|
|
|
|
response.extend([trending_category, latest_category, hot_category]) |
|
|
|
|
|
for category in categories: |
|
|
category_name = category[0] |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT id, title, backend_video_url, backend_thumbnail_url |
|
|
FROM videos |
|
|
WHERE category = %s |
|
|
''', (category_name,)) |
|
|
videos_in_category = cursor.fetchall() |
|
|
|
|
|
category_data = { |
|
|
"id": category_name, |
|
|
"title": category_name, |
|
|
"urls": [format_video(video) for video in videos_in_category] |
|
|
} |
|
|
|
|
|
response.append(category_data) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"An error occurred: {e}") |
|
|
|
|
|
finally: |
|
|
if cursor is not None: |
|
|
cursor.close() |
|
|
if conn is not None: |
|
|
conn.close() |
|
|
|
|
|
return response |
|
|
|
|
|
def video_id_exists(db_params, video_id: uuid.UUID) -> bool: |
|
|
check_query = """ |
|
|
SELECT EXISTS ( |
|
|
SELECT 1 |
|
|
FROM videos |
|
|
WHERE id = %s |
|
|
); |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(check_query, (str(video_id),)) |
|
|
exists = cur.fetchone()[0] |
|
|
|
|
|
return exists |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error checking if video ID exists: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def increment_likes(db_params, video_id: uuid.UUID) -> bool: |
|
|
increment_query = """ |
|
|
UPDATE videos |
|
|
SET likes = likes + 1 |
|
|
WHERE id = %s; |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(increment_query, (str(video_id),)) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error incrementing likes: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def increment_dislikes(db_params, video_id: uuid.UUID) -> bool: |
|
|
increment_query = """ |
|
|
UPDATE videos |
|
|
SET dislikes = dislikes + 1 |
|
|
WHERE id = %s; |
|
|
""" |
|
|
|
|
|
try: |
|
|
conn = psycopg2.connect(**db_params) |
|
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(increment_query, (str(video_id),)) |
|
|
conn.commit() |
|
|
|
|
|
if cur.rowcount > 0: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error incrementing dislikes: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
if cur: |
|
|
cur.close() |
|
|
if conn: |
|
|
conn.close() |
|
|
|