ThesisProject / db /database.py
JeyBii's picture
Fix get_user_by_username missing password_hash
5133cc3 verified
Raw
History Blame Contribute Delete
14.2 kB
import os
import logging
import psycopg2
import psycopg2.extras
from datetime import datetime
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
# Load .env from project root (does NOT override existing env vars)
load_dotenv(
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env")
)
# ── Main DB (users, check_history) ───────────────────────────────────
DATABASE_URL = os.getenv(
"DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/factcheck"
)
# ── External DB (scraped articles) ───────────────────────────────────
# Falls back to DATABASE_URL if not set, so existing setups keep working.
EXTERNAL_DATABASE_URL = os.getenv("EXTERNAL_DATABASE_URL", DATABASE_URL)
# Log masked URLs so we can debug connection issues
def _mask_url(url):
return url[:25] + "***" + url[-30:] if len(url) > 55 else "***"
logger.info("DATABASE_URL = %s", _mask_url(DATABASE_URL))
logger.info("EXTERNAL_DATABASE_URL = %s", _mask_url(EXTERNAL_DATABASE_URL))
def get_connection():
"""Get a connection to the main PostgreSQL database (users, history)."""
try:
conn = psycopg2.connect(DATABASE_URL)
return conn
except Exception as exc:
logger.error("Failed to connect to main database: %s", exc)
raise
def get_external_connection():
"""Get a connection to the external articles PostgreSQL database."""
try:
conn = psycopg2.connect(EXTERNAL_DATABASE_URL)
return conn
except Exception as exc:
logger.error("Failed to connect to external articles database: %s", exc)
raise
def init_db():
"""Create the users and check_history tables (main DB) + articles table (external DB)."""
# ── External DB: articles table ──────────────────────────────
try:
ext_conn = get_external_connection()
ext_cursor = ext_conn.cursor()
ext_cursor.execute(
"""
CREATE TABLE IF NOT EXISTS articles (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL,
url TEXT NOT NULL UNIQUE,
source TEXT NOT NULL,
published TEXT,
scraped_at TEXT NOT NULL
)
"""
)
ext_conn.commit()
ext_conn.close()
logger.info("External articles DB initialized βœ“")
except Exception as exc:
logger.warning("Could not initialize external articles DB: %s", exc)
# ── Main DB: users + check_history ───────────────────────────
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL UNIQUE,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
email_verified BOOLEAN NOT NULL DEFAULT FALSE,
verification_code TEXT,
verification_code_expires TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
"""
)
# Add UNIQUE constraint on username if it doesn't exist yet (for existing DBs)
try:
cursor.execute(
"ALTER TABLE users ADD CONSTRAINT users_username_unique UNIQUE (username)"
)
except psycopg2.errors.DuplicateTable:
conn.rollback()
cursor = conn.cursor()
# Add email verification columns for existing DBs
email_verified_added = False
for col, col_def in [
("email_verified", "BOOLEAN NOT NULL DEFAULT FALSE"),
("verification_code", "TEXT"),
("verification_code_expires", "TIMESTAMP"),
]:
try:
cursor.execute(f"ALTER TABLE users ADD COLUMN {col} {col_def}")
conn.commit()
if col == "email_verified":
email_verified_added = True
except psycopg2.errors.DuplicateColumn:
conn.rollback()
cursor = conn.cursor()
# Mark all pre-existing accounts as verified (they existed before this feature)
if email_verified_added:
cursor.execute(
"UPDATE users SET email_verified = TRUE WHERE verification_code IS NULL"
)
conn.commit()
logger.info("Marked all pre-existing accounts as email_verified = TRUE")
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS check_history (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
input_text TEXT NOT NULL,
final_verdict TEXT,
result_json TEXT,
checked_at TIMESTAMP NOT NULL DEFAULT NOW()
)
"""
)
conn.commit()
conn.close()
# ── User helpers ──────────────────────────────────────────────────────
def create_user(email: str, username: str, password_hash: str) -> dict:
"""Insert a new user and return the created record as a dict."""
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
try:
cursor.execute(
"INSERT INTO users (email, username, password_hash) "
"VALUES (%s, %s, %s) RETURNING id, email, username, created_at",
(email, username, password_hash),
)
user = dict(cursor.fetchone())
conn.commit()
return user
except psycopg2.errors.UniqueViolation:
conn.rollback()
return None
finally:
conn.close()
def get_user_by_email(email: str) -> dict | None:
"""Look up a user by email. Returns full row (incl. password_hash) or None."""
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def get_user_by_id(user_id: int) -> dict | None:
"""Look up a user by id. Returns row without password_hash or None."""
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT id, email, username, email_verified, created_at FROM users WHERE id = %s",
(user_id,),
)
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def get_user_by_username(username: str) -> dict | None:
"""Look up a user by username. Returns row or None."""
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT * FROM users WHERE LOWER(username) = LOWER(%s)",
(username,),
)
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def update_username(user_id: int, new_username: str) -> dict | None:
"""Update a user's username. Returns updated user dict or None on conflict."""
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
try:
cursor.execute(
"UPDATE users SET username = %s WHERE id = %s RETURNING id, email, username, created_at",
(new_username, user_id),
)
user = cursor.fetchone()
conn.commit()
return dict(user) if user else None
except psycopg2.errors.UniqueViolation:
conn.rollback()
return None
finally:
conn.close()
def update_user_password(user_id: int, new_password_hash: str) -> bool:
"""Update a user's password hash. Returns True on success."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET password_hash = %s WHERE id = %s",
(new_password_hash, user_id),
)
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def delete_user(user_id: int) -> bool:
"""Delete a user account and all related data. Returns True if deleted."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def set_verification_code(user_id: int, code: str, expires_at) -> bool:
"""Store a verification code for the user."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET verification_code = %s, verification_code_expires = %s WHERE id = %s",
(code, expires_at, user_id),
)
affected = cursor.rowcount
conn.commit()
conn.close()
return affected > 0
def verify_email_code(user_id: int, code: str) -> str:
"""
Verify the email code for a user.
Returns 'ok', 'invalid', or 'expired'.
"""
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT verification_code, verification_code_expires FROM users WHERE id = %s",
(user_id,),
)
row = cursor.fetchone()
if not row or not row["verification_code"]:
conn.close()
return "invalid"
if row["verification_code"] != code:
conn.close()
return "invalid"
if row["verification_code_expires"] and row["verification_code_expires"] < datetime.now():
conn.close()
return "expired"
# Mark as verified and clear the code
cursor.execute(
"UPDATE users SET email_verified = TRUE, verification_code = NULL, verification_code_expires = NULL WHERE id = %s",
(user_id,),
)
conn.commit()
conn.close()
return "ok"
def is_email_verified(user_id: int) -> bool:
"""Check if a user's email is verified."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT email_verified FROM users WHERE id = %s", (user_id,))
row = cursor.fetchone()
conn.close()
return bool(row and row[0])
# ── Check-history helpers ─────────────────────────────────────────────
def save_check_history(user_id: int, input_text: str, final_verdict: str, result_json: str):
"""Save a fact-check result to the user's history."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO check_history (user_id, input_text, final_verdict, result_json) "
"VALUES (%s, %s, %s, %s)",
(user_id, input_text, final_verdict, result_json),
)
conn.commit()
conn.close()
def get_user_history(user_id: int, limit: int = 50) -> list[dict]:
"""Return the most recent fact-check results for a user, keeping only the last 15 days."""
conn = get_connection()
cursor = conn.cursor()
# Auto-delete history older than 15 days for this user
cursor.execute(
"DELETE FROM check_history WHERE user_id = %s AND checked_at < NOW() - INTERVAL '15 days'",
(user_id,)
)
conn.commit()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT id, input_text, final_verdict, result_json, checked_at "
"FROM check_history WHERE user_id = %s ORDER BY checked_at DESC LIMIT %s",
(user_id, limit),
)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
# ── Article helpers (existing) ────────────────────────────────────────
def insert_articles(articles):
"""Insert articles into the external articles database, skipping duplicates.
Returns the number of newly inserted articles.
"""
conn = get_external_connection()
cursor = conn.cursor()
inserted = 0
for article in articles:
cursor.execute(
"INSERT INTO articles (title, url, source, published, scraped_at) "
"VALUES (%s, %s, %s, %s, %s) "
"ON CONFLICT (url) DO NOTHING",
(
article["title"],
article["url"],
article["source"],
article.get("published", "N/A"),
datetime.now().isoformat(),
),
)
if cursor.rowcount > 0:
inserted += 1
conn.commit()
conn.close()
return inserted
def get_all_articles():
"""Return all articles from the external articles database."""
conn = get_external_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM articles ORDER BY scraped_at DESC")
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def search_articles(query):
"""Search articles by keyword in title (case-insensitive) in external DB."""
conn = get_external_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT * FROM articles WHERE title ILIKE %s ORDER BY scraped_at DESC",
(f"%{query}%",),
)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_article_count():
"""Return the total number of articles in the external database."""
try:
conn = get_external_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM articles")
count = cursor.fetchone()[0]
conn.close()
return count
except Exception:
return 0