""" Download HTML Content from Verified Online Phishing URLs This script downloads HTML content from phishing URLs that are verified and online. Saves HTML files for later feature extraction. """ import pandas as pd import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm import time import hashlib import logging from datetime import datetime from bs4 import BeautifulSoup import re import urllib3 import random from collections import defaultdict from threading import Lock import json # Disable SSL warnings (expected when downloading phishing sites with invalid certificates) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger("html_downloader") class HTMLDownloader: """Optimized HTML downloader with retry, checkpointing, and rate limiting.""" def __init__(self, output_dir='data/html', max_workers=20, timeout=8, checkpoint_interval=100): """ Initialize optimized HTML downloader. Args: output_dir: Base directory to save HTML files max_workers: Number of parallel download threads (increased to 20) timeout: Request timeout in seconds (reduced to 8s for faster failure) checkpoint_interval: Save progress every N URLs """ self.output_dir = Path(output_dir) self.legit_dir = self.output_dir / 'legitimate' self.phishing_dir = self.output_dir / 'phishing' self.legit_dir.mkdir(parents=True, exist_ok=True) self.phishing_dir.mkdir(parents=True, exist_ok=True) self.max_workers = max_workers self.timeout = timeout self.checkpoint_interval = checkpoint_interval # Stats self.stats = { 'total': 0, 'success': 0, 'failed': 0, 'timeout': 0, 'error': 0, 'retried': 0, 'http_fallback': 0 } # User agents rotation (avoid blocks) self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/120.0.0.0', ] # Domain rate limiting (delay per domain) self.domain_last_access = defaultdict(float) self.domain_lock = Lock() self.min_domain_delay = 0.5 # 500ms between requests to same domain # Session pool for connection reuse self.sessions = [] for _ in range(max_workers): session = self._create_session() self.sessions.append(session) # Checkpoint file self.checkpoint_file = self.output_dir / 'download_checkpoint.json' self.completed_urls = self._load_checkpoint() def _create_session(self): """Create optimized requests session with retry and compression.""" session = requests.Session() # Retry strategy: 3 retries with exponential backoff retry_strategy = Retry( total=3, backoff_factor=0.5, # 0.5s, 1s, 2s status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "HEAD"] ) adapter = HTTPAdapter( max_retries=retry_strategy, pool_connections=100, pool_maxsize=100, pool_block=False ) session.mount("http://", adapter) session.mount("https://", adapter) # Enable compression session.headers.update({ 'Accept-Encoding': 'gzip, deflate, br', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Connection': 'keep-alive', }) return session def _get_random_user_agent(self): """Get random user agent to avoid detection.""" return random.choice(self.user_agents) def _load_checkpoint(self): """Load checkpoint of already downloaded URLs.""" if self.checkpoint_file.exists(): try: with open(self.checkpoint_file, 'r') as f: data = json.load(f) completed = set(data.get('completed_urls', [])) logger.info(f"Loaded checkpoint: {len(completed):,} URLs already downloaded") return completed except Exception as e: logger.warning(f"Failed to load checkpoint: {e}") return set() def _save_checkpoint(self, results): """Save checkpoint of completed URLs.""" try: completed = [r['url'] for r in results if r['status'] == 'success'] self.completed_urls.update(completed) with open(self.checkpoint_file, 'w') as f: json.dump({ 'completed_urls': list(self.completed_urls), 'timestamp': datetime.now().isoformat(), 'total_completed': len(self.completed_urls) }, f) except Exception as e: logger.warning(f"Failed to save checkpoint: {e}") def _rate_limit_domain(self, url): """Apply per-domain rate limiting.""" try: from urllib.parse import urlparse domain = urlparse(url).netloc with self.domain_lock: last_access = self.domain_last_access[domain] now = time.time() time_since_last = now - last_access if time_since_last < self.min_domain_delay: sleep_time = self.min_domain_delay - time_since_last time.sleep(sleep_time) self.domain_last_access[domain] = time.time() except: pass # If rate limiting fails, continue anyway def _url_to_filename(self, url): """Convert URL to safe filename using hash.""" url_hash = hashlib.md5(url.encode()).hexdigest() return f"{url_hash}.html" def _optimize_html(self, html_content): """ Aggressively optimize HTML for feature extraction. Removes unnecessary content while preserving structure: - Comments, excessive whitespace - Inline styles (keeps style tags for counting) - Large script/style content (keeps tags for counting) - Base64 embedded images (huge size, not needed for features) Args: html_content: Raw HTML content Returns: Optimized HTML string (typically 60-80% smaller) """ try: # Quick regex cleanup before parsing (faster than BeautifulSoup for some tasks) # Remove HTML comments html_content = re.sub(r'', '', html_content, flags=re.DOTALL) # Remove base64 embedded images (can be huge, not needed for features) html_content = re.sub(r'data:image/[^;]+;base64,[A-Za-z0-9+/=]+', 'data:image', html_content) # Parse HTML (use lxml parser if available, it's faster) try: soup = BeautifulSoup(html_content, 'lxml') except: soup = BeautifulSoup(html_content, 'html.parser') # Remove inline styles (but keep style tags for counting) for tag in soup.find_all(style=True): del tag['style'] # Truncate large script/style content (keep tags for counting, trim content) for script in soup.find_all('script'): if script.string and len(script.string) > 500: script.string = script.string[:500] + '...' for style in soup.find_all('style'): if style.string and len(style.string) > 500: style.string = style.string[:500] + '...' # Normalize whitespace in text nodes for text in soup.find_all(string=True): if text.parent.name not in ['script', 'style']: # type: ignore normalized = re.sub(r'\s+', ' ', str(text).strip()) if normalized: text.replace_with(normalized) # Convert back to string optimized = str(soup) # Final cleanup: remove excessive blank lines optimized = re.sub(r'\n\s*\n+', '\n', optimized) return optimized except Exception as e: logger.warning(f"HTML optimization failed: {e}, returning original") # Fallback: at least remove comments and excessive whitespace html_content = re.sub(r'', '', html_content, flags=re.DOTALL) html_content = re.sub(r'\n\s*\n+', '\n', html_content) return html_content def download_single_url(self, url, label, url_id=None, session=None): """ Download HTML with retry logic and HTTP fallback. Args: url: URL to download label: Label (0=legitimate, 1=phishing) url_id: Optional ID from dataset session: Requests session (for connection pooling) Returns: Dictionary with download result """ result = { 'url': url, 'label': label, 'url_id': url_id, 'status': 'failed', 'error': None, 'filename': None, 'size': 0, 'original_size': 0 } # Skip if already downloaded if url in self.completed_urls: result['status'] = 'skipped' result['error'] = 'Already downloaded' return result # Apply rate limiting self._rate_limit_domain(url) # Use provided session or create temporary one if session is None: session = self._create_session() # Add scheme if missing (default HTTPS) original_url = url if not url.startswith(('http://', 'https://')): url = 'https://' + url attempts = [url] # If HTTPS, also try HTTP as fallback if url.startswith('https://'): http_url = url.replace('https://', 'http://', 1) attempts.append(http_url) # Try each URL variant for attempt_num, attempt_url in enumerate(attempts): try: # Random user agent for each attempt headers = {'User-Agent': self._get_random_user_agent()} # Download with timeout and retries (handled by session) response = session.get( attempt_url, headers=headers, timeout=(3, self.timeout), # (connect timeout, read timeout) allow_redirects=True, verify=False, # Phishing sites often have invalid SSL stream=False # We need full content ) # Check if successful if response.status_code == 200: # Check content type (skip if not HTML) content_type = response.headers.get('Content-Type', '') if 'text/html' not in content_type.lower() and 'application/xhtml' not in content_type.lower(): result['status'] = 'failed' result['error'] = f'Non-HTML content: {content_type}' continue # Get HTML content html_content = response.text result['original_size'] = len(html_content) # Skip if too small (likely error page) if len(html_content) < 200: result['status'] = 'failed' result['error'] = 'HTML too small (< 200 bytes)' continue # Optimize HTML for feature extraction optimized_html = self._optimize_html(html_content) # Save to appropriate directory filename = self._url_to_filename(original_url) target_dir = self.legit_dir if label == 0 else self.phishing_dir filepath = target_dir / filename with open(filepath, 'w', encoding='utf-8', errors='ignore') as f: f.write(optimized_html) result['status'] = 'success' result['filename'] = filename result['size'] = len(optimized_html) result['target_dir'] = str(target_dir.name) result['compression_ratio'] = f"{(1 - len(optimized_html) / max(result['original_size'], 1)) * 100:.1f}%" if attempt_num > 0: result['http_fallback'] = True self.stats['http_fallback'] += 1 self.stats['success'] += 1 return result # Success! else: result['error'] = f"HTTP {response.status_code}" if attempt_num == len(attempts) - 1: # Last attempt result['status'] = 'failed' self.stats['failed'] += 1 except requests.Timeout: result['error'] = 'Timeout' if attempt_num == len(attempts) - 1: result['status'] = 'timeout' self.stats['timeout'] += 1 except requests.RequestException as e: result['error'] = f"{type(e).__name__}: {str(e)[:80]}" if attempt_num == len(attempts) - 1: result['status'] = 'error' self.stats['error'] += 1 except Exception as e: result['error'] = f"Unknown: {str(e)[:80]}" if attempt_num == len(attempts) - 1: result['status'] = 'error' self.stats['error'] += 1 return result def download_batch(self, urls_df, label_column='label', id_column=None, resume=True): """ Download HTML with checkpointing and session pooling. Args: urls_df: DataFrame with URLs label_column: Column name for labels id_column: Optional column name for IDs resume: Resume from checkpoint if available Returns: DataFrame with download results """ self.stats['total'] = len(urls_df) # Filter already downloaded URLs if resuming if resume and self.completed_urls: url_column = 'url' if 'url' in urls_df.columns else 'URL' urls_df = urls_df[~urls_df[url_column].isin(self.completed_urls)].copy() skipped = self.stats['total'] - len(urls_df) if skipped > 0: logger.info(f"Resuming: {skipped:,} URLs already downloaded, {len(urls_df):,} remaining") logger.info(f"Starting optimized download of {len(urls_df):,} URLs...") logger.info(f"Workers: {self.max_workers} | Timeout: {self.timeout}s | Checkpoint: every {self.checkpoint_interval} URLs") logger.info(f"Output: {self.output_dir.absolute()}") logger.info(f"Features: Session pooling, retry logic, HTTP fallback, rate limiting, compression") results = [] session_idx = 0 # Prepare tasks tasks = [] for idx, row in urls_df.iterrows(): url = row['url'] if 'url' in row else row['URL'] label = row[label_column] if label_column in row else 1 url_id = row[id_column] if id_column and id_column in row else idx tasks.append((url, label, url_id)) # Download in parallel with progress bar and checkpointing with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit tasks with session pooling future_to_task = {} for url, label, url_id in tasks: # Round-robin session assignment session = self.sessions[session_idx % len(self.sessions)] session_idx += 1 future = executor.submit(self.download_single_url, url, label, url_id, session) future_to_task[future] = (url, label, url_id) # Process completed tasks with progress bar with tqdm(total=len(tasks), desc="Downloading", unit="url") as pbar: checkpoint_counter = 0 for future in as_completed(future_to_task): result = future.result() results.append(result) pbar.update(1) checkpoint_counter += 1 # Save checkpoint periodically if checkpoint_counter >= self.checkpoint_interval: self._save_checkpoint(results) checkpoint_counter = 0 # Update progress bar with detailed stats pbar.set_postfix({ 'OK': self.stats['success'], 'Fail': self.stats['failed'], 'Timeout': self.stats['timeout'], 'HTTP↓': self.stats['http_fallback'] }) # Final checkpoint save self._save_checkpoint(results) # Create results DataFrame results_df = pd.DataFrame(results) # Print summary self._print_summary(results_df) return results_df def _print_summary(self, results_df): """Print detailed download summary with optimization metrics.""" logger.info("\n" + "="*80) logger.info("DOWNLOAD SUMMARY") logger.info("="*80) total = self.stats['total'] success = self.stats['success'] logger.info(f"\nTotal URLs processed: {total:,}") logger.info(f" ✓ Successful: {success:,} ({success/max(total,1)*100:.1f}%)") logger.info(f" ✗ Failed: {self.stats['failed']:,}") logger.info(f" ⏱ Timeout: {self.stats['timeout']:,}") logger.info(f" ⚠ Error: {self.stats['error']:,}") logger.info(f" ↓ HTTP Fallback: {self.stats['http_fallback']:,}") # Detailed stats if we have results if not results_df.empty and 'status' in results_df.columns: # Success by label if 'label' in results_df.columns: success_by_label = results_df[results_df['status'] == 'success'].groupby('label').size() if not success_by_label.empty: logger.info(f"\nSuccessful downloads by type:") for label, count in success_by_label.items(): label_name = 'Phishing' if label == 1 else 'Legitimate' logger.info(f" {label_name}: {count:,}") # Size statistics successful = results_df[results_df['status'] == 'success'] if not successful.empty and 'size' in successful.columns: total_optimized = successful['size'].sum() total_original = successful.get('original_size', successful['size']).sum() logger.info(f"\nStorage statistics:") logger.info(f" Original size: {total_original/1024/1024:.2f} MB") logger.info(f" Optimized size: {total_optimized/1024/1024:.2f} MB") if total_original > 0: saved = (1 - total_optimized / total_original) * 100 logger.info(f" Space saved: {saved:.1f}%") # Error breakdown failed = results_df[results_df['status'] != 'success'] if not failed.empty and 'error' in failed.columns: error_counts = failed['error'].value_counts().head(5) if not error_counts.empty: logger.info(f"\nTop failure reasons:") for error, count in error_counts.items(): logger.info(f" {error}: {count:,}") logger.info("="*80) def main(): """Main function to download HTML from verified online phishing URLs.""" import argparse parser = argparse.ArgumentParser(description='Download HTML content from URLs and organize by label') parser.add_argument('--input', type=str, default='data/processed/clean_dataset.csv', help='Input CSV file with URLs (must have url,label,type columns)') parser.add_argument('--output', type=str, default='data/html', help='Base output directory (will create legitimate/ and phishing/ subdirectories)') parser.add_argument('--workers', type=int, default=20, help='Number of parallel download workers (default: 20)') parser.add_argument('--timeout', type=int, default=8, help='Request timeout in seconds (default: 8s)') parser.add_argument('--checkpoint', type=int, default=100, help='Save progress every N URLs (default: 100)') parser.add_argument('--resume', action='store_true', default=True, help='Resume from checkpoint (default: True)') parser.add_argument('--no-resume', dest='resume', action='store_false', help='Start fresh, ignore checkpoint') parser.add_argument('--limit', type=int, default=None, help='Limit number of URLs to download (for testing)') parser.add_argument('--balance', action='store_true', help='Download equal number of legitimate and phishing URLs') args = parser.parse_args() logger.info("="*80) logger.info("HTML CONTENT DOWNLOADER - Phishing Detection") logger.info("="*80) # Load URLs script_dir = Path(__file__).parent.parent.parent input_path = (script_dir / args.input).resolve() logger.info(f"\nLoading URLs from: {input_path}") df = pd.read_csv(input_path) logger.info(f"Loaded: {len(df):,} URLs") # Show columns logger.info(f"Columns: {list(df.columns)}") # Verify required columns if 'url' not in df.columns and 'URL' not in df.columns: logger.error("No 'url' or 'URL' column found in dataset!") return if 'label' not in df.columns: logger.error("No 'label' column found in dataset!") return # Show label distribution logger.info(f"\nLabel distribution in dataset:") label_counts = df['label'].value_counts() for label, count in label_counts.items(): label_name = 'Legitimate' if label == 0 else 'Phishing' logger.info(f" {label_name} (label={label}): {count:,}") # Balance dataset if requested if args.balance: min_count = label_counts.min() df_balanced = pd.concat([ df[df['label'] == 0].sample(n=min(min_count, len(df[df['label'] == 0])), random_state=42), df[df['label'] == 1].sample(n=min(min_count, len(df[df['label'] == 1])), random_state=42) ]).sample(frac=1, random_state=42).reset_index(drop=True) df = df_balanced logger.info(f"\nBalanced dataset to {min_count:,} samples per class") logger.info(f"Total URLs after balancing: {len(df):,}") # Limit for testing if args.limit: df = df.head(args.limit) logger.info(f"Limited to first {args.limit:,} URLs for testing") # Initialize optimized downloader output_dir = (script_dir / args.output).resolve() downloader = HTMLDownloader( output_dir=output_dir, max_workers=args.workers, timeout=args.timeout, checkpoint_interval=args.checkpoint ) # Download HTML content with checkpointing results_df = downloader.download_batch( df, label_column='label' if 'label' in df.columns else None, # type: ignore id_column='phish_id' if 'phish_id' in df.columns else None, # type: ignore resume=args.resume ) # Save results results_file = output_dir / f'download_results_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv' results_df.to_csv(results_file, index=False) logger.info(f"\n✓ Results saved to: {results_file}") # Save metadata mapping (URL to filename) metadata = results_df[results_df['status'] == 'success'][['url', 'label', 'filename', 'url_id']] metadata_file = output_dir / 'html_metadata.csv' metadata.to_csv(metadata_file, index=False) logger.info(f"✓ Metadata saved to: {metadata_file}") logger.info("\n" + "="*80) logger.info("✓ HTML DOWNLOAD COMPLETE!") logger.info("="*80) logger.info(f"\nFiles saved to:") logger.info(f" Legitimate: {output_dir / 'legitimate'}") logger.info(f" Phishing: {output_dir / 'phishing'}") logger.info(f"\nHTML files have been optimized for feature extraction:") logger.info(f" - Comments removed") logger.info(f" - Whitespace normalized") logger.info(f" - Inline styles removed") logger.info(f" - Structure preserved for feature extraction") logger.info("="*80) if __name__ == "__main__": main()