Spaces:
Running
Running
| import logging | |
| import threading | |
| import time | |
| import schedule | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Callable, Optional | |
| from app import app | |
| from models import db, ScheduledJob | |
| logger = logging.getLogger(__name__) | |
| class SchedulerService: | |
| """Service for managing scheduled scraping jobs""" | |
| def __init__(self): | |
| """Initialize the scheduler service""" | |
| self.scheduler = schedule | |
| self.running = False | |
| self.thread = None | |
| self.jobs = {} # Store job references by name | |
| def start(self): | |
| """Start the scheduler in a background thread""" | |
| if self.running: | |
| logger.warning("Scheduler already running") | |
| return | |
| self.running = True | |
| self.thread = threading.Thread(target=self._run_schedule) | |
| self.thread.daemon = True | |
| self.thread.start() | |
| logger.info("Scheduler started") | |
| # Load jobs from database | |
| self._load_jobs_from_db() | |
| def stop(self): | |
| """Stop the scheduler""" | |
| if not self.running: | |
| logger.warning("Scheduler not running") | |
| return | |
| self.running = False | |
| if self.thread: | |
| self.thread.join(timeout=5) | |
| self.thread = None | |
| # Clear all jobs | |
| schedule.clear() | |
| self.jobs = {} | |
| logger.info("Scheduler stopped") | |
| def _run_schedule(self): | |
| """Run the scheduler loop""" | |
| while self.running: | |
| self.scheduler.run_pending() | |
| time.sleep(1) | |
| def _load_jobs_from_db(self): | |
| """Load scheduled jobs from database""" | |
| try: | |
| with app.app_context(): | |
| jobs = ScheduledJob.query.filter_by(enabled=True).all() | |
| for job in jobs: | |
| self.add_job( | |
| job.name, | |
| job.frequency, | |
| None, # We'll map by name in add_job | |
| update_db=False # Don't create duplicate DB entries | |
| ) | |
| logger.info(f"Loaded {len(jobs)} scheduled jobs from database") | |
| except Exception as e: | |
| logger.error(f"Error loading jobs from database: {str(e)}") | |
| def add_job(self, name: str, frequency: str, job_func: Optional[Callable] = None, update_db: bool = True) -> bool: | |
| """ | |
| Add a new scheduled job | |
| Args: | |
| name: Unique name for the job | |
| frequency: Frequency string (e.g., 'daily', 'hourly', '30 minutes') | |
| job_func: Function to call when job runs | |
| update_db: Whether to update the database with the new job | |
| Returns: | |
| Success status | |
| """ | |
| if name in self.jobs: | |
| logger.warning(f"Job '{name}' already exists") | |
| return False | |
| # Map job name to function if not provided | |
| if job_func is None: | |
| job_func = self._get_job_function(name) | |
| if not job_func: | |
| logger.error(f"No function mapped for job '{name}'") | |
| return False | |
| # Schedule job based on frequency | |
| scheduled_job = self._schedule_by_frequency(name, frequency, job_func) | |
| if not scheduled_job: | |
| logger.error(f"Failed to schedule job '{name}' with frequency '{frequency}'") | |
| return False | |
| # Store job reference | |
| self.jobs[name] = scheduled_job | |
| # Update database | |
| if update_db: | |
| try: | |
| with app.app_context(): | |
| # Check if job already exists | |
| existing = ScheduledJob.query.filter_by(name=name).first() | |
| if existing: | |
| existing.frequency = frequency | |
| existing.enabled = True | |
| db.session.commit() | |
| else: | |
| # Calculate next run time | |
| next_run = self._calculate_next_run(frequency) | |
| # Create new job | |
| job = ScheduledJob() | |
| job.name = name | |
| job.frequency = frequency | |
| job.next_run = next_run | |
| job.enabled = True | |
| db.session.add(job) | |
| db.session.commit() | |
| logger.info(f"Added job '{name}' to database") | |
| except Exception as e: | |
| logger.error(f"Error adding job to database: {str(e)}") | |
| return False | |
| logger.info(f"Added job '{name}' with frequency '{frequency}'") | |
| return True | |
| def remove_job(self, name: str) -> bool: | |
| """ | |
| Remove a scheduled job | |
| Args: | |
| name: Name of the job to remove | |
| Returns: | |
| Success status | |
| """ | |
| if name not in self.jobs: | |
| logger.warning(f"Job '{name}' not found") | |
| return False | |
| # Cancel the job | |
| scheduled_job = self.jobs[name] | |
| self.scheduler.cancel_job(scheduled_job) | |
| del self.jobs[name] | |
| # Update database | |
| try: | |
| with app.app_context(): | |
| job = ScheduledJob.query.filter_by(name=name).first() | |
| if job: | |
| job.enabled = False | |
| db.session.commit() | |
| logger.info(f"Disabled job '{name}' in database") | |
| else: | |
| logger.warning(f"Job '{name}' not found in database") | |
| except Exception as e: | |
| logger.error(f"Error removing job from database: {str(e)}") | |
| return False | |
| logger.info(f"Removed job '{name}'") | |
| return True | |
| def get_all_jobs(self) -> List[Dict[str, Any]]: | |
| """Get list of all scheduled jobs""" | |
| job_list = [] | |
| try: | |
| with app.app_context(): | |
| jobs = ScheduledJob.query.all() | |
| for job in jobs: | |
| job_info = job.to_dict() | |
| job_info["active"] = job.name in self.jobs | |
| job_list.append(job_info) | |
| except Exception as e: | |
| logger.error(f"Error getting jobs from database: {str(e)}") | |
| return job_list | |
| def update_job_status(self, name: str, success: bool) -> bool: | |
| """ | |
| Update job status after running | |
| Args: | |
| name: Name of the job | |
| success: Whether the job ran successfully | |
| Returns: | |
| Success status | |
| """ | |
| try: | |
| with app.app_context(): | |
| job = ScheduledJob.query.filter_by(name=name).first() | |
| if job: | |
| job.last_run = datetime.utcnow() | |
| job.next_run = self._calculate_next_run(job.frequency) | |
| db.session.commit() | |
| logger.info(f"Updated job '{name}' status") | |
| return True | |
| else: | |
| logger.warning(f"Job '{name}' not found in database") | |
| except Exception as e: | |
| logger.error(f"Error updating job status: {str(e)}") | |
| return False | |
| def _schedule_by_frequency(self, name: str, frequency: str, job_func: Callable) -> Optional[schedule.Job]: | |
| """Schedule a job based on frequency string""" | |
| # Create a wrapper function to update job status | |
| def job_wrapper(): | |
| try: | |
| logger.info(f"Running scheduled job: {name}") | |
| result = job_func() | |
| self.update_job_status(name, True) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error running job '{name}': {str(e)}") | |
| self.update_job_status(name, False) | |
| # Schedule based on frequency patterns | |
| if frequency == 'daily': | |
| return schedule.every().day.at("00:00").do(job_wrapper) | |
| elif frequency == 'hourly': | |
| return schedule.every().hour.do(job_wrapper) | |
| elif 'minutes' in frequency or 'minute' in frequency: | |
| # Extract number of minutes (e.g., '30 minutes' -> 30) | |
| try: | |
| minutes = int(frequency.split()[0]) | |
| return schedule.every(minutes).minutes.do(job_wrapper) | |
| except (ValueError, IndexError): | |
| logger.error(f"Invalid minutes format: {frequency}") | |
| return None | |
| elif 'hours' in frequency or 'hour' in frequency: | |
| # Extract number of hours (e.g., '6 hours' -> 6) | |
| try: | |
| hours = int(frequency.split()[0]) | |
| return schedule.every(hours).hours.do(job_wrapper) | |
| except (ValueError, IndexError): | |
| logger.error(f"Invalid hours format: {frequency}") | |
| return None | |
| elif frequency == 'weekly': | |
| return schedule.every().week.do(job_wrapper) | |
| else: | |
| logger.error(f"Unsupported frequency: {frequency}") | |
| return None | |
| def _calculate_next_run(self, frequency: str) -> datetime: | |
| """Calculate next run time based on frequency""" | |
| now = datetime.utcnow() | |
| if frequency == 'daily': | |
| # Next run at midnight | |
| next_day = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
| return next_day | |
| elif frequency == 'hourly': | |
| # Next run at the top of the next hour | |
| next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) | |
| return next_hour | |
| elif 'minutes' in frequency or 'minute' in frequency: | |
| try: | |
| minutes = int(frequency.split()[0]) | |
| return now + timedelta(minutes=minutes) | |
| except (ValueError, IndexError): | |
| return now + timedelta(minutes=30) # Default to 30 minutes | |
| elif 'hours' in frequency or 'hour' in frequency: | |
| try: | |
| hours = int(frequency.split()[0]) | |
| return now + timedelta(hours=hours) | |
| except (ValueError, IndexError): | |
| return now + timedelta(hours=1) # Default to 1 hour | |
| elif frequency == 'weekly': | |
| return now + timedelta(days=7) | |
| else: | |
| return now + timedelta(days=1) # Default to daily | |
| def _get_job_function(self, name: str) -> Optional[Callable]: | |
| """Map job name to function""" | |
| from services.horoscope_service import horoscope_service | |
| from services.llm_service import llm_service | |
| from services.wordpress_service import wordpress_service | |
| # Map of job names to functions | |
| job_map = { | |
| "scrape_daily_horoscopes": self._scrape_daily_horoscopes, | |
| "consolidate_horoscopes": self._consolidate_horoscopes, | |
| "export_to_wordpress": self._export_to_wordpress, | |
| } | |
| return job_map.get(name) | |
| def _scrape_daily_horoscopes(self): | |
| """Job function to scrape daily horoscopes""" | |
| from services.horoscope_service import horoscope_service | |
| logger.info("Running daily horoscope scraping job") | |
| # Get today's date | |
| today = datetime.today().strftime('%Y-%m-%d') | |
| # Run the scraping operation | |
| results = horoscope_service.scrape_all_horoscopes(date_str=today) | |
| logger.info(f"Daily horoscope scraping completed: {len(results)} horoscopes scraped") | |
| return results | |
| def _consolidate_horoscopes(self): | |
| """Job function to consolidate horoscopes using LLM""" | |
| from services.llm_service import llm_service | |
| from models import Horoscope, ConsolidatedHoroscope | |
| import json | |
| logger.info("Running horoscope consolidation job") | |
| today = datetime.today().date() | |
| try: | |
| with app.app_context(): | |
| # Get all zodiac signs | |
| signs = ["aries", "taurus", "gemini", "cancer", | |
| "leo", "virgo", "libra", "scorpio", | |
| "sagittarius", "capricorn", "aquarius", "pisces"] | |
| for sign in signs: | |
| # Find unconsolidated horoscopes for today and this sign | |
| horoscopes = Horoscope.query.filter_by( | |
| sign=sign, | |
| date=today | |
| ).all() | |
| if not horoscopes: | |
| logger.warning(f"No horoscopes found for {sign} on {today}") | |
| continue | |
| # Check if already consolidated | |
| existing = ConsolidatedHoroscope.query.filter_by( | |
| sign=sign, | |
| date=today | |
| ).first() | |
| if existing: | |
| logger.info(f"Horoscopes for {sign} on {today} already consolidated") | |
| continue | |
| # Convert to format needed by LLM service | |
| horoscope_data = [h.to_dict() for h in horoscopes] | |
| # Consolidate data using LLM | |
| consolidated = llm_service.consolidate_horoscopes(horoscope_data) | |
| if not consolidated or "error" in consolidated: | |
| logger.error(f"Error consolidating horoscopes for {sign}: {consolidated.get('error', 'Unknown error')}") | |
| continue | |
| # Create new consolidated horoscope | |
| sources = [h.source for h in horoscopes] | |
| new_consolidated = ConsolidatedHoroscope() | |
| new_consolidated.sign = sign | |
| new_consolidated.date = today | |
| new_consolidated.consolidated_prediction = consolidated.get("consolidated_prediction", "") | |
| new_consolidated.sources = json.dumps(sources) | |
| db.session.add(new_consolidated) | |
| db.session.commit() | |
| logger.info(f"Consolidated horoscope created for {sign} on {today}") | |
| logger.info("Horoscope consolidation job completed") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error in consolidation job: {str(e)}") | |
| return False | |
| def _export_to_wordpress(self): | |
| """Job function to export horoscopes to WordPress""" | |
| from services.wordpress_service import wordpress_service | |
| from models import ConsolidatedHoroscope, WordPressExport | |
| logger.info("Running WordPress export job") | |
| try: | |
| with app.app_context(): | |
| # Find consolidated horoscopes that haven't been exported | |
| consolidated_horoscopes = db.session.query(ConsolidatedHoroscope)\ | |
| .outerjoin(WordPressExport, ConsolidatedHoroscope.id == WordPressExport.horoscope_id)\ | |
| .filter(WordPressExport.id == None)\ | |
| .all() | |
| if not consolidated_horoscopes: | |
| logger.info("No new horoscopes to export to WordPress") | |
| return True | |
| logger.info(f"Found {len(consolidated_horoscopes)} horoscopes to export") | |
| for horoscope in consolidated_horoscopes: | |
| # Export to WordPress | |
| result = wordpress_service.publish_horoscope(horoscope) | |
| if not result or "error" in result: | |
| logger.error(f"Error exporting horoscope {horoscope.id} to WordPress: {result.get('error', 'Unknown error')}") | |
| # Create failed export record | |
| export = WordPressExport() | |
| export.horoscope_id = horoscope.id | |
| export.status = "failed" | |
| db.session.add(export) | |
| db.session.commit() | |
| continue | |
| # Create successful export record | |
| export = WordPressExport() | |
| export.horoscope_id = horoscope.id | |
| export.wordpress_post_id = result.get("post_id") | |
| export.wordpress_url = result.get("url") | |
| export.status = "published" | |
| db.session.add(export) | |
| db.session.commit() | |
| logger.info(f"Exported horoscope {horoscope.id} to WordPress as post {result.get('post_id')}") | |
| logger.info("WordPress export job completed") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error in WordPress export job: {str(e)}") | |
| return False | |
| # Create a singleton instance | |
| scheduler_service = SchedulerService() |