import psycopg2 from psycopg2 import pool from datetime import datetime, timedelta import os import json import secrets import time from functools import wraps # Load and parse DATABASE_CONFIG from environment variable database_config_json = os.getenv("neon_db") DB_CONFIG = json.loads(database_config_json) # Initialize connection pool db_pool = psycopg2.pool.SimpleConnectionPool(1, 20, **DB_CONFIG) # Retry decorator for handling EOF errors def retry_on_eof(max_attempts=2, delay=3): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): attempts = 0 while attempts < max_attempts: try: return func(*args, **kwargs) except psycopg2.OperationalError as e: if "SSL SYSCALL error: EOF detected" in str(e): print(f"EOF detected in {func.__name__}, retrying {attempts + 1}/{max_attempts} after {delay}s") attempts += 1 if attempts == max_attempts: raise # Re-raise the exception if max attempts reached time.sleep(delay) # Wait before retrying # Re-establish connection if closed conn = db_pool.getconn() if conn.closed: db_pool.putconn(conn, close=True) conn = db_pool.getconn() else: raise # Re-raise other OperationalErrors return func(*args, **kwargs) return wrapper return decorator @retry_on_eof(max_attempts=2, delay=3) def init_db(): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("CREATE TABLE IF NOT EXISTS users (user_id SERIAL PRIMARY KEY, email TEXT UNIQUE, password TEXT)") c.execute(""" CREATE TABLE IF NOT EXISTS predictions ( pred_id SERIAL PRIMARY KEY, user_id INT, timestamp TEXT, audio_name TEXT, result TEXT, audio BYTEA, hive_id INT, FOREIGN KEY (hive_id) REFERENCES Hive(hive_id) ) """) c.execute("CREATE TABLE IF NOT EXISTS farm (farm_id SERIAL PRIMARY KEY, user_id INT UNIQUE, fullname TEXT, country TEXT, city TEXT, zip TEXT, hives INT DEFAULT 0)") c.execute(""" CREATE TABLE IF NOT EXISTS Hive ( hive_id SERIAL PRIMARY KEY, farm_id INT, hive_number INT, bee_type TEXT, number_of_frames INT, creation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, health_status TEXT, notes TEXT, FOREIGN KEY (farm_id) REFERENCES farm(farm_id) ) """) conn.commit() finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def generate_reset_code(email): conn = db_pool.getconn() try: with conn.cursor() as c: # Check if email exists c.execute("SELECT user_id FROM users WHERE email = %s", (email,)) user = c.fetchone() if not user: return None # Generate 6-digit code code = ''.join(str(secrets.randbelow(10)) for _ in range(6)) # Set expiration (15 minutes from now) expires_at = datetime.now() + timedelta(minutes=15) # Store code c.execute( """ INSERT INTO password_reset_codes (user_id, code, expires_at) VALUES (%s, %s, %s) ON CONFLICT (user_id) DO UPDATE SET code = %s, expires_at = %s """, (user[0], code, expires_at, code, expires_at) ) conn.commit() return code finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def verify_reset_code(email, code): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( """ SELECT code, expires_at FROM password_reset_codes prc JOIN users u ON prc.user_id = u.user_id WHERE u.email = %s AND prc.code = %s """, (email, code) ) result = c.fetchone() if result and result[1] > datetime.now(): # Delete code after verification c.execute("DELETE FROM password_reset_codes WHERE code = %s", (code,)) conn.commit() return True return False finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def update_user_password(email, new_password): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "UPDATE users SET password = %s WHERE email = %s RETURNING user_id", (new_password, email) ) result = c.fetchone() conn.commit() return bool(result) finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def change_user_password(user_id, current_password, new_password): conn = db_pool.getconn() try: with conn.cursor() as c: # Verify user exists and current password matches c.execute( "SELECT user_id FROM users WHERE user_id = %s AND password = %s", (user_id, current_password) ) user = c.fetchone() if not user: return False # Update password c.execute( "UPDATE users SET password = %s WHERE user_id = %s", (new_password, user_id) ) conn.commit() return True finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def add_hive_to_db(farm_id, hive_number, bee_type, number_of_frames, health_status, notes): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" INSERT INTO Hive (farm_id, hive_number, bee_type, number_of_frames, health_status, notes) VALUES (%s, %s, %s, %s, %s, %s) RETURNING hive_id """, (farm_id, hive_number, bee_type, number_of_frames, health_status, notes)) hive_id = c.fetchone()[0] c.execute("UPDATE farm SET hives = hives + 1 WHERE farm_id = %s", (farm_id,)) conn.commit() return hive_id finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def delete_hive_from_db(hive_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT farm_id FROM Hive WHERE hive_id = %s", (hive_id,)) farm = c.fetchone() if farm: farm_id = farm[0] c.execute("DELETE FROM Hive WHERE hive_id = %s", (hive_id,)) if c.rowcount > 0: c.execute("UPDATE farm SET hives = hives - 1 WHERE farm_id = %s", (farm_id,)) conn.commit() return True return False finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def register_user(fullname, email, password=None, google_id=None): conn = db_pool.getconn() try: with conn.cursor() as c: # Check if email or google_id already exists c.execute("SELECT email, google_id FROM users WHERE email = %s OR google_id = %s", (email, google_id)) if c.fetchone(): return "email already exist" # Insert new user with default values c.execute( "INSERT INTO users (fullname, email, password, google_id, country, city, gender, phone_number) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING user_id", (fullname, email, password, google_id, "Pakistan", "Karachi", "Male", "0") ) user_id = c.fetchone()[0] # Create farm record for the new user with default values c.execute( "INSERT INTO farm (user_id, fullname, country, city, zip, hives) VALUES (%s, %s, %s, %s, %s, %s)", (user_id, "user farm", "Pakistan", "Karachi", "24700", 0) ) conn.commit() return "Signup successful" finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def authenticate_google_user(google_id, email): conn = db_pool.getconn() try: with conn.cursor() as c: # Get user_id and farm_id by google_id or email c.execute(""" SELECT u.user_id, f.farm_id FROM users u LEFT JOIN farm f ON u.user_id = f.user_id WHERE u.google_id = %s OR u.email = %s """, (google_id, email)) result = c.fetchone() return {"user_id": result[0], "farm_id": result[1]} if result else {"error": "User not found"} finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def authenticate_user(email, password): conn = db_pool.getconn() try: with conn.cursor() as c: # Get user_id and farm_id c.execute(""" SELECT u.user_id, f.farm_id FROM users u LEFT JOIN farm f ON u.user_id = f.user_id WHERE u.email = %s AND u.password = %s """, (email, password)) result = c.fetchone() return {"user_id": result[0], "farm_id": result[1]} if result else {"error": "Invalid credentials"} finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def save_prediction(user_id, audio_name, result, file_id, hive_id=None, user_predict=None): conn = db_pool.getconn() try: with conn.cursor() as c: # Add 5 hours to server time timestamp = (datetime.now() + timedelta(hours=5)).strftime("%Y-%m-%d %H:%M:%S") c.execute( "INSERT INTO predictions (user_id, timestamp, audio_name, result, file_id, hive_id, user_predict) " "VALUES (%s, %s, %s, %s, %s, %s, %s)", (user_id, timestamp, audio_name, result, file_id, hive_id, user_predict) ) conn.commit() finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_history(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT p.timestamp, p.audio_name, p.result, p.hive_id, h.hive_number FROM predictions p LEFT JOIN Hive h ON p.hive_id = h.hive_id WHERE p.user_id = %s ORDER BY p.timestamp DESC """, (user_id,)) return [{ "timestamp": row[0], "audio_name": row[1], "result": row[2], "hive_number": row[4] if row[3] else None } for row in c.fetchall()] finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_user_profile(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT email, fullname, country, city, gender, phone_number FROM users WHERE user_id = %s", (user_id,)) user = c.fetchone() return { "email": user[0], "fullname": user[1], "country": user[2], "city": user[3], "gender": user[4], "phone_number": user[5] } if user else {"error": "User not found"} finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def update_user_profile(user_id, fullname, country, city, gender, phone_number): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" UPDATE users SET fullname = %s, country = %s, city = %s, gender = %s, phone_number = %s WHERE user_id = %s """, (fullname, country, city, gender, phone_number, user_id)) conn.commit() return {"message": "Profile updated successfully"} finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_farm_details_from_db(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT farm_id, fullname, country, city, zip, hives FROM farm WHERE user_id = %s", (user_id,)) farm = c.fetchone() return { "farm_id": farm[0], "fullname": farm[1], "country": farm[2], "city": farm[3], "zip": farm[4] } if farm else None finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def update_farm_details_in_db(user_id, fullname, country, city, zip_code): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT farm_id FROM farm WHERE user_id = %s", (user_id,)) existing_farm = c.fetchone() if existing_farm: c.execute(""" UPDATE farm SET fullname = %s, country = %s, city = %s, zip = %s WHERE user_id = %s """, (fullname, country, city, zip_code, user_id)) else: c.execute(""" INSERT INTO farm (user_id, fullname, country, city, zip) VALUES (%s, %s, %s, %s, %s) """, (user_id, fullname, country, city, zip_code)) conn.commit() return {"message": "Farm details updated successfully"} finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_farm_detailss_from_db(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT farm_id FROM farm WHERE user_id = %s", (user_id,)) farm = c.fetchone() return {"farm_id": farm[0]} if farm else None finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_hives_from_db(farm_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT hive_id, hive_number, health_status FROM Hive WHERE farm_id = %s ORDER BY hive_number", (farm_id,)) return [{"hive_id": row[0], "hive_number": row[1], "health_status": row[2]} for row in c.fetchall()] finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_hive_detail_from_db(hive_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "SELECT hive_number, bee_type, number_of_frames, creation_date, health_status, notes FROM Hive WHERE hive_id = %s", (hive_id,) ) row = c.fetchone() if row: return { "hive_number": row[0], "bee_type": row[1], "number_of_frames": row[2], "creation_date": row[3], "health_status": row[4], "notes": row[5] } return {"error": "Hive not found"} finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def update_hive_health_in_db(hive_id, health_status): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("UPDATE hive SET health_status = %s WHERE hive_id = %s", (health_status, hive_id)) if c.rowcount == 0: raise Exception("Hive not found") conn.commit() finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def update_hive_in_db(hive_id, number_of_frames, bee_type, notes): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( """ UPDATE Hive SET number_of_frames = %s, bee_type = %s, notes = %s WHERE hive_id = %s RETURNING hive_id """, (number_of_frames, bee_type, notes, hive_id) ) result = c.fetchone() conn.commit() return bool(result) finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def add_task_to_db(user_id, task_name, hive_id, description, deadline_date, status): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" INSERT INTO Task (user_id, task_name, hive_id, description, deadline_date, status) VALUES (%s, %s, %s, %s, %s, %s) RETURNING task_id """, (user_id, task_name, None if hive_id is None else hive_id, description, deadline_date, status)) task_id = c.fetchone()[0] conn.commit() return task_id finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_user_tasks_from_db(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT task_id, task_name, status, deadline_date FROM Task WHERE user_id = %s ORDER BY status = 'pending' DESC, deadline_date ASC NULLS LAST, task_id """, (user_id,)) return [{ "task_id": row[0], "task_name": row[1], "status": row[2], "deadline_date": row[3].strftime("%Y-%m-%d") if row[3] else None } for row in c.fetchall()] finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_task_detail_from_db(task_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT t.task_id, t.user_id, t.task_name, h.hive_number, t.description, t.deadline_date, t.status FROM Task t LEFT JOIN Hive h ON t.hive_id = h.hive_id WHERE t.task_id = %s """, (task_id,)) row = c.fetchone() if row: return { "task_id": row[0], "task_name": row[2], "hive_number": row[3], "description": row[4], "deadline_date": row[5].strftime("%Y-%m-%d") if row[5] else None, "status": row[6] } return None finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def update_task_status_to_completed(task_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" UPDATE Task SET status = 'completed' WHERE task_id = %s """, (task_id,)) if c.rowcount > 0: conn.commit() return True return False finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def delete_task_from_db(task_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("DELETE FROM Task WHERE task_id = %s", (task_id,)) if c.rowcount > 0: conn.commit() return True return False finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_all_notifications(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT notification_id, text, created_at, read_status FROM notifications WHERE user_id = %s ORDER BY read_status = 'unread' DESC, created_at DESC """, (user_id,)) return [{ "notification_id": row[0], "text": row[1], "created_at": row[2].strftime("%Y-%m-%d %H:%M:%S"), "read_status": row[3] } for row in c.fetchall()] finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def add_notification(user_id, text): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" INSERT INTO notifications (user_id, text, read_status, created_at) VALUES (%s, %s, %s, CURRENT_TIMESTAMP + INTERVAL '5 hours') RETURNING notification_id """, (user_id, text, 'unread')) notification_id = c.fetchone()[0] conn.commit() return notification_id finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def mark_notification_as_read(notification_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" UPDATE notifications SET read_status = 'read' WHERE notification_id = %s """, (notification_id,)) conn.commit() finally: db_pool.putconn(conn) @retry_on_eof(max_attempts=2, delay=3) def get_user_cities(): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT user_id, city FROM farm WHERE city IS NOT NULL AND city != ''") return {row[0]: row[1] for row in c.fetchall()} finally: db_pool.putconn(conn)