Spaces:
Running
Running
File size: 5,976 Bytes
ddc9c77 ef242c8 ddc9c77 ef242c8 ddc9c77 d4d55d1 ddc9c77 ef242c8 ddc9c77 0176a31 ddc9c77 0176a31 ddc9c77 0176a31 ef242c8 0176a31 ef242c8 ddc9c77 ef242c8 0176a31 ddc9c77 0176a31 ddc9c77 0176a31 ddc9c77 0176a31 ddc9c77 ef242c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
"""
Authentication router for simple login system
"""
import os
import secrets
from datetime import datetime, timedelta
from typing import Dict, Optional
from fastapi import APIRouter, HTTPException, Response, Cookie, Form, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
import logging
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# Session management
SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", secrets.token_hex(32))
SESSION_MAX_AGE = 86400 # 24 hours in seconds
serializer = URLSafeTimedSerializer(SESSION_SECRET_KEY)
# In-memory session store (for simple use case)
# For production, consider using Redis or database
active_sessions: Dict[str, dict] = {}
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
success: bool
message: str
def create_session(username: str) -> str:
"""Create a new session token"""
session_id = secrets.token_urlsafe(32)
session_data = {
"username": username,
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(seconds=SESSION_MAX_AGE)).isoformat()
}
# Store session
active_sessions[session_id] = session_data
# Create signed token
token = serializer.dumps(session_id)
return token
def verify_session(token: Optional[str]) -> Optional[dict]:
"""Verify session token and return session data"""
if not token:
return None
try:
# Verify signature and age
session_id = serializer.loads(token, max_age=SESSION_MAX_AGE)
# Check if session exists
session_data = active_sessions.get(session_id)
if not session_data:
return None
# Check expiration
expires_at = datetime.fromisoformat(session_data["expires_at"])
if datetime.utcnow() > expires_at:
# Clean up expired session
active_sessions.pop(session_id, None)
return None
return session_data
except (BadSignature, SignatureExpired):
return None
except Exception as e:
logger.error(f"Session verification error: {e}")
return None
def verify_credentials(username: str, password: str) -> bool:
"""Verify username and password against environment variables"""
expected_username = "volaris"
expected_password = "volaris123"
return username == expected_username and password == expected_password
@router.post("/login", response_model=LoginResponse)
async def login(
response: Response,
request: Request,
username: str = Form(...),
password: str = Form(...)
):
"""
Login endpoint - validates credentials and creates session
"""
# Log login attempt
logger.info(f"Login attempt for username: {username}, Origin: {request.headers.get('origin')}")
# Verify credentials
if not verify_credentials(username, password):
logger.warning(f"Failed login attempt for username: {username}")
raise HTTPException(status_code=401, detail="Invalid username or password")
# Create session
token = create_session(username)
logger.info(f"Session created for user: {username}")
# Set secure cookie
# Detect if we're running on HTTPS (Hugging Face Spaces use HTTPS)
is_https = request.url.scheme == "https" or request.headers.get("x-forwarded-proto") == "https"
# For HTTPS (production/HF Spaces), use SameSite=None with Secure=True for cross-origin
# For HTTP (local dev), use SameSite=Lax with Secure=False
if is_https:
samesite = "none"
secure = True
else:
samesite = "lax"
secure = False
logger.info(f"Setting cookie with samesite={samesite}, secure={secure}, is_https={is_https}")
response.set_cookie(
key="session_token",
value=token,
httponly=True,
max_age=SESSION_MAX_AGE,
samesite=samesite,
secure=secure,
path="/"
)
logger.info(f"Successful login for user: {username}")
return LoginResponse(
success=True,
message="Login successful"
)
@router.post("/logout")
async def logout(
response: Response,
session_token: Optional[str] = Cookie(None)
):
"""
Logout endpoint - invalidates session
"""
if session_token:
try:
session_id = serializer.loads(session_token, max_age=SESSION_MAX_AGE)
active_sessions.pop(session_id, None)
except Exception:
pass
# Clear cookie
response.delete_cookie(key="session_token")
return {"success": True, "message": "Logged out successfully"}
@router.get("/verify")
async def verify(session_token: Optional[str] = Cookie(None)):
"""
Verify if current session is valid
"""
session_data = verify_session(session_token)
if not session_data:
raise HTTPException(status_code=401, detail="Not authenticated")
return {
"authenticated": True,
"username": session_data.get("username")
}
@router.get("/status")
async def status(request: Request, session_token: Optional[str] = Cookie(None)):
"""
Check authentication status without raising exception
"""
logger.info(f"Status check - Cookie present: {session_token is not None}, Origin: {request.headers.get('origin')}")
session_data = verify_session(session_token)
if session_data:
logger.info(f"Status check - Authenticated as: {session_data.get('username')}")
else:
logger.info("Status check - Not authenticated")
return {
"authenticated": session_data is not None,
"username": session_data.get("username") if session_data else None
}
|