| """ | |
| 进程管理服务 | |
| 负责管理爬虫进程的启动和停止 | |
| """ | |
| import asyncio | |
| import sys | |
| import os | |
| import signal | |
| from datetime import datetime | |
| from typing import Dict | |
| from src.utils import build_task_log_path | |
| class ProcessService: | |
| """进程管理服务""" | |
| def __init__(self): | |
| self.processes: Dict[int, asyncio.subprocess.Process] = {} | |
| self.log_paths: Dict[int, str] = {} | |
| def is_running(self, task_id: int) -> bool: | |
| """检查任务是否正在运行""" | |
| process = self.processes.get(task_id) | |
| return process is not None and process.returncode is None | |
| async def start_task(self, task_id: int, task_name: str) -> bool: | |
| """启动任务进程""" | |
| if self.is_running(task_id): | |
| print(f"任务 '{task_name}' (ID: {task_id}) 已在运行中") | |
| return False | |
| try: | |
| os.makedirs("logs", exist_ok=True) | |
| log_file_path = build_task_log_path(task_id, task_name) | |
| log_file_handle = open(log_file_path, 'a', encoding='utf-8') | |
| preexec_fn = os.setsid if sys.platform != "win32" else None | |
| child_env = os.environ.copy() | |
| child_env["PYTHONIOENCODING"] = "utf-8" | |
| child_env["PYTHONUTF8"] = "1" | |
| process = await asyncio.create_subprocess_exec( | |
| sys.executable, "-u", "spider_v2.py", "--task-name", task_name, | |
| stdout=log_file_handle, | |
| stderr=log_file_handle, | |
| preexec_fn=preexec_fn, | |
| env=child_env | |
| ) | |
| self.processes[task_id] = process | |
| self.log_paths[task_id] = log_file_path | |
| print(f"启动任务 '{task_name}' (PID: {process.pid})") | |
| return True | |
| except Exception as e: | |
| if task_id in self.log_paths: | |
| del self.log_paths[task_id] | |
| print(f"启动任务 '{task_name}' 失败: {e}") | |
| return False | |
| def _append_stop_marker(self, log_path: str | None) -> None: | |
| if not log_path: | |
| return | |
| try: | |
| ts = datetime.now().strftime(' %Y-%m-%d %H:%M:%S') | |
| with open(log_path, 'a', encoding='utf-8') as f: | |
| f.write(f"[{ts}] !!! 任务已被终止 !!!\n") | |
| except Exception as e: | |
| print(f"写入任务终止标记失败: {e}") | |
| async def stop_task(self, task_id: int) -> bool: | |
| """停止任务进程""" | |
| process = self.processes.pop(task_id, None) | |
| log_path = self.log_paths.pop(task_id, None) | |
| if not process: | |
| print(f"任务 ID {task_id} 没有正在运行的进程") | |
| return False | |
| if process.returncode is not None: | |
| print(f"任务进程 {process.pid} (ID: {task_id}) 已退出,略过停止") | |
| return False | |
| try: | |
| if sys.platform != "win32": | |
| os.killpg(os.getpgid(process.pid), signal.SIGTERM) | |
| else: | |
| process.terminate() | |
| try: | |
| await asyncio.wait_for(process.wait(), timeout=20) | |
| except asyncio.TimeoutError: | |
| print(f"任务进程 {process.pid} (ID: {task_id}) 未在 20 秒内退出,准备强制终止...") | |
| if sys.platform != "win32": | |
| with contextlib.suppress(ProcessLookupError): | |
| os.killpg(os.getpgid(process.pid), signal.SIGKILL) | |
| else: | |
| process.kill() | |
| await process.wait() | |
| self._append_stop_marker(log_path) | |
| print(f"任务进程 {process.pid} (ID: {task_id}) 已终止") | |
| return True | |
| except ProcessLookupError: | |
| print(f"进程 (ID: {task_id}) 已不存在") | |
| return False | |
| except Exception as e: | |
| print(f"停止任务进程 (ID: {task_id}) 时出错: {e}") | |
| return False | |
| async def stop_all(self): | |
| """停止所有任务进程""" | |
| task_ids = list(self.processes.keys()) | |
| for task_id in task_ids: | |
| await self.stop_task(task_id) | |