File size: 5,670 Bytes
01d5a5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os
import sys
import time
import subprocess
from typing import Optional, Dict, Any

from lpm_kernel.configs.logging import get_train_process_logger
logger = get_train_process_logger()


class ScriptRunner:
    """Script executor, supports executing Python and Bash scripts"""

    def __init__(self, log_path: str = "data/local_logs/train.log"):
        """
        Initialize script executor
        Args:
            log_path: Base path for log files
        """
        self.base_log_path = log_path

    def _prepare_log_file(self, script_type: str) -> str:
        """
        Prepare log file
        Args:
            script_type: Script type, used for log directory naming
        Returns:
            str: Complete path to the log file
        """
        # Create log directory
        log_dir = os.path.join(self.base_log_dir, script_type)
        os.makedirs(log_dir, exist_ok=True)

        # Generate log filename with timestamp
        timestamp = time.strftime("%Y%m%d_%H%M%S")
        return os.path.join(log_dir, f"{script_type}_{timestamp}.log")

    def _check_execution_env(self) -> Dict[str, str]:
        """
        Get current execution environment information, supporting docker or regular system environment
        Returns:
            Dict[str, str]: Dictionary containing environment type and detailed information
        """
        env_info = {
            "type": "system",
            "details": "Unknown environment"
        }
        
        # Check if in docker environment - first check environment variable
        if os.environ.get("IN_DOCKER_ENV") == "1":
            env_info["type"] = "docker"
            env_info["details"] = "docker-env-variable"
            return env_info
        
        # Regular system environment
        try:
            import platform
            system_info = platform.platform()
            env_info["details"] = system_info
        except Exception:
            pass
            
        return env_info

    def _check_python_version(self) -> str:
        """
        Get Python version information
        Returns:
            str: Python version information
        """
        return sys.version

    def execute_script(
        self,
        script_path: str,
        script_type: str,
        is_python: bool = False,
        args: Optional[list] = None,
    ) -> Dict[str, Any]:
        """
        Execute script
        Args:
            script_path: Complete path to the script
            script_type: Script type, used for log directory naming
            is_python: Whether it is a Python script
            args: List of additional script parameters
        Returns:
            Dict[str, Any]: Execution result, including process ID, environment information and log file path
        """
        try:
            # Check if script exists
            if not os.path.exists(script_path):
                raise FileNotFoundError(f"Script does not exist: {script_path}")

            # Get execution environment information
            env_info = self._check_execution_env()
            logger.info(f"Running in environment: {env_info['type']} ({env_info['details']})")
            
            # Prepare log file
            log_file = self.base_log_path
            logger.info(f"Starting {script_type} task, log file: {log_file}")

            # Ensure script has execution permission
            os.chmod(script_path, 0o755)

            # Build command
            if is_python:
                command = [sys.executable, script_path]
            else:
                command = ["bash", script_path]

            # Add additional parameters
            if args:
                command.extend(args)

            # Record Python version (if it's a Python script)
            if is_python:
                logger.info(f"Python version: {self._check_python_version()}")

            # Execute script
            from subprocess import PIPE
            
            # Open log file
            with open(log_file, "a", buffering=1) as f:
                process = subprocess.Popen(
                    command,
                    shell=False,  # Use list form of command, no need for shell=True
                    cwd=os.getcwd(),
                    env=os.environ.copy(),
                    stdout=PIPE,
                    stderr=subprocess.STDOUT,
                    bufsize=1,
                    universal_newlines=True  # This ensures text mode output
                )

                # Get process ID
                pid = process.pid
                logger.info(f"Process started, PID: {pid}")

                # Read output in real-time
                while True:
                    output = process.stdout.readline()
                    if output == '' and process.poll() is not None:
                        break
                    if output:
                        # Write to file
                        f.write(output)
                        # Write to console
                        print(output, end='', flush=True)

                # Wait for process to end
                exit_code = process.wait()
                end_message = f"Process (PID: {pid}) has ended, exit code: {exit_code}"
                logger.info(end_message)
                print(end_message)
                f.write(f"\n{end_message}\n")

            return {
                "pid": pid,
                "environment": env_info,
                "log_file": log_file,
                "exit_code": exit_code,
            }

        except Exception as e:
            error_msg = f"Failed to execute script: {str(e)}"
            logger.error(error_msg)
            raise