Spaces:
Sleeping
Sleeping
| """Monitoring scheduler for NASA solar image downloads.""" | |
| import time | |
| import logging | |
| import threading | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Callable, List | |
| import schedule | |
| from ..downloader.url_generator import URLGenerator | |
| from ..downloader.image_fetcher import DownloadManager | |
| from ..storage.storage_organizer import StorageOrganizer | |
| from ..models import DownloadTask | |
| class MonitoringLoop: | |
| """Manages 5-minute monitoring cycles for new NASA images.""" | |
| def __init__(self, url_generator: URLGenerator, download_manager: DownloadManager, | |
| storage_organizer: StorageOrganizer, check_interval_minutes: int = 5, | |
| monitoring_range_days: int = 1): | |
| """ | |
| Initialize monitoring loop. | |
| Args: | |
| url_generator: URLGenerator instance | |
| download_manager: DownloadManager instance | |
| storage_organizer: StorageOrganizer instance | |
| check_interval_minutes: Minutes between checks (default 5) | |
| monitoring_range_days: Days to look back for new images (default 1) | |
| """ | |
| self.url_generator = url_generator | |
| self.download_manager = download_manager | |
| self.storage = storage_organizer | |
| self.check_interval = check_interval_minutes | |
| self.monitoring_range_days = monitoring_range_days | |
| self.logger = logging.getLogger(__name__) | |
| self.is_running = False | |
| self.monitoring_thread = None | |
| self.last_check_time = None | |
| self.total_checks = 0 | |
| self.new_images_found = 0 | |
| # Callbacks for status updates | |
| self.on_check_start: Optional[Callable] = None | |
| self.on_check_complete: Optional[Callable] = None | |
| self.on_new_images_found: Optional[Callable] = None | |
| def start_monitoring(self): | |
| """Start the monitoring loop in a background thread.""" | |
| if self.is_running: | |
| self.logger.warning("Monitoring loop is already running") | |
| return | |
| self.is_running = True | |
| self.logger.info(f"Starting monitoring loop with {self.check_interval}-minute intervals") | |
| # Schedule the monitoring job | |
| schedule.every(self.check_interval).minutes.do(self._check_for_new_images) | |
| # Start the scheduler thread | |
| self.monitoring_thread = threading.Thread(target=self._run_scheduler, daemon=True) | |
| self.monitoring_thread.start() | |
| # Run initial check immediately | |
| self._check_for_new_images() | |
| def stop_monitoring(self): | |
| """Stop the monitoring loop.""" | |
| if not self.is_running: | |
| self.logger.warning("Monitoring loop is not running") | |
| return | |
| self.is_running = False | |
| schedule.clear() | |
| if self.monitoring_thread and self.monitoring_thread.is_alive(): | |
| self.monitoring_thread.join(timeout=5) | |
| self.logger.info("Monitoring loop stopped") | |
| def _run_scheduler(self): | |
| """Run the scheduler in a background thread.""" | |
| while self.is_running: | |
| schedule.run_pending() | |
| time.sleep(1) # Check every second | |
| def _check_for_new_images(self): | |
| """Check for new images and download them.""" | |
| check_start_time = datetime.now() | |
| self.total_checks += 1 | |
| self.logger.info(f"Starting monitoring check #{self.total_checks} at {check_start_time}") | |
| if self.on_check_start: | |
| self.on_check_start(check_start_time, self.total_checks) | |
| try: | |
| # Get URLs for recent images (configurable range) | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=self.monitoring_range_days) | |
| recent_urls = [] | |
| current_date = start_date | |
| while current_date <= end_date: | |
| daily_urls = self.url_generator.generate_daily_urls(current_date) | |
| recent_urls.extend(daily_urls) | |
| current_date += timedelta(days=1) | |
| self.logger.debug(f"Generated {len(recent_urls)} URLs to check (last {self.monitoring_range_days} days)") | |
| # Filter to only new images (not already downloaded) | |
| new_urls = self._filter_new_images(recent_urls) | |
| if new_urls: | |
| self.logger.info(f"Found {len(new_urls)} new images to download") | |
| self.new_images_found += len(new_urls) | |
| if self.on_new_images_found: | |
| self.on_new_images_found(new_urls) | |
| # Download new images | |
| self._download_new_images(new_urls) | |
| else: | |
| self.logger.info("No new images found") | |
| self.last_check_time = check_start_time | |
| check_duration = (datetime.now() - check_start_time).total_seconds() | |
| self.logger.info(f"Monitoring check completed in {check_duration:.1f}s") | |
| if self.on_check_complete: | |
| self.on_check_complete(check_start_time, len(new_urls), check_duration) | |
| except Exception as e: | |
| self.logger.error(f"Error during monitoring check: {e}") | |
| import traceback | |
| self.logger.debug(traceback.format_exc()) | |
| def _filter_new_images(self, urls: List[str]) -> List[str]: | |
| """ | |
| Filter URLs to only include images not already downloaded. | |
| Args: | |
| urls: List of URLs to check | |
| Returns: | |
| List of URLs for images not yet downloaded | |
| """ | |
| new_urls = [] | |
| for url in urls: | |
| # Extract metadata from URL | |
| date, time_seq = self.url_generator.extract_metadata_from_url(url) | |
| if not date or not time_seq: | |
| continue | |
| # Get filename | |
| filename = url.split('/')[-1] | |
| # Check if already exists locally | |
| if not self.storage.file_exists(filename, date): | |
| new_urls.append(url) | |
| return new_urls | |
| def _download_new_images(self, urls: List[str]): | |
| """ | |
| Download a list of new images. | |
| Args: | |
| urls: List of URLs to download | |
| """ | |
| successful_downloads = 0 | |
| for url in urls: | |
| try: | |
| # Extract metadata | |
| date, time_seq = self.url_generator.extract_metadata_from_url(url) | |
| if not date or not time_seq: | |
| self.logger.warning(f"Could not extract metadata from URL: {url}") | |
| continue | |
| filename = url.split('/')[-1] | |
| local_path = self.storage.get_local_path(filename, date) | |
| # Create download task | |
| task = DownloadTask(url=url, target_path=local_path) | |
| # Attempt download | |
| success = self.download_manager.download_and_save(task) | |
| if success: | |
| successful_downloads += 1 | |
| self.logger.info(f"Downloaded: {filename}") | |
| else: | |
| self.logger.warning(f"Failed to download: {filename} - {task.error_message}") | |
| except Exception as e: | |
| self.logger.error(f"Error downloading {url}: {e}") | |
| self.logger.info(f"Downloaded {successful_downloads}/{len(urls)} new images") | |
| def get_status(self) -> dict: | |
| """ | |
| Get current monitoring status. | |
| Returns: | |
| Dictionary with monitoring statistics | |
| """ | |
| return { | |
| 'is_running': self.is_running, | |
| 'check_interval_minutes': self.check_interval, | |
| 'monitoring_range_days': self.monitoring_range_days, | |
| 'total_checks': self.total_checks, | |
| 'last_check_time': self.last_check_time, | |
| 'new_images_found': self.new_images_found, | |
| 'total_downloads': self.download_manager.get_download_count(), | |
| 'failed_downloads': len(self.download_manager.get_failed_tasks()) | |
| } | |
| def force_check(self): | |
| """Force an immediate check for new images.""" | |
| if not self.is_running: | |
| self.logger.warning("Cannot force check - monitoring loop is not running") | |
| return | |
| self.logger.info("Forcing immediate check for new images") | |
| self._check_for_new_images() | |
| def set_monitoring_range(self, days: int): | |
| """ | |
| Set the monitoring range in days. | |
| Args: | |
| days: Number of days to look back for new images | |
| """ | |
| if days < 1: | |
| raise ValueError("Monitoring range must be at least 1 day") | |
| old_range = self.monitoring_range_days | |
| self.monitoring_range_days = days | |
| self.logger.info(f"Changed monitoring range from {old_range} to {days} days") | |
| def get_monitoring_range(self) -> int: | |
| """ | |
| Get the current monitoring range in days. | |
| Returns: | |
| Number of days being monitored | |
| """ | |
| return self.monitoring_range_days | |
| def set_monitoring_range(self, days: int): | |
| """ | |
| Set the monitoring range in days. | |
| Args: | |
| days: Number of days to look back for new images | |
| """ | |
| if days < 1: | |
| raise ValueError("Monitoring range must be at least 1 day") | |
| old_range = self.monitoring_range_days | |
| self.monitoring_range_days = days | |
| self.logger.info(f"Monitoring range changed from {old_range} to {days} days") | |
| def get_monitoring_range(self) -> int: | |
| """ | |
| Get the current monitoring range in days. | |
| Returns: | |
| Number of days being monitored | |
| """ | |
| return self.monitoring_range_days | |
| class TaskCoordinator: | |
| """Coordinates download tasks and UI updates.""" | |
| def __init__(self, monitoring_loop: MonitoringLoop): | |
| """ | |
| Initialize task coordinator. | |
| Args: | |
| monitoring_loop: MonitoringLoop instance to coordinate | |
| """ | |
| self.monitoring_loop = monitoring_loop | |
| self.logger = logging.getLogger(__name__) | |
| # Set up callbacks | |
| self.monitoring_loop.on_check_start = self._on_check_start | |
| self.monitoring_loop.on_check_complete = self._on_check_complete | |
| self.monitoring_loop.on_new_images_found = self._on_new_images_found | |
| def _on_check_start(self, check_time: datetime, check_number: int): | |
| """Handle monitoring check start.""" | |
| self.logger.info(f"Check #{check_number} started at {check_time.strftime('%H:%M:%S')}") | |
| def _on_check_complete(self, check_time: datetime, new_images: int, duration: float): | |
| """Handle monitoring check completion.""" | |
| self.logger.info(f"Check completed: {new_images} new images, {duration:.1f}s duration") | |
| def _on_new_images_found(self, urls: List[str]): | |
| """Handle new images found.""" | |
| self.logger.info(f"New images found: {len(urls)}") | |
| for url in urls[:5]: # Log first 5 URLs | |
| filename = url.split('/')[-1] | |
| self.logger.debug(f" - {filename}") | |
| if len(urls) > 5: | |
| self.logger.debug(f" ... and {len(urls) - 5} more") | |
| class StatusReporter: | |
| """Reports monitoring activity and download results.""" | |
| def __init__(self, monitoring_loop: MonitoringLoop): | |
| """ | |
| Initialize status reporter. | |
| Args: | |
| monitoring_loop: MonitoringLoop instance to report on | |
| """ | |
| self.monitoring_loop = monitoring_loop | |
| self.logger = logging.getLogger(__name__) | |
| def print_status(self): | |
| """Print current monitoring status.""" | |
| status = self.monitoring_loop.get_status() | |
| print("\n" + "="*50) | |
| print("NASA Solar Image Downloader - Status Report") | |
| print("="*50) | |
| print(f"Monitoring: {'Running' if status['is_running'] else 'Stopped'}") | |
| print(f"Check interval: {status['check_interval_minutes']} minutes") | |
| print(f"Monitoring range: {status['monitoring_range_days']} days") | |
| print(f"Total checks: {status['total_checks']}") | |
| if status['last_check_time']: | |
| last_check = status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') | |
| print(f"Last check: {last_check}") | |
| print(f"New images found: {status['new_images_found']}") | |
| print(f"Total downloads: {status['total_downloads']}") | |
| print(f"Failed downloads: {status['failed_downloads']}") | |
| print("="*50) | |
| def log_periodic_status(self, interval_minutes: int = 30): | |
| """ | |
| Log status periodically. | |
| Args: | |
| interval_minutes: Minutes between status logs | |
| """ | |
| def log_status(): | |
| status = self.monitoring_loop.get_status() | |
| self.logger.info( | |
| f"Status: {status['total_checks']} checks, " | |
| f"{status['new_images_found']} new images, " | |
| f"{status['total_downloads']} downloads" | |
| ) | |
| schedule.every(interval_minutes).minutes.do(log_status) |