| | """ |
| | Tests for platutils.py |
| | """ |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | import sys |
| | import signal |
| | import os |
| | import time |
| | from _thread import interrupt_main |
| | import threading |
| |
|
| | import pytest |
| |
|
| | from IPython.utils.process import (find_cmd, FindCmdError, arg_split, |
| | system, getoutput, getoutputerror, |
| | get_output_error_code) |
| | from IPython.utils.capture import capture_output |
| | from IPython.testing import decorators as dec |
| | from IPython.testing import tools as tt |
| |
|
| | python = os.path.basename(sys.executable) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | @dec.skip_win32 |
| | def test_find_cmd_ls(): |
| | """Make sure we can find the full path to ls.""" |
| | path = find_cmd("ls") |
| | assert path.endswith("ls") |
| |
|
| | |
| | @dec.skip_if_not_win32 |
| | def test_find_cmd_pythonw(): |
| | """Try to find pythonw on Windows.""" |
| | path = find_cmd('pythonw') |
| | assert path.lower().endswith('pythonw.exe'), path |
| |
|
| |
|
| | def test_find_cmd_fail(): |
| | """Make sure that FindCmdError is raised if we can't find the cmd.""" |
| | pytest.raises(FindCmdError, find_cmd, "asdfasdf") |
| |
|
| |
|
| | @dec.skip_win32 |
| | @pytest.mark.parametrize( |
| | "argstr, argv", |
| | [ |
| | ("hi", ["hi"]), |
| | ("hello there", ["hello", "there"]), |
| | |
| | |
| | |
| | ("h\u01cello", ["h\u01cello"]), |
| | ('something "with quotes"', ["something", '"with quotes"']), |
| | ], |
| | ) |
| | def test_arg_split(argstr, argv): |
| | """Ensure that argument lines are correctly split like in a shell.""" |
| | assert arg_split(argstr) == argv |
| |
|
| |
|
| | @dec.skip_if_not_win32 |
| | @pytest.mark.parametrize( |
| | "argstr,argv", |
| | [ |
| | ("hi", ["hi"]), |
| | ("hello there", ["hello", "there"]), |
| | ("h\u01cello", ["h\u01cello"]), |
| | ('something "with quotes"', ["something", "with quotes"]), |
| | ], |
| | ) |
| | def test_arg_split_win32(argstr, argv): |
| | """Ensure that argument lines are correctly split like in a shell.""" |
| | assert arg_split(argstr) == argv |
| |
|
| |
|
| | class SubProcessTestCase(tt.TempFileMixin): |
| | def setUp(self): |
| | """Make a valid python temp file.""" |
| | lines = [ "import sys", |
| | "print('on stdout', end='', file=sys.stdout)", |
| | "print('on stderr', end='', file=sys.stderr)", |
| | "sys.stdout.flush()", |
| | "sys.stderr.flush()"] |
| | self.mktmp('\n'.join(lines)) |
| |
|
| | def test_system(self): |
| | status = system(f'{python} "{self.fname}"') |
| | self.assertEqual(status, 0) |
| |
|
| | def test_system_quotes(self): |
| | status = system('%s -c "import sys"' % python) |
| | self.assertEqual(status, 0) |
| |
|
| | def assert_interrupts(self, command): |
| | """ |
| | Interrupt a subprocess after a second. |
| | """ |
| | if threading.main_thread() != threading.current_thread(): |
| | raise pytest.skip("Can't run this test if not in main thread.") |
| |
|
| | |
| | |
| | |
| | signal.signal(signal.SIGINT, signal.default_int_handler) |
| |
|
| | def interrupt(): |
| | |
| | time.sleep(0.5) |
| | interrupt_main() |
| |
|
| | threading.Thread(target=interrupt).start() |
| | start = time.time() |
| | try: |
| | result = command() |
| | except KeyboardInterrupt: |
| | |
| | pass |
| | end = time.time() |
| | self.assertTrue( |
| | end - start < 2, "Process didn't die quickly: %s" % (end - start) |
| | ) |
| | return result |
| |
|
| | def test_system_interrupt(self): |
| | """ |
| | When interrupted in the way ipykernel interrupts IPython, the |
| | subprocess is interrupted. |
| | """ |
| | def command(): |
| | return system('%s -c "import time; time.sleep(5)"' % python) |
| |
|
| | status = self.assert_interrupts(command) |
| | self.assertNotEqual( |
| | status, 0, f"The process wasn't interrupted. Status: {status}" |
| | ) |
| |
|
| | def test_getoutput(self): |
| | out = getoutput(f'{python} "{self.fname}"') |
| | |
| | try: |
| | self.assertEqual(out, 'on stderron stdout') |
| | except AssertionError: |
| | self.assertEqual(out, 'on stdouton stderr') |
| |
|
| | def test_getoutput_quoted(self): |
| | out = getoutput('%s -c "print (1)"' % python) |
| | self.assertEqual(out.strip(), '1') |
| |
|
| | |
| | @dec.skip_win32 |
| | def test_getoutput_quoted2(self): |
| | out = getoutput("%s -c 'print (1)'" % python) |
| | self.assertEqual(out.strip(), '1') |
| | out = getoutput("%s -c 'print (\"1\")'" % python) |
| | self.assertEqual(out.strip(), '1') |
| |
|
| | def test_getoutput_error(self): |
| | out, err = getoutputerror(f'{python} "{self.fname}"') |
| | self.assertEqual(out, 'on stdout') |
| | self.assertEqual(err, 'on stderr') |
| |
|
| | def test_get_output_error_code(self): |
| | quiet_exit = '%s -c "import sys; sys.exit(1)"' % python |
| | out, err, code = get_output_error_code(quiet_exit) |
| | self.assertEqual(out, '') |
| | self.assertEqual(err, '') |
| | self.assertEqual(code, 1) |
| | out, err, code = get_output_error_code(f'{python} "{self.fname}"') |
| | self.assertEqual(out, 'on stdout') |
| | self.assertEqual(err, 'on stderr') |
| | self.assertEqual(code, 0) |
| |
|
| | |
| |
|