Spaces:
Sleeping
Sleeping
File size: 5,670 Bytes
01d5a5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
import os
import sys
import time
import subprocess
from typing import Optional, Dict, Any
from lpm_kernel.configs.logging import get_train_process_logger
logger = get_train_process_logger()
class ScriptRunner:
"""Script executor, supports executing Python and Bash scripts"""
def __init__(self, log_path: str = "data/local_logs/train.log"):
"""
Initialize script executor
Args:
log_path: Base path for log files
"""
self.base_log_path = log_path
def _prepare_log_file(self, script_type: str) -> str:
"""
Prepare log file
Args:
script_type: Script type, used for log directory naming
Returns:
str: Complete path to the log file
"""
# Create log directory
log_dir = os.path.join(self.base_log_dir, script_type)
os.makedirs(log_dir, exist_ok=True)
# Generate log filename with timestamp
timestamp = time.strftime("%Y%m%d_%H%M%S")
return os.path.join(log_dir, f"{script_type}_{timestamp}.log")
def _check_execution_env(self) -> Dict[str, str]:
"""
Get current execution environment information, supporting docker or regular system environment
Returns:
Dict[str, str]: Dictionary containing environment type and detailed information
"""
env_info = {
"type": "system",
"details": "Unknown environment"
}
# Check if in docker environment - first check environment variable
if os.environ.get("IN_DOCKER_ENV") == "1":
env_info["type"] = "docker"
env_info["details"] = "docker-env-variable"
return env_info
# Regular system environment
try:
import platform
system_info = platform.platform()
env_info["details"] = system_info
except Exception:
pass
return env_info
def _check_python_version(self) -> str:
"""
Get Python version information
Returns:
str: Python version information
"""
return sys.version
def execute_script(
self,
script_path: str,
script_type: str,
is_python: bool = False,
args: Optional[list] = None,
) -> Dict[str, Any]:
"""
Execute script
Args:
script_path: Complete path to the script
script_type: Script type, used for log directory naming
is_python: Whether it is a Python script
args: List of additional script parameters
Returns:
Dict[str, Any]: Execution result, including process ID, environment information and log file path
"""
try:
# Check if script exists
if not os.path.exists(script_path):
raise FileNotFoundError(f"Script does not exist: {script_path}")
# Get execution environment information
env_info = self._check_execution_env()
logger.info(f"Running in environment: {env_info['type']} ({env_info['details']})")
# Prepare log file
log_file = self.base_log_path
logger.info(f"Starting {script_type} task, log file: {log_file}")
# Ensure script has execution permission
os.chmod(script_path, 0o755)
# Build command
if is_python:
command = [sys.executable, script_path]
else:
command = ["bash", script_path]
# Add additional parameters
if args:
command.extend(args)
# Record Python version (if it's a Python script)
if is_python:
logger.info(f"Python version: {self._check_python_version()}")
# Execute script
from subprocess import PIPE
# Open log file
with open(log_file, "a", buffering=1) as f:
process = subprocess.Popen(
command,
shell=False, # Use list form of command, no need for shell=True
cwd=os.getcwd(),
env=os.environ.copy(),
stdout=PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True # This ensures text mode output
)
# Get process ID
pid = process.pid
logger.info(f"Process started, PID: {pid}")
# Read output in real-time
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
# Write to file
f.write(output)
# Write to console
print(output, end='', flush=True)
# Wait for process to end
exit_code = process.wait()
end_message = f"Process (PID: {pid}) has ended, exit code: {exit_code}"
logger.info(end_message)
print(end_message)
f.write(f"\n{end_message}\n")
return {
"pid": pid,
"environment": env_info,
"log_file": log_file,
"exit_code": exit_code,
}
except Exception as e:
error_msg = f"Failed to execute script: {str(e)}"
logger.error(error_msg)
raise
|