cacode commited on
Commit
8385899
·
verified ·
1 Parent(s): 728219c

Add detailed runtime container logging

Browse files
Files changed (4) hide show
  1. Dockerfile +1 -1
  2. core/runtime_logging.py +34 -0
  3. core/task_manager.py +79 -1
  4. space_app.py +60 -3
Dockerfile CHANGED
@@ -31,4 +31,4 @@ USER user
31
 
32
  EXPOSE 7860
33
 
34
- CMD ["gunicorn", "--bind", "0.0.0.0:7860", "--workers", "1", "--threads", "8", "--timeout", "180", "app:app"]
 
31
 
32
  EXPOSE 7860
33
 
34
+ CMD ["gunicorn", "--bind", "0.0.0.0:7860", "--workers", "1", "--threads", "8", "--timeout", "180", "--log-level", "info", "--error-logfile", "-", "--capture-output", "app:app"]
core/runtime_logging.py ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ import os
5
+ import sys
6
+
7
+
8
+ DEFAULT_LOG_LEVEL = os.getenv("APP_LOG_LEVEL", "INFO").upper()
9
+ LOG_FORMAT = "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
10
+
11
+
12
+ def configure_logging(level: str | None = None) -> None:
13
+ resolved_level_name = (level or DEFAULT_LOG_LEVEL).upper()
14
+ resolved_level = getattr(logging, resolved_level_name, logging.INFO)
15
+ formatter = logging.Formatter(LOG_FORMAT)
16
+
17
+ root_logger = logging.getLogger()
18
+ if not root_logger.handlers:
19
+ handler = logging.StreamHandler(sys.stdout)
20
+ handler.setFormatter(formatter)
21
+ root_logger.addHandler(handler)
22
+ else:
23
+ for handler in root_logger.handlers:
24
+ handler.setFormatter(formatter)
25
+
26
+ root_logger.setLevel(resolved_level)
27
+ logging.captureWarnings(True)
28
+
29
+ for logger_name in ("gunicorn.error", "gunicorn.access", "werkzeug"):
30
+ logging.getLogger(logger_name).setLevel(resolved_level)
31
+
32
+
33
+ def get_logger(name: str) -> logging.Logger:
34
+ return logging.getLogger(name)
core/task_manager.py CHANGED
@@ -1,5 +1,6 @@
1
  from __future__ import annotations
2
 
 
3
  import threading
4
  import time
5
  from dataclasses import dataclass
@@ -9,6 +10,9 @@ from core.db import Database
9
  from core.security import SecretBox
10
 
11
 
 
 
 
12
  @dataclass(slots=True)
13
  class RunningTask:
14
  task_id: int
@@ -27,35 +31,60 @@ class TaskManager:
27
  self._shutdown_event = threading.Event()
28
  self._dispatcher_thread: threading.Thread | None = None
29
  self._running: dict[int, RunningTask] = {}
 
30
 
31
  def start(self) -> None:
32
  with self._startup_lock:
33
  if self._started:
 
34
  return
 
 
 
 
 
35
  self.store.reset_inflight_tasks()
36
  self._dispatcher_thread = threading.Thread(target=self._dispatch_loop, name="task-dispatcher", daemon=True)
37
  self._dispatcher_thread.start()
38
  self._started = True
 
39
 
40
  def shutdown(self) -> None:
 
41
  self._shutdown_event.set()
42
  with self._running_lock:
43
  for running_task in self._running.values():
44
  running_task.stop_event.set()
 
45
 
46
  def queue_task(self, user_id: int, requested_by: str, requested_by_role: str) -> tuple[dict, bool]:
47
  active_task = self.store.find_active_task_for_user(user_id)
48
  if active_task is not None:
 
 
 
 
 
 
49
  return self.store.get_task(active_task["id"]) or active_task, False
50
 
51
  task_id = self.store.create_task(user_id, requested_by, requested_by_role)
 
 
 
 
 
 
 
52
  self._log(task_id, user_id, "SYSTEM", "INFO", f"任务已进入队列,触发者: {requested_by_role}:{requested_by}")
53
  return self.store.get_task(task_id), True
54
 
55
  def stop_task(self, task_id: int) -> bool:
56
  requested = self.store.request_task_stop(task_id)
57
  if not requested:
 
58
  return False
 
