phi-drift / core /plugins /target_manager.py
crexs's picture
Upload folder using huggingface_hub
914e970 verified
Raw
History Blame Contribute Delete
7.7 kB
"""Target scope manager for Bug Bounty programs.
Tracks in-scope and out-of-scope assets, syncs with Bugcrowd,
and enforces authorization boundaries for recon tools.
"""
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
DEFAULT_DB_PATH = (
Path(__file__).resolve().parent.parent.parent / "data" / "bugbot_targets.db"
)
@dataclass
class Target:
id: int = 0
program_id: str = ""
program_name: str = ""
asset: str = "" # domain, URL, IP, CIDR, wildcard
asset_type: str = (
"domain" # domain | url | ip | cidr | wildcard | api | mobile | other
)
scope: str = "in" # in | out | pending
priority: str = "normal" # normal | high
notes: str = ""
created_at: str = field(
default_factory=lambda: datetime.now().isoformat(timespec="seconds")
)
class TargetManager:
def __init__(self, db_path: Optional[Path] = None):
self.db_path = db_path or DEFAULT_DB_PATH
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def _init_db(self):
with self._conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS targets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
program_id TEXT,
program_name TEXT,
asset TEXT,
asset_type TEXT,
scope TEXT,
priority TEXT,
notes TEXT,
created_at TEXT
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_asset ON targets(asset)")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_program ON targets(program_id)"
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_scope ON targets(scope)")
conn.commit()
def add(self, target: Target) -> int:
with self._conn() as conn:
cur = conn.execute(
"""
INSERT INTO targets (program_id, program_name, asset, asset_type, scope, priority, notes, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
target.program_id,
target.program_name,
target.asset.lower().strip(),
target.asset_type,
target.scope,
target.priority,
target.notes,
target.created_at,
),
)
conn.commit()
return cur.lastrowid
def bulk_add(
self,
program_id: str,
program_name: str,
assets: List[Dict[str, Any]],
scope: str = "in",
):
"""Add multiple assets from a Bugcrowd scope sync."""
for asset in assets:
self.add(
Target(
program_id=program_id,
program_name=program_name,
asset=asset.get("uri") or asset.get("name", ""),
asset_type=self._guess_type(
asset.get("uri") or asset.get("name", "")
),
scope=scope,
priority=asset.get("priority", "normal"),
)
)
def is_authorized(self, asset: str) -> bool:
"""Check if an asset is explicitly in-scope."""
hostname = (
asset.lower()
.strip()
.replace("https://", "")
.replace("http://", "")
.split("/")[0]
)
with self._conn() as conn:
# Exact match
row = conn.execute(
"SELECT scope FROM targets WHERE asset = ?", (hostname,)
).fetchone()
if row:
return row["scope"] == "in"
# Wildcard match
rows = conn.execute(
"SELECT asset, scope FROM targets WHERE asset LIKE '*.%'"
).fetchall()
for r in rows:
wildcard = r["asset"].replace("*.", "")
if hostname.endswith(wildcard):
return r["scope"] == "in"
return False
def is_out_of_scope(self, asset: str) -> bool:
hostname = (
asset.lower()
.strip()
.replace("https://", "")
.replace("http://", "")
.split("/")[0]
)
with self._conn() as conn:
row = conn.execute(
"SELECT scope FROM targets WHERE asset = ? AND scope = 'out'",
(hostname,),
).fetchone()
if row:
return True
rows = conn.execute(
"SELECT asset FROM targets WHERE asset LIKE '*.%' AND scope = 'out'"
).fetchall()
for r in rows:
wildcard = r["asset"].replace("*.", "")
if hostname.endswith(wildcard):
return True
return False
def list_targets(
self,
program_id: Optional[str] = None,
scope: Optional[str] = None,
asset_type: Optional[str] = None,
) -> List[Target]:
query = "SELECT * FROM targets WHERE 1=1"
params: List[Any] = []
if program_id:
query += " AND program_id = ?"
params.append(program_id)
if scope:
query += " AND scope = ?"
params.append(scope)
if asset_type:
query += " AND asset_type = ?"
params.append(asset_type)
query += " ORDER BY priority DESC, created_at DESC"
with self._conn() as conn:
rows = conn.execute(query, params).fetchall()
return [Target(**{k: r[k] for k in r.keys()}) for r in rows]
def get_authorized_targets(self) -> Set[str]:
"""Return all in-scope assets for quick set checks."""
with self._conn() as conn:
rows = conn.execute(
"SELECT asset FROM targets WHERE scope = 'in'"
).fetchall()
return {r["asset"] for r in rows}
def delete_program(self, program_id: str) -> int:
with self._conn() as conn:
cur = conn.execute(
"DELETE FROM targets WHERE program_id = ?", (program_id,)
)
conn.commit()
return cur.rowcount
def count(
self, program_id: Optional[str] = None, scope: Optional[str] = None
) -> int:
query = "SELECT COUNT(*) FROM targets WHERE 1=1"
params: List[Any] = []
if program_id:
query += " AND program_id = ?"
params.append(program_id)
if scope:
query += " AND scope = ?"
params.append(scope)
with self._conn() as conn:
return conn.execute(query, params).fetchone()[0]
def _guess_type(self, asset: str) -> str:
a = asset.lower().strip()
if "/" in a and (a.startswith("http://") or a.startswith("https://")):
return "url"
if "/" in a:
return "cidr"
if a.startswith("*."):
return "wildcard"
try:
parts = a.split(".")
if len(parts) == 4 and all(
p.isdigit() and 0 <= int(p) <= 255 for p in parts
):
return "ip"
except Exception:
pass
return "domain"