# database/database_manager.py from __future__ import annotations import logging import os from datetime import date, timedelta from typing import Optional import bcrypt import streamlit as st from dotenv import load_dotenv from sqlalchemy import create_engine, text from sqlalchemy.pool import QueuePool load_dotenv() # ── Logger ───────────────────────────────────────────────────────────────────── logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s — %(message)s", ) # ── SRS constants ────────────────────────────────────────────────────────────── def _next_review(mastery: float) -> str: """Return next review date as ISO-8601 string (YYYY-MM-DD). Returned as str, not datetime.date, because the legacy {SQL Server} ODBC driver raises HYC00 when binding Python date objects directly.""" mastery = max(0.0, min(mastery, 0.99)) interval = round((1 / (1 - mastery + 0.01)) ** 1.5) interval = max(1, min(interval, 60)) return (date.today() + timedelta(days=interval)).isoformat() def _new_mastery(old_mastery: float, passed: bool) -> float: delta = 0.20 if passed else -0.10 return round(max(0.0, min(1.0, old_mastery + delta)), 4) # ── SQLAlchemy Engine (singleton with QueuePool) ─────────────────────────────── _engine = None def _get_engine(): global _engine if _engine is not None: return _engine try: conn_url = st.secrets["DATABASE_URL"] if conn_url.startswith("postgres://"): conn_url = conn_url.replace("postgres://", "postgresql://", 1) except Exception: # رابط التجربة الذي زودتِني به conn_url = "postgresql://postgres.juyvtapnigpdavxaniqg:oklEE4CRY7VR3Jh2@aws-0-eu-west-1.pooler.supabase.com:6543/postgres" _engine = create_engine( conn_url, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_pre_ping=True, pool_recycle=1800, ) logger.info("SQLAlchemy engine initialised for Supabase (PostgreSQL).") return _engine def _conn(): """Context-managed connection from the pool.""" return _get_engine().connect() # ── Cache busting helper ──────────────────────────────────────────────────────── def _bust_cache() -> None: try: get_performance_stats.clear() get_top_errors.clear() except Exception: pass # ── bcrypt helpers ───────────────────────────────────────────────────────────── def _hash_password(plain: str) -> str: return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def _verify_password(plain: str, hashed: str) -> bool: try: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) except Exception: return False # ── Input validation ─────────────────────────────────────────────────────────── def _validate_username(username: str) -> str: username = username.strip() if not username or len(username) < 3: raise ValueError("Username must be at least 3 characters.") if len(username) > 50: raise ValueError("Username must be at most 50 characters.") return username def _validate_password(password: str) -> None: if not password or len(password) < 6: raise ValueError("Password must be at least 6 characters.") # ══════════════════════════════════════════════════════════════════════════════ # AUTH # ══════════════════════════════════════════════════════════════════════════════ def check_user(username: str, password: str) -> Optional[tuple]: """Return (Username, TotalXP, CurrentLevel, CurrentSentence, RegistrationDate) or None.""" try: username = _validate_username(username) except ValueError: return None try: with _conn() as conn: row = conn.execute( text( "SELECT Username, TotalXP, CurrentLevel, CurrentSentence, PasswordHash, " "RegistrationDate " "FROM Users WHERE Username = :u" ), {"u": username}, ).fetchone() if row is None: return None if row[4] and _verify_password(password, row[4]): return (row[0], row[1], row[2], row[3], row[5]) return None except Exception as exc: logger.error("check_user failed for '%s': %s", username, exc, exc_info=True) st.error("Database connection error. Please try again.") return None def add_user(username: str, password: str) -> bool: try: username = _validate_username(username) _validate_password(password) except ValueError as ve: st.error(str(ve)) return False try: hashed = _hash_password(password) with _conn() as conn: conn.execute( text( "INSERT INTO Users (Username, PasswordHash, TotalXP, CurrentLevel, " "CurrentSentence, UserLevel) VALUES (:u, :h, 0, 0, 0, 'Beginner')" ), {"u": username, "h": hashed}, ) conn.commit() _bust_cache() return True except Exception as exc: err_str = str(exc).lower() if "unique" in err_str or "duplicate" in err_str or "violation" in err_str: st.error("Username already taken.") else: logger.error("add_user failed for '%s': %s", username, exc, exc_info=True) st.error("Could not create account. Please try again.") return False # ══════════════════════════════════════════════════════════════════════════════ # PROGRESS # ══════════════════════════════════════════════════════════════════════════════ def update_user_progress( username: str, xp: int, level: int, sentence: int ) -> None: try: with _conn() as conn: conn.execute( text( "UPDATE Users SET TotalXP=:xp, CurrentLevel=:lv, " "CurrentSentence=:sn WHERE Username=:u" ), {"xp": xp, "lv": level, "sn": sentence, "u": username}, ) conn.commit() except Exception as exc: logger.warning("update_user_progress failed: %s", exc) # ══════════════════════════════════════════════════════════════════════════════ # PERFORMANCE LOGGING # ══════════════════════════════════════════════════════════════════════════════ def log_performance( username: str, target: str, transcript: str, accuracy: float, status: str, ) -> None: try: with _conn() as conn: conn.execute( text( "INSERT INTO PerformanceLog " "(Username, TargetSentence, UserTranscript, AccuracyPercentage, FeedbackStatus) " "VALUES (:u, :t, :tr, :a, :s)" ), { "u": username, "t": target, "tr": transcript, "a": round(accuracy, 2), "s": status, }, ) conn.commit() _bust_cache() except Exception as exc: logger.warning("log_performance failed: %s", exc) @st.cache_data(ttl=30) def get_performance_stats(username: str) -> list[tuple]: try: with _conn() as conn: rows = conn.execute( text( "SELECT AccuracyPercentage, FeedbackStatus, TargetSentence, " "UserTranscript, SessionTimestamp FROM PerformanceLog " "WHERE Username=:u ORDER BY SessionTimestamp DESC LIMIT 10" ), {"u": username}, ).fetchall() return [tuple(r) for r in rows] except Exception as exc: logger.warning("get_performance_stats failed: %s", exc) st.warning("Stats fetch failed.") return [] def get_all_performance_stats(username: str) -> list[tuple]: try: with _conn() as conn: rows = conn.execute( text( "SELECT AccuracyPercentage, SessionTimestamp, TargetSentence " "FROM PerformanceLog WHERE Username=:u ORDER BY SessionTimestamp ASC" ), {"u": username}, ).fetchall() return [tuple(r) for r in rows] except Exception as exc: logger.warning("get_all_performance_stats failed: %s", exc) st.warning("Full stats fetch failed.") return [] # ══════════════════════════════════════════════════════════════════════════════ # WORD-ERROR TRACKING — SRS-enhanced # ══════════════════════════════════════════════════════════════════════════════ def log_word_error( username: str, expected_word: str, user_said: str, phoneme_target: str = "", ) -> None: try: expected_word = expected_word.lower().strip()[:90] user_said = user_said.lower().strip()[:90] phoneme_target = phoneme_target[:254] if phoneme_target else "" with _conn() as conn: row = conn.execute( text( "SELECT ErrorID, ErrorCount, mastery_score FROM UserErrors " "WHERE Username=:u AND ExpectedWord=:e AND UserSaid=:s" ), {"u": username, "e": expected_word, "s": user_said}, ).fetchone() if row: new_count = row[1] + 1 new_mastery = _new_mastery(float(row[2] or 0.0), passed=False) new_review = _next_review(new_mastery) conn.execute( text( "UPDATE UserErrors SET ErrorCount=:c, mastery_score=:m, " "next_review_date=:r, LastSeen=CURRENT_TIMESTAMP, phoneme_target=:p " "WHERE ErrorID=:id" ), { "c": new_count, "m": new_mastery, "r": new_review, "p": phoneme_target, "id": row[0], }, ) else: first_review = date.today().isoformat() conn.execute( text( "INSERT INTO UserErrors " "(Username, ExpectedWord, UserSaid, ErrorCount, mastery_score, " "next_review_date, phoneme_target, LastSeen) " "VALUES (:u, :e, :s, 1, 0.0, :r, :p, CURRENT_TIMESTAMP)" ), { "u": username, "e": expected_word, "s": user_said, "r": first_review, "p": phoneme_target, }, ) conn.commit() except Exception as exc: logger.error("log_word_error فشل: %s", exc, exc_info=True) st.error(f"SQL INSERT FAILED (log_word_error): {exc}") def update_word_mastery(username: str, expected_word: str, passed: bool) -> None: try: expected_word = expected_word.lower().strip()[:90] with _conn() as conn: rows = conn.execute( text( "SELECT ErrorID, mastery_score FROM UserErrors " "WHERE Username=:u AND ExpectedWord=:e" ), {"u": username, "e": expected_word}, ).fetchall() for row in rows: new_mastery = _new_mastery(float(row[1] or 0.0), passed=passed) new_review = _next_review(new_mastery) conn.execute( text( "UPDATE UserErrors SET mastery_score=:m, next_review_date=:r " "WHERE ErrorID=:id" ), {"m": new_mastery, "r": new_review, "id": row[0]}, ) conn.commit() except Exception as exc: logger.error("update_word_mastery failed: %s", exc, exc_info=True) def get_recent_errors(username: str, limit: int = 10) -> list[dict]: try: with _conn() as conn: rows = conn.execute( text( "SELECT ExpectedWord, UserSaid, ErrorCount, " "COALESCE(mastery_score, 0.0), COALESCE(phoneme_target, '') " "FROM UserErrors WHERE Username=:u " "ORDER BY ErrorCount DESC, LastSeen DESC " "LIMIT :lim OFFSET 0" ), {"lim": limit, "u": username}, ).fetchall() return [ { "expected": r[0], "said": r[1], "count": r[2], "mastery": float(r[3]), "phoneme": r[4], } for r in rows ] except Exception as exc: logger.error("get_recent_errors failed: %s", exc, exc_info=True) return [] def get_srs_due_errors(username: str, limit: int = 10) -> list[dict]: try: with _conn() as conn: rows = conn.execute( text( "SELECT ExpectedWord, UserSaid, ErrorCount, " "COALESCE(mastery_score, 0.0), COALESCE(phoneme_target, '') " "FROM UserErrors " "WHERE Username=:u " " AND COALESCE(mastery_score, 0.0) < 0.90 " " AND (next_review_date IS NULL " " OR next_review_date <= CURRENT_DATE) " "ORDER BY COALESCE(mastery_score, 0.0) ASC, ErrorCount DESC " "LIMIT :lim OFFSET 0" ), {"lim": limit, "u": username}, ).fetchall() return [ { "expected": r[0], "said": r[1], "count": r[2], "mastery": float(r[3]), "phoneme": r[4], } for r in rows ] except Exception as exc: logger.error("get_srs_due_errors failed: %s", exc, exc_info=True) return [] @st.cache_data(ttl=30) def get_top_errors(username: str, limit: int = 5) -> list[dict]: try: with _conn() as conn: rows = conn.execute( text( "SELECT ExpectedWord, UserSaid, ErrorCount " "FROM UserErrors WHERE Username=:u " "ORDER BY ErrorCount DESC " "LIMIT :lim OFFSET 0" ), {"u": username, "lim": limit}, ).fetchall() return [{"expected": r[0], "said": r[1], "count": r[2]} for r in rows] except Exception as exc: logger.error("get_top_errors failed: %s", exc, exc_info=True) return [] def backfill_null_dates() -> int: try: with _conn() as conn: result = conn.execute( text( "UPDATE UserErrors SET LastSeen = CURRENT_TIMESTAMP " "WHERE LastSeen IS NULL" ) ) conn.commit() count = result.rowcount logger.info("backfill_null_dates: updated %d rows.", count) return count except Exception as exc: logger.error("backfill_null_dates failed: %s", exc, exc_info=True) return -1