File size: 3,712 Bytes
01d5a5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import os
import subprocess
import sys
from typing import List, Optional, Dict, Any
from lpm_kernel.common.logging import logger

# Test if logging works properly
logger.debug("DEBUG: ScriptExecutor module loaded")
logger.info("INFO: ScriptExecutor module loaded")
logger.warning("WARNING: ScriptExecutor module loaded")
logger.error("ERROR: ScriptExecutor module loaded")


class ScriptExecutor:
    def __init__(self):
        # Check if running in Docker environment
        self.in_docker = os.getenv("IN_DOCKER_ENV") == "1" or os.path.exists("/.dockerenv")

    def execute(
        self,
        script_path: str,
        script_type: str,
        args: Optional[List[str]] = None,
        shell: bool = False,
        log_file: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Execute scripts directly

        Args:
            script_path: Script path or command
            script_type: Script type, used for logging
            args: Command line arguments
            shell: Whether to use shell for execution
            log_file: Log file path, if provided will redirect output to this file

        Returns:
            Execution result
        """
        try:
            # Build the complete command
            if script_path.endswith(".py"):
                # Python script
                cmd = ["python", "-u", script_path]  # Add -u parameter to disable output buffering
            elif script_path.endswith(".sh"):
                # Shell script
                cmd = ["bash", "-x", script_path]  # Add -x parameter to display executed commands
            else:
                # Other commands
                cmd = [script_path]

            # Add additional parameters
            if args:
                cmd.extend(args)

            logger.info(f"Executing command: {' '.join(cmd)}")

            # If logging is needed, ensure the log directory exists
            if log_file:
                os.makedirs(os.path.dirname(log_file), exist_ok=True)

            # Execute command and capture output
            process = subprocess.Popen(
                cmd,
                shell=shell,
                env=os.environ.copy(),
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,  # Redirect error output to standard output
                bufsize=1,  # Line buffering
                universal_newlines=True,  # Use text mode
            )

            # Read and process output in real-time
            while True:
                output = process.stdout.readline()
                if output == "" and process.poll() is not None:
                    break
                if output:
                    # Output to console
                    print(output.strip())
                    # If needed, write to log file
                    if log_file:
                        with open(log_file, "a", encoding="utf-8") as f:
                            f.write(output)

            # Wait for the process to end and get the return code
            return_code = process.wait()

            if return_code != 0:
                logger.error(f"Command execution failed, return code: {return_code}")
            else:
                logger.info(f"Command execution successful, return code: {return_code}")

            return {
                "returncode": return_code,
                "error": f"Execution failed, return code: {return_code}"
                if return_code != 0
                else None,
            }

        except Exception as e:
            error_msg = f"Error occurred while executing {script_type} script: {str(e)}"
            logger.error(error_msg)
            return {"error": error_msg, "returncode": -1}