Spaces:
Sleeping
Sleeping
| import psycopg2 | |
| from psycopg2 import pool | |
| from datetime import datetime, timedelta | |
| import os | |
| import json | |
| import secrets | |
| import time | |
| from functools import wraps | |
| # Load and parse DATABASE_CONFIG from environment variable | |
| database_config_json = os.getenv("neon_db") | |
| DB_CONFIG = json.loads(database_config_json) | |
| # Initialize connection pool | |
| db_pool = psycopg2.pool.SimpleConnectionPool(1, 20, **DB_CONFIG) | |
| # Retry decorator for handling EOF errors | |
| def retry_on_eof(max_attempts=2, delay=3): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| attempts = 0 | |
| while attempts < max_attempts: | |
| try: | |
| return func(*args, **kwargs) | |
| except psycopg2.OperationalError as e: | |
| if "SSL SYSCALL error: EOF detected" in str(e): | |
| print(f"EOF detected in {func.__name__}, retrying {attempts + 1}/{max_attempts} after {delay}s") | |
| attempts += 1 | |
| if attempts == max_attempts: | |
| raise # Re-raise the exception if max attempts reached | |
| time.sleep(delay) # Wait before retrying | |
| # Re-establish connection if closed | |
| conn = db_pool.getconn() | |
| if conn.closed: | |
| db_pool.putconn(conn, close=True) | |
| conn = db_pool.getconn() | |
| else: | |
| raise # Re-raise other OperationalErrors | |
| return func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def init_db(): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("CREATE TABLE IF NOT EXISTS users (user_id SERIAL PRIMARY KEY, email TEXT UNIQUE, password TEXT)") | |
| c.execute(""" | |
| CREATE TABLE IF NOT EXISTS predictions ( | |
| pred_id SERIAL PRIMARY KEY, | |
| user_id INT, | |
| timestamp TEXT, | |
| audio_name TEXT, | |
| result TEXT, | |
| audio BYTEA, | |
| hive_id INT, | |
| FOREIGN KEY (hive_id) REFERENCES Hive(hive_id) | |
| ) | |
| """) | |
| c.execute("CREATE TABLE IF NOT EXISTS farm (farm_id SERIAL PRIMARY KEY, user_id INT UNIQUE, fullname TEXT, country TEXT, city TEXT, zip TEXT, hives INT DEFAULT 0)") | |
| c.execute(""" | |
| CREATE TABLE IF NOT EXISTS Hive ( | |
| hive_id SERIAL PRIMARY KEY, | |
| farm_id INT, | |
| hive_number INT, | |
| bee_type TEXT, | |
| number_of_frames INT, | |
| creation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| health_status TEXT, | |
| notes TEXT, | |
| FOREIGN KEY (farm_id) REFERENCES farm(farm_id) | |
| ) | |
| """) | |
| conn.commit() | |
| finally: | |
| db_pool.putconn(conn) | |
| def generate_reset_code(email): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| # Check if email exists | |
| c.execute("SELECT user_id FROM users WHERE email = %s", (email,)) | |
| user = c.fetchone() | |
| if not user: | |
| return None | |
| # Generate 6-digit code | |
| code = ''.join(str(secrets.randbelow(10)) for _ in range(6)) | |
| # Set expiration (15 minutes from now) | |
| expires_at = datetime.now() + timedelta(minutes=15) | |
| # Store code | |
| c.execute( | |
| """ | |
| INSERT INTO password_reset_codes (user_id, code, expires_at) | |
| VALUES (%s, %s, %s) | |
| ON CONFLICT (user_id) DO UPDATE | |
| SET code = %s, expires_at = %s | |
| """, | |
| (user[0], code, expires_at, code, expires_at) | |
| ) | |
| conn.commit() | |
| return code | |
| finally: | |
| db_pool.putconn(conn) | |
| def verify_reset_code(email, code): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute( | |
| """ | |
| SELECT code, expires_at | |
| FROM password_reset_codes prc | |
| JOIN users u ON prc.user_id = u.user_id | |
| WHERE u.email = %s AND prc.code = %s | |
| """, | |
| (email, code) | |
| ) | |
| result = c.fetchone() | |
| if result and result[1] > datetime.now(): | |
| # Delete code after verification | |
| c.execute("DELETE FROM password_reset_codes WHERE code = %s", (code,)) | |
| conn.commit() | |
| return True | |
| return False | |
| finally: | |
| db_pool.putconn(conn) | |
| def update_user_password(email, new_password): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute( | |
| "UPDATE users SET password = %s WHERE email = %s RETURNING user_id", | |
| (new_password, email) | |
| ) | |
| result = c.fetchone() | |
| conn.commit() | |
| return bool(result) | |
| finally: | |
| db_pool.putconn(conn) | |
| def change_user_password(user_id, current_password, new_password): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| # Verify user exists and current password matches | |
| c.execute( | |
| "SELECT user_id FROM users WHERE user_id = %s AND password = %s", | |
| (user_id, current_password) | |
| ) | |
| user = c.fetchone() | |
| if not user: | |
| return False | |
| # Update password | |
| c.execute( | |
| "UPDATE users SET password = %s WHERE user_id = %s", | |
| (new_password, user_id) | |
| ) | |
| conn.commit() | |
| return True | |
| finally: | |
| db_pool.putconn(conn) | |
| def add_hive_to_db(farm_id, hive_number, bee_type, number_of_frames, health_status, notes): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| INSERT INTO Hive (farm_id, hive_number, bee_type, number_of_frames, health_status, notes) | |
| VALUES (%s, %s, %s, %s, %s, %s) | |
| RETURNING hive_id | |
| """, (farm_id, hive_number, bee_type, number_of_frames, health_status, notes)) | |
| hive_id = c.fetchone()[0] | |
| c.execute("UPDATE farm SET hives = hives + 1 WHERE farm_id = %s", (farm_id,)) | |
| conn.commit() | |
| return hive_id | |
| finally: | |
| db_pool.putconn(conn) | |
| def delete_hive_from_db(hive_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("SELECT farm_id FROM Hive WHERE hive_id = %s", (hive_id,)) | |
| farm = c.fetchone() | |
| if farm: | |
| farm_id = farm[0] | |
| c.execute("DELETE FROM Hive WHERE hive_id = %s", (hive_id,)) | |
| if c.rowcount > 0: | |
| c.execute("UPDATE farm SET hives = hives - 1 WHERE farm_id = %s", (farm_id,)) | |
| conn.commit() | |
| return True | |
| return False | |
| finally: | |
| db_pool.putconn(conn) | |
| def register_user(fullname, email, password=None, google_id=None): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| # Check if email or google_id already exists | |
| c.execute("SELECT email, google_id FROM users WHERE email = %s OR google_id = %s", (email, google_id)) | |
| if c.fetchone(): | |
| return "email already exist" | |
| # Insert new user with default values | |
| c.execute( | |
| "INSERT INTO users (fullname, email, password, google_id, country, city, gender, phone_number) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING user_id", | |
| (fullname, email, password, google_id, "Pakistan", "Karachi", "Male", "0") | |
| ) | |
| user_id = c.fetchone()[0] | |
| # Create farm record for the new user with default values | |
| c.execute( | |
| "INSERT INTO farm (user_id, fullname, country, city, zip, hives) VALUES (%s, %s, %s, %s, %s, %s)", | |
| (user_id, "user farm", "Pakistan", "Karachi", "24700", 0) | |
| ) | |
| conn.commit() | |
| return "Signup successful" | |
| finally: | |
| db_pool.putconn(conn) | |
| def authenticate_google_user(google_id, email): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| # Get user_id and farm_id by google_id or email | |
| c.execute(""" | |
| SELECT u.user_id, f.farm_id | |
| FROM users u | |
| LEFT JOIN farm f ON u.user_id = f.user_id | |
| WHERE u.google_id = %s OR u.email = %s | |
| """, (google_id, email)) | |
| result = c.fetchone() | |
| return {"user_id": result[0], "farm_id": result[1]} if result else {"error": "User not found"} | |
| finally: | |
| db_pool.putconn(conn) | |
| def authenticate_user(email, password): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| # Get user_id and farm_id | |
| c.execute(""" | |
| SELECT u.user_id, f.farm_id | |
| FROM users u | |
| LEFT JOIN farm f ON u.user_id = f.user_id | |
| WHERE u.email = %s AND u.password = %s | |
| """, (email, password)) | |
| result = c.fetchone() | |
| return {"user_id": result[0], "farm_id": result[1]} if result else {"error": "Invalid credentials"} | |
| finally: | |
| db_pool.putconn(conn) | |
| def save_prediction(user_id, audio_name, result, file_id, hive_id=None, user_predict=None): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| # Add 5 hours to server time | |
| timestamp = (datetime.now() + timedelta(hours=5)).strftime("%Y-%m-%d %H:%M:%S") | |
| c.execute( | |
| "INSERT INTO predictions (user_id, timestamp, audio_name, result, file_id, hive_id, user_predict) " | |
| "VALUES (%s, %s, %s, %s, %s, %s, %s)", | |
| (user_id, timestamp, audio_name, result, file_id, hive_id, user_predict) | |
| ) | |
| conn.commit() | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_history(user_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| SELECT p.timestamp, p.audio_name, p.result, p.hive_id, h.hive_number | |
| FROM predictions p | |
| LEFT JOIN Hive h ON p.hive_id = h.hive_id | |
| WHERE p.user_id = %s | |
| ORDER BY p.timestamp DESC | |
| """, (user_id,)) | |
| return [{ | |
| "timestamp": row[0], | |
| "audio_name": row[1], | |
| "result": row[2], | |
| "hive_number": row[4] if row[3] else None | |
| } for row in c.fetchall()] | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_user_profile(user_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("SELECT email, fullname, country, city, gender, phone_number FROM users WHERE user_id = %s", (user_id,)) | |
| user = c.fetchone() | |
| return { | |
| "email": user[0], "fullname": user[1], "country": user[2], | |
| "city": user[3], "gender": user[4], "phone_number": user[5] | |
| } if user else {"error": "User not found"} | |
| finally: | |
| db_pool.putconn(conn) | |
| def update_user_profile(user_id, fullname, country, city, gender, phone_number): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| UPDATE users | |
| SET fullname = %s, country = %s, city = %s, gender = %s, phone_number = %s | |
| WHERE user_id = %s | |
| """, (fullname, country, city, gender, phone_number, user_id)) | |
| conn.commit() | |
| return {"message": "Profile updated successfully"} | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_farm_details_from_db(user_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("SELECT farm_id, fullname, country, city, zip, hives FROM farm WHERE user_id = %s", (user_id,)) | |
| farm = c.fetchone() | |
| return { | |
| "farm_id": farm[0], | |
| "fullname": farm[1], | |
| "country": farm[2], | |
| "city": farm[3], | |
| "zip": farm[4] | |
| } if farm else None | |
| finally: | |
| db_pool.putconn(conn) | |
| def update_farm_details_in_db(user_id, fullname, country, city, zip_code): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("SELECT farm_id FROM farm WHERE user_id = %s", (user_id,)) | |
| existing_farm = c.fetchone() | |
| if existing_farm: | |
| c.execute(""" | |
| UPDATE farm | |
| SET fullname = %s, country = %s, city = %s, zip = %s | |
| WHERE user_id = %s | |
| """, (fullname, country, city, zip_code, user_id)) | |
| else: | |
| c.execute(""" | |
| INSERT INTO farm (user_id, fullname, country, city, zip) | |
| VALUES (%s, %s, %s, %s, %s) | |
| """, (user_id, fullname, country, city, zip_code)) | |
| conn.commit() | |
| return {"message": "Farm details updated successfully"} | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_farm_detailss_from_db(user_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("SELECT farm_id FROM farm WHERE user_id = %s", (user_id,)) | |
| farm = c.fetchone() | |
| return {"farm_id": farm[0]} if farm else None | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_hives_from_db(farm_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("SELECT hive_id, hive_number, health_status FROM Hive WHERE farm_id = %s ORDER BY hive_number", (farm_id,)) | |
| return [{"hive_id": row[0], "hive_number": row[1], "health_status": row[2]} for row in c.fetchall()] | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_hive_detail_from_db(hive_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute( | |
| "SELECT hive_number, bee_type, number_of_frames, creation_date, health_status, notes FROM Hive WHERE hive_id = %s", | |
| (hive_id,) | |
| ) | |
| row = c.fetchone() | |
| if row: | |
| return { | |
| "hive_number": row[0], | |
| "bee_type": row[1], | |
| "number_of_frames": row[2], | |
| "creation_date": row[3], | |
| "health_status": row[4], | |
| "notes": row[5] | |
| } | |
| return {"error": "Hive not found"} | |
| finally: | |
| db_pool.putconn(conn) | |
| def update_hive_health_in_db(hive_id, health_status): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("UPDATE hive SET health_status = %s WHERE hive_id = %s", (health_status, hive_id)) | |
| if c.rowcount == 0: | |
| raise Exception("Hive not found") | |
| conn.commit() | |
| finally: | |
| db_pool.putconn(conn) | |
| def update_hive_in_db(hive_id, number_of_frames, bee_type, notes): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute( | |
| """ | |
| UPDATE Hive | |
| SET number_of_frames = %s, bee_type = %s, notes = %s | |
| WHERE hive_id = %s | |
| RETURNING hive_id | |
| """, | |
| (number_of_frames, bee_type, notes, hive_id) | |
| ) | |
| result = c.fetchone() | |
| conn.commit() | |
| return bool(result) | |
| finally: | |
| db_pool.putconn(conn) | |
| def add_task_to_db(user_id, task_name, hive_id, description, deadline_date, status): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| INSERT INTO Task (user_id, task_name, hive_id, description, deadline_date, status) | |
| VALUES (%s, %s, %s, %s, %s, %s) | |
| RETURNING task_id | |
| """, (user_id, task_name, None if hive_id is None else hive_id, description, deadline_date, status)) | |
| task_id = c.fetchone()[0] | |
| conn.commit() | |
| return task_id | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_user_tasks_from_db(user_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| SELECT task_id, task_name, status, deadline_date | |
| FROM Task | |
| WHERE user_id = %s | |
| ORDER BY status = 'pending' DESC, deadline_date ASC NULLS LAST, task_id | |
| """, (user_id,)) | |
| return [{ | |
| "task_id": row[0], | |
| "task_name": row[1], | |
| "status": row[2], | |
| "deadline_date": row[3].strftime("%Y-%m-%d") if row[3] else None | |
| } for row in c.fetchall()] | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_task_detail_from_db(task_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| SELECT t.task_id, t.user_id, t.task_name, h.hive_number, t.description, | |
| t.deadline_date, t.status | |
| FROM Task t | |
| LEFT JOIN Hive h ON t.hive_id = h.hive_id | |
| WHERE t.task_id = %s | |
| """, (task_id,)) | |
| row = c.fetchone() | |
| if row: | |
| return { | |
| "task_id": row[0], | |
| "task_name": row[2], | |
| "hive_number": row[3], | |
| "description": row[4], | |
| "deadline_date": row[5].strftime("%Y-%m-%d") if row[5] else None, | |
| "status": row[6] | |
| } | |
| return None | |
| finally: | |
| db_pool.putconn(conn) | |
| def update_task_status_to_completed(task_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| UPDATE Task | |
| SET status = 'completed' | |
| WHERE task_id = %s | |
| """, (task_id,)) | |
| if c.rowcount > 0: | |
| conn.commit() | |
| return True | |
| return False | |
| finally: | |
| db_pool.putconn(conn) | |
| def delete_task_from_db(task_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("DELETE FROM Task WHERE task_id = %s", (task_id,)) | |
| if c.rowcount > 0: | |
| conn.commit() | |
| return True | |
| return False | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_all_notifications(user_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| SELECT notification_id, text, created_at, read_status | |
| FROM notifications | |
| WHERE user_id = %s | |
| ORDER BY read_status = 'unread' DESC, created_at DESC | |
| """, (user_id,)) | |
| return [{ | |
| "notification_id": row[0], | |
| "text": row[1], | |
| "created_at": row[2].strftime("%Y-%m-%d %H:%M:%S"), | |
| "read_status": row[3] | |
| } for row in c.fetchall()] | |
| finally: | |
| db_pool.putconn(conn) | |
| def add_notification(user_id, text): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| INSERT INTO notifications (user_id, text, read_status, created_at) | |
| VALUES (%s, %s, %s, CURRENT_TIMESTAMP + INTERVAL '5 hours') | |
| RETURNING notification_id | |
| """, (user_id, text, 'unread')) | |
| notification_id = c.fetchone()[0] | |
| conn.commit() | |
| return notification_id | |
| finally: | |
| db_pool.putconn(conn) | |
| def mark_notification_as_read(notification_id): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute(""" | |
| UPDATE notifications | |
| SET read_status = 'read' | |
| WHERE notification_id = %s | |
| """, (notification_id,)) | |
| conn.commit() | |
| finally: | |
| db_pool.putconn(conn) | |
| def get_user_cities(): | |
| conn = db_pool.getconn() | |
| try: | |
| with conn.cursor() as c: | |
| c.execute("SELECT user_id, city FROM farm WHERE city IS NOT NULL AND city != ''") | |
| return {row[0]: row[1] for row in c.fetchall()} | |
| finally: | |
| db_pool.putconn(conn) |