File size: 4,115 Bytes
5378afe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
进程管理服务
负责管理爬虫进程的启动和停止
"""
import asyncio
import sys
import os
import signal
from datetime import datetime
from typing import Dict
from src.utils import build_task_log_path


class ProcessService:
    """进程管理服务"""

    def __init__(self):
        self.processes: Dict[int, asyncio.subprocess.Process] = {}
        self.log_paths: Dict[int, str] = {}

    def is_running(self, task_id: int) -> bool:
        """检查任务是否正在运行"""
        process = self.processes.get(task_id)
        return process is not None and process.returncode is None

    async def start_task(self, task_id: int, task_name: str) -> bool:
        """启动任务进程"""
        if self.is_running(task_id):
            print(f"任务 '{task_name}' (ID: {task_id}) 已在运行中")
            return False

        try:
            os.makedirs("logs", exist_ok=True)
            log_file_path = build_task_log_path(task_id, task_name)
            log_file_handle = open(log_file_path, 'a', encoding='utf-8')

            preexec_fn = os.setsid if sys.platform != "win32" else None
            child_env = os.environ.copy()
            child_env["PYTHONIOENCODING"] = "utf-8"
            child_env["PYTHONUTF8"] = "1"

            process = await asyncio.create_subprocess_exec(
                sys.executable, "-u", "spider_v2.py", "--task-name", task_name,
                stdout=log_file_handle,
                stderr=log_file_handle,
                preexec_fn=preexec_fn,
                env=child_env
            )

            self.processes[task_id] = process
            self.log_paths[task_id] = log_file_path
            print(f"启动任务 '{task_name}' (PID: {process.pid})")
            return True

        except Exception as e:
            if task_id in self.log_paths:
                del self.log_paths[task_id]
            print(f"启动任务 '{task_name}' 失败: {e}")
            return False

    def _append_stop_marker(self, log_path: str | None) -> None:
        if not log_path:
            return
        try:
            ts = datetime.now().strftime(' %Y-%m-%d %H:%M:%S')
            with open(log_path, 'a', encoding='utf-8') as f:
                f.write(f"[{ts}] !!! 任务已被终止 !!!\n")
        except Exception as e:
            print(f"写入任务终止标记失败: {e}")

    async def stop_task(self, task_id: int) -> bool:
        """停止任务进程"""
        process = self.processes.pop(task_id, None)
        log_path = self.log_paths.pop(task_id, None)
        if not process:
            print(f"任务 ID {task_id} 没有正在运行的进程")
            return False
        if process.returncode is not None:
            print(f"任务进程 {process.pid} (ID: {task_id}) 已退出,略过停止")
            return False

        try:
            if sys.platform != "win32":
                os.killpg(os.getpgid(process.pid), signal.SIGTERM)
            else:
                process.terminate()

            try:
                await asyncio.wait_for(process.wait(), timeout=20)
            except asyncio.TimeoutError:
                print(f"任务进程 {process.pid} (ID: {task_id}) 未在 20 秒内退出,准备强制终止...")
                if sys.platform != "win32":
                    with contextlib.suppress(ProcessLookupError):
                        os.killpg(os.getpgid(process.pid), signal.SIGKILL)
                else:
                    process.kill()
                await process.wait()

            self._append_stop_marker(log_path)
            print(f"任务进程 {process.pid} (ID: {task_id}) 已终止")
            return True

        except ProcessLookupError:
            print(f"进程 (ID: {task_id}) 已不存在")
            return False
        except Exception as e:
            print(f"停止任务进程 (ID: {task_id}) 时出错: {e}")
            return False

    async def stop_all(self):
        """停止所有任务进程"""
        task_ids = list(self.processes.keys())
        for task_id in task_ids:
            await self.stop_task(task_id)