import sqlite3, hashlib, secrets, time, random, json, os import urllib.request, urllib.error, urllib.parse BREVO_API_KEY = os.environ.get("BREVO_API_KEY", "") EMAIL_SENDER = os.environ.get("EMAIL_SENDER", "noreply@fitpro.app") SENDER_NAME = "FitPro AI" DB_PATH = "/tmp/fitpro.db" _conn = None def _db(): global _conn if _conn is None: _conn = sqlite3.connect(DB_PATH, check_same_thread=False) _conn.row_factory = sqlite3.Row _conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, password TEXT NOT NULL, token TEXT, created_at REAL, last_login REAL); CREATE TABLE IF NOT EXISTS otps ( key TEXT PRIMARY KEY, otp TEXT NOT NULL, expires_at REAL NOT NULL, username TEXT, password_hash TEXT); CREATE TABLE IF NOT EXISTS profiles ( username TEXT PRIMARY KEY, name TEXT, age INTEGER, gender TEXT, height REAL, weight REAL, goal TEXT, level TEXT, equipment TEXT, updated_at REAL); CREATE TABLE IF NOT EXISTS workouts ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, plan_text TEXT, days_total INTEGER, created_at REAL); CREATE TABLE IF NOT EXISTS tracking ( username TEXT NOT NULL, date TEXT NOT NULL, day_idx INTEGER, status TEXT DEFAULT 'pending', PRIMARY KEY (username, date)); """) # Add columns if upgrading from old schema try: _conn.execute("ALTER TABLE otps ADD COLUMN username TEXT") _conn.commit() except Exception: pass try: _conn.execute("ALTER TABLE otps ADD COLUMN password_hash TEXT") _conn.commit() except Exception: pass return _conn def _hash(pw): return hashlib.sha256(pw.encode()).hexdigest() def _token(): return secrets.token_hex(32) def _otp(): return str(random.randint(100000, 999999)) # ── User ops ────────────────────────────────────────────────────────────────── def get_user(username_or_email): v = username_or_email.strip().lower() c = _db().execute( "SELECT * FROM users WHERE lower(username)=? OR lower(email)=?", (v, v) ).fetchone() return dict(c) if c else None def get_user_by_username(username): c = _db().execute( "SELECT * FROM users WHERE lower(username)=?", (username.strip().lower(),) ).fetchone() return dict(c) if c else None def get_user_by_email(email): c = _db().execute( "SELECT * FROM users WHERE lower(email)=?", (email.strip().lower(),) ).fetchone() return dict(c) if c else None def create_user(username, email, password_hash_or_plain, already_hashed=False): token = _token() pw = password_hash_or_plain if already_hashed else _hash(password_hash_or_plain) _db().execute( "INSERT INTO users (username,email,password,token,created_at) VALUES (?,?,?,?,?)", (username.strip(), email.strip().lower(), pw, token, time.time()) ) _db().commit() return token def update_user_token(username, token): _db().execute( "UPDATE users SET token=?,last_login=? WHERE lower(username)=?", (token, time.time(), username.lower()) ) _db().commit() def logout_user(username): _db().execute( "UPDATE users SET token=NULL WHERE lower(username)=?", (username.lower(),) ) _db().commit() # ── Profile ops ─────────────────────────────────────────────────────────────── def save_profile(username, data: dict): _db().execute(""" INSERT INTO profiles (username,name,age,gender,height,weight,goal,level,equipment,updated_at) VALUES (?,?,?,?,?,?,?,?,?,?) ON CONFLICT(username) DO UPDATE SET name=excluded.name, age=excluded.age, gender=excluded.gender, height=excluded.height, weight=excluded.weight, goal=excluded.goal, level=excluded.level, equipment=excluded.equipment, updated_at=excluded.updated_at """, ( username, data.get("name",""), data.get("age",25), data.get("gender","Male"), data.get("height",170), data.get("weight",70), data.get("goal","Build Muscle"), data.get("level","Beginner"), json.dumps(data.get("equipment",[])), time.time() )) _db().commit() def load_profile(username): c = _db().execute("SELECT * FROM profiles WHERE username=?", (username,)).fetchone() if not c: return None d = dict(c) try: d["equipment"] = json.loads(d.get("equipment") or "[]") except Exception: d["equipment"] = [] return d # ── Workout ops ─────────────────────────────────────────────────────────────── def save_workout(username, plan_text, days_total): _db().execute( "INSERT INTO workouts (username,plan_text,days_total,created_at) VALUES (?,?,?,?)", (username, plan_text, days_total, time.time()) ) _db().commit() def load_latest_workout(username): c = _db().execute( "SELECT * FROM workouts WHERE username=? ORDER BY created_at DESC LIMIT 1", (username,) ).fetchone() return dict(c) if c else None # ── Tracking ops ────────────────────────────────────────────────────────────── def save_tracking(username, date_str, day_idx, status): _db().execute(""" INSERT INTO tracking (username,date,day_idx,status) VALUES (?,?,?,?) ON CONFLICT(username,date) DO UPDATE SET status=excluded.status, day_idx=excluded.day_idx """, (username, date_str, day_idx, status)) _db().commit() def load_tracking(username): rows = _db().execute( "SELECT date,day_idx,status FROM tracking WHERE username=?", (username,) ).fetchall() return {r["date"]: {"day_idx": r["day_idx"], "status": r["status"]} for r in rows} def delete_tracking(username): _db().execute("DELETE FROM tracking WHERE username=?", (username,)) _db().commit() # ── OTP ops — stores username+password_hash in DB so session_state not needed ─ def store_otp(email, otp, username="", password_plain=""): key = f"signup:{email.strip().lower()}" pw_hash = _hash(password_plain) if password_plain else "" _db().execute( """INSERT INTO otps(key,otp,expires_at,username,password_hash) VALUES(?,?,?,?,?) ON CONFLICT(key) DO UPDATE SET otp=excluded.otp, expires_at=excluded.expires_at, username=excluded.username, password_hash=excluded.password_hash""", (key, otp, time.time() + 600, username, pw_hash) ) _db().commit() def check_otp(email, otp_input): """Returns (ok, msg, username, password_hash)""" key = f"signup:{email.strip().lower()}" c = _db().execute("SELECT * FROM otps WHERE key=?", (key,)).fetchone() if not c: return False, "Code not found. Please sign up again.", "", "" if time.time() > c["expires_at"]: _db().execute("DELETE FROM otps WHERE key=?", (key,)) _db().commit() return False, "Code expired. Please sign up again.", "", "" if c["otp"] != otp_input.strip(): return False, "Incorrect code. Try again.", "", "" username = c["username"] or "" pw_hash = c["password_hash"] or "" _db().execute("DELETE FROM otps WHERE key=?", (key,)) _db().commit() return True, "OK", username, pw_hash # ── Brevo email ─────────────────────────────────────────────────────────────── def send_otp_email(to_email, otp): if not BREVO_API_KEY: return True, "__NO_EMAIL__" html = f"""
FITPRO AI
VERIFICATION
{otp}

