File size: 3,132 Bytes
dc4e6da | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | import itertools
import sys
import time
import threading
class StatusLine:
_BAR = "|/-\\"
_DOTS = "ββββ"
_ARROW = "ββββββββ"
_BRAILLE = "β β β Ήβ Έβ Όβ ΄β ¦β §β β "
_CIRCLE = "ββββ"
_DASH = "ββΎβΌβ"
def __init__(self, delay: float = 0.1):
self._stop = False
self._spinner = itertools.cycle(self._BRAILLE) # rotating chars
self._delay = delay
self.message = ""
self.lock = threading.Lock()
self.thread = threading.Thread(target=self._run, daemon=True)
def start(self):
self.start_time = time.time()
self.thread.start()
def stop(self):
self._stop = True
self.thread.join()
# clear line on exit
sys.stdout.write("\r" + " " * 80 + "\r")
sys.stdout.flush()
def update_message(self, msg: str):
with self.lock:
self.message = msg
def log(self, msg: str):
# Print a normal log above the status line
sys.stdout.write("\r" + " " * 80 + "\r") # clear status line
sys.stdout.write(msg + "\n")
sys.stdout.flush()
def _run(self):
last_logged_message = ""
last_logged_time = -1
is_tty = sys.stdout.isatty()
while not self._stop:
with self.lock:
spinner_char = next(self._spinner)
now = time.time()
elapsed = int(now - self.start_time)
current_msg = self.message
line = f"[{spinner_char}] {current_msg} | waiting {elapsed}s"
if is_tty:
sys.stdout.write("\r" + line[:79]) # overwrite line
sys.stdout.flush()
else:
# Log-file mode: only print when message changes OR every 30 seconds
# We use int(now) // 30 to ensure we only log once per 30-second window
current_period = int(now) // 30
if current_msg != last_logged_message or (current_period > last_logged_time):
sys.stdout.write(f"{line}\n")
sys.stdout.flush()
last_logged_message = current_msg
last_logged_time = current_period
time.sleep(self._delay)
# Example usage
if __name__ == "__main__":
status = StatusLine()
status.start()
for i in range(5):
status.update_message(f"Awaiting {5 - i} batches")
time.sleep(3)
status.log(f"Batch {i + 1} ended!")
status.stop()
print("Done.")
def get_progress_bar():
from rich.progress import (
Progress,
TimeElapsedColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
)
return Progress(
"[progress.description]{task.description}",
BarColumn(),
TaskProgressColumn(),
"[yellow]({task.completed}/{task.total})",
TimeElapsedColumn(),
TimeRemainingColumn(),
)
|