59
  self._log(task_id, None, "SYSTEM", "INFO", "收到停止请求,任务会在安全节点退出。")
60
  with self._running_lock:
61
  running_task = self._running.get(task_id)
@@ -64,44 +93,70 @@ class TaskManager:
64
  return True
65
 
66
  def _dispatch_loop(self) -> None:
 
67
  while not self._shutdown_event.is_set():
68
  self._cleanup_running_registry()
69
  parallel_limit = self.store.get_parallel_limit()
70
  running_count = self._running_count()
71
  available_slots = max(0, parallel_limit - running_count)
 
72
  if available_slots > 0:
73
  pending_tasks = self.store.list_pending_tasks(available_slots)
 
 
 
 
 
 
 
 
 
 
 
 
 
74
  for task in pending_tasks:
75
  if self._shutdown_event.is_set():
76
  break
77
  if not task["is_active"]:
 
 
 
 
 
78
  self.store.finish_task(task["id"], "failed", "该用户已被禁用,任务未执行。")
79
  self._log(task["id"], task["user_id"], "SYSTEM", "WARNING", "用户已禁用,队���中的任务被取消。")
80
  continue
81
  self._launch_task(task["id"])
82
  time.sleep(1)
 
83
 
84
  def _launch_task(self, task_id: int) -> None:
85
  with self._running_lock:
86
  if task_id in self._running:
 
87
  return
88
  stop_event = threading.Event()
89
  thread = threading.Thread(target=self._run_task, args=(task_id, stop_event), name=f"task-{task_id}", daemon=True)
90
  self._running[task_id] = RunningTask(task_id=task_id, thread=thread, stop_event=stop_event)
 
91
  thread.start()
92
 
93
  def _run_task(self, task_id: int, stop_event: threading.Event) -> None:
94
  task = self.store.get_task(task_id)
95
  if task is None:
 
96
  self._remove_running(task_id)
97
  return
98
 
99
  user = self.store.get_user(task["user_id"])
100
  if user is None:
 
101
  self.store.finish_task(task_id, "failed", "用户不存在。")
102
  self._remove_running(task_id)
103
  return
104
 
 
105
  self.store.mark_task_running(task_id)
106
  self._log(task_id, user["id"], "SYSTEM", "INFO", "任务开始执行。")
107
 
@@ -117,6 +172,7 @@ class TaskManager:
117
  )
118
  result = bot.run(stop_event)
119
  except Exception as exc: # pragma: no cover - defensive fallback
 
120
  result = TaskResult(status="failed", error=str(exc))
121
  self._log(task_id, user["id"], "SYSTEM", "ERROR", f"任务初始化失败: {exc}")
122
 
@@ -131,10 +187,28 @@ class TaskManager:
131
  final_text = result.error or f"任务已结束,状态: {final_status}"
132
  level = "ERROR" if final_status == "failed" else "INFO"
133
  self._log(task_id, user["id"], "SYSTEM", level, final_text)
 
 
 
 
 
 
134
  self._remove_running(task_id)
135
 
136
  def _log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> None:
137
  self.store.add_log(task_id, user_id, scope, level, message)
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
  def _running_count(self) -> int:
140
  with self._running_lock:
@@ -148,7 +222,11 @@ class TaskManager:
148
  finished_ids.append(task_id)
149
  for task_id in finished_ids:
150
  self._running.pop(task_id, None)
 
 
151
 
152
  def _remove_running(self, task_id: int) -> None:
153
  with self._running_lock:
154
- self._running.pop(task_id, None)
 
 
 
1
  from __future__ import annotations
2
 
3
+ import logging
4
  import threading
5
  import time
6
  from dataclasses import dataclass
 
10
  from core.security import SecretBox
11
 
12
 
13
+ LOGGER = logging.getLogger("sacc.task_manager")
14
+
15
+
16
  @dataclass(slots=True)
17
  class RunningTask:
18
  task_id: int
 
31
  self._shutdown_event = threading.Event()
32
  self._dispatcher_thread: threading.Thread | None = None
33
  self._running: dict[int, RunningTask] = {}
34
+ self._last_dispatch_snapshot: tuple[int, int, int] | None = None
35
 
36
  def start(self) -> None:
37
  with self._startup_lock:
38
  if self._started:
39
+ LOGGER.info("Task manager start skipped because it is already running")
40
  return
