bee_notbee / database.py
bilalhasanniazi's picture
Update database.py
6a42ae2 verified
import psycopg2
from psycopg2 import pool
from datetime import datetime, timedelta
import os
import json
import secrets
import time
from functools import wraps
# Load and parse DATABASE_CONFIG from environment variable
database_config_json = os.getenv("neon_db")
DB_CONFIG = json.loads(database_config_json)
# Initialize connection pool
db_pool = psycopg2.pool.SimpleConnectionPool(1, 20, **DB_CONFIG)
# Retry decorator for handling EOF errors
def retry_on_eof(max_attempts=2, delay=3):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except psycopg2.OperationalError as e:
if "SSL SYSCALL error: EOF detected" in str(e):
print(f"EOF detected in {func.__name__}, retrying {attempts + 1}/{max_attempts} after {delay}s")
attempts += 1
if attempts == max_attempts:
raise # Re-raise the exception if max attempts reached
time.sleep(delay) # Wait before retrying
# Re-establish connection if closed
conn = db_pool.getconn()
if conn.closed:
db_pool.putconn(conn, close=True)
conn = db_pool.getconn()
else:
raise # Re-raise other OperationalErrors
return func(*args, **kwargs)
return wrapper
return decorator
@retry_on_eof(max_attempts=2, delay=3)
def init_db():
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("CREATE TABLE IF NOT EXISTS users (user_id SERIAL PRIMARY KEY, email TEXT UNIQUE, password TEXT)")
c.execute("""
CREATE TABLE IF NOT EXISTS predictions (
pred_id SERIAL PRIMARY KEY,
user_id INT,
timestamp TEXT,
audio_name TEXT,
result TEXT,
audio BYTEA,
hive_id INT,
FOREIGN KEY (hive_id) REFERENCES Hive(hive_id)
)
""")
c.execute("CREATE TABLE IF NOT EXISTS farm (farm_id SERIAL PRIMARY KEY, user_id INT UNIQUE, fullname TEXT, country TEXT, city TEXT, zip TEXT, hives INT DEFAULT 0)")
c.execute("""
CREATE TABLE IF NOT EXISTS Hive (
hive_id SERIAL PRIMARY KEY,
farm_id INT,
hive_number INT,
bee_type TEXT,
number_of_frames INT,
creation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
health_status TEXT,
notes TEXT,
FOREIGN KEY (farm_id) REFERENCES farm(farm_id)
)
""")
conn.commit()
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def generate_reset_code(email):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
# Check if email exists
c.execute("SELECT user_id FROM users WHERE email = %s", (email,))
user = c.fetchone()
if not user:
return None
# Generate 6-digit code
code = ''.join(str(secrets.randbelow(10)) for _ in range(6))
# Set expiration (15 minutes from now)
expires_at = datetime.now() + timedelta(minutes=15)
# Store code
c.execute(
"""
INSERT INTO password_reset_codes (user_id, code, expires_at)
VALUES (%s, %s, %s)
ON CONFLICT (user_id) DO UPDATE
SET code = %s, expires_at = %s
""",
(user[0], code, expires_at, code, expires_at)
)
conn.commit()
return code
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def verify_reset_code(email, code):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute(
"""
SELECT code, expires_at
FROM password_reset_codes prc
JOIN users u ON prc.user_id = u.user_id
WHERE u.email = %s AND prc.code = %s
""",
(email, code)
)
result = c.fetchone()
if result and result[1] > datetime.now():
# Delete code after verification
c.execute("DELETE FROM password_reset_codes WHERE code = %s", (code,))
conn.commit()
return True
return False
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def update_user_password(email, new_password):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute(
"UPDATE users SET password = %s WHERE email = %s RETURNING user_id",
(new_password, email)
)
result = c.fetchone()
conn.commit()
return bool(result)
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def change_user_password(user_id, current_password, new_password):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
# Verify user exists and current password matches
c.execute(
"SELECT user_id FROM users WHERE user_id = %s AND password = %s",
(user_id, current_password)
)
user = c.fetchone()
if not user:
return False
# Update password
c.execute(
"UPDATE users SET password = %s WHERE user_id = %s",
(new_password, user_id)
)
conn.commit()
return True
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def add_hive_to_db(farm_id, hive_number, bee_type, number_of_frames, health_status, notes):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
INSERT INTO Hive (farm_id, hive_number, bee_type, number_of_frames, health_status, notes)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING hive_id
""", (farm_id, hive_number, bee_type, number_of_frames, health_status, notes))
hive_id = c.fetchone()[0]
c.execute("UPDATE farm SET hives = hives + 1 WHERE farm_id = %s", (farm_id,))
conn.commit()
return hive_id
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def delete_hive_from_db(hive_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("SELECT farm_id FROM Hive WHERE hive_id = %s", (hive_id,))
farm = c.fetchone()
if farm:
farm_id = farm[0]
c.execute("DELETE FROM Hive WHERE hive_id = %s", (hive_id,))
if c.rowcount > 0:
c.execute("UPDATE farm SET hives = hives - 1 WHERE farm_id = %s", (farm_id,))
conn.commit()
return True
return False
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def register_user(fullname, email, password=None, google_id=None):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
# Check if email or google_id already exists
c.execute("SELECT email, google_id FROM users WHERE email = %s OR google_id = %s", (email, google_id))
if c.fetchone():
return "email already exist"
# Insert new user with default values
c.execute(
"INSERT INTO users (fullname, email, password, google_id, country, city, gender, phone_number) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING user_id",
(fullname, email, password, google_id, "Pakistan", "Karachi", "Male", "0")
)
user_id = c.fetchone()[0]
# Create farm record for the new user with default values
c.execute(
"INSERT INTO farm (user_id, fullname, country, city, zip, hives) VALUES (%s, %s, %s, %s, %s, %s)",
(user_id, "user farm", "Pakistan", "Karachi", "24700", 0)
)
conn.commit()
return "Signup successful"
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def authenticate_google_user(google_id, email):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
# Get user_id and farm_id by google_id or email
c.execute("""
SELECT u.user_id, f.farm_id
FROM users u
LEFT JOIN farm f ON u.user_id = f.user_id
WHERE u.google_id = %s OR u.email = %s
""", (google_id, email))
result = c.fetchone()
return {"user_id": result[0], "farm_id": result[1]} if result else {"error": "User not found"}
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def authenticate_user(email, password):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
# Get user_id and farm_id
c.execute("""
SELECT u.user_id, f.farm_id
FROM users u
LEFT JOIN farm f ON u.user_id = f.user_id
WHERE u.email = %s AND u.password = %s
""", (email, password))
result = c.fetchone()
return {"user_id": result[0], "farm_id": result[1]} if result else {"error": "Invalid credentials"}
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def save_prediction(user_id, audio_name, result, file_id, hive_id=None, user_predict=None):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
# Add 5 hours to server time
timestamp = (datetime.now() + timedelta(hours=5)).strftime("%Y-%m-%d %H:%M:%S")
c.execute(
"INSERT INTO predictions (user_id, timestamp, audio_name, result, file_id, hive_id, user_predict) "
"VALUES (%s, %s, %s, %s, %s, %s, %s)",
(user_id, timestamp, audio_name, result, file_id, hive_id, user_predict)
)
conn.commit()
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_history(user_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
SELECT p.timestamp, p.audio_name, p.result, p.hive_id, h.hive_number
FROM predictions p
LEFT JOIN Hive h ON p.hive_id = h.hive_id
WHERE p.user_id = %s
ORDER BY p.timestamp DESC
""", (user_id,))
return [{
"timestamp": row[0],
"audio_name": row[1],
"result": row[2],
"hive_number": row[4] if row[3] else None
} for row in c.fetchall()]
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_user_profile(user_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("SELECT email, fullname, country, city, gender, phone_number FROM users WHERE user_id = %s", (user_id,))
user = c.fetchone()
return {
"email": user[0], "fullname": user[1], "country": user[2],
"city": user[3], "gender": user[4], "phone_number": user[5]
} if user else {"error": "User not found"}
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def update_user_profile(user_id, fullname, country, city, gender, phone_number):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
UPDATE users
SET fullname = %s, country = %s, city = %s, gender = %s, phone_number = %s
WHERE user_id = %s
""", (fullname, country, city, gender, phone_number, user_id))
conn.commit()
return {"message": "Profile updated successfully"}
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_farm_details_from_db(user_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("SELECT farm_id, fullname, country, city, zip, hives FROM farm WHERE user_id = %s", (user_id,))
farm = c.fetchone()
return {
"farm_id": farm[0],
"fullname": farm[1],
"country": farm[2],
"city": farm[3],
"zip": farm[4]
} if farm else None
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def update_farm_details_in_db(user_id, fullname, country, city, zip_code):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("SELECT farm_id FROM farm WHERE user_id = %s", (user_id,))
existing_farm = c.fetchone()
if existing_farm:
c.execute("""
UPDATE farm
SET fullname = %s, country = %s, city = %s, zip = %s
WHERE user_id = %s
""", (fullname, country, city, zip_code, user_id))
else:
c.execute("""
INSERT INTO farm (user_id, fullname, country, city, zip)
VALUES (%s, %s, %s, %s, %s)
""", (user_id, fullname, country, city, zip_code))
conn.commit()
return {"message": "Farm details updated successfully"}
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_farm_detailss_from_db(user_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("SELECT farm_id FROM farm WHERE user_id = %s", (user_id,))
farm = c.fetchone()
return {"farm_id": farm[0]} if farm else None
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_hives_from_db(farm_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("SELECT hive_id, hive_number, health_status FROM Hive WHERE farm_id = %s ORDER BY hive_number", (farm_id,))
return [{"hive_id": row[0], "hive_number": row[1], "health_status": row[2]} for row in c.fetchall()]
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_hive_detail_from_db(hive_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute(
"SELECT hive_number, bee_type, number_of_frames, creation_date, health_status, notes FROM Hive WHERE hive_id = %s",
(hive_id,)
)
row = c.fetchone()
if row:
return {
"hive_number": row[0],
"bee_type": row[1],
"number_of_frames": row[2],
"creation_date": row[3],
"health_status": row[4],
"notes": row[5]
}
return {"error": "Hive not found"}
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def update_hive_health_in_db(hive_id, health_status):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("UPDATE hive SET health_status = %s WHERE hive_id = %s", (health_status, hive_id))
if c.rowcount == 0:
raise Exception("Hive not found")
conn.commit()
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def update_hive_in_db(hive_id, number_of_frames, bee_type, notes):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute(
"""
UPDATE Hive
SET number_of_frames = %s, bee_type = %s, notes = %s
WHERE hive_id = %s
RETURNING hive_id
""",
(number_of_frames, bee_type, notes, hive_id)
)
result = c.fetchone()
conn.commit()
return bool(result)
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def add_task_to_db(user_id, task_name, hive_id, description, deadline_date, status):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
INSERT INTO Task (user_id, task_name, hive_id, description, deadline_date, status)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING task_id
""", (user_id, task_name, None if hive_id is None else hive_id, description, deadline_date, status))
task_id = c.fetchone()[0]
conn.commit()
return task_id
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_user_tasks_from_db(user_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
SELECT task_id, task_name, status, deadline_date
FROM Task
WHERE user_id = %s
ORDER BY status = 'pending' DESC, deadline_date ASC NULLS LAST, task_id
""", (user_id,))
return [{
"task_id": row[0],
"task_name": row[1],
"status": row[2],
"deadline_date": row[3].strftime("%Y-%m-%d") if row[3] else None
} for row in c.fetchall()]
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_task_detail_from_db(task_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
SELECT t.task_id, t.user_id, t.task_name, h.hive_number, t.description,
t.deadline_date, t.status
FROM Task t
LEFT JOIN Hive h ON t.hive_id = h.hive_id
WHERE t.task_id = %s
""", (task_id,))
row = c.fetchone()
if row:
return {
"task_id": row[0],
"task_name": row[2],
"hive_number": row[3],
"description": row[4],
"deadline_date": row[5].strftime("%Y-%m-%d") if row[5] else None,
"status": row[6]
}
return None
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def update_task_status_to_completed(task_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
UPDATE Task
SET status = 'completed'
WHERE task_id = %s
""", (task_id,))
if c.rowcount > 0:
conn.commit()
return True
return False
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def delete_task_from_db(task_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("DELETE FROM Task WHERE task_id = %s", (task_id,))
if c.rowcount > 0:
conn.commit()
return True
return False
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_all_notifications(user_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
SELECT notification_id, text, created_at, read_status
FROM notifications
WHERE user_id = %s
ORDER BY read_status = 'unread' DESC, created_at DESC
""", (user_id,))
return [{
"notification_id": row[0],
"text": row[1],
"created_at": row[2].strftime("%Y-%m-%d %H:%M:%S"),
"read_status": row[3]
} for row in c.fetchall()]
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def add_notification(user_id, text):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
INSERT INTO notifications (user_id, text, read_status, created_at)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP + INTERVAL '5 hours')
RETURNING notification_id
""", (user_id, text, 'unread'))
notification_id = c.fetchone()[0]
conn.commit()
return notification_id
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def mark_notification_as_read(notification_id):
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("""
UPDATE notifications
SET read_status = 'read'
WHERE notification_id = %s
""", (notification_id,))
conn.commit()
finally:
db_pool.putconn(conn)
@retry_on_eof(max_attempts=2, delay=3)
def get_user_cities():
conn = db_pool.getconn()
try:
with conn.cursor() as c:
c.execute("SELECT user_id, city FROM farm WHERE city IS NOT NULL AND city != ''")
return {row[0]: row[1] for row in c.fetchall()}
finally:
db_pool.putconn(conn)