darkfire514's picture
Upload 160 files
399b80c verified
import os
import platform
import shlex
import subprocess
import signal
import time
import json
import uuid
from datetime import datetime
from flask import Flask, request, jsonify, send_file, abort
import pyautogui
import threading
from io import BytesIO
import tempfile
from openspace.utils.logging import Logger
from openspace.local_server.utils import AccessibilityHelper, ScreenshotHelper
from openspace.local_server.platform_adapters import get_platform_adapter
from openspace.local_server.health_checker import HealthChecker
from openspace.local_server.feature_checker import FeatureChecker
platform_name = platform.system()
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB
pyautogui.PAUSE = 0
if platform_name == "Darwin":
pyautogui.DARWIN_CATCH_UP_TIME = 0
logger = Logger.get_logger(__name__)
TIMEOUT = 1800
recording_process = None
if platform_name == "Windows":
recording_path = os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'recording.mp4')
else:
recording_path = "/tmp/recording.mp4"
accessibility_helper = AccessibilityHelper()
screenshot_helper = ScreenshotHelper()
platform_adapter = get_platform_adapter()
feature_checker = FeatureChecker(
platform_adapter=platform_adapter,
accessibility_helper=accessibility_helper
)
def get_conda_activation_prefix(conda_env: str = None) -> str:
"""
Generate platform-specific conda activation command prefix
Args:
conda_env: Conda environment name (e.g., 'myenv')
Returns:
Activation command prefix string, empty if no conda_env
"""
if not conda_env:
return ""
if platform_name == "Windows":
# Windows: use conda.bat or conda.exe
# Try common conda installation paths
conda_paths = [
os.path.expandvars("%USERPROFILE%\\miniconda3\\Scripts\\activate.bat"),
os.path.expandvars("%USERPROFILE%\\anaconda3\\Scripts\\activate.bat"),
"C:\\ProgramData\\Miniconda3\\Scripts\\activate.bat",
"C:\\ProgramData\\Anaconda3\\Scripts\\activate.bat",
]
# Find first existing conda activate script
activate_script = None
for path in conda_paths:
if os.path.exists(path):
activate_script = path
break
if activate_script:
return f'call "{activate_script}" {conda_env} && '
else:
# Fallback: assume conda is in PATH
return f'conda activate {conda_env} && '
else:
# Linux/macOS: source conda.sh then activate
conda_paths = [
os.path.expanduser("~/miniconda3/etc/profile.d/conda.sh"),
os.path.expanduser("~/anaconda3/etc/profile.d/conda.sh"),
"/opt/conda/etc/profile.d/conda.sh",
"/usr/local/miniconda3/etc/profile.d/conda.sh",
"/usr/local/anaconda3/etc/profile.d/conda.sh",
]
# Find first existing conda.sh
conda_sh = None
for path in conda_paths:
if os.path.exists(path):
conda_sh = path
break
if conda_sh:
return f'source "{conda_sh}" && conda activate {conda_env} && '
else:
# Fallback: assume conda is already initialized in shell
return f'conda activate {conda_env} && '
def wrap_script_with_conda(script: str, conda_env: str = None) -> str:
"""
Wrap script with conda activation command.
If conda is not available, returns original script without conda activation.
"""
if not conda_env:
return script
if platform_name == "Windows":
activation_prefix = get_conda_activation_prefix(conda_env)
return f"{activation_prefix}{script}"
else:
conda_paths = [
os.path.expanduser("~/miniconda3/etc/profile.d/conda.sh"),
os.path.expanduser("~/anaconda3/etc/profile.d/conda.sh"),
os.path.expanduser("~/opt/anaconda3/etc/profile.d/conda.sh"),
"/opt/conda/etc/profile.d/conda.sh",
]
conda_sh = None
for path in conda_paths:
if os.path.exists(path):
conda_sh = path
break
if conda_sh:
# Use bash -i -c to run interactively, or directly source conda.sh
wrapped_script = f"""#!/bin/bash
# Initialize conda
if [ -f "{conda_sh}" ]; then
. "{conda_sh}"
conda activate {conda_env} 2>/dev/null || true
fi
# Run user script
{script}
"""
return wrapped_script
else:
# Conda not found - log warning and execute script directly without conda
logger.warning(f"Conda environment '{conda_env}' requested but conda not found. Executing with system Python.")
return script
health_checker = None
@app.route('/', methods=['GET'])
def health_check():
"""Health check interface - return features information"""
# Get features from health_checker
if health_checker:
features = health_checker.get_simple_features_dict()
else:
# Initial startup of health_checker may not have been initialized, fallback to feature_checker
features = feature_checker.check_all_features(use_cache=True)
return jsonify({
'status': 'ok',
'service': 'OpenSpace Desktop Server',
'version': '1.0.0',
'platform': platform_name,
'features': features,
'timestamp': datetime.now().isoformat()
})
@app.route('/platform', methods=['GET'])
def get_platform():
info = {
'system': platform_name,
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor()
}
if platform_adapter and hasattr(platform_adapter, 'get_system_info'):
info.update(platform_adapter.get_system_info())
return jsonify(info)
@app.route('/execute', methods=['POST'])
@app.route('/setup/execute', methods=['POST'])
def execute_command():
data = request.json
# The 'command' key in the JSON request should contain the command to be executed.
shell = data.get('shell', False)
command = data.get('command', "" if shell else [])
timeout = data.get('timeout', 120)
if isinstance(command, str) and not shell:
command = shlex.split(command)
# Expand user directory
if isinstance(command, list):
for i, arg in enumerate(command):
if arg.startswith("~/"):
command[i] = os.path.expanduser(arg)
try:
if platform_name == "Windows":
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell,
text=True,
timeout=timeout,
creationflags=subprocess.CREATE_NO_WINDOW,
)
else:
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell,
text=True,
timeout=timeout,
)
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({
'status': 'error',
'message': f'Command timeout after {timeout} seconds'
}), 408
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/execute_with_verification', methods=['POST'])
@app.route('/setup/execute_with_verification', methods=['POST'])
def execute_command_with_verification():
"""Execute command and verify the result based on provided verification criteria"""
data = request.json
shell = data.get('shell', False)
command = data.get('command', "" if shell else [])
verification = data.get('verification', {})
max_wait_time = data.get('max_wait_time', 10) # Maximum wait time in seconds
check_interval = data.get('check_interval', 1) # Check interval in seconds
if isinstance(command, str) and not shell:
command = shlex.split(command)
# Expand user directory
if isinstance(command, list):
for i, arg in enumerate(command):
if arg.startswith("~/"):
command[i] = os.path.expanduser(arg)
# Execute the main command
try:
if platform_name == "Windows":
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell,
text=True,
timeout=120,
creationflags=subprocess.CREATE_NO_WINDOW,
)
else:
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell,
text=True,
timeout=120,
)
# If no verification is needed, return immediately
if not verification:
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode
})
# Wait and verify the result
start_time = time.time()
while time.time() - start_time < max_wait_time:
verification_passed = True
# Check window existence if specified
if 'window_exists' in verification:
window_name = verification['window_exists']
try:
if platform_name == 'Linux':
wmctrl_result = subprocess.run(
['wmctrl', '-l'],
capture_output=True,
text=True,
check=True
)
if window_name.lower() not in wmctrl_result.stdout.lower():
verification_passed = False
elif platform_adapter:
# Use platform adapter to check window existence
windows = platform_adapter.list_windows() if hasattr(platform_adapter, 'list_windows') else []
if not any(window_name.lower() in str(w).lower() for w in windows):
verification_passed = False
except:
verification_passed = False
# Check command execution if specified
if 'command_success' in verification:
verify_cmd = verification['command_success']
try:
verify_result = subprocess.run(
verify_cmd,
shell=True,
capture_output=True,
text=True,
timeout=5
)
if verify_result.returncode != 0:
verification_passed = False
except:
verification_passed = False
if verification_passed:
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode,
'verification': 'passed',
'wait_time': time.time() - start_time
})
time.sleep(check_interval)
# Verification failed
return jsonify({
'status': 'verification_failed',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode,
'verification': 'failed',
'wait_time': max_wait_time
}), 500
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
def _get_machine_architecture() -> str:
"""Get the machine architecture, e.g., x86_64, arm64, aarch64, i386, etc.
Returns 'amd' for x86/AMD architectures, 'arm' for ARM architectures, or 'unknown'.
"""
architecture = platform.machine().lower()
if architecture in ['amd32', 'amd64', 'x86', 'x86_64', 'x86-64', 'x64', 'i386', 'i686']:
return 'amd'
elif architecture in ['arm64', 'aarch64', 'aarch32']:
return 'arm'
else:
return 'unknown'
@app.route('/setup/launch', methods=["POST"])
def launch_app():
data = request.json
shell = data.get("shell", False)
command = data.get("command", "" if shell else [])
if isinstance(command, str) and not shell:
command = shlex.split(command)
# Expand user directory
if isinstance(command, list):
for i, arg in enumerate(command):
if arg.startswith("~/"):
command[i] = os.path.expanduser(arg)
try:
# ARM architecture compatibility: replace google-chrome with chromium
# ARM64 Chrome is not available yet, can only use Chromium
if isinstance(command, list) and 'google-chrome' in command and _get_machine_architecture() == 'arm':
index = command.index('google-chrome')
command[index] = 'chromium'
logger.info("ARM architecture detected: replacing 'google-chrome' with 'chromium'")
subprocess.Popen(command, shell=shell)
cmd_str = command if shell else " ".join(command)
logger.info(f"Application launched successfully: {cmd_str}")
return jsonify({
'status': 'success',
'message': f'{cmd_str} launched successfully'
})
except Exception as e:
logger.error(f"Application launch failed: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route("/run_python", methods=['POST'])
def run_python():
data = request.json
code = data.get('code', None)
timeout = data.get('timeout', 30)
working_dir = data.get('working_dir', None)
env = data.get('env', None)
conda_env = data.get('conda_env', None)
if not code:
return jsonify({'status': 'error', 'message': 'Code not supplied!'}), 400
# Generate unique filename
if platform_name == "Windows":
temp_filename = os.path.join(tempfile.gettempdir(), f"python_exec_{uuid.uuid4().hex}.py")
else:
temp_filename = f"/tmp/python_exec_{uuid.uuid4().hex}.py"
try:
with open(temp_filename, 'w') as f:
f.write(code)
# Prepare environment variables
exec_env = os.environ.copy()
if env:
exec_env.update(env)
# If conda_env is specified, try to use bash/cmd to activate and run
# If conda is not available, fall back to system Python
if conda_env:
activation_cmd = get_conda_activation_prefix(conda_env)
# Check if conda activation command is empty (conda not found)
if not activation_cmd:
logger.warning(f"Conda environment '{conda_env}' requested but conda not found. Using system Python.")
conda_env = None # Disable conda and use default path
if conda_env and get_conda_activation_prefix(conda_env):
if platform_name == "Windows":
# Windows: use cmd with activation
activation_cmd = get_conda_activation_prefix(conda_env)
full_cmd = f'{activation_cmd}python "{temp_filename}"'
result = subprocess.run(
['cmd', '/c', full_cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout,
cwd=working_dir or os.getcwd(),
env=exec_env
)
else:
# Linux/macOS: use bash with activation
activation_cmd = get_conda_activation_prefix(conda_env)
full_cmd = f'{activation_cmd}python3 "{temp_filename}"'
result = subprocess.run(
['/bin/bash', '-c', full_cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout,
cwd=working_dir or os.getcwd(),
env=exec_env
)
else:
# No conda activation needed
python_cmd = 'python' if platform_name == "Windows" else 'python3'
result = subprocess.run(
[python_cmd, temp_filename],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout,
cwd=working_dir or os.getcwd(),
env=exec_env
)
os.remove(temp_filename)
output = result.stdout + result.stderr
return jsonify({
'status': 'success' if result.returncode == 0 else 'error',
'content': output or "Code executed successfully (no output)",
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
if os.path.exists(temp_filename):
os.remove(temp_filename)
return jsonify({
'status': 'error',
'message': f'Execution timeout after {timeout} seconds'
}), 408
except Exception as e:
if os.path.exists(temp_filename):
os.remove(temp_filename)
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route("/run_bash_script", methods=['POST'])
def run_bash_script():
data = request.json
script = data.get('script', None)
timeout = data.get('timeout', 30)
working_dir = data.get('working_dir', None)
env = data.get('env', None)
conda_env = data.get('conda_env', None)
if not script:
return jsonify({'status': 'error', 'message': 'Script not supplied!'}), 400
# Generate unique filename
if platform_name == "Windows":
temp_filename = os.path.join(tempfile.gettempdir(), f"bash_exec_{uuid.uuid4().hex}.sh")
else:
temp_filename = f"/tmp/bash_exec_{uuid.uuid4().hex}.sh"
try:
# Wrap script with conda activation if needed
final_script = wrap_script_with_conda(script, conda_env)
with open(temp_filename, 'w') as f:
f.write(final_script)
os.chmod(temp_filename, 0o755)
if platform_name == "Windows":
shell_cmd = ['bash', temp_filename]
else:
shell_cmd = ['/bin/bash', temp_filename]
# Prepare environment variables
exec_env = os.environ.copy()
if env:
exec_env.update(env)
result = subprocess.run(
shell_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=timeout,
cwd=working_dir or os.getcwd(),
env=exec_env
)
os.unlink(temp_filename)
return jsonify({
'status': 'success' if result.returncode == 0 else 'error',
'output': result.stdout,
'error': "",
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
if os.path.exists(temp_filename):
os.unlink(temp_filename)
return jsonify({
'status': 'error',
'output': f'Script execution timed out after {timeout} seconds',
'error': "",
'returncode': -1
}), 500
except Exception as e:
if os.path.exists(temp_filename):
try:
os.unlink(temp_filename)
except:
pass
return jsonify({
'status': 'error',
'output': f'Failed to execute script: {str(e)}',
'error': "",
'returncode': -1
}), 500
@app.route('/screenshot', methods=['GET'])
def capture_screen_with_cursor():
"""Capture screenshot (including mouse cursor)"""
try:
buf = BytesIO()
tmp_path = os.path.join(tempfile.gettempdir(), f"screenshot_{uuid.uuid4().hex}.png")
if screenshot_helper.capture(tmp_path, with_cursor=True):
with open(tmp_path, 'rb') as f:
buf.write(f.read())
os.remove(tmp_path)
buf.seek(0)
return send_file(buf, mimetype='image/png')
else:
return jsonify({'status':'error','message':'Screenshot failed'}), 500
except Exception as e:
logger.error(f"Screenshot failed: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/cursor_position', methods=['GET'])
def get_cursor_position():
"""Get cursor position"""
try:
x, y = screenshot_helper.get_cursor_position()
return jsonify({'x': x, 'y': y, 'status': 'success'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/screen_size', methods=['POST', 'GET'])
def get_screen_size():
"""Get screen size"""
try:
width, height = screenshot_helper.get_screen_size()
return jsonify({'width': width, 'height': height, 'status': 'success'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
# Accessibility Tree
@app.route("/accessibility", methods=["GET"])
def get_accessibility_tree():
"""Get accessibility tree"""
try:
max_depth = request.args.get('max_depth', 10, type=int)
tree = accessibility_helper.get_tree(max_depth=max_depth)
return jsonify(tree)
except Exception as e:
logger.error(f"Failed to get accessibility tree: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
# File Operations
@app.route('/list_directory', methods=['POST'])
def list_directory():
"""List directory contents"""
data = request.json
path = data.get('path', '.')
try:
path = os.path.expanduser(path)
items = []
for item in os.listdir(path):
item_path = os.path.join(path, item)
items.append({
'name': item,
'is_dir': os.path.isdir(item_path),
'is_file': os.path.isfile(item_path),
'size': os.path.getsize(item_path) if os.path.isfile(item_path) else None
})
return jsonify({
'status': 'success',
'path': path,
'items': items
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/file', methods=['POST'])
def file_operation():
"""File operations"""
data = request.json
operation = data.get('operation', 'read')
path = data.get('path')
if not path:
return jsonify({'status': 'error', 'message': 'Path required'}), 400
path = os.path.expanduser(path)
try:
if operation == 'read':
with open(path, 'r') as f:
content = f.read()
return jsonify({
'status': 'success',
'content': content
})
elif operation == 'exists':
exists = os.path.exists(path)
return jsonify({
'status': 'success',
'exists': exists
})
else:
return jsonify({
'status': 'error',
'message': f'Unknown operation: {operation}'
}), 400
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/desktop_path', methods=['POST', 'GET'])
def get_desktop_path():
"""Get desktop path"""
try:
desktop = os.path.expanduser("~/Desktop")
return jsonify({
'status': 'success',
'path': desktop
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route("/setup/activate_window", methods=['POST'])
def activate_window():
"""Activate window"""
data = request.json
window_name = data.get("window_name")
strict = data.get("strict", False)
by_class_name = data.get("by_class", False)
if not window_name:
return jsonify({'status': 'error', 'message': 'window_name required'}), 400
try:
if platform_adapter and hasattr(platform_adapter, 'activate_window'):
result = platform_adapter.activate_window(window_name, strict=strict)
if result['status'] == 'success':
return jsonify(result)
else:
return jsonify(result), 400
else:
return jsonify({
'status': 'error',
'message': f'Window activation not supported on {platform_name}'
}), 501
except Exception as e:
logger.error(f"Window activation failed: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route("/setup/close_window", methods=["POST"])
def close_window():
"""Close window"""
data = request.json
window_name = data.get("window_name")
strict = data.get("strict", False)
by_class_name = data.get("by_class", False)
if not window_name:
return jsonify({'status': 'error', 'message': 'window_name required'}), 400
try:
if platform_adapter and hasattr(platform_adapter, 'close_window'):
result = platform_adapter.close_window(window_name, strict=strict)
if result['status'] == 'success':
return jsonify(result)
else:
return jsonify(result), 404
else:
return jsonify({
'status': 'error',
'message': f'Window closing not supported on {platform_name}'
}), 501
except Exception as e:
logger.error(f"Window closing failed: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/window_size', methods=['POST'])
def get_window_size():
"""Get window size"""
try:
width, height = screenshot_helper.get_screen_size()
return jsonify({
'status': 'success',
'width': width,
'height': height
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/wallpaper', methods=['POST'])
@app.route('/setup/change_wallpaper', methods=['POST'])
def set_wallpaper():
"""Set wallpaper"""
data = request.json
image_path = data.get('path')
if not image_path:
return jsonify({'status': 'error', 'message': 'path required'}), 400
try:
if platform_adapter and hasattr(platform_adapter, 'set_wallpaper'):
result = platform_adapter.set_wallpaper(image_path)
if result['status'] == 'success':
return jsonify(result)
else:
return jsonify(result), 400
else:
return jsonify({
'status': 'error',
'message': f'Wallpaper setting not supported on {platform_name}'
}), 501
except Exception as e:
logger.error(f"Failed to set wallpaper: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# Screen Recording
@app.route('/start_recording', methods=['POST'])
def start_recording():
"""Start screen recording (supports Linux, macOS, Windows)"""
global recording_process
# Check if platform adapter supports recording
if not platform_adapter or not hasattr(platform_adapter, 'start_recording'):
return jsonify({
'status': 'error',
'message': f'Recording not supported on {platform_name}'
}), 501
# Check if recording is already in progress
if recording_process and recording_process.poll() is None:
return jsonify({
'status': 'error',
'message': 'Recording is already in progress.'
}), 400
# Clean up old recording file
if os.path.exists(recording_path):
try:
os.remove(recording_path)
except OSError as e:
logger.error(f"Cannot delete old recording file: {e}")
try:
# Use platform adapter to start recording
result = platform_adapter.start_recording(recording_path)
if result['status'] == 'success':
recording_process = result.get('process')
logger.info("Recording started successfully")
return jsonify({
'status': 'success',
'message': 'Recording started'
})
else:
logger.error(f"Failed to start recording: {result.get('message', 'Unknown error')}")
return jsonify({
'status': 'error',
'message': result.get('message', 'Failed to start recording')
}), 500
except Exception as e:
logger.error(f"Failed to start recording: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/end_recording', methods=['POST'])
def end_recording():
"""End screen recording (supports Linux, macOS, Windows)"""
global recording_process
# Check if recording is in progress
if not recording_process or recording_process.poll() is not None:
recording_process = None
return jsonify({
'status': 'error',
'message': 'No recording in progress'
}), 400
try:
# Use platform adapter to stop recording
if platform_adapter and hasattr(platform_adapter, 'stop_recording'):
result = platform_adapter.stop_recording(recording_process)
recording_process = None
if result['status'] != 'success':
logger.error(f"Failed to stop recording: {result.get('message', 'Unknown error')}")
return jsonify(result), 500
else:
# Fallback: terminate process directly
recording_process.send_signal(signal.SIGINT)
try:
recording_process.wait(timeout=15)
except subprocess.TimeoutExpired:
logger.warning("ffmpeg not responding, force terminating")
recording_process.kill()
recording_process.wait()
recording_process = None
# Check if recording file exists
# wait for ffmpeg to write the file header
for _ in range(10):
if os.path.exists(recording_path) and os.path.getsize(recording_path) > 0:
break
time.sleep(0.5)
if os.path.exists(recording_path) and os.path.getsize(recording_path) > 0:
logger.info("Recording ended, file saved")
return send_file(recording_path, as_attachment=True)
else:
logger.error("Recording file is missing or empty")
return abort(500, description="Recording file is missing or empty")
except Exception as e:
logger.error(f"Failed to end recording: {str(e)}")
if recording_process:
try:
recording_process.kill()
recording_process.wait()
except:
pass
recording_process = None
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/terminal', methods=['GET'])
def get_terminal_output():
"""Get terminal output (supports Linux, macOS, Windows)"""
try:
if platform_adapter and hasattr(platform_adapter, 'get_terminal_output'):
output = platform_adapter.get_terminal_output()
if output:
return jsonify({'output': output, 'status': 'success'})
else:
return jsonify({
'status': 'error',
'message': f'No terminal output available on {platform_name}',
'platform_note': 'Make sure a terminal window is open and active'
}), 404
else:
return jsonify({
'status': 'error',
'message': f'Terminal output not supported on {platform_name}'
}), 501
except Exception as e:
logger.error(f"Failed to get terminal output: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route("/setup/upload", methods=["POST"])
def upload_file():
"""Upload file"""
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No file selected'}), 400
try:
# Get target path
target_path = request.form.get('path', os.path.expanduser('~/Desktop'))
target_path = os.path.expanduser(target_path)
# Ensure directory exists
os.makedirs(target_path, exist_ok=True)
# Save file
file_path = os.path.join(target_path, file.filename)
file.save(file_path)
logger.info(f"File uploaded successfully: {file_path}")
return jsonify({
'status': 'success',
'path': file_path,
'message': 'File uploaded successfully'
})
except Exception as e:
logger.error(f"File upload failed: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route("/setup/download_file", methods=["POST"])
def download_file():
"""Download file"""
data = request.json
path = data.get('path')
if not path:
return jsonify({'status': 'error', 'message': 'path required'}), 400
try:
path = os.path.expanduser(path)
if not os.path.exists(path):
return jsonify({'status': 'error', 'message': f'File not found: {path}'}), 404
return send_file(path, as_attachment=True)
except Exception as e:
logger.error(f"File download failed: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route("/setup/open_file", methods=['POST'])
def open_file():
"""Open file (using system default application)"""
data = request.json
path = data.get('path')
if not path:
return jsonify({'status': 'error', 'message': 'path required'}), 400
try:
path = os.path.expanduser(path)
if not os.path.exists(path):
return jsonify({'status': 'error', 'message': f'File not found: {path}'}), 404
if platform_name == "Darwin":
subprocess.Popen(['open', path])
elif platform_name == "Linux":
subprocess.Popen(['xdg-open', path])
elif platform_name == "Windows":
os.startfile(path)
logger.info(f"File opened successfully: {path}")
return jsonify({
'status': 'success',
'message': f'File opened: {path}'
})
except Exception as e:
logger.error(f"File opening failed: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
def print_banner(host: str = "127.0.0.1", port: int = 5000, debug: bool = False):
"""Print startup banner with server information"""
from openspace.utils.display import print_banner as display_banner, print_section, print_separator, colorize
# STARTUP INFORMATION
display_banner("OpenSpace · Local Server")
server_url = f"http://{host}:{port}"
# Server section
info_lines = [
colorize(server_url, 'g', bold=True),
]
if host == '0.0.0.0':
info_lines.append(f"{colorize('Listening on all interfaces', 'gr')} {colorize('(0.0.0.0:' + str(port) + ')', 'y')}")
info_lines.append(f"{colorize(platform_name, 'gr')} · {colorize('Debug' if debug else 'Production', 'y' if debug else 'g')}")
print_section("Server", info_lines)
print()
print_separator()
print(f" {colorize('Press Ctrl+C to stop', 'gr')}")
print()
def run_health_check_async():
"""Asynchronous running health check"""
def _run():
from openspace.utils.display import colorize
time.sleep(2)
print(colorize("\n - Starting health check...\n", 'c', bold=True))
results = health_checker.check_all(test_endpoints=True)
health_checker.print_results(results, show_endpoint_details=False)
summary = health_checker.get_summary()
logger.info(f"Health check completed: {summary['fully_available']}/{summary['total']} fully available")
thread = threading.Thread(target=_run, daemon=True)
thread.start()
def run_server(host: str = "127.0.0.1", port: int = 5000, debug: bool = False):
"""
Start desktop control server
Args:
host: Listening address (127.0.0.1 for local, 0.0.0.0 for all interfaces)
port: Listening port
debug: Debug mode (display detailed logs)
"""
global health_checker
# Initialize health_checker
base_url = f"http://{host if host != '0.0.0.0' else '127.0.0.1'}:{port}"
health_checker = HealthChecker(feature_checker, base_url, auto_cleanup=False)
print_banner(host, port, debug)
if not debug:
run_health_check_async()
app.run(host=host, port=port, debug=debug, threaded=True)
def main():
import argparse
from openspace.config.utils import get_config_value
parser = argparse.ArgumentParser(
description='OpenSpace Local Server - Desktop Control Server'
)
parser.add_argument('--host', type=str, default='127.0.0.1',
help='Server host (default: 127.0.0.1)')
parser.add_argument('--port', type=int, default=5000,
help='Server port (default: 5000)')
parser.add_argument('--debug', action='store_true',
help='Enable debug mode')
parser.add_argument('--config', type=str,
help='Path to config.json file')
args = parser.parse_args()
config_path = args.config
if not config_path:
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
config = json.load(f)
server_config = get_config_value(config, 'server', {})
host = args.host if args.host != '127.0.0.1' else get_config_value(server_config, 'host', '127.0.0.1')
port = args.port if args.port != 5000 else get_config_value(server_config, 'port', 5000)
debug = args.debug or get_config_value(server_config, 'debug', False)
run_server(host=host, port=port, debug=debug)
except Exception as e:
logger.error(f"Failed to load config: {e}")
run_server(host=args.host, port=args.port, debug=args.debug)
else:
run_server(host=args.host, port=args.port, debug=args.debug)
if __name__ == "__main__":
main()