Spaces:
Runtime error
Runtime error
| import threading | |
| import time | |
| from typing import Optional, Callable | |
| import logging | |
| class SheetMonitor: | |
| def __init__(self, sheet_manager, check_interval: float = 1.0): | |
| """ | |
| Initialize SheetMonitor with a sheet manager instance. | |
| """ | |
| self.sheet_manager = sheet_manager | |
| self.check_interval = check_interval | |
| # Threading control | |
| self.monitor_thread = None | |
| self.is_running = threading.Event() | |
| self.pause_monitoring = threading.Event() | |
| self.monitor_paused = threading.Event() | |
| # Queue status | |
| self.has_data = threading.Event() | |
| # Logging setup | |
| logging.basicConfig(level=logging.INFO) | |
| self.logger = logging.getLogger(__name__) | |
| def start_monitoring(self): | |
| """Start the monitoring thread.""" | |
| if self.monitor_thread is not None and self.monitor_thread.is_alive(): | |
| self.logger.warning("Monitoring thread is already running") | |
| return | |
| self.is_running.set() | |
| self.pause_monitoring.clear() | |
| self.monitor_thread = threading.Thread(target=self._monitor_loop) | |
| self.monitor_thread.daemon = True | |
| self.monitor_thread.start() | |
| self.logger.info("Started monitoring thread") | |
| def stop_monitoring(self): | |
| """Stop the monitoring thread.""" | |
| self.is_running.clear() | |
| if self.monitor_thread: | |
| self.monitor_thread.join() | |
| self.logger.info("Stopped monitoring thread") | |
| def pause(self): | |
| """Pause the monitoring.""" | |
| self.pause_monitoring.set() | |
| self.monitor_paused.wait() | |
| self.logger.info("Monitoring paused") | |
| def resume(self): | |
| """Resume the monitoring.""" | |
| self.pause_monitoring.clear() | |
| self.monitor_paused.clear() | |
| # ์ฆ์ ์ฒดํฌ ์ํ | |
| self.logger.info("Monitoring resumed, checking for new data...") | |
| values = self.sheet_manager.get_all_values() | |
| if values: | |
| self.has_data.set() | |
| self.logger.info(f"Found data after resume: {values}") | |
| def _monitor_loop(self): | |
| """Main monitoring loop that checks for data in sheet.""" | |
| while self.is_running.is_set(): | |
| if self.pause_monitoring.is_set(): | |
| self.monitor_paused.set() | |
| self.pause_monitoring.wait() | |
| self.monitor_paused.clear() | |
| # continue | |
| try: | |
| # Check if there's any data in the sheet | |
| values = self.sheet_manager.get_all_values() | |
| self.logger.info(f"Monitoring: Current column={self.sheet_manager.column_name}, " | |
| f"Values found={len(values)}, " | |
| f"Has data={self.has_data.is_set()}") | |
| if values: # If there's any non-empty value | |
| self.has_data.set() | |
| self.logger.info(f"Data detected: {values}") | |
| else: | |
| self.has_data.clear() | |
| self.logger.info("No data in sheet, waiting...") | |
| time.sleep(self.check_interval) | |
| except Exception as e: | |
| self.logger.error(f"Error in monitoring loop: {str(e)}") | |
| time.sleep(self.check_interval) | |
| class MainLoop: | |
| def __init__(self, sheet_manager, sheet_monitor, callback_function: Callable = None): | |
| """ | |
| Initialize MainLoop with sheet manager and monitor instances. | |
| """ | |
| self.sheet_manager = sheet_manager | |
| self.monitor = sheet_monitor | |
| self.callback = callback_function | |
| self.is_running = threading.Event() | |
| self.logger = logging.getLogger(__name__) | |
| def start(self): | |
| """Start the main processing loop.""" | |
| self.is_running.set() | |
| self.monitor.start_monitoring() | |
| self._main_loop() | |
| def stop(self): | |
| """Stop the main processing loop.""" | |
| self.is_running.clear() | |
| self.monitor.stop_monitoring() | |
| def process_new_value(self): | |
| """Process values by calling pop function for multiple columns and custom callback.""" | |
| try: | |
| # Store original column | |
| original_column = self.sheet_manager.column_name | |
| # Pop from huggingface_id column | |
| model_id = self.sheet_manager.pop() | |
| if model_id: | |
| # Pop from benchmark_name column | |
| self.sheet_manager.change_column("benchmark_name") | |
| benchmark_name = self.sheet_manager.pop() | |
| # Pop from prompt_cfg_name column | |
| self.sheet_manager.change_column("prompt_cfg_name") | |
| prompt_cfg_name = self.sheet_manager.pop() | |
| # Return to original column | |
| self.sheet_manager.change_column(original_column) | |
| self.logger.info(f"Processed values - model_id: {model_id}, " | |
| f"benchmark_name: {benchmark_name}, " | |
| f"prompt_cfg_name: {prompt_cfg_name}") | |
| if self.callback: | |
| # Pass all three values to callback | |
| self.callback(model_id, benchmark_name, prompt_cfg_name) | |
| return model_id, benchmark_name, prompt_cfg_name | |
| except Exception as e: | |
| self.logger.error(f"Error processing values: {str(e)}") | |
| # Return to original column in case of error | |
| try: | |
| self.sheet_manager.change_column(original_column) | |
| except: | |
| pass | |
| return None | |
| def _main_loop(self): | |
| """Main processing loop.""" | |
| while self.is_running.is_set(): | |
| # Wait for data to be available | |
| if self.monitor.has_data.wait(timeout=1.0): | |
| # Pause monitoring | |
| self.monitor.pause() | |
| # Process the value | |
| self.process_new_value() | |
| # Check if there's still data in the sheet | |
| values = self.sheet_manager.get_all_values() | |
| self.logger.info(f"After processing: Current column={self.sheet_manager.column_name}, " | |
| f"Values remaining={len(values)}") | |
| if not values: | |
| self.monitor.has_data.clear() | |
| self.logger.info("All data processed, clearing has_data flag") | |
| else: | |
| self.logger.info(f"Remaining data: {values}") | |
| # Resume monitoring | |
| self.monitor.resume() | |
| ## TODO | |
| # API ๋ถ๋น ํธ์ถ ๋ฌธ์ ๋ก ๋ง์ฝ์ ์ฐธ์กฐํ๋ค๊ฐ ์คํจํ ๊ฒฝ์ฐ ๋๊ธฐํ๋ค๊ฐ ๋ค์ ์๋ํ๊ฒ๋ ์ค๊ณ | |
| # Example usage | |
| if __name__ == "__main__": | |
| import sys | |
| from pathlib import Path | |
| sys.path.append(str(Path(__file__).parent.parent.parent)) | |
| from sheet_manager.sheet_crud.sheet_crud import SheetManager | |
| from pia_bench.pipe_line.piepline import PiaBenchMark | |
| def my_custom_function(huggingface_id, benchmark_name, prompt_cfg_name): | |
| piabenchmark = PiaBenchMark(huggingface_id, benchmark_name, prompt_cfg_name) | |
| piabenchmark.bench_start() | |
| # Initialize components | |
| sheet_manager = SheetManager() | |
| monitor = SheetMonitor(sheet_manager, check_interval=10.0) | |
| main_loop = MainLoop(sheet_manager, monitor, callback_function=my_custom_function) | |
| try: | |
| main_loop.start() | |
| while True: | |
| time.sleep(5) | |
| except KeyboardInterrupt: | |
| main_loop.stop() |