File size: 3,779 Bytes
c481f8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import json
import os

from dotenv import load_dotenv
from loguru import logger

from xhs_utils.cookie_util import trans_cookies


def _mask(s: str) -> str:
    if not s:
        return ""
    if len(s) <= 8:
        return "*" * len(s)
    return s[:4] + "*" * (len(s) - 8) + s[-4:]


class SessionManager:
    def __init__(self, cookies_file: str | None = None):
        base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
        self.base_dir = base_dir
        self.cookies_file = cookies_file or os.path.join(base_dir, "datas", "cookies.json")
        self.env_file = os.path.join(base_dir, ".env")

    def validate_cookie(self, cookies_str: str | None):
        if not cookies_str or not cookies_str.strip():
            return False, "cookies_empty"
        try:
            cookies = trans_cookies(cookies_str)
        except Exception:
            return False, "cookies_parse_failed"
        if "a1" not in cookies or not cookies["a1"]:
            return False, "missing_a1"
        return True, "ok"

    def load_from_env(self) -> str | None:
        load_dotenv()
        cookies_str = os.getenv("COOKIES")
        if cookies_str is None:
            return None
        return cookies_str.strip()

    def load_from_file(self) -> str | None:
        if not os.path.exists(self.cookies_file):
            return None
        with open(self.cookies_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        if not isinstance(data, dict):
            return None
        val = data.get("COOKIES")
        if not val:
            return None
        return str(val).strip()

    def save_to_file(self, cookies_str: str):
        os.makedirs(os.path.dirname(self.cookies_file), exist_ok=True)
        with open(self.cookies_file, "w", encoding="utf-8") as f:
            json.dump({"COOKIES": cookies_str}, f, ensure_ascii=False, separators=(",", ":"))

    def save_to_env_file(self, cookies_str: str, env_file: str | None = None):
        env_file = os.path.abspath(env_file or self.env_file)
        os.makedirs(os.path.dirname(env_file), exist_ok=True)
        if os.path.exists(env_file):
            with open(env_file, "r", encoding="utf-8") as f:
                lines = f.read().splitlines()
        else:
            lines = []

        safe = cookies_str.replace("\\", "\\\\").replace("\"", "\\\"")
        new_line = f"COOKIES=\"{safe}\""

        replaced = False
        out = []
        for line in lines:
            if line.strip().startswith("COOKIES="):
                out.append(new_line)
                replaced = True
            else:
                out.append(line)
        if not replaced:
            out.append(new_line)
        with open(env_file, "w", encoding="utf-8") as f:
            f.write("\n".join(out).rstrip("\n") + "\n")

    def load_cookies(self, mode: str = "both") -> str | None:
        candidates = []
        if mode in ("manual", "both", "auto"):
            candidates.append(("env", self.load_from_env()))
            candidates.append(("file", self.load_from_file()))
        for source, cookies_str in candidates:
            ok, reason = self.validate_cookie(cookies_str)
            if ok:
                logger.info(f"cookies_loaded source={source} a1={_mask(trans_cookies(cookies_str).get('a1', ''))}")
                return cookies_str
            if cookies_str:
                logger.warning(f"cookies_invalid source={source} reason={reason}")
        return None

    def refresh_cookies_if_needed(self, cookies_str: str | None, mode: str = "both"):
        ok, _ = self.validate_cookie(cookies_str)
        if ok:
            return cookies_str
        if mode in ("auto", "both"):
            raise ValueError("cookies_invalid_need_login")
        return None