srilakshu012456's picture
Rename login.py to services/login.py
69d6ba4 verified
# services/login.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import sqlite3, os, hmac, hashlib
from typing import Optional
router = APIRouter()
# Resolve absolute path to db/users.db (adjust if your file is elsewhere)
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # .../services
PROJECT_ROOT = os.path.dirname(BASE_DIR) # project root
DB_PATH = os.path.join(PROJECT_ROOT, "db", "users.db") # .../db/users.db
def get_conn():
# Helpful debug: see exactly which file is used
print(f"[login.py] Connecting to DB: {DB_PATH} | exists={os.path.exists(DB_PATH)}")
return sqlite3.connect(DB_PATH)
def pbkdf2_hash(password: str, salt_hex: Optional[str] = None) -> tuple[str, str]:
if not salt_hex:
salt_hex = os.urandom(16).hex()
salt = bytes.fromhex(salt_hex)
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 100_000)
return dk.hex(), salt_hex
def verify_password(password: str, stored_hash_hex: str, salt_hex: str) -> bool:
actual_hash_hex, _ = pbkdf2_hash(password, salt_hex)
return hmac.compare_digest(actual_hash_hex, stored_hash_hex)
class LoginRequest(BaseModel):
username: str
password: str
@router.post("/login")
def login(req: LoginRequest):
username = req.username.strip()
password = req.password
if not username or not password:
raise HTTPException(status_code=400, detail="Username and password required.")
conn = get_conn()
try:
cur = conn.execute("SELECT password_hash, salt FROM users WHERE username = ?", (username,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=401, detail="Invalid credentials.")
stored_hash, salt = row
if not verify_password(password, stored_hash, salt):
raise HTTPException(status_code=401, detail="Invalid credentials.")
return {"ok": True, "username": username}
finally:
conn.close()