import sqlite3 import requests import json import time import sys from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urljoin from typing import List, Dict, Any, Optional, Tuple from loguru import logger from bs4 import BeautifulSoup from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from src.config import settings # Setup logging logger.remove() logger.add(sys.stderr, format="{time:HH:mm:ss} | {level: <8} | {message}", level="INFO") class FastSatelliteScraper: def __init__(self) -> None: """Initialize the scraper with session and configuration.""" self.session = requests.Session() self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }) self.results: List[Dict[str, Any]] = [] def load_targets(self) -> List[Tuple]: """Load satellite targets from the SQLite database.""" conn = sqlite3.connect(settings.DB_PATH) cursor = conn.cursor() try: satellites = cursor.execute("SELECT id, country_name, category_name, operator, satellite_name, url FROM satellites").fetchall() return satellites finally: conn.close() def clean_text(self, text: Optional[str]) -> str: """Clean whitespace and non-breaking spaces from text.""" if text: return text.strip().replace(u'\xa0', u' ') return "" def parse_specifications(self, soup: BeautifulSoup) -> Dict[str, str]: """Parse the specifications table.""" specs = {} table = soup.find("table", id="satdata") if not table: return specs for row in table.find_all("tr"): th = row.find("th") td = row.find("td") if th and td: key = self.clean_text(th.get_text()) val = self.clean_text(td.get_text()) if key.endswith(":"): key = key[:-1] specs[key] = val return specs def parse_launches(self, soup: BeautifulSoup) -> List[Dict[str, Any]]: """Parse the launch history table.""" launches = [] table = soup.find("table", id="satlist") if not table: return launches # Get headers headers = [] header_row = table.find("tr") if header_row: headers = [self.clean_text(th.get_text()) for th in header_row.find_all(["th", "td"])] # Process rows rows = table.find_all("tr")[1:] # Skip header for row in rows: cols = row.find_all("td") if not cols: continue row_data = [self.clean_text(col.get_text()) for col in cols] if len(row_data) == len(headers): launch_item = dict(zip(headers, row_data)) else: launch_item = {"data": row_data} launches.append(launch_item) return launches @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((requests.RequestException, TimeoutError)) ) def fetch_details(self, sat_tuple: Tuple) -> Optional[Dict[str, Any]]: """ Fetch details for a single satellite. Retries on network errors up to 3 times. """ sat_id, country, category, operator, name, url = sat_tuple try: resp = self.session.get(url, timeout=settings.REQUEST_TIMEOUT) if resp.status_code != 200: logger.warning(f"Failed to fetch {name}: {resp.status_code}") return None soup = BeautifulSoup(resp.text, "lxml") # 1. Description desc_div = soup.find("div", id="satdescription") description = "" if desc_div: description = "\n".join([self.clean_text(p.get_text()) for p in desc_div.find_all("p")]) # 2. Specifications specs = self.parse_specifications(soup) # 3. Launches launches = self.parse_launches(soup) # 4. Images images = [] if desc_div: img_tags = desc_div.find_all("img") for img in img_tags: src = img.get("src") if src: images.append(urljoin(url, src)) return { "id": sat_id, "name": name, "country": country, "category": category, "operator": operator, "url": url, "description": description, "specifications": specs, "launch_history": launches, "images": images } except Exception as e: # Logger context is useful here, retry decorator handles the retry logic logger.debug(f"Error processing {name} (attempting retry if network error): {e}") raise e # Re-raise to trigger tenacity retry def run(self) -> None: """Run the full scraping pipeline.""" satellites = self.load_targets() logger.info(f"Loaded {len(satellites)} satellites from DB.") start_time = time.time() with ThreadPoolExecutor(max_workers=settings.MAX_WORKERS) as executor: # We wrap the call to handle non-retryable exceptions gracefully inside the loop futures = {executor.submit(self._safe_fetch, sat): sat for sat in satellites} completed = 0 for future in as_completed(futures): result = future.result() if result: self.results.append(result) completed += 1 if completed % 50 == 0: logger.info(f"Progress: {completed}/{len(satellites)}") duration = time.time() - start_time logger.info(f"Scraping completed in {duration:.2f} seconds.") self.save_results() def _safe_fetch(self, sat: Tuple) -> Optional[Dict[str, Any]]: """Wrapper to catch exceptions that exhaust retries.""" try: return self.fetch_details(sat) except Exception as e: logger.error(f"Failed to process satellite {sat[4]} after retries: {e}") return None def save_results(self) -> None: """Save results to JSON file.""" logger.info(f"Saving {len(self.results)} records to {settings.OUTPUT_FILE}...") # Ensure directory exists settings.OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) with open(settings.OUTPUT_FILE, "w", encoding="utf-8") as f: json.dump(self.results, f, indent=2, ensure_ascii=False) logger.info("Done.") if __name__ == "__main__": if not settings.DB_PATH.exists(): logger.error(f"Database not found at {settings.DB_PATH}. Please run the initial scraper first.") sys.exit(1) scraper = FastSatelliteScraper() scraper.run()