Spaces:
Runtime error
Runtime error
| """ | |
| helpers.py - utility helpers with async support and timing context | |
| """ | |
| import time | |
| import asyncio | |
| def async_run_if_possible(func): | |
| """Run async if event loop exists, else sync.""" | |
| def wrapper(*args, **kwargs): | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| return asyncio.create_task(func(*args, **kwargs)) | |
| else: | |
| return func(*args, **kwargs) | |
| except RuntimeError: | |
| return func(*args, **kwargs) | |
| return wrapper | |
| class calculate_duration: | |
| """Context manager for timing code blocks.""" | |
| def __init__(self, activity_name=""): | |
| self.activity_name = activity_name | |
| def __enter__(self): | |
| self.start_time = time.time() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| duration = time.time() - self.start_time | |
| if self.activity_name: | |
| print(f"[Duration] {self.activity_name}: {duration:.3f} seconds") | |
| else: | |
| print(f"[Duration]: {duration:.3f} seconds") | |
| def flux_pipe_call_that_returns_an_iterable_of_images(self, *args, **kwargs): | |
| # Placeholder, must be replaced by your actual flux_pipe method | |
| raise NotImplementedError("Must implement 'flux_pipe_call_that_returns_an_iterable_of_images'") | |