test / src /full_scraper.py
Kirtan001's picture
Fresh Start: Clean Repo without binaries
ad06665
import sqlite3
import requests
import json
import time
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
from typing import List, Dict, Any, Optional, Tuple
from loguru import logger
from bs4 import BeautifulSoup
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from src.config import settings
# Setup logging
logger.remove()
logger.add(sys.stderr, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>", level="INFO")
class FastSatelliteScraper:
def __init__(self) -> None:
"""Initialize the scraper with session and configuration."""
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
self.results: List[Dict[str, Any]] = []
def load_targets(self) -> List[Tuple]:
"""Load satellite targets from the SQLite database."""
conn = sqlite3.connect(settings.DB_PATH)
cursor = conn.cursor()
try:
satellites = cursor.execute("SELECT id, country_name, category_name, operator, satellite_name, url FROM satellites").fetchall()
return satellites
finally:
conn.close()
def clean_text(self, text: Optional[str]) -> str:
"""Clean whitespace and non-breaking spaces from text."""
if text:
return text.strip().replace(u'\xa0', u' ')
return ""
def parse_specifications(self, soup: BeautifulSoup) -> Dict[str, str]:
"""Parse the specifications table."""
specs = {}
table = soup.find("table", id="satdata")
if not table:
return specs
for row in table.find_all("tr"):
th = row.find("th")
td = row.find("td")
if th and td:
key = self.clean_text(th.get_text())
val = self.clean_text(td.get_text())
if key.endswith(":"):
key = key[:-1]
specs[key] = val
return specs
def parse_launches(self, soup: BeautifulSoup) -> List[Dict[str, Any]]:
"""Parse the launch history table."""
launches = []
table = soup.find("table", id="satlist")
if not table:
return launches
# Get headers
headers = []
header_row = table.find("tr")
if header_row:
headers = [self.clean_text(th.get_text()) for th in header_row.find_all(["th", "td"])]
# Process rows
rows = table.find_all("tr")[1:] # Skip header
for row in rows:
cols = row.find_all("td")
if not cols:
continue
row_data = [self.clean_text(col.get_text()) for col in cols]
if len(row_data) == len(headers):
launch_item = dict(zip(headers, row_data))
else:
launch_item = {"data": row_data}
launches.append(launch_item)
return launches
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.RequestException, TimeoutError))
)
def fetch_details(self, sat_tuple: Tuple) -> Optional[Dict[str, Any]]:
"""
Fetch details for a single satellite.
Retries on network errors up to 3 times.
"""
sat_id, country, category, operator, name, url = sat_tuple
try:
resp = self.session.get(url, timeout=settings.REQUEST_TIMEOUT)
if resp.status_code != 200:
logger.warning(f"Failed to fetch {name}: {resp.status_code}")
return None
soup = BeautifulSoup(resp.text, "lxml")
# 1. Description
desc_div = soup.find("div", id="satdescription")
description = ""
if desc_div:
description = "\n".join([self.clean_text(p.get_text()) for p in desc_div.find_all("p")])
# 2. Specifications
specs = self.parse_specifications(soup)
# 3. Launches
launches = self.parse_launches(soup)
# 4. Images
images = []
if desc_div:
img_tags = desc_div.find_all("img")
for img in img_tags:
src = img.get("src")
if src:
images.append(urljoin(url, src))
return {
"id": sat_id,
"name": name,
"country": country,
"category": category,
"operator": operator,
"url": url,
"description": description,
"specifications": specs,
"launch_history": launches,
"images": images
}
except Exception as e:
# Logger context is useful here, retry decorator handles the retry logic
logger.debug(f"Error processing {name} (attempting retry if network error): {e}")
raise e # Re-raise to trigger tenacity retry
def run(self) -> None:
"""Run the full scraping pipeline."""
satellites = self.load_targets()
logger.info(f"Loaded {len(satellites)} satellites from DB.")
start_time = time.time()
with ThreadPoolExecutor(max_workers=settings.MAX_WORKERS) as executor:
# We wrap the call to handle non-retryable exceptions gracefully inside the loop
futures = {executor.submit(self._safe_fetch, sat): sat for sat in satellites}
completed = 0
for future in as_completed(futures):
result = future.result()
if result:
self.results.append(result)
completed += 1
if completed % 50 == 0:
logger.info(f"Progress: {completed}/{len(satellites)}")
duration = time.time() - start_time
logger.info(f"Scraping completed in {duration:.2f} seconds.")
self.save_results()
def _safe_fetch(self, sat: Tuple) -> Optional[Dict[str, Any]]:
"""Wrapper to catch exceptions that exhaust retries."""
try:
return self.fetch_details(sat)
except Exception as e:
logger.error(f"Failed to process satellite {sat[4]} after retries: {e}")
return None
def save_results(self) -> None:
"""Save results to JSON file."""
logger.info(f"Saving {len(self.results)} records to {settings.OUTPUT_FILE}...")
# Ensure directory exists
settings.OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(settings.OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
logger.info("Done.")
if __name__ == "__main__":
if not settings.DB_PATH.exists():
logger.error(f"Database not found at {settings.DB_PATH}. Please run the initial scraper first.")
sys.exit(1)
scraper = FastSatelliteScraper()
scraper.run()