Spaces:
No application file
No application file
| # encoding: utf-8 | |
| """ | |
| IO related utilities. | |
| """ | |
| # Copyright (c) IPython Development Team. | |
| # Distributed under the terms of the Modified BSD License. | |
| import atexit | |
| import os | |
| import sys | |
| import tempfile | |
| from pathlib import Path | |
| from warnings import warn | |
| from IPython.utils.decorators import undoc | |
| from .capture import CapturedIO, capture_output | |
| class Tee(object): | |
| """A class to duplicate an output stream to stdout/err. | |
| This works in a manner very similar to the Unix 'tee' command. | |
| When the object is closed or deleted, it closes the original file given to | |
| it for duplication. | |
| """ | |
| # Inspired by: | |
| # http://mail.python.org/pipermail/python-list/2007-May/442737.html | |
| def __init__(self, file_or_name, mode="w", channel='stdout'): | |
| """Construct a new Tee object. | |
| Parameters | |
| ---------- | |
| file_or_name : filename or open filehandle (writable) | |
| File that will be duplicated | |
| mode : optional, valid mode for open(). | |
| If a filename was give, open with this mode. | |
| channel : str, one of ['stdout', 'stderr'] | |
| """ | |
| if channel not in ['stdout', 'stderr']: | |
| raise ValueError('Invalid channel spec %s' % channel) | |
| if hasattr(file_or_name, 'write') and hasattr(file_or_name, 'seek'): | |
| self.file = file_or_name | |
| else: | |
| encoding = None if "b" in mode else "utf-8" | |
| self.file = open(file_or_name, mode, encoding=encoding) | |
| self.channel = channel | |
| self.ostream = getattr(sys, channel) | |
| setattr(sys, channel, self) | |
| self._closed = False | |
| def close(self): | |
| """Close the file and restore the channel.""" | |
| self.flush() | |
| setattr(sys, self.channel, self.ostream) | |
| self.file.close() | |
| self._closed = True | |
| def write(self, data): | |
| """Write data to both channels.""" | |
| self.file.write(data) | |
| self.ostream.write(data) | |
| self.ostream.flush() | |
| def flush(self): | |
| """Flush both channels.""" | |
| self.file.flush() | |
| self.ostream.flush() | |
| def __del__(self): | |
| if not self._closed: | |
| self.close() | |
| def ask_yes_no(prompt, default=None, interrupt=None): | |
| """Asks a question and returns a boolean (y/n) answer. | |
| If default is given (one of 'y','n'), it is used if the user input is | |
| empty. If interrupt is given (one of 'y','n'), it is used if the user | |
| presses Ctrl-C. Otherwise the question is repeated until an answer is | |
| given. | |
| An EOF is treated as the default answer. If there is no default, an | |
| exception is raised to prevent infinite loops. | |
| Valid answers are: y/yes/n/no (match is not case sensitive).""" | |
| answers = {'y':True,'n':False,'yes':True,'no':False} | |
| ans = None | |
| while ans not in answers.keys(): | |
| try: | |
| ans = input(prompt+' ').lower() | |
| if not ans: # response was an empty string | |
| ans = default | |
| except KeyboardInterrupt: | |
| if interrupt: | |
| ans = interrupt | |
| print("\r") | |
| except EOFError: | |
| if default in answers.keys(): | |
| ans = default | |
| print() | |
| else: | |
| raise | |
| return answers[ans] | |
| def temp_pyfile(src, ext='.py'): | |
| """Make a temporary python file, return filename and filehandle. | |
| Parameters | |
| ---------- | |
| src : string or list of strings (no need for ending newlines if list) | |
| Source code to be written to the file. | |
| ext : optional, string | |
| Extension for the generated file. | |
| Returns | |
| ------- | |
| (filename, open filehandle) | |
| It is the caller's responsibility to close the open file and unlink it. | |
| """ | |
| fname = tempfile.mkstemp(ext)[1] | |
| with open(Path(fname), "w", encoding="utf-8") as f: | |
| f.write(src) | |
| f.flush() | |
| return fname | |
| def raw_print(*args, **kw): | |
| """DEPRECATED: Raw print to sys.__stdout__, otherwise identical interface to print().""" | |
| warn("IPython.utils.io.raw_print has been deprecated since IPython 7.0", DeprecationWarning, stacklevel=2) | |
| print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'), | |
| file=sys.__stdout__) | |
| sys.__stdout__.flush() | |
| def raw_print_err(*args, **kw): | |
| """DEPRECATED: Raw print to sys.__stderr__, otherwise identical interface to print().""" | |
| warn("IPython.utils.io.raw_print_err has been deprecated since IPython 7.0", DeprecationWarning, stacklevel=2) | |
| print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'), | |
| file=sys.__stderr__) | |
| sys.__stderr__.flush() | |