File size: 26,403 Bytes
8d5ac47
 
c5929bc
 
 
 
 
 
 
8d5ac47
8ab1bb1
 
 
 
5e4a3cc
8d5ac47
 
 
 
 
c5929bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3eacee3
 
 
 
8d5ac47
c5929bc
 
 
 
 
 
8d5ac47
 
 
 
 
 
 
 
 
 
 
bc9f049
 
 
 
 
8ab1bb1
 
 
bc9f049
 
8d5ac47
 
 
c5929bc
 
 
 
 
 
 
 
 
8d5ac47
8ab1bb1
 
 
 
 
 
 
abd380b
8ab1bb1
3eacee3
8ab1bb1
3eacee3
 
 
8ab1bb1
 
 
 
 
 
abd380b
8ab1bb1
3eacee3
 
 
5e4a3cc
 
 
8d5ac47
5e4a3cc
 
 
 
 
 
 
 
 
 
 
 
 
 
abd380b
5e4a3cc
 
 
 
 
8ab1bb1
abd380b
5e4a3cc
 
 
 
 
 
abd380b
5e4a3cc
 
8d5ac47
 
 
 
 
 
 
 
c5929bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e4a3cc
 
 
 
 
 
 
c5929bc
5e4a3cc
c5929bc
 
 
5e4a3cc
c5929bc
 
 
 
 
5e4a3cc
c5929bc
 
 
 
5e4a3cc
c5929bc
5e4a3cc
 
 
 
 
 
 
 
 
 
 
c5929bc
 
5e4a3cc
c5929bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8d5ac47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ab1bb1
 
 
 
 
 
abd380b
8ab1bb1
8d5ac47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e4a3cc
 
 
 
 
 
8d5ac47
 
 
 
 
 
5e4a3cc
 
 
 
 
 
 
 
 
 
 
8d5ac47
 
 
 
 
 
 
 
5e4a3cc
 
 
 
 
 
 
 
 
 
 
8d5ac47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e4a3cc
3eacee3
 
 
 
 
 
 
 
 
 
5e4a3cc
 
 
8ab1bb1
abd380b
5e4a3cc
 
 
 
 
 
 
 
 
 
8ab1bb1
abd380b
5e4a3cc
 
8d5ac47
 
 
c5929bc
 
 
 
 
 
 
 
 
 
 
 
 
 
8d5ac47
 
 
 
 
 
 
 
 
 
 
 
 
5e4a3cc
 
 
 
 
 
 
 
 
8d5ac47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e4a3cc
 
 
 
 
 
 
8ab1bb1
 
 
 
8d5ac47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
from __future__ import annotations

import logging
import re
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from zoneinfo import ZoneInfo

from core.course_bot import CourseBot, TaskResult
from core.db import Database
from core.login_modes import LOGIN_MODE_LABELS, LOGIN_MODE_UNIFIED, normalize_login_mode
from core.security import SecretBox
from core.task_modes import TASK_RUN_MODE_LABELS, TASK_RUN_MODE_STABLE, normalize_task_run_mode


LOGGER = logging.getLogger("sacc.task_manager")


@dataclass(slots=True)
class RunningTask:
    task_id: int
    thread: threading.Thread
    stop_event: threading.Event


@dataclass(slots=True)
class LoginTwoFactorChallenge:
    task_id: int
    user_id: int
    student_id: str
    display_name: str
    phone_mask: str
    created_at: float
    expires_at: float
    event: threading.Event = field(default_factory=threading.Event)
    submitted_code: str = ""
    submitted_by: str = ""