41
+ LOGGER.info(
42
+ "Task manager starting | db_path=%s default_parallel_limit=%s",
43
+ self.store.path,
44
+ getattr(self.config, "default_parallel_limit", "-"),
45
+ )
46
  self.store.reset_inflight_tasks()
47
  self._dispatcher_thread = threading.Thread(target=self._dispatch_loop, name="task-dispatcher", daemon=True)
48
  self._dispatcher_thread.start()
49
  self._started = True
50
+ LOGGER.info("Task manager started")
51
 
52
  def shutdown(self) -> None:
53
+ LOGGER.info("Task manager shutdown requested")
54
  self._shutdown_event.set()
55
  with self._running_lock:
56
  for running_task in self._running.values():
57
  running_task.stop_event.set()
58
+ LOGGER.info("Task manager stop signal broadcast to %s running task(s)", self._running_count())
59
 
60
  def queue_task(self, user_id: int, requested_by: str, requested_by_role: str) -> tuple[dict, bool]:
61
  active_task = self.store.find_active_task_for_user(user_id)
62
  if active_task is not None:
63
+ LOGGER.info(
64
+ "Task queue skipped because an active task already exists | task_id=%s user_id=%s status=%s",
65
+ active_task["id"],
66
+ user_id,
67
+ active_task["status"],
68
+ )
69
  return self.store.get_task(active_task["id"]) or active_task, False
70
 
71
  task_id = self.store.create_task(user_id, requested_by, requested_by_role)
72
+ LOGGER.info(
73
+ "Task queued | task_id=%s user_id=%s requested_by_role=%s requested_by=%s",
74
+ task_id,
75
+ user_id,
76
+ requested_by_role,
77
+ requested_by,
78
+ )
79
  self._log(task_id, user_id, "SYSTEM", "INFO", f"任务已进入队列,触发者: {requested_by_role}:{requested_by}")
80
  return self.store.get_task(task_id), True
81
 
82
  def stop_task(self, task_id: int) -> bool:
83
  requested = self.store.request_task_stop(task_id)
84
  if not requested:
85
+ LOGGER.info("Task stop request ignored because task is not active | task_id=%s", task_id)
86
  return False
87
+ LOGGER.info("Task stop requested | task_id=%s", task_id)
88
  self._log(task_id, None, "SYSTEM", "INFO", "收到停止请求,任务会在安全节点退出。")
89
  with self._running_lock:
90
  running_task = self._running.get(task_id)
 
93
  return True
94
 
95
  def _dispatch_loop(self) -> None:
96
+ LOGGER.info("Dispatcher loop started")
97
  while not self._shutdown_event.is_set():
98
  self._cleanup_running_registry()
99
  parallel_limit = self.store.get_parallel_limit()
100
  running_count = self._running_count()
101
  available_slots = max(0, parallel_limit - running_count)
102
+ pending_tasks: list[dict] = []
103
  if available_slots > 0:
104
  pending_tasks = self.store.list_pending_tasks(available_slots)
105
+
106
+ snapshot = (parallel_limit, running_count, len(pending_tasks))
107
+ if snapshot != self._last_dispatch_snapshot and (running_count > 0 or pending_tasks):
108
+ LOGGER.info(
109
+ "Dispatcher snapshot | parallel_limit=%s running=%s available_slots=%s fetched_pending=%s",
110
+ parallel_limit,
111
+ running_count,
112
+ available_slots,
113
+ len(pending_tasks),
114
+ )
115
+ self._last_dispatch_snapshot = snapshot
116
+
117
+ if pending_tasks:
118
  for task in pending_tasks:
119
  if self._shutdown_event.is_set():
120
  break
121
  if not task["is_active"]:
122
+ LOGGER.warning(
123
+ "Queued task cancelled because user is inactive | task_id=%s user_id=%s",
124
+ task["id"],
125
+ task["user_id"],
126
+ )
127
  self.store.finish_task(task["id"], "failed", "该用户已被禁用,任务未执行。")
128
  self._log(task["id"], task["user_id"], "SYSTEM", "WARNING", "用户已禁用,队���中的任务被取消。")
129
  continue
130
  self._launch_task(task["id"])
131
  time.sleep(1)
132
+ LOGGER.info("Dispatcher loop stopped")
133
 
134
  def _launch_task(self, task_id: int) -> None:
