moazx's picture
Merge branch 'main' of https://huggingface.co/spaces/moazx/API
2c7b0ce
raw
history blame
5.11 kB
"""
Authentication router for simple login system
"""
import os
import secrets
from datetime import datetime, timedelta
from typing import Dict, Optional
from fastapi import APIRouter, HTTPException, Response, Cookie, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# Session management
SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", secrets.token_hex(32))
SESSION_MAX_AGE = 86400 # 24 hours in seconds
serializer = URLSafeTimedSerializer(SESSION_SECRET_KEY)
# In-memory session store (for simple use case)
# For production, consider using Redis or database
active_sessions: Dict[str, dict] = {}
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
success: bool
message: str
def create_session(username: str) -> str:
"""Create a new session token"""
session_id = secrets.token_urlsafe(32)
session_data = {
"username": username,
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(seconds=SESSION_MAX_AGE)).isoformat()
}
# Store session
active_sessions[session_id] = session_data
# Create signed token
token = serializer.dumps(session_id)
return token
def verify_session(token: Optional[str]) -> Optional[dict]:
"""Verify session token and return session data"""
if not token:
return None
try:
# Verify signature and age
session_id = serializer.loads(token, max_age=SESSION_MAX_AGE)
# Check if session exists
session_data = active_sessions.get(session_id)
if not session_data:
return None
# Check expiration
expires_at = datetime.fromisoformat(session_data["expires_at"])
if datetime.utcnow() > expires_at:
# Clean up expired session
active_sessions.pop(session_id, None)
return None
return session_data
except (BadSignature, SignatureExpired):
return None
except Exception as e:
logger.error(f"Session verification error: {e}")
return None
def verify_credentials(username: str, password: str) -> bool:
"""Verify username and password against environment variables"""
expected_username = "volaris"
expected_password = "volaris123"
return username == expected_username and password == expected_password
@router.post("/login", response_model=LoginResponse)
async def login(
response: Response,
username: str = Form(...),
password: str = Form(...)
):
"""
Login endpoint - validates credentials and creates session
"""
# Verify credentials
if not verify_credentials(username, password):
logger.warning(f"Failed login attempt for username: {username}")
raise HTTPException(status_code=401, detail="Invalid username or password")
# Create session
token = create_session(username)
# Set secure cookie
# In development (HTTP), use lax samesite and secure=False
# In production (HTTPS), use none samesite and secure=True
is_production = os.getenv("ENVIRONMENT", "development") == "production"
response.set_cookie(
key="session_token",
value=token,
httponly=True,
max_age=SESSION_MAX_AGE,
samesite="none" if is_production else "lax",
secure=is_production # Only secure in production with HTTPS
)
logger.info(f"Successful login for user: {username}")
return LoginResponse(
success=True,
message="Login successful"
)
@router.post("/logout")
async def logout(
response: Response,
session_token: Optional[str] = Cookie(None)
):
"""
Logout endpoint - invalidates session
"""
if session_token:
try:
session_id = serializer.loads(session_token, max_age=SESSION_MAX_AGE)
active_sessions.pop(session_id, None)
except Exception:
pass
# Clear cookie
response.delete_cookie(key="session_token")
return {"success": True, "message": "Logged out successfully"}
@router.get("/verify")
async def verify(session_token: Optional[str] = Cookie(None)):
"""
Verify if current session is valid
"""
session_data = verify_session(session_token)
if not session_data:
raise HTTPException(status_code=401, detail="Not authenticated")
return {
"authenticated": True,
"username": session_data.get("username")
}
@router.get("/status")
async def status(session_token: Optional[str] = Cookie(None)):
"""
Check authentication status without raising exception
"""
session_data = verify_session(session_token)
return {
"authenticated": session_data is not None,
"username": session_data.get("username") if session_data else None
}