File size: 7,468 Bytes
ad06665 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
import sqlite3
import requests
import json
import time
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
from typing import List, Dict, Any, Optional, Tuple
from loguru import logger
from bs4 import BeautifulSoup
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from src.config import settings
# Setup logging
logger.remove()
logger.add(sys.stderr, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>", level="INFO")
class FastSatelliteScraper:
def __init__(self) -> None:
"""Initialize the scraper with session and configuration."""
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
self.results: List[Dict[str, Any]] = []
def load_targets(self) -> List[Tuple]:
"""Load satellite targets from the SQLite database."""
conn = sqlite3.connect(settings.DB_PATH)
cursor = conn.cursor()
try:
satellites = cursor.execute("SELECT id, country_name, category_name, operator, satellite_name, url FROM satellites").fetchall()
return satellites
finally:
conn.close()
def clean_text(self, text: Optional[str]) -> str:
"""Clean whitespace and non-breaking spaces from text."""
if text:
return text.strip().replace(u'\xa0', u' ')
return ""
def parse_specifications(self, soup: BeautifulSoup) -> Dict[str, str]:
"""Parse the specifications table."""
specs = {}
table = soup.find("table", id="satdata")
if not table:
return specs
for row in table.find_all("tr"):
th = row.find("th")
td = row.find("td")
if th and td:
key = self.clean_text(th.get_text())
val = self.clean_text(td.get_text())
if key.endswith(":"):
key = key[:-1]
specs[key] = val
return specs
def parse_launches(self, soup: BeautifulSoup) -> List[Dict[str, Any]]:
"""Parse the launch history table."""
launches = []
table = soup.find("table", id="satlist")
if not table:
return launches
# Get headers
headers = []
header_row = table.find("tr")
if header_row:
headers = [self.clean_text(th.get_text()) for th in header_row.find_all(["th", "td"])]
# Process rows
rows = table.find_all("tr")[1:] # Skip header
for row in rows:
cols = row.find_all("td")
if not cols:
continue
row_data = [self.clean_text(col.get_text()) for col in cols]
if len(row_data) == len(headers):
launch_item = dict(zip(headers, row_data))
else:
launch_item = {"data": row_data}
launches.append(launch_item)
return launches
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.RequestException, TimeoutError))
)
def fetch_details(self, sat_tuple: Tuple) -> Optional[Dict[str, Any]]:
"""
Fetch details for a single satellite.
Retries on network errors up to 3 times.
"""
sat_id, country, category, operator, name, url = sat_tuple
try:
resp = self.session.get(url, timeout=settings.REQUEST_TIMEOUT)
if resp.status_code != 200:
logger.warning(f"Failed to fetch {name}: {resp.status_code}")
return None
soup = BeautifulSoup(resp.text, "lxml")
# 1. Description
desc_div = soup.find("div", id="satdescription")
description = ""
if desc_div:
description = "\n".join([self.clean_text(p.get_text()) for p in desc_div.find_all("p")])
# 2. Specifications
specs = self.parse_specifications(soup)
# 3. Launches
launches = self.parse_launches(soup)
# 4. Images
images = []
if desc_div:
img_tags = desc_div.find_all("img")
for img in img_tags:
src = img.get("src")
if src:
images.append(urljoin(url, src))
return {
"id": sat_id,
"name": name,
"country": country,
"category": category,
"operator": operator,
"url": url,
"description": description,
"specifications": specs,
"launch_history": launches,
"images": images
}
except Exception as e:
# Logger context is useful here, retry decorator handles the retry logic
logger.debug(f"Error processing {name} (attempting retry if network error): {e}")
raise e # Re-raise to trigger tenacity retry
def run(self) -> None:
"""Run the full scraping pipeline."""
satellites = self.load_targets()
logger.info(f"Loaded {len(satellites)} satellites from DB.")
start_time = time.time()
with ThreadPoolExecutor(max_workers=settings.MAX_WORKERS) as executor:
# We wrap the call to handle non-retryable exceptions gracefully inside the loop
futures = {executor.submit(self._safe_fetch, sat): sat for sat in satellites}
completed = 0
for future in as_completed(futures):
result = future.result()
if result:
self.results.append(result)
completed += 1
if completed % 50 == 0:
logger.info(f"Progress: {completed}/{len(satellites)}")
duration = time.time() - start_time
logger.info(f"Scraping completed in {duration:.2f} seconds.")
self.save_results()
def _safe_fetch(self, sat: Tuple) -> Optional[Dict[str, Any]]:
"""Wrapper to catch exceptions that exhaust retries."""
try:
return self.fetch_details(sat)
except Exception as e:
logger.error(f"Failed to process satellite {sat[4]} after retries: {e}")
return None
def save_results(self) -> None:
"""Save results to JSON file."""
logger.info(f"Saving {len(self.results)} records to {settings.OUTPUT_FILE}...")
# Ensure directory exists
settings.OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(settings.OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
logger.info("Done.")
if __name__ == "__main__":
if not settings.DB_PATH.exists():
logger.error(f"Database not found at {settings.DB_PATH}. Please run the initial scraper first.")
sys.exit(1)
scraper = FastSatelliteScraper()
scraper.run()
|