class TaskManager:
    login_2fa_timeout_seconds = 300

    def __init__(self, *, config, store: Database, secret_box: SecretBox) -> None:
        self.config = config
        self.store = store
        self.secret_box = secret_box
        self._started = False
        self._startup_lock = threading.Lock()
        self._queue_lock = threading.Lock()
        self._running_lock = threading.Lock()
        self._shutdown_event = threading.Event()
        self._dispatcher_thread: threading.Thread | None = None
        self._running: dict[int, RunningTask] = {}
        self._last_dispatch_snapshot: tuple[int, int, int] | None = None
        self._schedule_timezone = ZoneInfo(getattr(self.config, "schedule_timezone", "Asia/Shanghai"))
        self._login_2fa_lock = threading.Lock()
        self._login_2fa_challenges: dict[int, LoginTwoFactorChallenge] = {}

    def start(self) -> None:
        with self._startup_lock:
            if self._started:
                LOGGER.info("Task manager start skipped because it is already running")
                return
            LOGGER.info(
                "Task manager starting | db_path=%s default_parallel_limit=%s schedule_timezone=%s",
                self.store.path,
                getattr(self.config, "default_parallel_limit", "-"),
                getattr(self.config, "schedule_timezone", "Asia/Shanghai"),
            )
            self.store.reset_inflight_tasks()
            normalized_modes = self.store.normalize_legacy_task_modes()
            if normalized_modes:
                LOGGER.info("Normalized %s legacy task mode record(s) to stable", normalized_modes)
            normalized_login_modes = self.store.normalize_legacy_login_modes()
            if normalized_login_modes:
                LOGGER.info("Normalized %s legacy login mode record(s) to unified", normalized_login_modes)
            self._dispatcher_thread = threading.Thread(target=self._dispatch_loop, name="task-dispatcher", daemon=True)
            self._dispatcher_thread.start()
            self._started = True
            LOGGER.info("Task manager started")

    def shutdown(self) -> None:
        LOGGER.info("Task manager shutdown requested")
        self._shutdown_event.set()
        with self._running_lock:
            for running_task in self._running.values():
                running_task.stop_event.set()
        with self._login_2fa_lock:
            self._login_2fa_challenges.clear()
        LOGGER.info("Task manager stop signal broadcast to %s running task(s)", self._running_count())

    def queue_task(
        self,
        user_id: int,
        requested_by: str,
        requested_by_role: str,
        requested_mode: str = TASK_RUN_MODE_STABLE,
        login_mode: str = LOGIN_MODE_UNIFIED,
        use_proxy: bool = False,
    ) -> tuple[dict, bool]:
        normalized_mode = normalize_task_run_mode(requested_mode)
        normalized_login_mode = normalize_login_mode(login_mode)
        with self._queue_lock:
            active_task = self.store.find_active_task_for_user(user_id)
            if active_task is None:
                task_id = self.store.create_task(
                    user_id,
                    requested_by,
                    requested_by_role,
                    requested_mode=normalized_mode,
                    login_mode=normalized_login_mode,
                    use_proxy=use_proxy,
                )
            else:
                task_id = int(active_task["id"])

        if active_task is not None:
            LOGGER.info(
                "Task queue skipped because an active task already exists | task_id=%s user_id=%s status=%s",
                active_task["id"],
                user_id,
                active_task["status"],
            )
            self._log(
                int(active_task["id"]),
                user_id,
                "SYSTEM",
                "INFO",
                f"收到新的启动请求,但已有活动任务处于 {active_task['status']} 状态,本次不重复创建。触发者: {requested_by_role}:{requested_by}。",
            )
            return self.store.get_task(active_task["id"]) or active_task, False

        course_count = len(self.store.list_courses_for_user(user_id))
        LOGGER.info(
            "Task queued | task_id=%s user_id=%s requested_by_role=%s requested_by=%s requested_mode=%s login_mode=%s use_proxy=%s",
            task_id,
            user_id,
            requested_by_role,
            requested_by,
            normalized_mode,
            normalized_login_mode,
            bool(use_proxy),
        )
        self._log(
            task_id,
            user_id,
            "SYSTEM",
            "INFO",
            f"任务已进入队列,触发者: {requested_by_role}:{requested_by},模式: {self._mode_label(normalized_mode)},登录方式: {self._login_mode_label(normalized_login_mode)},代理: {'启用' if use_proxy else '关闭'},待选课程 {course_count} 门。",
        )
        return self.store.get_task(task_id), True

    def stop_task(self, task_id: int) -> bool:
        requested = self.store.request_task_stop(task_id)
        if not requested:
            LOGGER.info("Task stop request ignored because task is not active | task_id=%s", task_id)
            return False
        LOGGER.info("Task stop requested | task_id=%s", task_id)
        self._log(task_id, None, "SYSTEM", "INFO", "收到停止请求,任务会在安全节点退出。")
        with self._running_lock:
            running_task = self._running.get(task_id)
            if running_task is not None:
                running_task.stop_event.set()
        self._remove_login_2fa_challenge(task_id)
        return True

    def request_login_2fa_code(self, task_id: int, user: dict, phone_mask: str, stop_event: threading.Event) -> str:
        now = time.time()
        challenge = LoginTwoFactorChallenge(
            task_id=task_id,
            user_id=int(user["id"]),
            student_id=str(user.get("student_id") or ""),
            display_name=str(user.get("display_name") or ""),
            phone_mask=str(phone_mask or "绑定手机"),
            created_at=now,
            expires_at=now + self.login_2fa_timeout_seconds,
        )
        with self._login_2fa_lock:
            self._prune_login_2fa_challenges_locked(now=now)
            self._login_2fa_challenges[task_id] = challenge

        self.store.update_task_status(task_id, "waiting_2fa", "等待短信验证码。")
        self._log(
            task_id,
            int(user["id"]),
            "SYSTEM",
            "INFO",
            f"已发送短信验证码,请在 {self.login_2fa_timeout_seconds // 60} 分钟内到面板输入 6 位验证码。手机号={challenge.phone_mask}。",
        )

        next_wait_log_at = now + 60
        while time.time() < challenge.expires_at:
            if stop_event.is_set() or self._shutdown_event.is_set():
                self._remove_login_2fa_challenge(task_id)
                self._log(task_id, int(user["id"]), "SYSTEM", "WARNING", "短信验证码等待被停止,已清理本次验证码挑战。")
                raise RuntimeError("短信验证码等待已停止。")
            if challenge.event.wait(timeout=0.2):
                code = challenge.submitted_code
                self._remove_login_2fa_challenge(task_id)
                if not code:
                    self._log(task_id, int(user["id"]), "SYSTEM", "WARNING", "短信验证码等待被停止,未收到有效提交。")
                    raise RuntimeError("短信验证码等待已停止。")
                current_task = self.store.get_task(task_id)
                if current_task and current_task.get("status") == "waiting_2fa":
                    self.store.update_task_status(task_id, "running", "")
                self._log(task_id, int(user["id"]), "SYSTEM", "INFO", "短信验证码已转交后台 Selenium,会继续完成统一认证。")
                return code
            current_time = time.time()
            if current_time >= next_wait_log_at:
                remaining_seconds = max(0, int(challenge.expires_at - current_time))
                self._log(
                    task_id,
                    int(user["id"]),
                    "SYSTEM",
                    "INFO",
                    f"仍在等待短信验证码提交,剩余约 {remaining_seconds} 秒。",
                )
                next_wait_log_at = current_time + 60

        self._remove_login_2fa_challenge(task_id)
        self._log(task_id, int(user["id"]), "SYSTEM", "ERROR", "等待短信验证码超时,已清理本次验证码挑战。")
        raise TimeoutError("等待短信验证码超时,请重新启动任务。")

    def submit_login_2fa_code(self, task_id: int, code: str, *, submitted_by: str) -> tuple[bool, str]:
        normalized_code = str(code or "").strip()
        if not re.fullmatch(r"\d{6}", normalized_code):
            return False, "短信验证码必须是 6 位数字。"
        now = time.time()
        with self._login_2fa_lock:
            self._prune_login_2fa_challenges_locked(now=now)
            challenge = self._login_2fa_challenges.get(task_id)
            if challenge is None:
                return False, "当前没有等待验证码的任务,或验证码已过期。"
            if challenge.event.is_set():
                return False, "验证码已经提交,请等待后台继续登录。"
            challenge.submitted_code = normalized_code
            challenge.submitted_by = submitted_by
            challenge.event.set()

        self._log(task_id, challenge.user_id, "SYSTEM", "INFO", f"已收到短信验证码,后台继续登录。提交者: {submitted_by}")
        return True, "验证码已提交,后台会继续完成登录。"

    def get_login_2fa_challenge(self, task_id: int) -> dict | None:
        with self._login_2fa_lock:
            self._prune_login_2fa_challenges_locked()
            challenge = self._login_2fa_challenges.get(task_id)
            return None if challenge is None else self._serialize_login_2fa_challenge(challenge)

    def get_login_2fa_challenge_for_user(self, user_id: int) -> dict | None:
        with self._login_2fa_lock:
            self._prune_login_2fa_challenges_locked()
            for challenge in self._login_2fa_challenges.values():
                if challenge.user_id == int(user_id):
                    return self._serialize_login_2fa_challenge(challenge)
        return None

    def list_login_2fa_challenges(self) -> list[dict]:
        with self._login_2fa_lock:
            self._prune_login_2fa_challenges_locked()
            challenges = [self._serialize_login_2fa_challenge(challenge) for challenge in self._login_2fa_challenges.values()]
        return sorted(challenges, key=lambda item: int(item["remaining_seconds"]))

    def _remove_login_2fa_challenge(self, task_id: int) -> None:
        with self._login_2fa_lock:
            self._login_2fa_challenges.pop(task_id, None)

    def _prune_login_2fa_challenges_locked(self, *, now: float | None = None) -> None:
        current_time = time.time() if now is None else now
        expired_task_ids = [
            task_id
            for task_id, challenge in self._login_2fa_challenges.items()
            if current_time >= challenge.expires_at
        ]
        for task_id in expired_task_ids:
            self._login_2fa_challenges.pop(task_id, None)

    @staticmethod
    def _serialize_login_2fa_challenge(challenge: LoginTwoFactorChallenge) -> dict:
        remaining_seconds = max(0, int(challenge.expires_at - time.time() + 0.999))
        return {
            "task_id": challenge.task_id,
            "user_id": challenge.user_id,
            "student_id": challenge.student_id,
            "display_name": challenge.display_name,
            "phone_mask": challenge.phone_mask,
            "remaining_seconds": remaining_seconds,
        }

    def _current_schedule_now(self) -> datetime:
        return datetime.now(self._schedule_timezone)

    def _within_schedule_date_window(self, schedule: dict, today_text: str) -> bool:
        start_date = str(schedule.get("start_date") or "").strip()
        end_date = str(schedule.get("end_date") or "").strip()
        if start_date and today_text < start_date:
            return False
        if end_date and today_text > end_date:
            return False
        return True

    def _apply_user_schedules(self) -> None:
        now = self._current_schedule_now()
        today_text = now.date().isoformat()
        current_time_text = now.strftime("%H:%M")

        for schedule in self.store.list_enabled_user_schedules():
            if not bool(schedule.get("user_is_active", 0)):
                continue
            if not self._within_schedule_date_window(schedule, today_text):
                continue

            start_time = str(schedule.get("daily_start_time") or "").strip()
            stop_time = str(schedule.get("daily_stop_time") or "").strip()
            if not start_time or not stop_time:
                continue

            user_id = int(schedule["user_id"])
            active_task = self.store.find_active_task_for_user(user_id)

            if current_time_text >= stop_time and str(schedule.get("last_auto_stop_on") or "") != today_text:
                if active_task and self.stop_task(int(active_task["id"])):
                    self._log(int(active_task["id"]), user_id, "SYSTEM", "INFO", "已按管理员定时设置发送停止请求。")
                else:
                    self._log(None, user_id, "SYSTEM", "INFO", "已到定时停止时间,当前没有运行中的任务。")
                self.store.mark_schedule_auto_stop(user_id, today_text)
                continue

            if not (start_time <= current_time_text < stop_time):
                continue
            if str(schedule.get("last_auto_start_on") or "") == today_text:
                continue

            user = self.store.get_user(user_id)
            if user is None:
                self.store.mark_schedule_auto_start(user_id, today_text)
                continue
            if not self.store.list_courses_for_user(user_id):
                self._log(None, user_id, "SYSTEM", "INFO", "已到定时启动时间,但当前没有课程目标,今天不自动启动。")
                self.store.mark_schedule_auto_start(user_id, today_text)
                continue

            task, created = self.queue_task(
                user_id,
                requested_by="scheduler",
                requested_by_role="system",
                requested_mode=TASK_RUN_MODE_STABLE,
                login_mode=LOGIN_MODE_UNIFIED,
                use_proxy=False,
            )
            if created:
                self._log(task["id"], user_id, "SYSTEM", "INFO", "已按管理员定时设置自动加入任务队列。")
            else:
                self._log(task["id"], user_id, "SYSTEM", "INFO", "已到定时启动时间,但当前已有任务在运行或排队。")
            self.store.mark_schedule_auto_start(user_id, today_text)

    def _dispatch_loop(self) -> None:
        LOGGER.info("Dispatcher loop started")
        while not self._shutdown_event.is_set():
            self._cleanup_running_registry()
            self._apply_user_schedules()
            parallel_limit = self.store.get_parallel_limit()
            running_count = self._running_count()
            available_slots = max(0, parallel_limit - running_count)
            pending_tasks: list[dict] = []
            if available_slots > 0:
                pending_tasks = self.store.list_pending_tasks(available_slots)

            snapshot = (parallel_limit, running_count, len(pending_tasks))
            if snapshot != self._last_dispatch_snapshot and (running_count > 0 or pending_tasks):
                LOGGER.info(
                    "Dispatcher snapshot | parallel_limit=%s running=%s available_slots=%s fetched_pending=%s",
                    parallel_limit,
                    running_count,
                    available_slots,
                    len(pending_tasks),
                )
                self._last_dispatch_snapshot = snapshot

            if pending_tasks:
                for task in pending_tasks:
                    if self._shutdown_event.is_set():
                        break
                    if not task["is_active"]:
                        LOGGER.warning(
                            "Queued task cancelled because user is inactive | task_id=%s user_id=%s",
                            task["id"],
                            task["user_id"],
                        )
                        self.store.finish_task(task["id"], "failed", "该用户已被禁用,任务未执行。")
                        self._log(task["id"], task["user_id"], "SYSTEM", "WARNING", "用户已禁用,队列中的任务被取消。")
                        continue
                    self._log(
                        task["id"],
                        task["user_id"],
                        "SYSTEM",
                        "INFO",
                        f"调度器已获取执行名额,准备启动任务线程。当前并发 {running_count}/{parallel_limit}。",
                    )
                    self._launch_task(task["id"])
            time.sleep(1)
        LOGGER.info("Dispatcher loop stopped")

    def _launch_task(self, task_id: int) -> None:
        with self._running_lock:
            if task_id in self._running:
                LOGGER.info("Task launch skipped because thread already exists | task_id=%s", task_id)
                return
            stop_event = threading.Event()
            thread = threading.Thread(target=self._run_task, args=(task_id, stop_event), name=f"task-{task_id}", daemon=True)
            self._running[task_id] = RunningTask(task_id=task_id, thread=thread, stop_event=stop_event)
            LOGGER.info("Launching task thread | task_id=%s thread_name=%s", task_id, thread.name)
            task = self.store.get_task(task_id)
            self._log(
                task_id,
                int(task["user_id"]) if task else None,
                "SYSTEM",
                "INFO",
                f"调度器已分配执行线程 {thread.name},任务即将开始运行。",
            )
            thread.start()

    def _run_task(self, task_id: int, stop_event: threading.Event) -> None:
        task = self.store.get_task(task_id)
        if task is None:
            LOGGER.warning("Task record disappeared before execution | task_id=%s", task_id)
            self._remove_running(task_id)
            return

        user = self.store.get_user(task["user_id"])
        if user is None:
            LOGGER.error("Task cannot start because user does not exist | task_id=%s user_id=%s", task_id, task["user_id"])
            self.store.finish_task(task_id, "failed", "用户不存在。")
            self._remove_running(task_id)
            return

        LOGGER.info("Task runner starting | task_id=%s user_id=%s", task_id, user["id"])
        if not self.store.mark_task_running(task_id):
            latest_task = self.store.get_task(task_id) or task
            latest_status = str(latest_task.get("status") or "")
            if latest_status == "cancel_requested":
                self.store.finish_task(task_id, "stopped", "任务启动前已收到停止请求。")
                self._log(task_id, user["id"], "SYSTEM", "INFO", "任务启动前已收到停止请求,未启动 Selenium。")
            else:
                self._log(task_id, user["id"], "SYSTEM", "WARNING", f"任务启动前状态已变为 {latest_status or '未知'},未覆盖为运行态。")
            self._remove_running(task_id)
            return
        course_count = len(self.store.list_courses_for_user(user["id"]))
        requested_mode = normalize_task_run_mode(task.get("requested_mode"))
        effective_mode = normalize_task_run_mode(task.get("effective_mode"))
        login_mode = normalize_login_mode(task.get("login_mode"))
        use_proxy = bool(int(task.get("use_proxy") or 0))
        refresh_interval = user.get("refresh_interval_seconds") or getattr(self.config, "poll_interval_seconds", "-")
        self._log(
            task_id,
            user["id"],
            "SYSTEM",
            "INFO",
            "任务开始执行。"
            f"学号={user.get('student_id') or '-'},"
            f"待选课程 {course_count} 门,"
            f"请求模式={self._mode_label(requested_mode)},当前模式={self._mode_label(effective_mode)},"
            f"登录方式={self._login_mode_label(login_mode)},"
            f"代理={'启用' if use_proxy else '关闭'},"
            f"刷新间隔={refresh_interval} 秒。",
        )

        try:
            password = self.secret_box.decrypt(user["password_encrypted"])
            bot = CourseBot(
                config=self.config,
                store=self.store,
                task_id=task_id,
                user=user,
                password=password,
                logger=lambda level, message: self._log(task_id, user["id"], "RUNNER", level, message),
                login_2fa_code_provider=lambda phone_mask, current_stop_event: self.request_login_2fa_code(
                    task_id,
                    user,
                    phone_mask,
                    current_stop_event,
                ),
            )
            result = bot.run(stop_event)
        except Exception as exc:  # pragma: no cover - defensive fallback
            LOGGER.exception("Task initialization failed | task_id=%s user_id=%s", task_id, user["id"])
            result = TaskResult(status="failed", error=str(exc))
            self._log(task_id, user["id"], "SYSTEM", "ERROR", f"任务初始化失败: {exc}")

        current_task = self.store.get_task(task_id)
        final_status = result.status
        if current_task and current_task["status"] == "cancel_requested" and result.status != "completed":
            final_status = "stopped"
        elif stop_event.is_set() and result.status != "completed":
            final_status = "stopped"

        self.store.finish_task(task_id, final_status, result.error)
        final_task = self.store.get_task(task_id) or {}
        final_attempts = int(final_task.get("total_attempts") or 0)
        final_errors = int(final_task.get("total_errors") or 0)
        remaining_courses = len(self.store.list_courses_for_user(user["id"]))
        final_text = result.error or f"任务已结束,状态: {final_status}"
        final_text = f"{final_text} 总尝试 {final_attempts} 次,累计异常 {final_errors} 次,剩余待选课程 {remaining_courses} 门。"
        level = "ERROR" if final_status == "failed" else "INFO"
        self._log(task_id, user["id"], "SYSTEM", level, final_text)
        LOGGER.info(
            "Task runner finished | task_id=%s user_id=%s final_status=%s",
            task_id,
            user["id"],
            final_status,
        )
        self._remove_running(task_id)

    def _log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> None:
        upper_level = level.upper()
        self.store.add_log(task_id, user_id, scope, upper_level, message)
        if task_id is not None and upper_level in {"WARNING", "ERROR"}:
            self.store.increment_task_errors(task_id)
        self._emit_runtime_log(task_id=task_id, user_id=user_id, scope=scope, level=upper_level, message=message)

    @staticmethod
    def _emit_runtime_log(*, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> None:
        upper_level = level.upper()
        rendered = f"task_id={task_id or '-'} user_id={user_id or '-'} scope={scope} level={upper_level} | {message}"
        if upper_level == "ERROR":
            LOGGER.error(rendered)
        elif upper_level == "WARNING":
            LOGGER.warning(rendered)
        else:
            LOGGER.info(rendered)

    def _running_count(self) -> int:
        with self._running_lock:
            return len(self._running)

    @staticmethod
    def _mode_label(mode: str) -> str:
        return TASK_RUN_MODE_LABELS.get(mode, mode)

    @staticmethod
    def _login_mode_label(mode: str) -> str:
        return LOGIN_MODE_LABELS.get(mode, mode)

    def _cleanup_running_registry(self) -> None:
        finished_ids: list[int] = []
        with self._running_lock:
            for task_id, running_task in self._running.items():
                if not running_task.thread.is_alive():
                    finished_ids.append(task_id)
            for task_id in finished_ids:
                self._running.pop(task_id, None)
        for task_id in finished_ids:
            LOGGER.info("Task thread cleaned from registry | task_id=%s", task_id)

    def _remove_running(self, task_id: int) -> None:
        with self._running_lock:
            removed = self._running.pop(task_id, None)
        if removed is not None:
            LOGGER.info("Task removed from running registry | task_id=%s", task_id)