Buckets:
ktongue/docker_container / .vscode-server /extensions /ms-python.python-2026.4.0 /python_files /python_server.py
| import ast | |
| import contextlib | |
| import io | |
| import json | |
| import sys | |
| import traceback | |
| import uuid | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Union | |
| STDIN = sys.stdin | |
| STDOUT = sys.stdout | |
| STDERR = sys.stderr | |
| USER_GLOBALS = {} | |
| def _send_message(msg: str): | |
| # Content-Length is the data size in bytes. | |
| length_msg = len(msg.encode()) | |
| STDOUT.buffer.write(f"Content-Length: {length_msg}\r\n\r\n{msg}".encode()) | |
| STDOUT.buffer.flush() | |
| def send_message(**kwargs): | |
| _send_message(json.dumps({"jsonrpc": "2.0", **kwargs})) | |
| def print_log(msg: str): | |
| send_message(method="log", params=msg) | |
| def send_response( | |
| response: str, | |
| response_id: int, | |
| execution_status: bool = True, # noqa: FBT001, FBT002 | |
| ): | |
| send_message( | |
| id=response_id, | |
| result={"status": execution_status, "output": response}, | |
| ) | |
| def send_request(params: Optional[Union[List, Dict]] = None): | |
| request_id = uuid.uuid4().hex | |
| if params is None: | |
| send_message(id=request_id, method="input") | |
| else: | |
| send_message(id=request_id, method="input", params=params) | |
| return request_id | |
| original_input = input | |
| def custom_input(prompt=""): | |
| try: | |
| send_request({"prompt": prompt}) | |
| headers = get_headers() | |
| # Content-Length is the data size in bytes. | |
| content_length = int(headers.get("Content-Length", 0)) | |
| if content_length: | |
| message_text = STDIN.buffer.read(content_length).decode() | |
| message_json = json.loads(message_text) | |
| return message_json["result"]["userInput"] | |
| except EOFError: | |
| # Input stream closed, exit gracefully | |
| sys.exit(0) | |
| except Exception: | |
| print_log(traceback.format_exc()) | |
| # Set input to our custom input | |
| USER_GLOBALS["input"] = custom_input | |
| input = custom_input # noqa: A001 | |
| def handle_response(request_id): | |
| while True: | |
| try: | |
| headers = get_headers() | |
| # Content-Length is the data size in bytes. | |
| content_length = int(headers.get("Content-Length", 0)) | |
| if content_length: | |
| message_text = STDIN.buffer.read(content_length).decode() | |
| message_json = json.loads(message_text) | |
| our_user_input = message_json["result"]["userInput"] | |
| if message_json["id"] == request_id: | |
| send_response(our_user_input, message_json["id"]) | |
| elif message_json["method"] == "exit": | |
| sys.exit(0) | |
| except EOFError: # noqa: PERF203 | |
| # Input stream closed, exit gracefully | |
| sys.exit(0) | |
| except Exception: | |
| print_log(traceback.format_exc()) | |
| def exec_function(user_input): | |
| try: | |
| compile(user_input, "<stdin>", "eval") | |
| except SyntaxError: | |
| return exec | |
| return eval | |
| def check_valid_command(request): | |
| try: | |
| user_input = request["params"] | |
| ast.parse(user_input[0]) | |
| send_response("True", request["id"]) | |
| except SyntaxError: | |
| send_response("False", request["id"]) | |
| def execute(request, user_globals): | |
| str_output = CustomIO("<stdout>", encoding="utf-8") | |
| str_error = CustomIO("<stderr>", encoding="utf-8") | |
| str_input = CustomIO("<stdin>", encoding="utf-8", newline="\n") | |
| with contextlib.redirect_stdout(str_output), contextlib.redirect_stderr(str_error): | |
| original_stdin = sys.stdin | |
| try: | |
| sys.stdin = str_input | |
| execution_status = exec_user_input(request["params"], user_globals) | |
| finally: | |
| sys.stdin = original_stdin | |
| send_response(str_output.get_value(), request["id"], execution_status) | |
| def exec_user_input(user_input, user_globals) -> bool: | |
| user_input = user_input[0] if isinstance(user_input, list) else user_input | |
| try: | |
| callable_ = exec_function(user_input) | |
| retval = callable_(user_input, user_globals) | |
| if retval is not None: | |
| print(retval) | |
| return True | |
| except KeyboardInterrupt: | |
| print(traceback.format_exc()) | |
| return False | |
| except Exception: | |
| print(traceback.format_exc()) | |
| return False | |
| class CustomIO(io.TextIOWrapper): | |
| """Custom stream object to replace stdio.""" | |
| def __init__(self, name, encoding="utf-8", newline=None): | |
| self._buffer = io.BytesIO() | |
| self._custom_name = name | |
| super().__init__(self._buffer, encoding=encoding, newline=newline) | |
| def close(self): | |
| """Provide this close method which is used by some tools.""" | |
| # This is intentionally empty. | |
| def get_value(self) -> str: | |
| """Returns value from the buffer as string.""" | |
| self.seek(0) | |
| return self.read() | |
| def get_headers(): | |
| headers = {} | |
| while True: | |
| raw = STDIN.buffer.readline() | |
| # Detect EOF: readline() returns empty bytes when input stream is closed | |
| if raw == b"": | |
| raise EOFError("EOF reached while reading headers") | |
| line = raw.decode().strip() | |
| if not line: | |
| break | |
| name, value = line.split(":", 1) | |
| headers[name] = value.strip() | |
| return headers | |
| if __name__ == "__main__": | |
| # https://docs.python.org/3/tutorial/modules.html#the-module-search-path | |
| # The directory containing the input script (or the current directory when no file is specified). | |
| # Here we emulate the same behavior like no file is specified. | |
| input_script_dir = Path(__file__).parent | |
| script_dir_str = str(input_script_dir) | |
| if script_dir_str in sys.path: | |
| sys.path.remove(script_dir_str) | |
| while "" in sys.path: | |
| sys.path.remove("") | |
| sys.path.insert(0, "") | |
| while True: | |
| try: | |
| headers = get_headers() | |
| # Content-Length is the data size in bytes. | |
| content_length = int(headers.get("Content-Length", 0)) | |
| if content_length: | |
| request_text = STDIN.buffer.read(content_length).decode() | |
| request_json = json.loads(request_text) | |
| if request_json["method"] == "execute": | |
| execute(request_json, USER_GLOBALS) | |
| if request_json["method"] == "check_valid_command": | |
| check_valid_command(request_json) | |
| elif request_json["method"] == "exit": | |
| sys.exit(0) | |
| except EOFError: # noqa: PERF203 | |
| # Input stream closed (VS Code terminated), exit gracefully | |
| sys.exit(0) | |
| except Exception: | |
| print_log(traceback.format_exc()) | |
Xet Storage Details
- Size:
- 6.56 kB
- Xet hash:
- 59ecb3a54ee21165807cea0c6b049eca92579bbdc70a4b2c0cdca89f09d29bcd
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.