|
|
""" |
|
|
Authentication router for simple login system |
|
|
""" |
|
|
import os |
|
|
import secrets |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, Optional |
|
|
from fastapi import APIRouter, HTTPException, Response, Cookie, Form, Request |
|
|
from fastapi.responses import JSONResponse |
|
|
from pydantic import BaseModel |
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired |
|
|
import logging |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
router = APIRouter(prefix="/auth", tags=["Authentication"]) |
|
|
|
|
|
|
|
|
SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", secrets.token_hex(32)) |
|
|
SESSION_MAX_AGE = 86400 |
|
|
serializer = URLSafeTimedSerializer(SESSION_SECRET_KEY) |
|
|
|
|
|
|
|
|
|
|
|
active_sessions: Dict[str, dict] = {} |
|
|
|
|
|
|
|
|
class LoginRequest(BaseModel): |
|
|
username: str |
|
|
password: str |
|
|
|
|
|
|
|
|
class LoginResponse(BaseModel): |
|
|
success: bool |
|
|
message: str |
|
|
|
|
|
|
|
|
def create_session(username: str) -> str: |
|
|
"""Create a new session token""" |
|
|
session_id = secrets.token_urlsafe(32) |
|
|
session_data = { |
|
|
"username": username, |
|
|
"created_at": datetime.utcnow().isoformat(), |
|
|
"expires_at": (datetime.utcnow() + timedelta(seconds=SESSION_MAX_AGE)).isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
active_sessions[session_id] = session_data |
|
|
|
|
|
|
|
|
token = serializer.dumps(session_id) |
|
|
return token |
|
|
|
|
|
|
|
|
def verify_session(token: Optional[str]) -> Optional[dict]: |
|
|
"""Verify session token and return session data""" |
|
|
if not token: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
session_id = serializer.loads(token, max_age=SESSION_MAX_AGE) |
|
|
|
|
|
|
|
|
session_data = active_sessions.get(session_id) |
|
|
if not session_data: |
|
|
return None |
|
|
|
|
|
|
|
|
expires_at = datetime.fromisoformat(session_data["expires_at"]) |
|
|
if datetime.utcnow() > expires_at: |
|
|
|
|
|
active_sessions.pop(session_id, None) |
|
|
return None |
|
|
|
|
|
return session_data |
|
|
except (BadSignature, SignatureExpired): |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Session verification error: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def verify_credentials(username: str, password: str) -> bool: |
|
|
"""Verify username and password against environment variables""" |
|
|
expected_username = "volaris" |
|
|
expected_password = "volaris123" |
|
|
|
|
|
return username == expected_username and password == expected_password |
|
|
|
|
|
|
|
|
@router.post("/login", response_model=LoginResponse) |
|
|
async def login( |
|
|
response: Response, |
|
|
request: Request, |
|
|
username: str = Form(...), |
|
|
password: str = Form(...) |
|
|
): |
|
|
""" |
|
|
Login endpoint - validates credentials and creates session |
|
|
""" |
|
|
|
|
|
logger.info(f"Login attempt for username: {username}, Origin: {request.headers.get('origin')}") |
|
|
|
|
|
|
|
|
if not verify_credentials(username, password): |
|
|
logger.warning(f"Failed login attempt for username: {username}") |
|
|
raise HTTPException(status_code=401, detail="Invalid username or password") |
|
|
|
|
|
|
|
|
token = create_session(username) |
|
|
logger.info(f"Session created for user: {username}") |
|
|
|
|
|
|
|
|
|
|
|
is_https = request.url.scheme == "https" or request.headers.get("x-forwarded-proto") == "https" |
|
|
|
|
|
|
|
|
|
|
|
if is_https: |
|
|
samesite = "none" |
|
|
secure = True |
|
|
else: |
|
|
samesite = "lax" |
|
|
secure = False |
|
|
|
|
|
logger.info(f"Setting cookie with samesite={samesite}, secure={secure}, is_https={is_https}") |
|
|
|
|
|
response.set_cookie( |
|
|
key="session_token", |
|
|
value=token, |
|
|
httponly=True, |
|
|
max_age=SESSION_MAX_AGE, |
|
|
samesite=samesite, |
|
|
secure=secure, |
|
|
path="/" |
|
|
) |
|
|
|
|
|
logger.info(f"Successful login for user: {username}") |
|
|
|
|
|
return LoginResponse( |
|
|
success=True, |
|
|
message="Login successful" |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/logout") |
|
|
async def logout( |
|
|
response: Response, |
|
|
session_token: Optional[str] = Cookie(None) |
|
|
): |
|
|
""" |
|
|
Logout endpoint - invalidates session |
|
|
""" |
|
|
if session_token: |
|
|
try: |
|
|
session_id = serializer.loads(session_token, max_age=SESSION_MAX_AGE) |
|
|
active_sessions.pop(session_id, None) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
response.delete_cookie(key="session_token") |
|
|
|
|
|
return {"success": True, "message": "Logged out successfully"} |
|
|
|
|
|
|
|
|
@router.get("/verify") |
|
|
async def verify(session_token: Optional[str] = Cookie(None)): |
|
|
""" |
|
|
Verify if current session is valid |
|
|
""" |
|
|
session_data = verify_session(session_token) |
|
|
|
|
|
if not session_data: |
|
|
raise HTTPException(status_code=401, detail="Not authenticated") |
|
|
|
|
|
return { |
|
|
"authenticated": True, |
|
|
"username": session_data.get("username") |
|
|
} |
|
|
|
|
|
|
|
|
@router.get("/status") |
|
|
async def status(request: Request, session_token: Optional[str] = Cookie(None)): |
|
|
""" |
|
|
Check authentication status without raising exception |
|
|
""" |
|
|
logger.info(f"Status check - Cookie present: {session_token is not None}, Origin: {request.headers.get('origin')}") |
|
|
session_data = verify_session(session_token) |
|
|
|
|
|
if session_data: |
|
|
logger.info(f"Status check - Authenticated as: {session_data.get('username')}") |
|
|
else: |
|
|
logger.info("Status check - Not authenticated") |
|
|
|
|
|
return { |
|
|
"authenticated": session_data is not None, |
|
|
"username": session_data.get("username") if session_data else None |
|
|
} |
|
|
|