135
  with self._running_lock:
136
  if task_id in self._running:
137
+ LOGGER.info("Task launch skipped because thread already exists | task_id=%s", task_id)
138
  return
139
  stop_event = threading.Event()
140
  thread = threading.Thread(target=self._run_task, args=(task_id, stop_event), name=f"task-{task_id}", daemon=True)
141
  self._running[task_id] = RunningTask(task_id=task_id, thread=thread, stop_event=stop_event)
142
+ LOGGER.info("Launching task thread | task_id=%s thread_name=%s", task_id, thread.name)
143
  thread.start()
144
 
145
  def _run_task(self, task_id: int, stop_event: threading.Event) -> None:
146
  task = self.store.get_task(task_id)
147
  if task is None:
148
+ LOGGER.warning("Task record disappeared before execution | task_id=%s", task_id)
149
  self._remove_running(task_id)
150
  return
151
 
152
  user = self.store.get_user(task["user_id"])
153
  if user is None:
154
+ LOGGER.error("Task cannot start because user does not exist | task_id=%s user_id=%s", task_id, task["user_id"])
155
  self.store.finish_task(task_id, "failed", "用户不存在。")
156
  self._remove_running(task_id)
157
  return
158
 
159
+ LOGGER.info("Task runner starting | task_id=%s user_id=%s", task_id, user["id"])
160
  self.store.mark_task_running(task_id)
161
  self._log(task_id, user["id"], "SYSTEM", "INFO", "任务开始执行。")
162
 
 
172
  )
173
  result = bot.run(stop_event)
174
  except Exception as exc: # pragma: no cover - defensive fallback
175
+ LOGGER.exception("Task initialization failed | task_id=%s user_id=%s", task_id, user["id"])
176
  result = TaskResult(status="failed", error=str(exc))
177
  self._log(task_id, user["id"], "SYSTEM", "ERROR", f"任务初始化失败: {exc}")
178
 
 
187
  final_text = result.error or f"任务已结束,状态: {final_status}"
188
  level = "ERROR" if final_status == "failed" else "INFO"
189
  self._log(task_id, user["id"], "SYSTEM", level, final_text)
190
+ LOGGER.info(
191
+ "Task runner finished | task_id=%s user_id=%s final_status=%s",
192
+ task_id,
193
+ user["id"],
194
+ final_status,
195
+ )
196
  self._remove_running(task_id)
197
 
198
  def _log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> None:
199
  self.store.add_log(task_id, user_id, scope, level, message)
