| """Target scope manager for Bug Bounty programs. |
| |
| Tracks in-scope and out-of-scope assets, syncs with Bugcrowd, |
| and enforces authorization boundaries for recon tools. |
| """ |
|
|
| import sqlite3 |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Set |
|
|
|
|
| DEFAULT_DB_PATH = ( |
| Path(__file__).resolve().parent.parent.parent / "data" / "bugbot_targets.db" |
| ) |
|
|
|
|
| @dataclass |
| class Target: |
| id: int = 0 |
| program_id: str = "" |
| program_name: str = "" |
| asset: str = "" |
| asset_type: str = ( |
| "domain" |
| ) |
| scope: str = "in" |
| priority: str = "normal" |
| notes: str = "" |
| created_at: str = field( |
| default_factory=lambda: datetime.now().isoformat(timespec="seconds") |
| ) |
|
|
|
|
| class TargetManager: |
| def __init__(self, db_path: Optional[Path] = None): |
| self.db_path = db_path or DEFAULT_DB_PATH |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) |
| self._init_db() |
|
|
| def _conn(self) -> sqlite3.Connection: |
| conn = sqlite3.connect(str(self.db_path)) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
| def _init_db(self): |
| with self._conn() as conn: |
| conn.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS targets ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| program_id TEXT, |
| program_name TEXT, |
| asset TEXT, |
| asset_type TEXT, |
| scope TEXT, |
| priority TEXT, |
| notes TEXT, |
| created_at TEXT |
| ) |
| """ |
| ) |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_asset ON targets(asset)") |
| conn.execute( |
| "CREATE INDEX IF NOT EXISTS idx_program ON targets(program_id)" |
| ) |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_scope ON targets(scope)") |
| conn.commit() |
|
|
| def add(self, target: Target) -> int: |
| with self._conn() as conn: |
| cur = conn.execute( |
| """ |
| INSERT INTO targets (program_id, program_name, asset, asset_type, scope, priority, notes, created_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) |
| """, |
| ( |
| target.program_id, |
| target.program_name, |
| target.asset.lower().strip(), |
| target.asset_type, |
| target.scope, |
| target.priority, |
| target.notes, |
| target.created_at, |
| ), |
| ) |
| conn.commit() |
| return cur.lastrowid |
|
|
| def bulk_add( |
| self, |
| program_id: str, |
| program_name: str, |
| assets: List[Dict[str, Any]], |
| scope: str = "in", |
| ): |
| """Add multiple assets from a Bugcrowd scope sync.""" |
| for asset in assets: |
| self.add( |
| Target( |
| program_id=program_id, |
| program_name=program_name, |
| asset=asset.get("uri") or asset.get("name", ""), |
| asset_type=self._guess_type( |
| asset.get("uri") or asset.get("name", "") |
| ), |
| scope=scope, |
| priority=asset.get("priority", "normal"), |
| ) |
| ) |
|
|
| def is_authorized(self, asset: str) -> bool: |
| """Check if an asset is explicitly in-scope.""" |
| hostname = ( |
| asset.lower() |
| .strip() |
| .replace("https://", "") |
| .replace("http://", "") |
| .split("/")[0] |
| ) |
| with self._conn() as conn: |
| |
| row = conn.execute( |
| "SELECT scope FROM targets WHERE asset = ?", (hostname,) |
| ).fetchone() |
| if row: |
| return row["scope"] == "in" |
| |
| rows = conn.execute( |
| "SELECT asset, scope FROM targets WHERE asset LIKE '*.%'" |
| ).fetchall() |
| for r in rows: |
| wildcard = r["asset"].replace("*.", "") |
| if hostname.endswith(wildcard): |
| return r["scope"] == "in" |
| return False |
|
|
| def is_out_of_scope(self, asset: str) -> bool: |
| hostname = ( |
| asset.lower() |
| .strip() |
| .replace("https://", "") |
| .replace("http://", "") |
| .split("/")[0] |
| ) |
| with self._conn() as conn: |
| row = conn.execute( |
| "SELECT scope FROM targets WHERE asset = ? AND scope = 'out'", |
| (hostname,), |
| ).fetchone() |
| if row: |
| return True |
| rows = conn.execute( |
| "SELECT asset FROM targets WHERE asset LIKE '*.%' AND scope = 'out'" |
| ).fetchall() |
| for r in rows: |
| wildcard = r["asset"].replace("*.", "") |
| if hostname.endswith(wildcard): |
| return True |
| return False |
|
|
| def list_targets( |
| self, |
| program_id: Optional[str] = None, |
| scope: Optional[str] = None, |
| asset_type: Optional[str] = None, |
| ) -> List[Target]: |
| query = "SELECT * FROM targets WHERE 1=1" |
| params: List[Any] = [] |
| if program_id: |
| query += " AND program_id = ?" |
| params.append(program_id) |
| if scope: |
| query += " AND scope = ?" |
| params.append(scope) |
| if asset_type: |
| query += " AND asset_type = ?" |
| params.append(asset_type) |
| query += " ORDER BY priority DESC, created_at DESC" |
| with self._conn() as conn: |
| rows = conn.execute(query, params).fetchall() |
| return [Target(**{k: r[k] for k in r.keys()}) for r in rows] |
|
|
| def get_authorized_targets(self) -> Set[str]: |
| """Return all in-scope assets for quick set checks.""" |
| with self._conn() as conn: |
| rows = conn.execute( |
| "SELECT asset FROM targets WHERE scope = 'in'" |
| ).fetchall() |
| return {r["asset"] for r in rows} |
|
|
| def delete_program(self, program_id: str) -> int: |
| with self._conn() as conn: |
| cur = conn.execute( |
| "DELETE FROM targets WHERE program_id = ?", (program_id,) |
| ) |
| conn.commit() |
| return cur.rowcount |
|
|
| def count( |
| self, program_id: Optional[str] = None, scope: Optional[str] = None |
| ) -> int: |
| query = "SELECT COUNT(*) FROM targets WHERE 1=1" |
| params: List[Any] = [] |
| if program_id: |
| query += " AND program_id = ?" |
| params.append(program_id) |
| if scope: |
| query += " AND scope = ?" |
| params.append(scope) |
| with self._conn() as conn: |
| return conn.execute(query, params).fetchone()[0] |
|
|
| def _guess_type(self, asset: str) -> str: |
| a = asset.lower().strip() |
| if "/" in a and (a.startswith("http://") or a.startswith("https://")): |
| return "url" |
| if "/" in a: |
| return "cidr" |
| if a.startswith("*."): |
| return "wildcard" |
| try: |
| parts = a.split(".") |
| if len(parts) == 4 and all( |
| p.isdigit() and 0 <= int(p) <= 255 for p in parts |
| ): |
| return "ip" |
| except Exception: |
| pass |
| return "domain" |
|
|