|
|
import sqlite3 |
|
|
import requests |
|
|
import json |
|
|
import time |
|
|
import sys |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
from urllib.parse import urljoin |
|
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
from loguru import logger |
|
|
from bs4 import BeautifulSoup |
|
|
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type |
|
|
|
|
|
from src.config import settings |
|
|
|
|
|
|
|
|
logger.remove() |
|
|
logger.add(sys.stderr, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>", level="INFO") |
|
|
|
|
|
class FastSatelliteScraper: |
|
|
def __init__(self) -> None: |
|
|
"""Initialize the scraper with session and configuration.""" |
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({ |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
|
}) |
|
|
self.results: List[Dict[str, Any]] = [] |
|
|
|
|
|
def load_targets(self) -> List[Tuple]: |
|
|
"""Load satellite targets from the SQLite database.""" |
|
|
conn = sqlite3.connect(settings.DB_PATH) |
|
|
cursor = conn.cursor() |
|
|
try: |
|
|
satellites = cursor.execute("SELECT id, country_name, category_name, operator, satellite_name, url FROM satellites").fetchall() |
|
|
return satellites |
|
|
finally: |
|
|
conn.close() |
|
|
|
|
|
def clean_text(self, text: Optional[str]) -> str: |
|
|
"""Clean whitespace and non-breaking spaces from text.""" |
|
|
if text: |
|
|
return text.strip().replace(u'\xa0', u' ') |
|
|
return "" |
|
|
|
|
|
def parse_specifications(self, soup: BeautifulSoup) -> Dict[str, str]: |
|
|
"""Parse the specifications table.""" |
|
|
specs = {} |
|
|
table = soup.find("table", id="satdata") |
|
|
if not table: |
|
|
return specs |
|
|
|
|
|
for row in table.find_all("tr"): |
|
|
th = row.find("th") |
|
|
td = row.find("td") |
|
|
if th and td: |
|
|
key = self.clean_text(th.get_text()) |
|
|
val = self.clean_text(td.get_text()) |
|
|
if key.endswith(":"): |
|
|
key = key[:-1] |
|
|
specs[key] = val |
|
|
return specs |
|
|
|
|
|
def parse_launches(self, soup: BeautifulSoup) -> List[Dict[str, Any]]: |
|
|
"""Parse the launch history table.""" |
|
|
launches = [] |
|
|
table = soup.find("table", id="satlist") |
|
|
if not table: |
|
|
return launches |
|
|
|
|
|
|
|
|
headers = [] |
|
|
header_row = table.find("tr") |
|
|
if header_row: |
|
|
headers = [self.clean_text(th.get_text()) for th in header_row.find_all(["th", "td"])] |
|
|
|
|
|
|
|
|
rows = table.find_all("tr")[1:] |
|
|
for row in rows: |
|
|
cols = row.find_all("td") |
|
|
if not cols: |
|
|
continue |
|
|
|
|
|
row_data = [self.clean_text(col.get_text()) for col in cols] |
|
|
|
|
|
if len(row_data) == len(headers): |
|
|
launch_item = dict(zip(headers, row_data)) |
|
|
else: |
|
|
launch_item = {"data": row_data} |
|
|
|
|
|
launches.append(launch_item) |
|
|
|
|
|
return launches |
|
|
|
|
|
@retry( |
|
|
stop=stop_after_attempt(3), |
|
|
wait=wait_exponential(multiplier=1, min=2, max=10), |
|
|
retry=retry_if_exception_type((requests.RequestException, TimeoutError)) |
|
|
) |
|
|
def fetch_details(self, sat_tuple: Tuple) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch details for a single satellite. |
|
|
Retries on network errors up to 3 times. |
|
|
""" |
|
|
sat_id, country, category, operator, name, url = sat_tuple |
|
|
|
|
|
try: |
|
|
resp = self.session.get(url, timeout=settings.REQUEST_TIMEOUT) |
|
|
if resp.status_code != 200: |
|
|
logger.warning(f"Failed to fetch {name}: {resp.status_code}") |
|
|
return None |
|
|
|
|
|
soup = BeautifulSoup(resp.text, "lxml") |
|
|
|
|
|
|
|
|
desc_div = soup.find("div", id="satdescription") |
|
|
description = "" |
|
|
if desc_div: |
|
|
description = "\n".join([self.clean_text(p.get_text()) for p in desc_div.find_all("p")]) |
|
|
|
|
|
|
|
|
specs = self.parse_specifications(soup) |
|
|
|
|
|
|
|
|
launches = self.parse_launches(soup) |
|
|
|
|
|
|
|
|
images = [] |
|
|
if desc_div: |
|
|
img_tags = desc_div.find_all("img") |
|
|
for img in img_tags: |
|
|
src = img.get("src") |
|
|
if src: |
|
|
images.append(urljoin(url, src)) |
|
|
|
|
|
return { |
|
|
"id": sat_id, |
|
|
"name": name, |
|
|
"country": country, |
|
|
"category": category, |
|
|
"operator": operator, |
|
|
"url": url, |
|
|
"description": description, |
|
|
"specifications": specs, |
|
|
"launch_history": launches, |
|
|
"images": images |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
logger.debug(f"Error processing {name} (attempting retry if network error): {e}") |
|
|
raise e |
|
|
|
|
|
def run(self) -> None: |
|
|
"""Run the full scraping pipeline.""" |
|
|
satellites = self.load_targets() |
|
|
logger.info(f"Loaded {len(satellites)} satellites from DB.") |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=settings.MAX_WORKERS) as executor: |
|
|
|
|
|
futures = {executor.submit(self._safe_fetch, sat): sat for sat in satellites} |
|
|
|
|
|
completed = 0 |
|
|
for future in as_completed(futures): |
|
|
result = future.result() |
|
|
if result: |
|
|
self.results.append(result) |
|
|
|
|
|
completed += 1 |
|
|
if completed % 50 == 0: |
|
|
logger.info(f"Progress: {completed}/{len(satellites)}") |
|
|
|
|
|
duration = time.time() - start_time |
|
|
logger.info(f"Scraping completed in {duration:.2f} seconds.") |
|
|
|
|
|
self.save_results() |
|
|
|
|
|
def _safe_fetch(self, sat: Tuple) -> Optional[Dict[str, Any]]: |
|
|
"""Wrapper to catch exceptions that exhaust retries.""" |
|
|
try: |
|
|
return self.fetch_details(sat) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process satellite {sat[4]} after retries: {e}") |
|
|
return None |
|
|
|
|
|
def save_results(self) -> None: |
|
|
"""Save results to JSON file.""" |
|
|
logger.info(f"Saving {len(self.results)} records to {settings.OUTPUT_FILE}...") |
|
|
|
|
|
settings.OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(settings.OUTPUT_FILE, "w", encoding="utf-8") as f: |
|
|
json.dump(self.results, f, indent=2, ensure_ascii=False) |
|
|
logger.info("Done.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
if not settings.DB_PATH.exists(): |
|
|
logger.error(f"Database not found at {settings.DB_PATH}. Please run the initial scraper first.") |
|
|
sys.exit(1) |
|
|
|
|
|
scraper = FastSatelliteScraper() |
|
|
scraper.run() |
|
|
|