200
+ self._emit_runtime_log(task_id=task_id, user_id=user_id, scope=scope, level=level, message=message)
201
+
202
+ @staticmethod
203
+ def _emit_runtime_log(*, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> None:
204
+ upper_level = level.upper()
205
+ rendered = f"task_id={task_id or '-'} user_id={user_id or '-'} scope={scope} level={upper_level} | {message}"
206
+ if upper_level == "ERROR":
207
+ LOGGER.error(rendered)
208
+ elif upper_level == "WARNING":
209
+ LOGGER.warning(rendered)
210
+ else:
211
+ LOGGER.info(rendered)
212
 
213
  def _running_count(self) -> int:
214
  with self._running_lock:
 
222
  finished_ids.append(task_id)
223
  for task_id in finished_ids:
224
  self._running.pop(task_id, None)
225
+ for task_id in finished_ids:
226
+ LOGGER.info("Task thread cleaned from registry | task_id=%s", task_id)
227
 
228
  def _remove_running(self, task_id: int) -> None:
229
  with self._running_lock:
230
+ removed = self._running.pop(task_id, None)
231
+ if removed is not None:
232
+ LOGGER.info("Task removed from running registry | task_id=%s", task_id)
space_app.py CHANGED
@@ -1,4 +1,4 @@
1
- from __future__ import annotations
2
 
3
  import atexit
4
  import json
@@ -6,14 +6,18 @@ import time
6
  from functools import wraps
7
  from typing import Callable
8
 
9
- from flask import Flask, Response, flash, jsonify, redirect, render_template, request, session, stream_with_context, url_for
10
 
11
  from core.config import AppConfig
12
  from core.db import Database
13
  from core.security import SecretBox, hash_password, verify_password
14
  from core.task_manager import TaskManager
 
15
 
16
 
 
 
 
17
  config = AppConfig.load()
18
  store = Database(config.db_path, default_parallel_limit=config.default_parallel_limit)
19
  store.init_db()
@@ -55,6 +59,14 @@ atexit.register(task_manager.shutdown)
55
  app = Flask(__name__, template_folder="templates", static_folder="static")
56
  app.secret_key = config.session_secret
57
  app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax")
 
 
 
 
 
 
 
 
58
 
59
  CATEGORY_LABELS = {
60
  "plan": "方案选课",
@@ -70,6 +82,51 @@ TASK_LABELS = {
70
  }
71
 
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  @app.context_processor
74
  def inject_globals() -> dict:
75
  return {
@@ -517,4 +574,4 @@ def stream_admin_logs():
517
  response = Response(generate(), mimetype="text/event-stream")
518
  response.headers["Cache-Control"] = "no-cache"
519
  response.headers["X-Accel-Buffering"] = "no"
520
- return response
 
1
+ from __future__ import annotations
2
 
3
  import atexit
4
  import json
 
6
  from functools import wraps
7
  from typing import Callable
8
 
9
+ from flask import Flask, Response, flash, g, jsonify, redirect, render_template, request, session, stream_with_context, url_for
10
 
11
  from core.config import AppConfig
12
  from core.db import Database
13
  from core.security import SecretBox, hash_password, verify_password
14
  from core.task_manager import TaskManager
15
+ from core.runtime_logging import configure_logging, get_logger
16
 
17
 
18
+ configure_logging()
19
+ APP_LOGGER = get_logger("sacc.web")
20
+
21
  config = AppConfig.load()
22
  store = Database(config.db_path, default_parallel_limit=config.default_parallel_limit)
23
  store.init_db()
 
59
  app = Flask(__name__, template_folder="templates", static_folder="static")
60
  app.secret_key = config.session_secret
61
  app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax")
62
+ APP_LOGGER.info(
63
+ "Application bootstrap complete | data_dir=%s db_path=%s chrome_binary=%s chromedriver_path=%s default_parallel_limit=%s",
64
+ config.data_dir,
65
+ config.db_path,
66
+ config.chrome_binary,
67
+ config.chromedriver_path,
68
+ config.default_parallel_limit,
69
+ )
70
 
71
  CATEGORY_LABELS = {
72
  "plan": "方案选课",
 
82
  }
83
 
84
 
85
+ SKIPPED_REQUEST_LOG_PREFIXES = (
86
+ "/static/",
87
+ "/api/",
88
+ )
89
+ SKIPPED_REQUEST_LOG_PATHS = {
90
+ "/favicon.ico",
91
+ }
92
+
93
+
94
+ def _current_actor_label() -> str:
95
+ role = session.get("role", "guest")
96
+ if role == "user":
97
+ return f"user:{session.get('user_id', '-')}"
98
+ if role == "admin":
99
+ return f"admin:{session.get('admin_username', '-')}"
100
+ return "guest"
101
+
102
+
103
+ @app.before_request
104
+ def before_request_logging() -> None:
105
+ g.request_started_at = time.perf_counter()
106
+
107
+
108
+ @app.after_request
109
+ def after_request_logging(response):
110
+ if request.path in SKIPPED_REQUEST_LOG_PATHS:
111
+ return response
112
+ if any(request.path.startswith(prefix) for prefix in SKIPPED_REQUEST_LOG_PREFIXES):
113
+ return response
114
+
115
+ started_at = getattr(g, "request_started_at", None)
116
+ duration_ms = 0.0 if started_at is None else (time.perf_counter() - started_at) * 1000
117
+ remote_addr = request.headers.get("x-forwarded-for") or request.remote_addr or "-"
118
+ APP_LOGGER.info(
119
+ "HTTP %s %s -> %s in %.1fms | actor=%s remote=%s",
120
+ request.method,
121
+ request.path,
122
+ response.status_code,
123
+ duration_ms,
124
+ _current_actor_label(),
125
+ remote_addr,
126
+ )
127
+ return response
128
+
129
+
130
  @app.context_processor
131
  def inject_globals() -> dict:
132
  return {
 
574
  response = Response(generate(), mimetype="text/event-stream")
575
  response.headers["Cache-Control"] = "no-cache"
576
  response.headers["X-Accel-Buffering"] = "no"
577
+ return response