Spaces:
Running
Running
| import logging | |
| import json | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime, date | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from scrapers.horoscope_scraper import HoroscopeScraper | |
| from scrapers.astrology_com_scraper import AstrologyComScraper | |
| from scrapers.horoscope_com_scraper import HoroscopeComScraper | |
| from utils.rate_limiter import RateLimiter | |
| from models import db, Horoscope | |
| from app import app | |
| logger = logging.getLogger(__name__) | |
| class HoroscopeService: | |
| """Service to manage horoscope scraping operations""" | |
| def __init__(self, max_workers: int = 3): | |
| """ | |
| Initialize horoscope service | |
| Args: | |
| max_workers: Maximum number of concurrent scrapers | |
| """ | |
| self.max_workers = max_workers | |
| self.rate_limiters = {} # Domain-specific rate limiters | |
| # Register available scrapers | |
| self.scrapers = { | |
| "astrology.com": AstrologyComScraper(), | |
| "horoscope.com": HoroscopeComScraper(), | |
| } | |
| def _get_rate_limiter(self, domain: str) -> RateLimiter: | |
| """Get or create a rate limiter for a specific domain""" | |
| if domain not in self.rate_limiters: | |
| # Default: 5 requests per minute for each domain | |
| self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5) | |
| return self.rate_limiters[domain] | |
| def scrape_sign(self, source: str, sign: str, date_str: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Scrape horoscope for a specific sign from a specific source | |
| Args: | |
| source: Source name (e.g., 'astrology.com') | |
| sign: Zodiac sign | |
| date_str: Optional date string (YYYY-MM-DD) | |
| Returns: | |
| Dictionary with horoscope data | |
| """ | |
| if source not in self.scrapers: | |
| return {"success": False, "error": f"Unknown source: {source}"} | |
| scraper = self.scrapers[source] | |
| base_url = scraper.base_url | |
| # Apply rate limiting | |
| rate_limiter = self._get_rate_limiter(source) | |
| if not rate_limiter.can_proceed(): | |
| wait_time = rate_limiter.get_wait_time() | |
| logger.warning(f"Rate limit reached for {source}. Waiting {wait_time:.2f} seconds") | |
| time.sleep(wait_time) | |
| # Perform scraping | |
| result = scraper.scrape_sign(base_url, sign, date_str) | |
| rate_limiter.record_request() | |
| # Save to database if successful | |
| if result.get('success', False): | |
| self._save_to_database(result, source, sign, date_str) | |
| return result | |
| def scrape_all_signs(self, source: str, date_str: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Scrape horoscopes for all zodiac signs from a specific source | |
| Args: | |
| source: Source name (e.g., 'astrology.com') | |
| date_str: Optional date string (YYYY-MM-DD) | |
| Returns: | |
| List of dictionaries with horoscope data | |
| """ | |
| if source not in self.scrapers: | |
| return [{"success": False, "error": f"Unknown source: {source}"}] | |
| scraper = self.scrapers[source] | |
| zodiac_signs = scraper.ZODIAC_SIGNS | |
| results = [] | |
| # Use ThreadPoolExecutor for concurrent scraping | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| # Submit scraping tasks | |
| future_to_sign = { | |
| executor.submit(self.scrape_sign, source, sign, date_str): sign | |
| for sign in zodiac_signs | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_sign): | |
| sign = future_to_sign[future] | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| logger.info(f"Completed scraping {sign} horoscope from {source}") | |
| except Exception as e: | |
| logger.error(f"Exception scraping {sign} from {source}: {str(e)}") | |
| results.append({ | |
| "success": False, | |
| "sign": sign, | |
| "source": source, | |
| "error": str(e), | |
| "scraped_at": time.time() | |
| }) | |
| return results | |
| def scrape_sign_from_all_sources(self, sign: str, date_str: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Scrape horoscope for a specific sign from all available sources | |
| Args: | |
| sign: Zodiac sign | |
| date_str: Optional date string (YYYY-MM-DD) | |
| Returns: | |
| List of dictionaries with horoscope data | |
| """ | |
| results = [] | |
| # Use ThreadPoolExecutor for concurrent scraping | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| # Submit scraping tasks | |
| future_to_source = { | |
| executor.submit(self.scrape_sign, source, sign, date_str): source | |
| for source in self.scrapers.keys() | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_source): | |
| source = future_to_source[future] | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| logger.info(f"Completed scraping {sign} horoscope from {source}") | |
| except Exception as e: | |
| logger.error(f"Exception scraping {sign} from {source}: {str(e)}") | |
| results.append({ | |
| "success": False, | |
| "sign": sign, | |
| "source": source, | |
| "error": str(e), | |
| "scraped_at": time.time() | |
| }) | |
| return results | |
| def scrape_all_horoscopes(self, date_str: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Scrape horoscopes for all signs from all sources | |
| Args: | |
| date_str: Optional date string (YYYY-MM-DD) | |
| Returns: | |
| List of dictionaries with horoscope data | |
| """ | |
| all_results = [] | |
| for source in self.scrapers.keys(): | |
| results = self.scrape_all_signs(source, date_str) | |
| all_results.extend(results) | |
| return all_results | |
| def _save_to_database(self, result: Dict[str, Any], source: str, sign: str, date_str: Optional[str] = None) -> None: | |
| """Save horoscope data to database""" | |
| try: | |
| # Extract data from result | |
| prediction = result.get('prediction', '') | |
| # Parse date | |
| if date_str: | |
| horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
| else: | |
| # Use the date from the scraper or today | |
| horoscope_date = datetime.strptime(result.get('date', date.today().isoformat()), '%Y-%m-%d').date() | |
| with app.app_context(): | |
| # Check if horoscope already exists for this sign, date, and source | |
| existing = Horoscope.query.filter_by( | |
| sign=sign.lower(), | |
| date=horoscope_date, | |
| source=source | |
| ).first() | |
| if existing: | |
| # Update existing horoscope | |
| existing.prediction = prediction | |
| db.session.commit() | |
| logger.info(f"Updated horoscope for {sign} on {horoscope_date} from {source}") | |
| else: | |
| # Create new horoscope | |
| horoscope = Horoscope() | |
| horoscope.sign = sign.lower() | |
| horoscope.date = horoscope_date | |
| horoscope.prediction = prediction | |
| horoscope.source = source | |
| db.session.add(horoscope) | |
| db.session.commit() | |
| logger.info(f"Added horoscope for {sign} on {horoscope_date} from {source}") | |
| except Exception as e: | |
| logger.error(f"Error saving horoscope to database: {str(e)}") | |
| def get_horoscope(self, sign: str, date_str: Optional[str] = None, source: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Retrieve horoscope from database | |
| Args: | |
| sign: Zodiac sign | |
| date_str: Optional date string (YYYY-MM-DD) | |
| source: Optional source name | |
| Returns: | |
| Dictionary with horoscope data | |
| """ | |
| try: | |
| # Parse date | |
| if date_str: | |
| horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
| else: | |
| horoscope_date = date.today() | |
| with app.app_context(): | |
| query = Horoscope.query.filter_by( | |
| sign=sign.lower(), | |
| date=horoscope_date | |
| ) | |
| if source: | |
| query = query.filter_by(source=source) | |
| horoscopes = query.all() | |
| if not horoscopes: | |
| # If no horoscope found, try to scrape it | |
| if source: | |
| self.scrape_sign(source, sign, date_str) | |
| # Try to fetch again | |
| horoscope = Horoscope.query.filter_by( | |
| sign=sign.lower(), | |
| date=horoscope_date, | |
| source=source | |
| ).first() | |
| if horoscope: | |
| return horoscope.to_dict() | |
| else: | |
| # Try all sources | |
| self.scrape_sign_from_all_sources(sign, date_str) | |
| # Try to fetch again | |
| horoscopes = Horoscope.query.filter_by( | |
| sign=sign.lower(), | |
| date=horoscope_date | |
| ).all() | |
| if not horoscopes: | |
| return {"error": f"No horoscope found for {sign} on {horoscope_date}"} | |
| # If multiple horoscopes found, return them all | |
| if len(horoscopes) > 1: | |
| return {"horoscopes": [h.to_dict() for h in horoscopes]} | |
| else: | |
| return horoscopes[0].to_dict() | |
| except Exception as e: | |
| logger.error(f"Error getting horoscope from database: {str(e)}") | |
| return {"error": str(e)} | |
| def get_horoscopes_for_date(self, date_str: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Retrieve all horoscopes for a specific date | |
| Args: | |
| date_str: Optional date string (YYYY-MM-DD) | |
| Returns: | |
| Dictionary with horoscope data for all signs | |
| """ | |
| try: | |
| # Parse date | |
| if date_str: | |
| horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
| else: | |
| horoscope_date = date.today() | |
| with app.app_context(): | |
| horoscopes = Horoscope.query.filter_by(date=horoscope_date).all() | |
| if not horoscopes: | |
| # If no horoscopes found, try to scrape them | |
| self.scrape_all_horoscopes(date_str) | |
| # Try to fetch again | |
| horoscopes = Horoscope.query.filter_by(date=horoscope_date).all() | |
| if not horoscopes: | |
| return {"error": f"No horoscopes found for {horoscope_date}"} | |
| # Group by sign | |
| result = {} | |
| for horoscope in horoscopes: | |
| sign = horoscope.sign | |
| if sign not in result: | |
| result[sign] = [] | |
| result[sign].append(horoscope.to_dict()) | |
| return {"date": horoscope_date.isoformat(), "horoscopes": result} | |
| except Exception as e: | |
| logger.error(f"Error getting horoscopes for date: {str(e)}") | |
| return {"error": str(e)} | |
| # Create a singleton instance | |
| horoscope_service = HoroscopeService() |