Spaces:
Running
Running
| import logging | |
| import time | |
| from typing import Dict, Any, List, Optional, Tuple, Type | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from urllib.parse import urlparse | |
| from scrapers.base_scraper import BaseScraper | |
| from scrapers.news_scraper import NewsScraper | |
| from scrapers.blog_scraper import BlogScraper | |
| from utils.rate_limiter import RateLimiter | |
| logger = logging.getLogger(__name__) | |
| class ScraperService: | |
| """Service to manage scraping operations""" | |
| def __init__(self, max_workers: int = 5, timeout: int = 30): | |
| """ | |
| Initialize scraper service | |
| Args: | |
| max_workers: Maximum number of concurrent scrapers | |
| timeout: Timeout for each scraping operation in seconds | |
| """ | |
| self.max_workers = max_workers | |
| self.timeout = timeout | |
| self.rate_limiters = {} # Domain-specific rate limiters | |
| # Register available scrapers | |
| self.scrapers = { | |
| "news": NewsScraper(timeout), | |
| "blog": BlogScraper(timeout), | |
| } | |
| def get_scraper_for_url(self, url: str) -> Tuple[BaseScraper, str]: | |
| """ | |
| Determine the appropriate scraper to use for a URL | |
| Args: | |
| url: URL to scrape | |
| Returns: | |
| Tuple of (scraper instance, scraper type) | |
| """ | |
| # Simple logic to determine scraper type based on URL patterns | |
| # This could be enhanced with more sophisticated detection | |
| domain = urlparse(url).netloc.lower() | |
| # News site patterns | |
| news_patterns = ["news", "cnn", "bbc", "reuters", "nytimes", "washingtonpost", | |
| "guardian", "aljazeera", "foxnews", "nbcnews", "abc"] | |
| # Blog patterns | |
| blog_patterns = ["blog", "medium", "wordpress", "blogspot", "tumblr", | |
| "substack", "ghost", "hashnode"] | |
| # Check domain against patterns | |
| for pattern in news_patterns: | |
| if pattern in domain: | |
| return self.scrapers["news"], "news" | |
| for pattern in blog_patterns: | |
| if pattern in domain: | |
| return self.scrapers["blog"], "blog" | |
| # Default to news scraper | |
| return self.scrapers["news"], "news" | |
| def _get_rate_limiter(self, domain: str) -> RateLimiter: | |
| """Get or create a rate limiter for a specific domain""" | |
| if domain not in self.rate_limiters: | |
| # Default: 5 requests per minute for each domain | |
| self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5) | |
| return self.rate_limiters[domain] | |
| def scrape_url(self, url: str, scraper_type: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Scrape a single URL | |
| Args: | |
| url: URL to scrape | |
| scraper_type: Optional type of scraper to use | |
| Returns: | |
| Dictionary with scraped data | |
| """ | |
| try: | |
| # Parse domain for rate limiting | |
| domain = urlparse(url).netloc | |
| rate_limiter = self._get_rate_limiter(domain) | |
| # Check if we can proceed with the request | |
| if not rate_limiter.can_proceed(): | |
| wait_time = rate_limiter.get_wait_time() | |
| logger.warning(f"Rate limit reached for {domain}. Waiting {wait_time:.2f} seconds") | |
| time.sleep(wait_time) | |
| # Select appropriate scraper | |
| if scraper_type and scraper_type in self.scrapers: | |
| scraper = self.scrapers[scraper_type] | |
| selected_type = scraper_type | |
| else: | |
| scraper, selected_type = self.get_scraper_for_url(url) | |
| logger.info(f"Scraping {url} with {selected_type} scraper") | |
| # Perform scraping and record the request | |
| result = scraper.scrape(url) | |
| rate_limiter.record_request() | |
| # Add metadata about scraping | |
| result["scraper_type"] = selected_type | |
| result["scraped_at"] = time.time() | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error scraping URL {url}: {str(e)}") | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": str(e), | |
| "scraped_at": time.time() | |
| } | |
| def scrape_multiple_urls(self, urls: List[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Scrape multiple URLs in parallel | |
| Args: | |
| urls: List of URLs to scrape | |
| Returns: | |
| List of dictionaries with scraped data | |
| """ | |
| results = [] | |
| # Use ThreadPoolExecutor for concurrent scraping | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| # Submit scraping tasks | |
| future_to_url = {executor.submit(self.scrape_url, url): url for url in urls} | |
| # Collect results as they complete | |
| for future in as_completed(future_to_url): | |
| url = future_to_url[future] | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| logger.info(f"Completed scraping: {url}") | |
| except Exception as e: | |
| logger.error(f"Exception scraping {url}: {str(e)}") | |
| results.append({ | |
| "success": False, | |
| "url": url, | |
| "error": str(e), | |
| "scraped_at": time.time() | |
| }) | |
| return results | |