File size: 3,128 Bytes
47258ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
管理后台认证中间件
"""
from fastapi import Request, HTTPException, status
from fastapi.responses import RedirectResponse
from typing import Optional
import hashlib
import secrets
from datetime import datetime, timedelta

from app.core.config import settings

# 简单的内存 Session 存储(生产环境建议使用 Redis)
_sessions = {}

# Session 有效期(小时)
SESSION_EXPIRE_HOURS = 24


def generate_session_token() -> str:
    """生成随机 session token"""
    return secrets.token_urlsafe(32)


def create_session(password: str) -> Optional[str]:
    """
    创建 session

    Args:
        password: 用户输入的密码

    Returns:
        session_token 或 None(密码错误)
    """
    # 验证密码
    if password != settings.ADMIN_PASSWORD:
        return None

    # 生成 session token
    session_token = generate_session_token()

    # 存储 session(包含过期时间)
    _sessions[session_token] = {
        "created_at": datetime.now(),
        "expires_at": datetime.now() + timedelta(hours=SESSION_EXPIRE_HOURS),
        "authenticated": True
    }

    return session_token


def verify_session(session_token: Optional[str]) -> bool:
    """
    验证 session 是否有效

    Args:
        session_token: Session token

    Returns:
        是否已认证
    """
    if not session_token:
        return False

    session = _sessions.get(session_token)
    if not session:
        return False

    # 检查是否过期
    if datetime.now() > session["expires_at"]:
        # 删除过期 session
        del _sessions[session_token]
        return False

    return session.get("authenticated", False)


def delete_session(session_token: Optional[str]):
    """删除 session(登出)"""
    if session_token and session_token in _sessions:
        del _sessions[session_token]


def get_session_token_from_request(request: Request) -> Optional[str]:
    """从请求中获取 session token"""
    return request.cookies.get("admin_session")


async def require_auth(request: Request):
    """
    认证依赖项:要求用户已登录

    在路由中使用:
    @router.get("/admin", dependencies=[Depends(require_auth)])
    """
    session_token = get_session_token_from_request(request)

    if not verify_session(session_token):
        # 未认证,重定向到登录页
        raise HTTPException(
            status_code=status.HTTP_303_SEE_OTHER,
            detail="未登录",
            headers={"Location": "/admin/login"}
        )


def get_authenticated_user(request: Request) -> bool:
    """
    获取当前认证状态(用于模板)

    Returns:
        是否已认证
    """
    session_token = get_session_token_from_request(request)
    return verify_session(session_token)


def cleanup_expired_sessions():
    """清理过期的 session(定时任务调用)"""
    now = datetime.now()
    expired_tokens = [
        token for token, session in _sessions.items()
        if now > session["expires_at"]
    ]

    for token in expired_tokens:
        del _sessions[token]

    return len(expired_tokens)