Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import psycopg2 | |
| import psycopg2.extras | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| logger = logging.getLogger(__name__) | |
| # Load .env from project root (does NOT override existing env vars) | |
| load_dotenv( | |
| os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env") | |
| ) | |
| # ββ Main DB (users, check_history) βββββββββββββββββββββββββββββββββββ | |
| DATABASE_URL = os.getenv( | |
| "DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/factcheck" | |
| ) | |
| # ββ External DB (scraped articles) βββββββββββββββββββββββββββββββββββ | |
| # Falls back to DATABASE_URL if not set, so existing setups keep working. | |
| EXTERNAL_DATABASE_URL = os.getenv("EXTERNAL_DATABASE_URL", DATABASE_URL) | |
| # Log masked URLs so we can debug connection issues | |
| def _mask_url(url): | |
| return url[:25] + "***" + url[-30:] if len(url) > 55 else "***" | |
| logger.info("DATABASE_URL = %s", _mask_url(DATABASE_URL)) | |
| logger.info("EXTERNAL_DATABASE_URL = %s", _mask_url(EXTERNAL_DATABASE_URL)) | |
| def get_connection(): | |
| """Get a connection to the main PostgreSQL database (users, history).""" | |
| try: | |
| conn = psycopg2.connect(DATABASE_URL) | |
| return conn | |
| except Exception as exc: | |
| logger.error("Failed to connect to main database: %s", exc) | |
| raise | |
| def get_external_connection(): | |
| """Get a connection to the external articles PostgreSQL database.""" | |
| try: | |
| conn = psycopg2.connect(EXTERNAL_DATABASE_URL) | |
| return conn | |
| except Exception as exc: | |
| logger.error("Failed to connect to external articles database: %s", exc) | |
| raise | |
| def init_db(): | |
| """Create the users and check_history tables (main DB) + articles table (external DB).""" | |
| # ββ External DB: articles table ββββββββββββββββββββββββββββββ | |
| try: | |
| ext_conn = get_external_connection() | |
| ext_cursor = ext_conn.cursor() | |
| ext_cursor.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS articles ( | |
| id SERIAL PRIMARY KEY, | |
| title TEXT NOT NULL, | |
| url TEXT NOT NULL UNIQUE, | |
| source TEXT NOT NULL, | |
| published TEXT, | |
| scraped_at TEXT NOT NULL | |
| ) | |
| """ | |
| ) | |
| ext_conn.commit() | |
| ext_conn.close() | |
| logger.info("External articles DB initialized β") | |
| except Exception as exc: | |
| logger.warning("Could not initialize external articles DB: %s", exc) | |
| # ββ Main DB: users + check_history βββββββββββββββββββββββββββ | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id SERIAL PRIMARY KEY, | |
| email TEXT NOT NULL UNIQUE, | |
| username TEXT NOT NULL UNIQUE, | |
| password_hash TEXT NOT NULL, | |
| email_verified BOOLEAN NOT NULL DEFAULT FALSE, | |
| verification_code TEXT, | |
| verification_code_expires TIMESTAMP, | |
| created_at TIMESTAMP NOT NULL DEFAULT NOW() | |
| ) | |
| """ | |
| ) | |
| # Add UNIQUE constraint on username if it doesn't exist yet (for existing DBs) | |
| try: | |
| cursor.execute( | |
| "ALTER TABLE users ADD CONSTRAINT users_username_unique UNIQUE (username)" | |
| ) | |
| except psycopg2.errors.DuplicateTable: | |
| conn.rollback() | |
| cursor = conn.cursor() | |
| # Add email verification columns for existing DBs | |
| email_verified_added = False | |
| for col, col_def in [ | |
| ("email_verified", "BOOLEAN NOT NULL DEFAULT FALSE"), | |
| ("verification_code", "TEXT"), | |
| ("verification_code_expires", "TIMESTAMP"), | |
| ]: | |
| try: | |
| cursor.execute(f"ALTER TABLE users ADD COLUMN {col} {col_def}") | |
| conn.commit() | |
| if col == "email_verified": | |
| email_verified_added = True | |
| except psycopg2.errors.DuplicateColumn: | |
| conn.rollback() | |
| cursor = conn.cursor() | |
| # Mark all pre-existing accounts as verified (they existed before this feature) | |
| if email_verified_added: | |
| cursor.execute( | |
| "UPDATE users SET email_verified = TRUE WHERE verification_code IS NULL" | |
| ) | |
| conn.commit() | |
| logger.info("Marked all pre-existing accounts as email_verified = TRUE") | |
| cursor.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS check_history ( | |
| id SERIAL PRIMARY KEY, | |
| user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, | |
| input_text TEXT NOT NULL, | |
| final_verdict TEXT, | |
| result_json TEXT, | |
| checked_at TIMESTAMP NOT NULL DEFAULT NOW() | |
| ) | |
| """ | |
| ) | |
| conn.commit() | |
| conn.close() | |
| # ββ User helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_user(email: str, username: str, password_hash: str) -> dict: | |
| """Insert a new user and return the created record as a dict.""" | |
| conn = get_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| try: | |
| cursor.execute( | |
| "INSERT INTO users (email, username, password_hash) " | |
| "VALUES (%s, %s, %s) RETURNING id, email, username, created_at", | |
| (email, username, password_hash), | |
| ) | |
| user = dict(cursor.fetchone()) | |
| conn.commit() | |
| return user | |
| except psycopg2.errors.UniqueViolation: | |
| conn.rollback() | |
| return None | |
| finally: | |
| conn.close() | |
| def get_user_by_email(email: str) -> dict | None: | |
| """Look up a user by email. Returns full row (incl. password_hash) or None.""" | |
| conn = get_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cursor.execute("SELECT * FROM users WHERE email = %s", (email,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def get_user_by_id(user_id: int) -> dict | None: | |
| """Look up a user by id. Returns row without password_hash or None.""" | |
| conn = get_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cursor.execute( | |
| "SELECT id, email, username, email_verified, created_at FROM users WHERE id = %s", | |
| (user_id,), | |
| ) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def get_user_by_username(username: str) -> dict | None: | |
| """Look up a user by username. Returns row or None.""" | |
| conn = get_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cursor.execute( | |
| "SELECT * FROM users WHERE LOWER(username) = LOWER(%s)", | |
| (username,), | |
| ) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def update_username(user_id: int, new_username: str) -> dict | None: | |
| """Update a user's username. Returns updated user dict or None on conflict.""" | |
| conn = get_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| try: | |
| cursor.execute( | |
| "UPDATE users SET username = %s WHERE id = %s RETURNING id, email, username, created_at", | |
| (new_username, user_id), | |
| ) | |
| user = cursor.fetchone() | |
| conn.commit() | |
| return dict(user) if user else None | |
| except psycopg2.errors.UniqueViolation: | |
| conn.rollback() | |
| return None | |
| finally: | |
| conn.close() | |
| def update_user_password(user_id: int, new_password_hash: str) -> bool: | |
| """Update a user's password hash. Returns True on success.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "UPDATE users SET password_hash = %s WHERE id = %s", | |
| (new_password_hash, user_id), | |
| ) | |
| affected = cursor.rowcount | |
| conn.commit() | |
| conn.close() | |
| return affected > 0 | |
| def delete_user(user_id: int) -> bool: | |
| """Delete a user account and all related data. Returns True if deleted.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) | |
| affected = cursor.rowcount | |
| conn.commit() | |
| conn.close() | |
| return affected > 0 | |
| def set_verification_code(user_id: int, code: str, expires_at) -> bool: | |
| """Store a verification code for the user.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "UPDATE users SET verification_code = %s, verification_code_expires = %s WHERE id = %s", | |
| (code, expires_at, user_id), | |
| ) | |
| affected = cursor.rowcount | |
| conn.commit() | |
| conn.close() | |
| return affected > 0 | |
| def verify_email_code(user_id: int, code: str) -> str: | |
| """ | |
| Verify the email code for a user. | |
| Returns 'ok', 'invalid', or 'expired'. | |
| """ | |
| conn = get_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cursor.execute( | |
| "SELECT verification_code, verification_code_expires FROM users WHERE id = %s", | |
| (user_id,), | |
| ) | |
| row = cursor.fetchone() | |
| if not row or not row["verification_code"]: | |
| conn.close() | |
| return "invalid" | |
| if row["verification_code"] != code: | |
| conn.close() | |
| return "invalid" | |
| if row["verification_code_expires"] and row["verification_code_expires"] < datetime.now(): | |
| conn.close() | |
| return "expired" | |
| # Mark as verified and clear the code | |
| cursor.execute( | |
| "UPDATE users SET email_verified = TRUE, verification_code = NULL, verification_code_expires = NULL WHERE id = %s", | |
| (user_id,), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return "ok" | |
| def is_email_verified(user_id: int) -> bool: | |
| """Check if a user's email is verified.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT email_verified FROM users WHERE id = %s", (user_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return bool(row and row[0]) | |
| # ββ Check-history helpers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_check_history(user_id: int, input_text: str, final_verdict: str, result_json: str): | |
| """Save a fact-check result to the user's history.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "INSERT INTO check_history (user_id, input_text, final_verdict, result_json) " | |
| "VALUES (%s, %s, %s, %s)", | |
| (user_id, input_text, final_verdict, result_json), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def get_user_history(user_id: int, limit: int = 50) -> list[dict]: | |
| """Return the most recent fact-check results for a user, keeping only the last 15 days.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| # Auto-delete history older than 15 days for this user | |
| cursor.execute( | |
| "DELETE FROM check_history WHERE user_id = %s AND checked_at < NOW() - INTERVAL '15 days'", | |
| (user_id,) | |
| ) | |
| conn.commit() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cursor.execute( | |
| "SELECT id, input_text, final_verdict, result_json, checked_at " | |
| "FROM check_history WHERE user_id = %s ORDER BY checked_at DESC LIMIT %s", | |
| (user_id, limit), | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| # ββ Article helpers (existing) ββββββββββββββββββββββββββββββββββββββββ | |
| def insert_articles(articles): | |
| """Insert articles into the external articles database, skipping duplicates. | |
| Returns the number of newly inserted articles. | |
| """ | |
| conn = get_external_connection() | |
| cursor = conn.cursor() | |
| inserted = 0 | |
| for article in articles: | |
| cursor.execute( | |
| "INSERT INTO articles (title, url, source, published, scraped_at) " | |
| "VALUES (%s, %s, %s, %s, %s) " | |
| "ON CONFLICT (url) DO NOTHING", | |
| ( | |
| article["title"], | |
| article["url"], | |
| article["source"], | |
| article.get("published", "N/A"), | |
| datetime.now().isoformat(), | |
| ), | |
| ) | |
| if cursor.rowcount > 0: | |
| inserted += 1 | |
| conn.commit() | |
| conn.close() | |
| return inserted | |
| def get_all_articles(): | |
| """Return all articles from the external articles database.""" | |
| conn = get_external_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cursor.execute("SELECT * FROM articles ORDER BY scraped_at DESC") | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def search_articles(query): | |
| """Search articles by keyword in title (case-insensitive) in external DB.""" | |
| conn = get_external_connection() | |
| cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cursor.execute( | |
| "SELECT * FROM articles WHERE title ILIKE %s ORDER BY scraped_at DESC", | |
| (f"%{query}%",), | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def get_article_count(): | |
| """Return the total number of articles in the external database.""" | |
| try: | |
| conn = get_external_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT COUNT(*) FROM articles") | |
| count = cursor.fetchone()[0] | |
| conn.close() | |
| return count | |
| except Exception: | |
| return 0 | |