import os import logging import psycopg2 import psycopg2.extras from datetime import datetime from dotenv import load_dotenv logger = logging.getLogger(__name__) # Load .env from project root (does NOT override existing env vars) load_dotenv( os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env") ) # ── Main DB (users, check_history) ─────────────────────────────────── DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/factcheck" ) # ── External DB (scraped articles) ─────────────────────────────────── # Falls back to DATABASE_URL if not set, so existing setups keep working. EXTERNAL_DATABASE_URL = os.getenv("EXTERNAL_DATABASE_URL", DATABASE_URL) # Log masked URLs so we can debug connection issues def _mask_url(url): return url[:25] + "***" + url[-30:] if len(url) > 55 else "***" logger.info("DATABASE_URL = %s", _mask_url(DATABASE_URL)) logger.info("EXTERNAL_DATABASE_URL = %s", _mask_url(EXTERNAL_DATABASE_URL)) def get_connection(): """Get a connection to the main PostgreSQL database (users, history).""" try: conn = psycopg2.connect(DATABASE_URL) return conn except Exception as exc: logger.error("Failed to connect to main database: %s", exc) raise def get_external_connection(): """Get a connection to the external articles PostgreSQL database.""" try: conn = psycopg2.connect(EXTERNAL_DATABASE_URL) return conn except Exception as exc: logger.error("Failed to connect to external articles database: %s", exc) raise def init_db(): """Create the users and check_history tables (main DB) + articles table (external DB).""" # ── External DB: articles table ────────────────────────────── try: ext_conn = get_external_connection() ext_cursor = ext_conn.cursor() ext_cursor.execute( """ CREATE TABLE IF NOT EXISTS articles ( id SERIAL PRIMARY KEY, title TEXT NOT NULL, url TEXT NOT NULL UNIQUE, source TEXT NOT NULL, published TEXT, scraped_at TEXT NOT NULL ) """ ) ext_conn.commit() ext_conn.close() logger.info("External articles DB initialized ✓") except Exception as exc: logger.warning("Could not initialize external articles DB: %s", exc) # ── Main DB: users + check_history ─────────────────────────── conn = get_connection() cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, email TEXT NOT NULL UNIQUE, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, email_verified BOOLEAN NOT NULL DEFAULT FALSE, verification_code TEXT, verification_code_expires TIMESTAMP, created_at TIMESTAMP NOT NULL DEFAULT NOW() ) """ ) # Add UNIQUE constraint on username if it doesn't exist yet (for existing DBs) try: cursor.execute( "ALTER TABLE users ADD CONSTRAINT users_username_unique UNIQUE (username)" ) except psycopg2.errors.DuplicateTable: conn.rollback() cursor = conn.cursor() # Add email verification columns for existing DBs email_verified_added = False for col, col_def in [ ("email_verified", "BOOLEAN NOT NULL DEFAULT FALSE"), ("verification_code", "TEXT"), ("verification_code_expires", "TIMESTAMP"), ]: try: cursor.execute(f"ALTER TABLE users ADD COLUMN {col} {col_def}") conn.commit() if col == "email_verified": email_verified_added = True except psycopg2.errors.DuplicateColumn: conn.rollback() cursor = conn.cursor() # Mark all pre-existing accounts as verified (they existed before this feature) if email_verified_added: cursor.execute( "UPDATE users SET email_verified = TRUE WHERE verification_code IS NULL" ) conn.commit() logger.info("Marked all pre-existing accounts as email_verified = TRUE") cursor.execute( """ CREATE TABLE IF NOT EXISTS check_history ( id SERIAL PRIMARY KEY, user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, input_text TEXT NOT NULL, final_verdict TEXT, result_json TEXT, checked_at TIMESTAMP NOT NULL DEFAULT NOW() ) """ ) conn.commit() conn.close() # ── User helpers ────────────────────────────────────────────────────── def create_user(email: str, username: str, password_hash: str) -> dict: """Insert a new user and return the created record as a dict.""" conn = get_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) try: cursor.execute( "INSERT INTO users (email, username, password_hash) " "VALUES (%s, %s, %s) RETURNING id, email, username, created_at", (email, username, password_hash), ) user = dict(cursor.fetchone()) conn.commit() return user except psycopg2.errors.UniqueViolation: conn.rollback() return None finally: conn.close() def get_user_by_email(email: str) -> dict | None: """Look up a user by email. Returns full row (incl. password_hash) or None.""" conn = get_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute("SELECT * FROM users WHERE email = %s", (email,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def get_user_by_id(user_id: int) -> dict | None: """Look up a user by id. Returns row without password_hash or None.""" conn = get_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( "SELECT id, email, username, email_verified, created_at FROM users WHERE id = %s", (user_id,), ) row = cursor.fetchone() conn.close() return dict(row) if row else None def get_user_by_username(username: str) -> dict | None: """Look up a user by username. Returns row or None.""" conn = get_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( "SELECT * FROM users WHERE LOWER(username) = LOWER(%s)", (username,), ) row = cursor.fetchone() conn.close() return dict(row) if row else None def update_username(user_id: int, new_username: str) -> dict | None: """Update a user's username. Returns updated user dict or None on conflict.""" conn = get_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) try: cursor.execute( "UPDATE users SET username = %s WHERE id = %s RETURNING id, email, username, created_at", (new_username, user_id), ) user = cursor.fetchone() conn.commit() return dict(user) if user else None except psycopg2.errors.UniqueViolation: conn.rollback() return None finally: conn.close() def update_user_password(user_id: int, new_password_hash: str) -> bool: """Update a user's password hash. Returns True on success.""" conn = get_connection() cursor = conn.cursor() cursor.execute( "UPDATE users SET password_hash = %s WHERE id = %s", (new_password_hash, user_id), ) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def delete_user(user_id: int) -> bool: """Delete a user account and all related data. Returns True if deleted.""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def set_verification_code(user_id: int, code: str, expires_at) -> bool: """Store a verification code for the user.""" conn = get_connection() cursor = conn.cursor() cursor.execute( "UPDATE users SET verification_code = %s, verification_code_expires = %s WHERE id = %s", (code, expires_at, user_id), ) affected = cursor.rowcount conn.commit() conn.close() return affected > 0 def verify_email_code(user_id: int, code: str) -> str: """ Verify the email code for a user. Returns 'ok', 'invalid', or 'expired'. """ conn = get_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( "SELECT verification_code, verification_code_expires FROM users WHERE id = %s", (user_id,), ) row = cursor.fetchone() if not row or not row["verification_code"]: conn.close() return "invalid" if row["verification_code"] != code: conn.close() return "invalid" if row["verification_code_expires"] and row["verification_code_expires"] < datetime.now(): conn.close() return "expired" # Mark as verified and clear the code cursor.execute( "UPDATE users SET email_verified = TRUE, verification_code = NULL, verification_code_expires = NULL WHERE id = %s", (user_id,), ) conn.commit() conn.close() return "ok" def is_email_verified(user_id: int) -> bool: """Check if a user's email is verified.""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT email_verified FROM users WHERE id = %s", (user_id,)) row = cursor.fetchone() conn.close() return bool(row and row[0]) # ── Check-history helpers ───────────────────────────────────────────── def save_check_history(user_id: int, input_text: str, final_verdict: str, result_json: str): """Save a fact-check result to the user's history.""" conn = get_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO check_history (user_id, input_text, final_verdict, result_json) " "VALUES (%s, %s, %s, %s)", (user_id, input_text, final_verdict, result_json), ) conn.commit() conn.close() def get_user_history(user_id: int, limit: int = 50) -> list[dict]: """Return the most recent fact-check results for a user, keeping only the last 15 days.""" conn = get_connection() cursor = conn.cursor() # Auto-delete history older than 15 days for this user cursor.execute( "DELETE FROM check_history WHERE user_id = %s AND checked_at < NOW() - INTERVAL '15 days'", (user_id,) ) conn.commit() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( "SELECT id, input_text, final_verdict, result_json, checked_at " "FROM check_history WHERE user_id = %s ORDER BY checked_at DESC LIMIT %s", (user_id, limit), ) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] # ── Article helpers (existing) ──────────────────────────────────────── def insert_articles(articles): """Insert articles into the external articles database, skipping duplicates. Returns the number of newly inserted articles. """ conn = get_external_connection() cursor = conn.cursor() inserted = 0 for article in articles: cursor.execute( "INSERT INTO articles (title, url, source, published, scraped_at) " "VALUES (%s, %s, %s, %s, %s) " "ON CONFLICT (url) DO NOTHING", ( article["title"], article["url"], article["source"], article.get("published", "N/A"), datetime.now().isoformat(), ), ) if cursor.rowcount > 0: inserted += 1 conn.commit() conn.close() return inserted def get_all_articles(): """Return all articles from the external articles database.""" conn = get_external_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute("SELECT * FROM articles ORDER BY scraped_at DESC") rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def search_articles(query): """Search articles by keyword in title (case-insensitive) in external DB.""" conn = get_external_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( "SELECT * FROM articles WHERE title ILIKE %s ORDER BY scraped_at DESC", (f"%{query}%",), ) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_article_count(): """Return the total number of articles in the external database.""" try: conn = get_external_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM articles") count = cursor.fetchone()[0] conn.close() return count except Exception: return 0