test / development_logs /legacy_src /fast_scraper.py
Kirtan001's picture
Fresh Start: Clean Repo without binaries
ad06665
import sqlite3
import requests
from bs4 import BeautifulSoup
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
from loguru import logger
import sys
import time
# Configuration
DB_PATH = "data/satellites.db"
OUTPUT_FILE = "data/satellites_detailed.json"
MAX_WORKERS = 10
# Setup logging
logger.remove()
logger.add(sys.stderr, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>", level="INFO")
class FastSatelliteScraper:
def __init__(self, db_path, output_file):
self.db_path = db_path
self.output_file = output_file
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
self.results = []
def load_targets(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Fetching all satellites, not just China if valid, but user said "from that 637 satellites" which are China.
# So we fetch based on what's in the DB.
satellites = cursor.execute("SELECT id, country_name, category_name, operator, satellite_name, url FROM satellites").fetchall()
conn.close()
return satellites
def clean_text(self, text):
if text:
return text.strip().replace(u'\xa0', u' ')
return ""
def parse_specifications(self, soup):
specs = {}
table = soup.find("table", id="satdata")
if not table:
return specs
for row in table.find_all("tr"):
th = row.find("th")
td = row.find("td")
if th and td:
key = self.clean_text(th.get_text())
val = self.clean_text(td.get_text())
# Remove trailing colon
if key.endswith(":"):
key = key[:-1]
specs[key] = val
return specs
def parse_launches(self, soup):
launches = []
table = soup.find("table", id="satlist")
if not table:
return launches
# Get headers
headers = []
header_row = table.find("tr")
if header_row:
headers = [self.clean_text(th.get_text()) for th in header_row.find_all(["th", "td"])]
# Process rows
rows = table.find_all("tr")[1:] # Skip header
for row in rows:
cols = row.find_all("td")
if not cols:
continue
# Simple list of values for now to avoid misalignment issues with complex rowspans
# Ideally we'd map to headers, but keeping it raw is safer for "no data loss"
row_data = [self.clean_text(col.get_text()) for col in cols]
# Try to map to headers if lengths match
if len(row_data) == len(headers):
launch_item = dict(zip(headers, row_data))
else:
launch_item = {"data": row_data}
launches.append(launch_item)
return launches
def fetch_details(self, sat_tuple):
sat_id, country, category, operator, name, url = sat_tuple
try:
resp = self.session.get(url, timeout=10)
if resp.status_code != 200:
logger.warning(f"Failed to fetch {name}: {resp.status_code}")
return None
soup = BeautifulSoup(resp.text, "lxml")
# 1. Description
desc_div = soup.find("div", id="satdescription")
description = ""
if desc_div:
# Get text from paragraphs to be cleaner
description = "\n".join([self.clean_text(p.get_text()) for p in desc_div.find_all("p")])
# 2. Specifications
specs = self.parse_specifications(soup)
# 3. Launches
launches = self.parse_launches(soup)
# 4. Images (Optional but good)
images = []
if desc_div:
img_tags = desc_div.find_all("img")
for img in img_tags:
src = img.get("src")
if src:
images.append(urljoin(url, src))
return {
"id": sat_id,
"name": name,
"country": country,
"category": category,
"operator": operator,
"url": url,
"description": description,
"specifications": specs,
"launch_history": launches,
"images": images
}
except Exception as e:
logger.error(f"Error processing {name}: {e}")
return None
def run(self):
satellites = self.load_targets()
logger.info(f"loaded {len(satellites)} satellites from DB.")
start_time = time.time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(self.fetch_details, sat) for sat in satellites]
completed = 0
for future in as_completed(futures):
result = future.result()
if result:
self.results.append(result)
completed += 1
if completed % 50 == 0:
logger.info(f"Progress: {completed}/{len(satellites)}")
duration = time.time() - start_time
logger.info(f"Scraping completed in {duration:.2f} seconds.")
# Save to JSON
logger.info(f"Saving {len(self.results)} records to {self.output_file}...")
with open(self.output_file, "w", encoding="utf-8") as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
logger.info("Done.")
if __name__ == "__main__":
scraper = FastSatelliteScraper(DB_PATH, OUTPUT_FILE)
scraper.run()