VerbaCorrect / database /database_manager.py
had345JER's picture
آخر نسخة 2
c9a5f71
# database/database_manager.py
from __future__ import annotations
import logging
import os
from datetime import date, timedelta
from typing import Optional
import bcrypt
import streamlit as st
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
load_dotenv()
# ── Logger ─────────────────────────────────────────────────────────────────────
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s — %(message)s",
)
# ── SRS constants ──────────────────────────────────────────────────────────────
def _next_review(mastery: float) -> str:
"""Return next review date as ISO-8601 string (YYYY-MM-DD).
Returned as str, not datetime.date, because the legacy {SQL Server}
ODBC driver raises HYC00 when binding Python date objects directly."""
mastery = max(0.0, min(mastery, 0.99))
interval = round((1 / (1 - mastery + 0.01)) ** 1.5)
interval = max(1, min(interval, 60))
return (date.today() + timedelta(days=interval)).isoformat()
def _new_mastery(old_mastery: float, passed: bool) -> float:
delta = 0.20 if passed else -0.10
return round(max(0.0, min(1.0, old_mastery + delta)), 4)
# ── SQLAlchemy Engine (singleton with QueuePool) ───────────────────────────────
_engine = None
def _get_engine():
global _engine
if _engine is not None:
return _engine
try:
conn_url = st.secrets["DATABASE_URL"]
if conn_url.startswith("postgres://"):
conn_url = conn_url.replace("postgres://", "postgresql://", 1)
except Exception:
# رابط التجربة الذي زودتِني به
conn_url = "postgresql://postgres.juyvtapnigpdavxaniqg:oklEE4CRY7VR3Jh2@aws-0-eu-west-1.pooler.supabase.com:6543/postgres"
_engine = create_engine(
conn_url,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,
)
logger.info("SQLAlchemy engine initialised for Supabase (PostgreSQL).")
return _engine
def _conn():
"""Context-managed connection from the pool."""
return _get_engine().connect()
# ── Cache busting helper ────────────────────────────────────────────────────────
def _bust_cache() -> None:
try:
get_performance_stats.clear()
get_top_errors.clear()
except Exception:
pass
# ── bcrypt helpers ─────────────────────────────────────────────────────────────
def _hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def _verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except Exception:
return False
# ── Input validation ───────────────────────────────────────────────────────────
def _validate_username(username: str) -> str:
username = username.strip()
if not username or len(username) < 3:
raise ValueError("Username must be at least 3 characters.")
if len(username) > 50:
raise ValueError("Username must be at most 50 characters.")
return username
def _validate_password(password: str) -> None:
if not password or len(password) < 6:
raise ValueError("Password must be at least 6 characters.")
# ══════════════════════════════════════════════════════════════════════════════
# AUTH
# ══════════════════════════════════════════════════════════════════════════════
def check_user(username: str, password: str) -> Optional[tuple]:
"""Return (Username, TotalXP, CurrentLevel, CurrentSentence, RegistrationDate) or None."""
try:
username = _validate_username(username)
except ValueError:
return None
try:
with _conn() as conn:
row = conn.execute(
text(
"SELECT Username, TotalXP, CurrentLevel, CurrentSentence, PasswordHash, "
"RegistrationDate "
"FROM Users WHERE Username = :u"
),
{"u": username},
).fetchone()
if row is None:
return None
if row[4] and _verify_password(password, row[4]):
return (row[0], row[1], row[2], row[3], row[5])
return None
except Exception as exc:
logger.error("check_user failed for '%s': %s", username, exc, exc_info=True)
st.error("Database connection error. Please try again.")
return None
def add_user(username: str, password: str) -> bool:
try:
username = _validate_username(username)
_validate_password(password)
except ValueError as ve:
st.error(str(ve))
return False
try:
hashed = _hash_password(password)
with _conn() as conn:
conn.execute(
text(
"INSERT INTO Users (Username, PasswordHash, TotalXP, CurrentLevel, "
"CurrentSentence, UserLevel) VALUES (:u, :h, 0, 0, 0, 'Beginner')"
),
{"u": username, "h": hashed},
)
conn.commit()
_bust_cache()
return True
except Exception as exc:
err_str = str(exc).lower()
if "unique" in err_str or "duplicate" in err_str or "violation" in err_str:
st.error("Username already taken.")
else:
logger.error("add_user failed for '%s': %s", username, exc, exc_info=True)
st.error("Could not create account. Please try again.")
return False
# ══════════════════════════════════════════════════════════════════════════════
# PROGRESS
# ══════════════════════════════════════════════════════════════════════════════
def update_user_progress(
username: str, xp: int, level: int, sentence: int
) -> None:
try:
with _conn() as conn:
conn.execute(
text(
"UPDATE Users SET TotalXP=:xp, CurrentLevel=:lv, "
"CurrentSentence=:sn WHERE Username=:u"
),
{"xp": xp, "lv": level, "sn": sentence, "u": username},
)
conn.commit()
except Exception as exc:
logger.warning("update_user_progress failed: %s", exc)
# ══════════════════════════════════════════════════════════════════════════════
# PERFORMANCE LOGGING
# ══════════════════════════════════════════════════════════════════════════════
def log_performance(
username: str,
target: str,
transcript: str,
accuracy: float,
status: str,
) -> None:
try:
with _conn() as conn:
conn.execute(
text(
"INSERT INTO PerformanceLog "
"(Username, TargetSentence, UserTranscript, AccuracyPercentage, FeedbackStatus) "
"VALUES (:u, :t, :tr, :a, :s)"
),
{
"u": username,
"t": target,
"tr": transcript,
"a": round(accuracy, 2),
"s": status,
},
)
conn.commit()
_bust_cache()
except Exception as exc:
logger.warning("log_performance failed: %s", exc)
@st.cache_data(ttl=30)
def get_performance_stats(username: str) -> list[tuple]:
try:
with _conn() as conn:
rows = conn.execute(
text(
"SELECT AccuracyPercentage, FeedbackStatus, TargetSentence, "
"UserTranscript, SessionTimestamp FROM PerformanceLog "
"WHERE Username=:u ORDER BY SessionTimestamp DESC LIMIT 10"
),
{"u": username},
).fetchall()
return [tuple(r) for r in rows]
except Exception as exc:
logger.warning("get_performance_stats failed: %s", exc)
st.warning("Stats fetch failed.")
return []
def get_all_performance_stats(username: str) -> list[tuple]:
try:
with _conn() as conn:
rows = conn.execute(
text(
"SELECT AccuracyPercentage, SessionTimestamp, TargetSentence "
"FROM PerformanceLog WHERE Username=:u ORDER BY SessionTimestamp ASC"
),
{"u": username},
).fetchall()
return [tuple(r) for r in rows]
except Exception as exc:
logger.warning("get_all_performance_stats failed: %s", exc)
st.warning("Full stats fetch failed.")
return []
# ══════════════════════════════════════════════════════════════════════════════
# WORD-ERROR TRACKING — SRS-enhanced
# ══════════════════════════════════════════════════════════════════════════════
def log_word_error(
username: str,
expected_word: str,
user_said: str,
phoneme_target: str = "",
) -> None:
try:
expected_word = expected_word.lower().strip()[:90]
user_said = user_said.lower().strip()[:90]
phoneme_target = phoneme_target[:254] if phoneme_target else ""
with _conn() as conn:
row = conn.execute(
text(
"SELECT ErrorID, ErrorCount, mastery_score FROM UserErrors "
"WHERE Username=:u AND ExpectedWord=:e AND UserSaid=:s"
),
{"u": username, "e": expected_word, "s": user_said},
).fetchone()
if row:
new_count = row[1] + 1
new_mastery = _new_mastery(float(row[2] or 0.0), passed=False)
new_review = _next_review(new_mastery)
conn.execute(
text(
"UPDATE UserErrors SET ErrorCount=:c, mastery_score=:m, "
"next_review_date=:r, LastSeen=CURRENT_TIMESTAMP, phoneme_target=:p "
"WHERE ErrorID=:id"
),
{
"c": new_count,
"m": new_mastery,
"r": new_review,
"p": phoneme_target,
"id": row[0],
},
)
else:
first_review = date.today().isoformat()
conn.execute(
text(
"INSERT INTO UserErrors "
"(Username, ExpectedWord, UserSaid, ErrorCount, mastery_score, "
"next_review_date, phoneme_target, LastSeen) "
"VALUES (:u, :e, :s, 1, 0.0, :r, :p, CURRENT_TIMESTAMP)"
),
{
"u": username,
"e": expected_word,
"s": user_said,
"r": first_review,
"p": phoneme_target,
},
)
conn.commit()
except Exception as exc:
logger.error("log_word_error فشل: %s", exc, exc_info=True)
st.error(f"SQL INSERT FAILED (log_word_error): {exc}")
def update_word_mastery(username: str, expected_word: str, passed: bool) -> None:
try:
expected_word = expected_word.lower().strip()[:90]
with _conn() as conn:
rows = conn.execute(
text(
"SELECT ErrorID, mastery_score FROM UserErrors "
"WHERE Username=:u AND ExpectedWord=:e"
),
{"u": username, "e": expected_word},
).fetchall()
for row in rows:
new_mastery = _new_mastery(float(row[1] or 0.0), passed=passed)
new_review = _next_review(new_mastery)
conn.execute(
text(
"UPDATE UserErrors SET mastery_score=:m, next_review_date=:r "
"WHERE ErrorID=:id"
),
{"m": new_mastery, "r": new_review, "id": row[0]},
)
conn.commit()
except Exception as exc:
logger.error("update_word_mastery failed: %s", exc, exc_info=True)
def get_recent_errors(username: str, limit: int = 10) -> list[dict]:
try:
with _conn() as conn:
rows = conn.execute(
text(
"SELECT ExpectedWord, UserSaid, ErrorCount, "
"COALESCE(mastery_score, 0.0), COALESCE(phoneme_target, '') "
"FROM UserErrors WHERE Username=:u "
"ORDER BY ErrorCount DESC, LastSeen DESC "
"LIMIT :lim OFFSET 0"
),
{"lim": limit, "u": username},
).fetchall()
return [
{
"expected": r[0],
"said": r[1],
"count": r[2],
"mastery": float(r[3]),
"phoneme": r[4],
}
for r in rows
]
except Exception as exc:
logger.error("get_recent_errors failed: %s", exc, exc_info=True)
return []
def get_srs_due_errors(username: str, limit: int = 10) -> list[dict]:
try:
with _conn() as conn:
rows = conn.execute(
text(
"SELECT ExpectedWord, UserSaid, ErrorCount, "
"COALESCE(mastery_score, 0.0), COALESCE(phoneme_target, '') "
"FROM UserErrors "
"WHERE Username=:u "
" AND COALESCE(mastery_score, 0.0) < 0.90 "
" AND (next_review_date IS NULL "
" OR next_review_date <= CURRENT_DATE) "
"ORDER BY COALESCE(mastery_score, 0.0) ASC, ErrorCount DESC "
"LIMIT :lim OFFSET 0"
),
{"lim": limit, "u": username},
).fetchall()
return [
{
"expected": r[0],
"said": r[1],
"count": r[2],
"mastery": float(r[3]),
"phoneme": r[4],
}
for r in rows
]
except Exception as exc:
logger.error("get_srs_due_errors failed: %s", exc, exc_info=True)
return []
@st.cache_data(ttl=30)
def get_top_errors(username: str, limit: int = 5) -> list[dict]:
try:
with _conn() as conn:
rows = conn.execute(
text(
"SELECT ExpectedWord, UserSaid, ErrorCount "
"FROM UserErrors WHERE Username=:u "
"ORDER BY ErrorCount DESC "
"LIMIT :lim OFFSET 0"
),
{"u": username, "lim": limit},
).fetchall()
return [{"expected": r[0], "said": r[1], "count": r[2]} for r in rows]
except Exception as exc:
logger.error("get_top_errors failed: %s", exc, exc_info=True)
return []
def backfill_null_dates() -> int:
try:
with _conn() as conn:
result = conn.execute(
text(
"UPDATE UserErrors SET LastSeen = CURRENT_TIMESTAMP "
"WHERE LastSeen IS NULL"
)
)
conn.commit()
count = result.rowcount
logger.info("backfill_null_dates: updated %d rows.", count)
return count
except Exception as exc:
logger.error("backfill_null_dates failed: %s", exc, exc_info=True)
return -1