Expires in 10 minutes. Don't share this code.

""" payload = json.dumps({ "sender": {"name": SENDER_NAME, "email": EMAIL_SENDER}, "to": [{"email": to_email}], "subject": f"FitPro AI — Your code: {otp}", "htmlContent": html, }).encode() req = urllib.request.Request( "https://api.brevo.com/v3/smtp/email", data=payload, headers={"accept": "application/json", "api-key": BREVO_API_KEY, "content-type": "application/json"}, method="POST" ) try: with urllib.request.urlopen(req, timeout=15) as r: return (r.status in (200, 201, 202)), "Sent" except Exception as e: return False, str(e) # ── Auth flows ──────────────────────────────────────────────────────────────── def initiate_signup(username, email, password): """ Returns (ok, mode_flag, token_or_none) ok=True, mode_flag='__DIRECT__', token → no Brevo, registered immediately ok=True, mode_flag='__OTP__', None → OTP sent, call complete_signup next ok=False, mode_flag=error_string, None → validation / duplicate error """ username = username.strip() email = email.strip().lower() if get_user_by_username(username): return False, "Username already taken.", None if get_user_by_email(email): return False, "Email already registered.", None if len(password) < 6: return False, "Password must be at least 6 characters.", None # No Brevo → register directly, log in immediately if not BREVO_API_KEY: token = create_user(username, email, password) return True, "__DIRECT__", token # Brevo configured → send OTP and store everything in DB otp = _otp() ok, msg = send_otp_email(email, otp) if not ok: return False, msg, None # Store username + hashed password in the OTP row so we don't need session_state store_otp(email, otp, username=username, password_plain=password) return True, "__OTP__", None def complete_signup(email, otp_input): """ No longer needs username/password — reads them from the OTP row in DB. Returns (ok, token_or_none, username_or_error_msg) """ ok, msg, username, pw_hash = check_otp(email, otp_input) if not ok: return False, None, msg if not username or not pw_hash: return False, None, "Session data missing. Please sign up again." token = create_user(username, email, pw_hash, already_hashed=True) return True, token, username def login(username_or_email, password): u = get_user(username_or_email) if not u: return False, None, None, "Account not found." if u["password"] != _hash(password): return False, None, None, "Incorrect password." token = _token() update_user_token(u["username"], token) return True, token, u["username"], "Welcome back!"