| import sqlite3 |
| import requests |
| from bs4 import BeautifulSoup |
| import json |
| import os |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from urllib.parse import urljoin |
| from loguru import logger |
| import sys |
| import time |
|
|
| |
| DB_PATH = "data/satellites.db" |
| OUTPUT_FILE = "data/satellites_detailed.json" |
| MAX_WORKERS = 10 |
|
|
| |
| logger.remove() |
| logger.add(sys.stderr, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>", level="INFO") |
|
|
| class FastSatelliteScraper: |
| def __init__(self, db_path, output_file): |
| self.db_path = db_path |
| self.output_file = output_file |
| self.session = requests.Session() |
| self.session.headers.update({ |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| }) |
| self.results = [] |
|
|
| def load_targets(self): |
| conn = sqlite3.connect(self.db_path) |
| cursor = conn.cursor() |
| |
| |
| satellites = cursor.execute("SELECT id, country_name, category_name, operator, satellite_name, url FROM satellites").fetchall() |
| conn.close() |
| return satellites |
|
|
| def clean_text(self, text): |
| if text: |
| return text.strip().replace(u'\xa0', u' ') |
| return "" |
|
|
| def parse_specifications(self, soup): |
| specs = {} |
| table = soup.find("table", id="satdata") |
| if not table: |
| return specs |
| |
| for row in table.find_all("tr"): |
| th = row.find("th") |
| td = row.find("td") |
| if th and td: |
| key = self.clean_text(th.get_text()) |
| val = self.clean_text(td.get_text()) |
| |
| if key.endswith(":"): |
| key = key[:-1] |
| specs[key] = val |
| return specs |
|
|
| def parse_launches(self, soup): |
| launches = [] |
| table = soup.find("table", id="satlist") |
| if not table: |
| return launches |
| |
| |
| headers = [] |
| header_row = table.find("tr") |
| if header_row: |
| headers = [self.clean_text(th.get_text()) for th in header_row.find_all(["th", "td"])] |
| |
| |
| rows = table.find_all("tr")[1:] |
| for row in rows: |
| cols = row.find_all("td") |
| if not cols: |
| continue |
| |
| |
| |
| row_data = [self.clean_text(col.get_text()) for col in cols] |
| |
| |
| if len(row_data) == len(headers): |
| launch_item = dict(zip(headers, row_data)) |
| else: |
| launch_item = {"data": row_data} |
| |
| launches.append(launch_item) |
| |
| return launches |
|
|
| def fetch_details(self, sat_tuple): |
| sat_id, country, category, operator, name, url = sat_tuple |
| |
| try: |
| resp = self.session.get(url, timeout=10) |
| if resp.status_code != 200: |
| logger.warning(f"Failed to fetch {name}: {resp.status_code}") |
| return None |
|
|
| soup = BeautifulSoup(resp.text, "lxml") |
| |
| |
| desc_div = soup.find("div", id="satdescription") |
| description = "" |
| if desc_div: |
| |
| description = "\n".join([self.clean_text(p.get_text()) for p in desc_div.find_all("p")]) |
| |
| |
| specs = self.parse_specifications(soup) |
| |
| |
| launches = self.parse_launches(soup) |
| |
| |
| images = [] |
| if desc_div: |
| img_tags = desc_div.find_all("img") |
| for img in img_tags: |
| src = img.get("src") |
| if src: |
| images.append(urljoin(url, src)) |
|
|
| return { |
| "id": sat_id, |
| "name": name, |
| "country": country, |
| "category": category, |
| "operator": operator, |
| "url": url, |
| "description": description, |
| "specifications": specs, |
| "launch_history": launches, |
| "images": images |
| } |
| |
| except Exception as e: |
| logger.error(f"Error processing {name}: {e}") |
| return None |
|
|
| def run(self): |
| satellites = self.load_targets() |
| logger.info(f"loaded {len(satellites)} satellites from DB.") |
| |
| start_time = time.time() |
| |
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: |
| futures = [executor.submit(self.fetch_details, sat) for sat in satellites] |
| |
| completed = 0 |
| for future in as_completed(futures): |
| result = future.result() |
| if result: |
| self.results.append(result) |
| |
| completed += 1 |
| if completed % 50 == 0: |
| logger.info(f"Progress: {completed}/{len(satellites)}") |
| |
| duration = time.time() - start_time |
| logger.info(f"Scraping completed in {duration:.2f} seconds.") |
| |
| |
| logger.info(f"Saving {len(self.results)} records to {self.output_file}...") |
| with open(self.output_file, "w", encoding="utf-8") as f: |
| json.dump(self.results, f, indent=2, ensure_ascii=False) |
| logger.info("Done.") |
|
|
| if __name__ == "__main__": |
| scraper = FastSatelliteScraper(DB_PATH, OUTPUT_FILE) |
| scraper.run() |
|
|