File size: 4,702 Bytes
91d209c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e0b1d0
91d209c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Authentication endpoints for user login and access control
"""
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional
import os
import hashlib
from jose import JWTError, jwt

router = APIRouter()
security = HTTPBearer()

# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "Adgenesis")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24 * 7  # 7 days

# User credentials (in production, use a database)
# Format: username -> hashed_password
# You can set these via environment variables or use a simple hash
ALLOWED_USERS = {}

def load_allowed_users():
    """Load allowed users from environment variables"""
    users_str = os.getenv("ALLOWED_USERS", "")
    if not users_str:
        # Default user for development (username: admin, password: admin)
        # In production, always set ALLOWED_USERS env var
        print("⚠️  Using default credentials (admin/admin). Set ALLOWED_USERS env var for production.")
        return {"admin": hash_password("admin")}
    
    users = {}
    for user_entry in users_str.split(","):
        if ":" in user_entry:
            username, password = user_entry.split(":", 1)
            users[username.strip()] = hash_password(password.strip())
    print(f"✅ Loaded {len(users)} user(s) from ALLOWED_USERS")
    return users

def hash_password(password: str) -> str:
    """Hash password using SHA256 (simple, for basic auth)"""
    return hashlib.sha256(password.encode()).hexdigest()

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify password against hash"""
    return hash_password(plain_password) == hashed_password

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """Create JWT access token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Verify JWT token"""
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid authentication credentials",
                headers={"WWW-Authenticate": "Bearer"},
            )
        return username
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

# Load users on module import
ALLOWED_USERS = load_allowed_users()

# Request/Response Models
class LoginRequest(BaseModel):
    username: str
    password: str

class LoginResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    username: str

class VerifyResponse(BaseModel):
    authenticated: bool
    username: Optional[str] = None

@router.post("/auth/login", response_model=LoginResponse)
async def login(request: LoginRequest):
    """Login endpoint - returns JWT token"""
    username = request.username
    password = request.password
    
    # Check if user exists
    if username not in ALLOWED_USERS:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password"
        )
    
    # Verify password
    if not verify_password(password, ALLOWED_USERS[username]):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password"
        )
    
    # Create access token
    access_token = create_access_token(data={"sub": username})
    
    return LoginResponse(
        access_token=access_token,
        token_type="bearer",
        username=username
    )

@router.get("/auth/verify", response_model=VerifyResponse)
async def verify_token_endpoint(username: str = Depends(verify_token)):
    """Verify if token is valid"""
    return VerifyResponse(authenticated=True, username=username)

@router.get("/auth/me", response_model=VerifyResponse)
async def get_current_user(username: str = Depends(verify_token)):
    """Get current authenticated user"""
    return VerifyResponse(authenticated=True, username=username)