phi-drift / core /bug_bot.py
crexs's picture
sync: update core/bug_bot.py
4c037e7 verified
Raw
History Blame Contribute Delete
19.5 kB
"""DRIFT Bug Bot β€” the beast.
Orchestrates recon, findings tracking, report generation, and
Bugcrowd submission for bug bounty hunting.
Safe by design:
- Rate-limited API calls (1 req/sec default)
- Scanner rate limits (nuclei -rl 10, ffuf -rate 10)
- Scope enforcement before any active recon
- No destructive actions
"""
import json
import subprocess
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from infj_bot.core.config import PROJECT_ROOT
from infj_bot.core.memory import DriftMemory
from infj_bot.core.plugins.bugcrowd_client import BugcrowdClient
from infj_bot.core.plugins.findings_db import Finding, FindingsDB
from infj_bot.core.plugins.report_builder import ReportBuilder
from infj_bot.core.plugins.target_manager import TargetManager
RECON_DIR = Path(PROJECT_ROOT) / "recon"
BUGBOT_LOG = Path(PROJECT_ROOT) / "logs" / "bugbot.log"
class BugBot:
"""
The main Bug Bounty engine.
Usage:
bot = BugBot()
bot.sync_programs() # Pull from Bugcrowd
bot.recon(program_id) # Run scoped recon
bot.add_finding(...) # Log a finding
bot.generate_report(fid) # Draft a report
bot.submit(finding_id) # Submit to Bugcrowd
"""
def __init__(
self,
api_key: Optional[str] = None,
memory: Optional[DriftMemory] = None,
):
self.client = BugcrowdClient(api_key=api_key)
self.findings = FindingsDB()
self.targets = TargetManager()
self.reports = ReportBuilder()
self.memory = memory
RECON_DIR.mkdir(parents=True, exist_ok=True)
BUGBOT_LOG.parent.mkdir(parents=True, exist_ok=True)
# ── Program Sync ──────────────────────────────────────────────
def sync_programs(self) -> str:
"""Fetch Bugcrowd programs and sync scope to local target DB."""
try:
programs = self.client.list_programs()
except Exception as exc:
return f"❌ Failed to fetch programs: {exc}"
if not programs:
return "⚠️ No Bugcrowd programs found. Check your API key."
for prog in programs:
try:
full = self.client.get_program(prog.uuid)
self.targets.delete_program(prog.uuid)
self.targets.bulk_add(prog.uuid, prog.name, full.scope, scope="in")
self.targets.bulk_add(prog.uuid, prog.name, full.oos, scope="out")
self._log(
f"Synced program: {prog.name} ({len(full.scope)} in / {len(full.oos)} out)"
)
except Exception as exc:
self._log(f"⚠️ Failed to sync {prog.name}: {exc}")
return f"βœ… Synced {len(programs)} program(s). Targets: {self.targets.count(scope='in')} in / {self.targets.count(scope='out')} out."
def list_programs(self) -> str:
"""Pretty-print enrolled programs."""
try:
programs = self.client.list_programs()
except Exception as exc:
return f"❌ {exc}"
if not programs:
return "No programs found."
lines = ["🎯 Bugcrowd Programs"]
for p in programs:
lines.append(
f" β€’ {p.name} β€” {p.status} ({p.rewards or 'no rewards info'})"
)
return "\n".join(lines)
# ── Recon ─────────────────────────────────────────────────────
def recon(self, program_id: str, tool: str = "all") -> str:
"""
Run scoped recon for a program.
tool: all | subdomains | nuclei | fuzz
"""
targets = self.targets.list_targets(program_id=program_id, scope="in")
if not targets:
return (
f"❌ No in-scope targets for program {program_id}. Run /bug sync first."
)
domains = [t.asset for t in targets if t.asset_type in ("domain", "wildcard")]
urls = [t.asset for t in targets if t.asset_type == "url"]
results: List[str] = []
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = RECON_DIR / program_id / stamp
out_dir.mkdir(parents=True, exist_ok=True)
# Subdomain enumeration
if tool in ("all", "subdomains") and domains:
results.append(self._run_subfinder(domains, out_dir))
# Nuclei scan
if tool in ("all", "nuclei"):
scan_targets = domains + urls
if scan_targets:
results.append(self._run_nuclei(scan_targets, out_dir))
# Directory fuzzing
if tool in ("all", "fuzz") and (domains or urls):
results.append(self._run_ffuf(domains + urls, out_dir))
summary = "\n".join(results)
self._log(f"Recon {program_id}/{tool}: {summary[:200]}")
return f"πŸ›°οΈ Recon complete for {program_id}\n{summary}\nπŸ“ Artifacts: {out_dir}"
def _run_subfinder(self, domains: List[str], out_dir: Path) -> str:
domain_file = out_dir / "domains.txt"
domain_file.write_text("\n".join(domains))
output = out_dir / "subdomains.txt"
try:
subprocess.run(
["subfinder", "-dL", str(domain_file), "-o", str(output), "-silent"],
capture_output=True,
text=True,
timeout=300,
check=False,
)
count = len(output.read_text().splitlines()) if output.exists() else 0
return f" πŸ” subfinder: {count} subdomains β†’ {output.name}"
except FileNotFoundError:
return " ⚠️ subfinder not installed"
except Exception as exc:
return f" ❌ subfinder error: {exc}"
def _run_nuclei(self, targets: List[str], out_dir: Path) -> str:
target_file = out_dir / "nuclei_targets.txt"
target_file.write_text("\n".join(targets))
output = out_dir / "nuclei.json"
try:
subprocess.run(
[
"nuclei",
"-l",
str(target_file),
"-severity",
"critical,high,medium",
"-rate-limit",
"10",
"-json-export",
str(output),
"-silent",
],
capture_output=True,
text=True,
timeout=600,
check=False,
)
if output.exists():
data = (
json.loads(output.read_text()) if output.stat().st_size > 0 else []
)
# Auto-ingest critical/high findings with deduplication
skipped = 0
seen_this_run: set = set()
for item in data:
sev = item.get("info", {}).get("severity", "").lower()
if sev in ("critical", "high"):
asset = item.get("host", "")
vuln_type = f"Nuclei: {item.get('template-id', 'unknown')}"
dedup_key = (asset, vuln_type)
# DB-level dedup check β€” no arbitrary limit
duplicate = dedup_key in seen_this_run or self.findings.exists_by_asset_and_vuln_type(asset, vuln_type)
if duplicate:
skipped += 1
continue
seen_this_run.add(dedup_key)
self.add_finding(
program_id="auto",
title=item.get("info", {}).get("name", "Nuclei finding"),
vuln_type=vuln_type,
severity="P2" if sev == "critical" else "P3",
asset=asset,
description=json.dumps(item.get("info", {}), indent=2),
confidence="medium",
)
ch_count = len(
[
d
for d in data
if d.get("info", {}).get("severity", "").lower() in ("critical", "high")
]
)
skip_note = f", {skipped} skipped (dedup)" if skipped else ""
return f" 🎯 nuclei: {len(data)} findings β†’ {output.name} ({ch_count} critical/high{skip_note})"
return " 🎯 nuclei: no output"
except FileNotFoundError:
return " ⚠️ nuclei not installed"
except Exception as exc:
return f" ❌ nuclei error: {exc}"
def _run_ffuf(self, targets: List[str], out_dir: Path) -> str:
# Run ffuf on first domain only (safety)
domain = targets[0].replace("https://", "").replace("http://", "").split("/")[0]
output = out_dir / "ffuf.json"
try:
subprocess.run(
[
"ffuf",
"-u",
f"https://{domain}/FUZZ",
"-w",
"/usr/share/wordlists/dirb/common.txt",
"-rate",
"10",
"-o",
str(output),
"-of",
"json",
"-s",
],
capture_output=True,
text=True,
timeout=300,
check=False,
)
if output.exists():
data = (
json.loads(output.read_text()) if output.stat().st_size > 0 else {}
)
results = data.get("results", [])
return f" πŸŒͺ️ ffuf: {len(results)} hits on {domain} β†’ {output.name}"
return " πŸŒͺ️ ffuf: no output"
except FileNotFoundError:
return " ⚠️ ffuf not installed"
except Exception as exc:
return f" ❌ ffuf error: {exc}"
# ── Findings ──────────────────────────────────────────────────
def add_finding(
self,
program_id: str = "",
program_name: str = "",
title: str = "",
vuln_type: str = "",
severity: str = "P5",
cvss: float = 0.0,
asset: str = "",
description: str = "",
reproduction: str = "",
impact: str = "",
fix: str = "",
confidence: str = "low",
notes: str = "",
) -> str:
finding = Finding(
program_id=program_id,
program_name=program_name,
title=title,
vuln_type=vuln_type,
severity=severity,
cvss=cvss,
asset=asset,
description=description,
reproduction=reproduction,
impact=impact,
fix=fix,
confidence=confidence,
notes=notes,
)
fid = self.findings.add(finding)
if self.memory:
self.memory.learn_concept(
f"Finding {fid}: {title}",
f"{description}\n\nAsset: {asset}\nSeverity: {severity}\nConfidence: {confidence}",
tags=["bug", "finding", program_id],
importance=0.9 if severity in ("P1", "P2") else 0.7,
)
return f"πŸ› Finding logged: {fid} β€” {title} ({severity})"
def get_finding(self, fid: str) -> str:
f = self.findings.get(fid)
if not f:
return f"❌ Finding {fid} not found."
ev = self.findings.get_evidence(fid)
lines = [
f"πŸ› Finding {f.id}",
f" Title: {f.title}",
f" Severity: {f.severity} | Confidence: {f.confidence} | Status: {f.status}",
f" Asset: {f.asset or 'N/A'}",
f" Type: {f.vuln_type or 'N/A'}",
f" Description: {f.description[:200]}..."
if len(f.description) > 200
else f" Description: {f.description}",
f" Evidence: {len(ev)} item(s)",
]
return "\n".join(lines)
def list_findings(
self, program_id: Optional[str] = None, status: Optional[str] = None
) -> str:
items = self.findings.list(program_id=program_id, status=status, limit=100)
if not items:
return "No findings."
lines = [f"{'ID':<10} {'Severity':<10} {'Status':<12} {'Title'}"]
lines.append("-" * 60)
for f in items:
title = (f.title[:35] + "...") if len(f.title) > 35 else f.title
lines.append(f"{f.id:<10} {f.severity:<10} {f.status:<12} {title}")
return "\n".join(lines)
def stats(self) -> str:
s = self.findings.stats()
lines = [
"πŸ“Š Bug Bot Stats",
f" Total findings: {s['total']}",
" By severity:",
]
for sev, count in sorted(s.get("by_severity", {}).items()):
lines.append(f" {sev}: {count}")
lines.append(" By status:")
for st, count in sorted(s.get("by_status", {}).items()):
lines.append(f" {st}: {count}")
return "\n".join(lines)
# ── Reports ───────────────────────────────────────────────────
def generate_report(self, finding_id: str) -> str:
f = self.findings.get(finding_id)
if not f:
return f"❌ Finding {finding_id} not found."
ev = self.findings.get_evidence(finding_id)
report = self.reports.build(f, ev)
out_path = RECON_DIR / f"report_{finding_id}.md"
out_path.write_text(report)
return f"πŸ“ Report drafted: {out_path}\n\n{report[:500]}..."
def preview_report(self, finding_id: str) -> str:
f = self.findings.get(finding_id)
if not f:
return f"❌ Finding {finding_id} not found."
ev = self.findings.get_evidence(finding_id)
return self.reports.build(f, ev)
# ── Bugcrowd Submission ───────────────────────────────────────
def submit(self, finding_id: str) -> str:
f = self.findings.get(finding_id)
if not f:
return f"❌ Finding {finding_id} not found."
if not f.program_id or f.program_id == "auto":
return "❌ Finding has no Bugcrowd program ID. Set it with /bug update."
try:
sub_id = self.client.create_submission(
program_uuid=f.program_id,
title=f.title,
vulnerability_type=f.vuln_type or "Other",
severity=f.severity,
description=f.description,
reproduction=f.reproduction,
impact=f.impact,
asset=f.asset,
)
self.findings.update(
finding_id,
status="submitted",
bugcrowd_submission_id=sub_id,
submitted_at=datetime.now().isoformat(timespec="seconds"),
)
return f"βœ… Submitted to Bugcrowd! Submission ID: {sub_id}"
except Exception as exc:
return f"❌ Submission failed: {exc}"
def attach_evidence(
self,
finding_id: str,
path: str,
ev_type: str = "file",
description: str = "",
) -> str:
"""Attach evidence to a finding."""
f = self.findings.get(finding_id)
if not f:
return f"❌ Finding {finding_id} not found."
self.findings.add_evidence(finding_id, ev_type, path, description)
return f"πŸ“Ž Evidence attached to {finding_id}: [{ev_type}] {path}"
def dashboard(self) -> str:
"""Return a rich text dashboard summary."""
s = self.findings.stats()
total = s["total"]
by_sev = s.get("by_severity", {})
by_status = s.get("by_status", {})
by_program = s.get("by_program", {})
lines = [
"═══ Bug Bot Dashboard ═══",
f"Total findings: {total}",
"",
"By Severity:",
]
for sev in ("P1", "P2", "P3", "P4", "P5"):
count = by_sev.get(sev, 0)
if count:
lines.append(f" {sev}: {count}")
lines.append("")
lines.append("By Status:")
for st, count in sorted(by_status.items()):
lines.append(f" {st}: {count}")
# Recent 5 findings table
recent = self.findings.list(limit=5)
if recent:
lines.extend(
[
"",
"Recent Findings:",
f"{'ID':<10} {'Sev':<5} {'Status':<14} {'Title'}",
"-" * 60,
]
)
for f in recent:
title = (f.title[:40] + "...") if len(f.title) > 40 else f.title
lines.append(f"{f.id:<10} {f.severity:<5} {f.status:<14} {title}")
# Top program by finding count
if by_program:
top_prog = max(by_program, key=lambda k: by_program[k])
if top_prog:
lines.append(
f"\nTop program: {top_prog} ({by_program[top_prog]} finding(s))"
)
lines.append(f"\nDB: {self.findings.db_path}")
return "\n".join(lines)
def draft_with_ai(self, finding_id: str, brain=None) -> str:
"""Generate an AI-enhanced report for a finding."""
f = self.findings.get(finding_id)
if not f:
return f"❌ Finding {finding_id} not found."
ev = self.findings.get_evidence(finding_id)
standard_report = self.reports.build(f, ev)
if brain is not None:
prompt = (
f"You are a professional bug bounty report writer. "
f"Please improve the following vulnerability report for clarity and impact. "
f"Make it compelling, well-structured, and persuasive for a triage team. "
f"Preserve all technical details.\n\n{standard_report}"
)
try:
enhanced = brain.agent_turn(prompt, tools_enabled=False)
if enhanced:
return enhanced
except Exception:
pass
return (
standard_report
+ "\n\n---\n*Note: AI enhancement was unavailable. Showing standard report.*"
)
# ── Utility ───────────────────────────────────────────────────
def health(self) -> str:
api = self.client.health()
return f"{api}\nπŸ“ Findings DB: {self.findings.stats()['total']} finding(s)\n🎯 Targets DB: {self.targets.count(scope='in')} in-scope / {self.targets.count(scope='out')} out-of-scope"
def _log(self, msg: str):
line = f"[{datetime.now().isoformat(timespec='seconds')}] {msg}\n"
try:
with open(BUGBOT_LOG, "a") as fh:
fh.write(line)
except Exception:
pass