Spaces:
Runtime error
Runtime error
File size: 1,307 Bytes
955066e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
"""
helpers.py - utility helpers with async support and timing context
"""
import time
import asyncio
def async_run_if_possible(func):
"""Run async if event loop exists, else sync."""
def wrapper(*args, **kwargs):
try:
loop = asyncio.get_event_loop()
if loop.is_running():
return asyncio.create_task(func(*args, **kwargs))
else:
return func(*args, **kwargs)
except RuntimeError:
return func(*args, **kwargs)
return wrapper
class calculate_duration:
"""Context manager for timing code blocks."""
def __init__(self, activity_name=""):
self.activity_name = activity_name
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
if self.activity_name:
print(f"[Duration] {self.activity_name}: {duration:.3f} seconds")
else:
print(f"[Duration]: {duration:.3f} seconds")
def flux_pipe_call_that_returns_an_iterable_of_images(self, *args, **kwargs):
# Placeholder, must be replaced by your actual flux_pipe method
raise NotImplementedError("Must implement 'flux_pipe_call_that_returns_an_iterable_of_images'")
|