"""
Combined URL + HTML Feature Extraction from clean_dataset.csv
Reads URLs from clean_dataset.csv, extracts URL features and downloads HTML
to extract HTML features, combines them into a single feature dataset.
Produces a balanced combined_features.csv.
Usage:
python scripts/feature_extraction/extract_combined_features.py
python scripts/feature_extraction/extract_combined_features.py --workers 20 --timeout 15
python scripts/feature_extraction/extract_combined_features.py --limit 1000 --no-balance
"""
import argparse
import logging
import random
import sys
import time
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Lock
import numpy as np
import pandas as pd
import requests
import urllib3
from tqdm import tqdm
# Suppress SSL warnings (phishing sites often have invalid certs)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings('ignore', message='.*Unverified HTTPS.*')
# ---------------------------------------------------------------------------
# Project setup
# ---------------------------------------------------------------------------
PROJECT_ROOT = Path(__file__).resolve().parents[2] # src/
sys.path.insert(0, str(PROJECT_ROOT))
from scripts.feature_extraction.url.url_features_v3 import URLFeatureExtractorOptimized
from scripts.feature_extraction.html.html_feature_extractor import HTMLFeatureExtractor
from scripts.feature_extraction.html.feature_engineering import engineer_features
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S',
)
logger = logging.getLogger('extract_combined')
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
HEADERS = {
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
CHECKPOINT_FILE = PROJECT_ROOT / 'data' / 'features' / '_combined_checkpoint.csv'
# ---------------------------------------------------------------------------
# Feature extraction for a single URL (runs in thread)
# ---------------------------------------------------------------------------
def extract_single(
url: str,
label: int,
url_extractor: URLFeatureExtractorOptimized,
html_extractor: HTMLFeatureExtractor,
timeout: int = 10,
) -> dict | None:
"""
Extract URL + HTML features for a single URL.
Returns:
Combined feature dict with url, label, and all features,
or None on total failure.
"""
result = {'url': url, 'label': label}
# --- 1. URL features (always succeeds) ---
try:
url_feats = url_extractor.extract_features(url)
for k, v in url_feats.items():
result[f'url_{k}'] = v
except Exception as e:
logger.debug(f"URL feature error for {url}: {e}")
return None
# --- 2. Download HTML & extract HTML features ---
html_ok = False
try:
resp = requests.get(
url, timeout=timeout, verify=False, headers=HEADERS,
allow_redirects=True,
)
if resp.status_code == 200 and len(resp.text) > 200:
raw_feats = html_extractor.extract_features(resp.text)
# Apply feature engineering
raw_df = pd.DataFrame([raw_feats])
eng_df = engineer_features(raw_df)
eng_row = eng_df.iloc[0].to_dict()
for k, v in eng_row.items():
result[f'html_{k}'] = v
html_ok = True
except Exception:
pass
if not html_ok:
# Fill HTML features with zeros
dummy_html = html_extractor.extract_features('')
dummy_df = pd.DataFrame([dummy_html])
eng_df = engineer_features(dummy_df)
for k in eng_df.columns:
result[f'html_{k}'] = 0
return result
# ---------------------------------------------------------------------------
# Batch extraction with threading + checkpointing
# ---------------------------------------------------------------------------
def extract_all(
df: pd.DataFrame,
max_workers: int = 10,
timeout: int = 10,
checkpoint_every: int = 500,
) -> pd.DataFrame:
"""
Extract combined features for all URLs using thread pool.
Args:
df: DataFrame with 'url' and 'label' columns.
max_workers: Parallel download threads.
timeout: HTTP timeout per URL (seconds).
checkpoint_every: Save intermediate results every N rows.
Returns:
DataFrame with combined features.
"""
url_extractor = URLFeatureExtractorOptimized()
html_extractor = HTMLFeatureExtractor()
urls = df['url'].tolist()
labels = df['label'].tolist()
total = len(urls)
# --- Load checkpoint if exists ---
done_urls = set()
results = []
if CHECKPOINT_FILE.exists():
ckpt = pd.read_csv(CHECKPOINT_FILE)
done_urls = set(ckpt['url'].tolist())
results = ckpt.to_dict('records')
logger.info(f"Resuming from checkpoint: {len(done_urls):,} URLs already done")
remaining = [(u, l) for u, l in zip(urls, labels) if u not in done_urls]
logger.info(f"Remaining URLs to process: {len(remaining):,} / {total:,}")
if not remaining:
logger.info("All URLs already processed!")
return pd.DataFrame(results)
lock = Lock()
n_success = 0
n_html_fail = 0
n_fail = 0
t_start = time.perf_counter()
def _worker(url_label):
u, l = url_label
return extract_single(u, l, url_extractor, html_extractor, timeout)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(_worker, item): item for item in remaining}
with tqdm(total=len(remaining), desc='Extracting', unit='url') as pbar:
for future in as_completed(futures):
pbar.update(1)
result = future.result()
with lock:
if result is not None:
results.append(result)
n_success += 1
# Check if HTML was zero-filled
if result.get('html_num_tags', 0) == 0:
n_html_fail += 1
else:
n_fail += 1
# Checkpoint
if len(results) % checkpoint_every == 0:
_save_checkpoint(results)
elapsed = time.perf_counter() - t_start
speed = len(remaining) / elapsed if elapsed > 0 else 0
logger.info(f"\nExtraction complete in {elapsed:.1f}s ({speed:.0f} URLs/sec)")
logger.info(f" Successful: {n_success:,}")
logger.info(f" HTML download failed (zero-filled): {n_html_fail:,}")
logger.info(f" Total failures (skipped): {n_fail:,}")
# Final checkpoint
_save_checkpoint(results)
return pd.DataFrame(results)
def _save_checkpoint(results: list):
"""Save intermediate results to checkpoint file."""
CHECKPOINT_FILE.parent.mkdir(parents=True, exist_ok=True)
pd.DataFrame(results).to_csv(CHECKPOINT_FILE, index=False)
# ---------------------------------------------------------------------------
# Balance dataset
# ---------------------------------------------------------------------------
def balance_dataset(df: pd.DataFrame, random_state: int = 42) -> pd.DataFrame:
"""Undersample majority class to balance the dataset."""
counts = df['label'].value_counts()
min_count = counts.min()
logger.info(f"Balancing: {counts.to_dict()} → {min_count:,} per class")
balanced = (
df.groupby('label', group_keys=False)
.apply(lambda g: g.sample(n=min_count, random_state=random_state))
)
balanced = balanced.sample(frac=1, random_state=random_state).reset_index(drop=True)
return balanced
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description='Extract combined URL + HTML features from clean_dataset.csv')
parser.add_argument('--input', type=str,
default='data/processed/clean_dataset.csv',
help='Input CSV with url,label columns')
parser.add_argument('--output', type=str,
default='data/features/combined_features.csv',
help='Output CSV path')
parser.add_argument('--workers', type=int, default=10,
help='Parallel download threads (default: 10)')
parser.add_argument('--timeout', type=int, default=10,
help='HTTP timeout in seconds (default: 10)')
parser.add_argument('--limit', type=int, default=None,
help='Limit total URLs (for testing)')
parser.add_argument('--checkpoint-every', type=int, default=500,
help='Save checkpoint every N URLs (default: 500)')
parser.add_argument('--no-balance', action='store_true',
help='Do not balance the output dataset')
args = parser.parse_args()
input_path = (PROJECT_ROOT / args.input).resolve()
output_path = (PROJECT_ROOT / args.output).resolve()
logger.info("=" * 70)
logger.info("COMBINED URL + HTML FEATURE EXTRACTION")
logger.info("=" * 70)
logger.info(f" Input: {input_path}")
logger.info(f" Output: {output_path}")
logger.info(f" Workers: {args.workers}")
logger.info(f" Timeout: {args.timeout}s")
logger.info(f" Balance: {'YES' if not args.no_balance else 'NO'}")
# --- Load dataset ---
df = pd.read_csv(input_path)
logger.info(f"\nLoaded {len(df):,} URLs")
logger.info(f" Label distribution: {df['label'].value_counts().to_dict()}")
if args.limit:
# Stratified limit
per_class = args.limit // 2
df = (
df.groupby('label', group_keys=False)
.apply(lambda g: g.sample(n=min(per_class, len(g)), random_state=42))
)
df = df.reset_index(drop=True)
logger.info(f" Limited to: {len(df):,} URLs")
# --- Extract features ---
features_df = extract_all(
df,
max_workers=args.workers,
timeout=args.timeout,
checkpoint_every=args.checkpoint_every,
)
if features_df.empty:
logger.error("No features extracted!")
sys.exit(1)
logger.info(f"\nExtracted features: {features_df.shape}")
logger.info(f" Label distribution: {features_df['label'].value_counts().to_dict()}")
# --- Balance ---
if not args.no_balance:
features_df = balance_dataset(features_df)
logger.info(f" After balancing: {features_df.shape}")
logger.info(f" Label dist: {features_df['label'].value_counts().to_dict()}")
# --- Reorder columns: url, label first, then sorted features ---
meta_cols = ['url', 'label']
feature_cols = sorted([c for c in features_df.columns if c not in meta_cols])
features_df = features_df[meta_cols + feature_cols]
# --- Clean up infinities / NaNs ---
features_df = features_df.replace([np.inf, -np.inf], 0)
features_df = features_df.fillna(0)
# --- Save ---
output_path.parent.mkdir(parents=True, exist_ok=True)
features_df.to_csv(output_path, index=False)
# --- Cleanup checkpoint ---
if CHECKPOINT_FILE.exists():
CHECKPOINT_FILE.unlink()
logger.info("Checkpoint file cleaned up")
# --- Summary ---
logger.info("\n" + "=" * 70)
logger.info("EXTRACTION COMPLETE")
logger.info("=" * 70)
logger.info(f" Total samples: {len(features_df):,}")
logger.info(f" Legitimate: {(features_df['label'] == 0).sum():,}")
logger.info(f" Phishing: {(features_df['label'] == 1).sum():,}")
logger.info(f" Total features: {len(feature_cols)}")
url_feats = [c for c in feature_cols if c.startswith('url_')]
html_feats = [c for c in feature_cols if c.startswith('html_')]
logger.info(f" URL features: {len(url_feats)}")
logger.info(f" HTML features: {len(html_feats)}")
logger.info(f" Output: {output_path}")
logger.info("=" * 70)
if __name__ == '__main__':
main()