import psycopg2 from psycopg2 import sql import uuid from datetime import datetime, timedelta db_params = { 'dbname': 'API_DB', 'user': 'postgres', 'password': '4b95dfe8-4644-46ce-a4fe-648d6d4860a4', 'host': 'localhost', 'port': '5432' } # conn = psycopg2.connect(**db_params) # conn.autocommit = True # cur = conn.cursor() #user db def insert_user(db_params ,uid: uuid.UUID, paid: bool, first_name: str, last_name: str, email: str, password: str): if uid is None or paid is None: return False insert_query = """ INSERT INTO users (id, paid, first_name, last_name, email, dor, la, password) VALUES (%s, %s, %s, %s, %s, %s, %s, %s); """ try: current_time = datetime.now() conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(insert_query, (str(uid), paid, first_name, last_name, email, current_time, current_time, password)) conn.commit() return True except Exception as e: print(f"Error: {e}") return False finally: if cur: cur.close() if conn: conn.close() def user_exists(db_params, user_id: uuid.UUID) -> bool: try: conn = psycopg2.connect(**db_params) cur = conn.cursor() query = "SELECT EXISTS (SELECT 1 FROM users WHERE id = %s);" cur.execute(query, (str(user_id),)) result = cur.fetchone() if result: exists = result[0] return exists else: return False except Exception as e: print(f"Error checking if user exists: {e}") return False finally: if cur: cur.close() if conn: conn.close() def has_user_paid(db_params, user_id: uuid.UUID) -> bool: try: conn = psycopg2.connect(**db_params) cur = conn.cursor() query = "SELECT paid FROM users WHERE id = %s;" cur.execute(query, (str(user_id),)) result = cur.fetchone() if result: paid_status = result[0] return paid_status else: return False except Exception as e: print(f"Error checking if user has paid: {e}") return False finally: if cur: cur.close() if conn: conn.close() def get_user_id_by_email(db_params, email: str) -> uuid.UUID: try: conn = psycopg2.connect(**db_params) cur = conn.cursor() query = "SELECT id FROM users WHERE email = %s;" cur.execute(query, (email,)) result = cur.fetchone() if result: user_id = result[0] return uuid.UUID(user_id) else: return None except Exception as e: print(f"Error retrieving user ID by email: {e}") return None finally: if cur: cur.close() if conn: conn.close() def user_change_password(db_params, email: str, old_password: str, new_password: str) -> bool: try: user_id = get_user_id_by_email(db_params, email) if user_id is None: return False conn = psycopg2.connect(**db_params) cur = conn.cursor() query = "SELECT password FROM users WHERE id = %s;" cur.execute(query, (str(user_id),)) result = cur.fetchone() if result: current_password = result[0] if current_password == old_password: update_query = "UPDATE users SET password = %s WHERE id = %s;" cur.execute(update_query, (new_password, str(user_id))) conn.commit() return True else: return False else: return False except Exception as e: print(f"Error changing password: {e}") return False finally: if cur: cur.close() if conn: conn.close() def update_user_paid_state(db_params, email: str = None, user_id: uuid.UUID = None, paid_state: bool = False) -> bool: if email is None and user_id is None: print("Error: Either email or user ID must be provided.") return False if email is not None: user_id = get_user_id_by_email(db_params, email) if user_id is None: return False try: conn = psycopg2.connect(**db_params) cur = conn.cursor() update_query = "UPDATE users SET paid = %s WHERE id = %s;" cur.execute(update_query, (paid_state, str(user_id))) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error updating paid state: {e}") return False finally: if cur: cur.close() if conn: conn.close() def delete_user_by_id(db_params, user_id: uuid.UUID) -> bool: try: conn = psycopg2.connect(**db_params) cur = conn.cursor() delete_query = "DELETE FROM users WHERE id = %s;" cur.execute(delete_query, (str(user_id),)) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error deleting user by ID: {e}") return False finally: if cur: cur.close() if conn: conn.close() def update_last_activity(db_params, email: str = None, user_id: uuid.UUID = None) -> bool: if email is None and user_id is None: print("Error: Either email or user ID must be provided.") return False if email is not None: user_id = get_user_id_by_email(db_params, email) if user_id is None: return False try: conn = psycopg2.connect(**db_params) cur = conn.cursor() current_time = datetime.now() update_query = "UPDATE users SET la = %s WHERE id = %s;" cur.execute(update_query, (current_time, str(user_id))) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error updating last activity: {e}") return False finally: if cur: cur.close() if conn: conn.close() #request table def insert_request(db_params, request_id: uuid.UUID, video_id: uuid.UUID, user_id: uuid.UUID, image_base64: str, public: bool, report: bool, request_service: str, registration_token: str) -> bool: if not user_exists(db_params, user_id): print("Error: User does not exist.") return False paid = has_user_paid(db_params, user_id) insert_query = """ INSERT INTO request (id, video_id, user_id, image_base64, status, tor, toc, public, paid, report, request_service, registration_token) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ conn = None cur = None try: current_time = datetime.now() conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(insert_query, ( str(request_id), str(video_id), str(user_id), image_base64, 'Requested', current_time, None, public, paid, report, request_service, registration_token )) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error inserting request: {e}") return False finally: if cur: cur.close() if conn: conn.close() def get_request_data(db_params, request_id: uuid.UUID): select_query = """ SELECT image_base64, video_id, request_service, registration_token FROM request WHERE id = %s; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(select_query, (str(request_id),)) result = cur.fetchone() if result is None: return None image_base64 = result[0] video_id = result[1] request_service = result[2] registration_token = result[3] return [image_base64, video_id, request_service, registration_token] except Exception as e: print(f"Error retrieving image and video: {e}") return None finally: if cur: cur.close() if conn: conn.close() def update_request_status(db_params, request_id: uuid.UUID, new_status: str) -> bool: update_query = """ UPDATE request SET status = %s, dor = %s WHERE id = %s; """ conn = None cur = None try: current_time = datetime.now() conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(update_query, (new_status, current_time, str(request_id))) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error updating request status: {e}") return False finally: if cur: cur.close() if conn: conn.close() def check_and_update_request(db_params, request_service: str, video_id: uuid.UUID, image_base64: str, registration_token: str): five_minutes_ago = datetime.now() - timedelta(minutes=5) check_query = """ SELECT id, dor FROM request WHERE video_id = %s AND image_base64 = %s AND created_at >= %s AND status = 'Requested' AND request_service = %s """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(check_query, (str(video_id), image_base64, five_minutes_ago, request_service)) existing_request = cur.fetchone() if existing_request: existing_request_id, dor, existing_token, existing_ls = existing_request update_query = """ UPDATE request SET dor = %s, registration_token = %s, ls = %s WHERE id = %s """ cur.execute(update_query, (datetime.now(), registration_token, datetime.now(), str(existing_request_id))) conn.commit() return existing_request_id return None except Exception as e: print(f"Error checking or updating request: {e}") return None finally: if cur: cur.close() if conn: conn.close() def update_reported_status(db_params, request_id: uuid.UUID) -> bool: update_query = """ UPDATE request SET report = TRUE WHERE id = %s; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(update_query, (str(request_id),)) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error updating reported status: {e}") return False finally: if cur: cur.close() if conn: conn.close() def request_id_exists(db_params, request_id: uuid.UUID) -> bool: check_query = """ SELECT EXISTS ( SELECT 1 FROM request WHERE id = %s ); """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(check_query, (str(request_id),)) exists = cur.fetchone()[0] return exists except Exception as e: print(f"Error checking if request ID exists: {e}") return False finally: if cur: cur.close() if conn: conn.close() def delete_request(db_params, request_id: uuid.UUID) -> bool: delete_query = """ DELETE FROM request WHERE id = %s; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(delete_query, (str(request_id),)) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error deleting request: {e}") return False finally: if cur: cur.close() if conn: conn.close() def update_request_status(db_params, request_id: uuid.UUID, new_status: str) -> bool: update_query = """ UPDATE request SET status = %s WHERE id = %s; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(update_query, (new_status, str(request_id))) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error updating request status: {e}") return False finally: if cur: cur.close() if conn: conn.close() def get_oldest_requests(db_params, limit: int): select_query = """ SELECT id, video_id, image_base64, tor, paid FROM request WHERE status = 'Requested'-- AND paid = True ORDER BY CASE WHEN paid THEN 0 ELSE 1 END, tor ASC LIMIT %s; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(select_query, (limit,)) rows = cur.fetchall() if rows: return rows else: return None except Exception as e: print(f"Error retrieving oldest requests: {e}") return None finally: if cur: cur.close() if conn: conn.close() def request_remove_image(db_params, request_id: uuid.UUID) -> bool: check_query = """ SELECT image_base64 FROM request WHERE id = %s; """ update_query = """ UPDATE request SET image_base64 = 'image_placeholder' WHERE id = %s AND image_base64 IS NOT NULL AND image_base64 <> ''; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(check_query, (str(request_id),)) result = cur.fetchone() if result is None or result[0] in (None, ''): return False cur.execute(update_query, (str(request_id),)) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error updating image_base64: {e}") return False finally: if cur: cur.close() if conn: conn.close() # ban list def insert_ban(db_params, ip: str, reason: str) -> bool: try: conn = psycopg2.connect(**db_params) cursor = conn.cursor() cursor.execute('''INSERT INTO ban_list (ip, reason) VALUES (%s, %s)''', (ip, reason)) conn.commit() return True except Exception as e: print(f"Error inserting ban: {e}") return False finally: if cursor: cursor.close() if conn: conn.close() def search_ban(db_params, ip: str) -> bool: try: conn = psycopg2.connect(**db_params) cursor = conn.cursor() cursor.execute('''SELECT * FROM ban_list WHERE ip = %s''', (ip,)) rows = cursor.fetchall() return len(rows) > 0 except Exception as e: print(f"Error searching ban list: {e}") return False finally: if cursor: cursor.close() if conn: conn.close() #content def get_videos_data(db_params): conn = psycopg2.connect(**db_params) cursor = conn.cursor() try: cursor.execute(''' SELECT id, title, backend_video_url, backend_thumbnail_url FROM videos ORDER BY created_at DESC LIMIT 4 ''') latest_videos = cursor.fetchall() cursor.execute(''' SELECT id, title, backend_video_url, backend_thumbnail_url FROM videos ORDER BY likes DESC LIMIT 4 ''') trending_videos = cursor.fetchall() cursor.execute(''' SELECT id, title, backend_video_url, backend_thumbnail_url FROM videos ORDER BY hits DESC LIMIT 4 ''') hot_videos = cursor.fetchall() cursor.execute(''' SELECT DISTINCT category FROM videos ''') categories = cursor.fetchall() def format_video(video): return { "id": video[0], "url": video[2], "title": video[1], "info": "", "thumbnail": video[3] } response = [] latest_category = { "id": "latest", "title": "Latest", "urls": [format_video(video) for video in latest_videos] } trending_category = { "id": "trending", "title": "Trending", "urls": [format_video(video) for video in trending_videos] } hot_category = { "id": "hot", "title": "Hot", "urls": [format_video(video) for video in hot_videos] } response.extend([trending_category, latest_category, hot_category]) for category in categories: category_name = category[0] cursor.execute(''' SELECT id, title, backend_video_url, backend_thumbnail_url FROM videos WHERE category = %s ''', (category_name,)) videos_in_category = cursor.fetchall() category_data = { "id": category_name, "title": category_name, "urls": [format_video(video) for video in videos_in_category] } response.append(category_data) except Exception as e: print(f"An error occurred: {e}") finally: if cursor is not None: cursor.close() if conn is not None: conn.close() return response def video_id_exists(db_params, video_id: uuid.UUID) -> bool: check_query = """ SELECT EXISTS ( SELECT 1 FROM videos WHERE id = %s ); """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(check_query, (str(video_id),)) exists = cur.fetchone()[0] return exists except Exception as e: print(f"Error checking if video ID exists: {e}") return False finally: if cur: cur.close() if conn: conn.close() def increment_likes(db_params, video_id: uuid.UUID) -> bool: increment_query = """ UPDATE videos SET likes = likes + 1 WHERE id = %s; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(increment_query, (str(video_id),)) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error incrementing likes: {e}") return False finally: if cur: cur.close() if conn: conn.close() def increment_dislikes(db_params, video_id: uuid.UUID) -> bool: increment_query = """ UPDATE videos SET dislikes = dislikes + 1 WHERE id = %s; """ try: conn = psycopg2.connect(**db_params) cur = conn.cursor() cur.execute(increment_query, (str(video_id),)) conn.commit() if cur.rowcount > 0: return True else: return False except Exception as e: print(f"Error incrementing dislikes: {e}") return False finally: if cur: cur.close() if conn: conn.close()