""" 进程管理服务 负责管理爬虫进程的启动和停止 """ import asyncio import sys import os import signal from datetime import datetime from typing import Dict from src.utils import build_task_log_path class ProcessService: """进程管理服务""" def __init__(self): self.processes: Dict[int, asyncio.subprocess.Process] = {} self.log_paths: Dict[int, str] = {} def is_running(self, task_id: int) -> bool: """检查任务是否正在运行""" process = self.processes.get(task_id) return process is not None and process.returncode is None async def start_task(self, task_id: int, task_name: str) -> bool: """启动任务进程""" if self.is_running(task_id): print(f"任务 '{task_name}' (ID: {task_id}) 已在运行中") return False try: os.makedirs("logs", exist_ok=True) log_file_path = build_task_log_path(task_id, task_name) log_file_handle = open(log_file_path, 'a', encoding='utf-8') preexec_fn = os.setsid if sys.platform != "win32" else None child_env = os.environ.copy() child_env["PYTHONIOENCODING"] = "utf-8" child_env["PYTHONUTF8"] = "1" process = await asyncio.create_subprocess_exec( sys.executable, "-u", "spider_v2.py", "--task-name", task_name, stdout=log_file_handle, stderr=log_file_handle, preexec_fn=preexec_fn, env=child_env ) self.processes[task_id] = process self.log_paths[task_id] = log_file_path print(f"启动任务 '{task_name}' (PID: {process.pid})") return True except Exception as e: if task_id in self.log_paths: del self.log_paths[task_id] print(f"启动任务 '{task_name}' 失败: {e}") return False def _append_stop_marker(self, log_path: str | None) -> None: if not log_path: return try: ts = datetime.now().strftime(' %Y-%m-%d %H:%M:%S') with open(log_path, 'a', encoding='utf-8') as f: f.write(f"[{ts}] !!! 任务已被终止 !!!\n") except Exception as e: print(f"写入任务终止标记失败: {e}") async def stop_task(self, task_id: int) -> bool: """停止任务进程""" process = self.processes.pop(task_id, None) log_path = self.log_paths.pop(task_id, None) if not process: print(f"任务 ID {task_id} 没有正在运行的进程") return False if process.returncode is not None: print(f"任务进程 {process.pid} (ID: {task_id}) 已退出,略过停止") return False try: if sys.platform != "win32": os.killpg(os.getpgid(process.pid), signal.SIGTERM) else: process.terminate() try: await asyncio.wait_for(process.wait(), timeout=20) except asyncio.TimeoutError: print(f"任务进程 {process.pid} (ID: {task_id}) 未在 20 秒内退出,准备强制终止...") if sys.platform != "win32": with contextlib.suppress(ProcessLookupError): os.killpg(os.getpgid(process.pid), signal.SIGKILL) else: process.kill() await process.wait() self._append_stop_marker(log_path) print(f"任务进程 {process.pid} (ID: {task_id}) 已终止") return True except ProcessLookupError: print(f"进程 (ID: {task_id}) 已不存在") return False except Exception as e: print(f"停止任务进程 (ID: {task_id}) 时出错: {e}") return False async def stop_all(self): """停止所有任务进程""" task_ids = list(self.processes.keys()) for task_id in task_ids: await self.stop_task(task_id)