File size: 5,976 Bytes
ddc9c77
 
 
 
 
 
 
ef242c8
ddc9c77
 
 
 
ef242c8
ddc9c77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d4d55d1
 
ddc9c77
 
 
 
 
 
 
ef242c8
ddc9c77
 
 
 
 
 
0176a31
 
 
ddc9c77
 
 
 
 
 
 
0176a31
ddc9c77
 
0176a31
 
 
 
 
 
 
 
 
 
 
ef242c8
0176a31
ef242c8
ddc9c77
 
 
 
 
ef242c8
0176a31
 
ddc9c77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0176a31
ddc9c77
 
 
0176a31
ddc9c77
 
0176a31
 
 
 
 
ddc9c77
 
 
ef242c8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""
Authentication router for simple login system
"""
import os
import secrets
from datetime import datetime, timedelta
from typing import Dict, Optional
from fastapi import APIRouter, HTTPException, Response, Cookie, Form, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
import logging
from urllib.parse import urlparse

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/auth", tags=["Authentication"])

# Session management
SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", secrets.token_hex(32))
SESSION_MAX_AGE = 86400  # 24 hours in seconds
serializer = URLSafeTimedSerializer(SESSION_SECRET_KEY)

# In-memory session store (for simple use case)
# For production, consider using Redis or database
active_sessions: Dict[str, dict] = {}


class LoginRequest(BaseModel):
    username: str
    password: str


class LoginResponse(BaseModel):
    success: bool
    message: str


def create_session(username: str) -> str:
    """Create a new session token"""
    session_id = secrets.token_urlsafe(32)
    session_data = {
        "username": username,
        "created_at": datetime.utcnow().isoformat(),
        "expires_at": (datetime.utcnow() + timedelta(seconds=SESSION_MAX_AGE)).isoformat()
    }
    
    # Store session
    active_sessions[session_id] = session_data
    
    # Create signed token
    token = serializer.dumps(session_id)
    return token


def verify_session(token: Optional[str]) -> Optional[dict]:
    """Verify session token and return session data"""
    if not token:
        return None
    
    try:
        # Verify signature and age
        session_id = serializer.loads(token, max_age=SESSION_MAX_AGE)
        
        # Check if session exists
        session_data = active_sessions.get(session_id)
        if not session_data:
            return None
        
        # Check expiration
        expires_at = datetime.fromisoformat(session_data["expires_at"])
        if datetime.utcnow() > expires_at:
            # Clean up expired session
            active_sessions.pop(session_id, None)
            return None
        
        return session_data
    except (BadSignature, SignatureExpired):
        return None
    except Exception as e:
        logger.error(f"Session verification error: {e}")
        return None


def verify_credentials(username: str, password: str) -> bool:
    """Verify username and password against environment variables"""
    expected_username = "volaris"
    expected_password = "volaris123"
    
    return username == expected_username and password == expected_password


@router.post("/login", response_model=LoginResponse)
async def login(
    response: Response,
    request: Request,
    username: str = Form(...),
    password: str = Form(...)
):
    """
    Login endpoint - validates credentials and creates session
    """
    # Log login attempt
    logger.info(f"Login attempt for username: {username}, Origin: {request.headers.get('origin')}")
    
    # Verify credentials
    if not verify_credentials(username, password):
        logger.warning(f"Failed login attempt for username: {username}")
        raise HTTPException(status_code=401, detail="Invalid username or password")
    
    # Create session
    token = create_session(username)
    logger.info(f"Session created for user: {username}")
    
    # Set secure cookie
    # Detect if we're running on HTTPS (Hugging Face Spaces use HTTPS)
    is_https = request.url.scheme == "https" or request.headers.get("x-forwarded-proto") == "https"
    
    # For HTTPS (production/HF Spaces), use SameSite=None with Secure=True for cross-origin
    # For HTTP (local dev), use SameSite=Lax with Secure=False
    if is_https:
        samesite = "none"
        secure = True
    else:
        samesite = "lax"
        secure = False
    
    logger.info(f"Setting cookie with samesite={samesite}, secure={secure}, is_https={is_https}")

    response.set_cookie(
        key="session_token",
        value=token,
        httponly=True,
        max_age=SESSION_MAX_AGE,
        samesite=samesite,
        secure=secure,
        path="/"
    )
    
    logger.info(f"Successful login for user: {username}")
    
    return LoginResponse(
        success=True,
        message="Login successful"
    )


@router.post("/logout")
async def logout(
    response: Response,
    session_token: Optional[str] = Cookie(None)
):
    """
    Logout endpoint - invalidates session
    """
    if session_token:
        try:
            session_id = serializer.loads(session_token, max_age=SESSION_MAX_AGE)
            active_sessions.pop(session_id, None)
        except Exception:
            pass
    
    # Clear cookie
    response.delete_cookie(key="session_token")
    
    return {"success": True, "message": "Logged out successfully"}


@router.get("/verify")
async def verify(session_token: Optional[str] = Cookie(None)):
    """
    Verify if current session is valid
    """
    session_data = verify_session(session_token)
    
    if not session_data:
        raise HTTPException(status_code=401, detail="Not authenticated")
    
    return {
        "authenticated": True,
        "username": session_data.get("username")
    }


@router.get("/status")
async def status(request: Request, session_token: Optional[str] = Cookie(None)):
    """
    Check authentication status without raising exception
    """
    logger.info(f"Status check - Cookie present: {session_token is not None}, Origin: {request.headers.get('origin')}")
    session_data = verify_session(session_token)
    
    if session_data:
        logger.info(f"Status check - Authenticated as: {session_data.get('username')}")
    else:
        logger.info("Status check - Not authenticated")
    
    return {
        "authenticated": session_data is not None,
        "username": session_data.get("username") if session_data else None
    }