| """DRIFT Bug Bot β the beast. |
| |
| Orchestrates recon, findings tracking, report generation, and |
| Bugcrowd submission for bug bounty hunting. |
| |
| Safe by design: |
| - Rate-limited API calls (1 req/sec default) |
| - Scanner rate limits (nuclei -rl 10, ffuf -rate 10) |
| - Scope enforcement before any active recon |
| - No destructive actions |
| """ |
|
|
| import json |
| import subprocess |
| from datetime import datetime |
| from pathlib import Path |
| from typing import List, Optional |
|
|
| from infj_bot.core.config import PROJECT_ROOT |
| from infj_bot.core.memory import DriftMemory |
| from infj_bot.core.plugins.bugcrowd_client import BugcrowdClient |
| from infj_bot.core.plugins.findings_db import Finding, FindingsDB |
| from infj_bot.core.plugins.report_builder import ReportBuilder |
| from infj_bot.core.plugins.target_manager import TargetManager |
|
|
|
|
| RECON_DIR = Path(PROJECT_ROOT) / "recon" |
| BUGBOT_LOG = Path(PROJECT_ROOT) / "logs" / "bugbot.log" |
|
|
|
|
| class BugBot: |
| """ |
| The main Bug Bounty engine. |
| |
| Usage: |
| bot = BugBot() |
| bot.sync_programs() # Pull from Bugcrowd |
| bot.recon(program_id) # Run scoped recon |
| bot.add_finding(...) # Log a finding |
| bot.generate_report(fid) # Draft a report |
| bot.submit(finding_id) # Submit to Bugcrowd |
| """ |
|
|
| def __init__( |
| self, |
| api_key: Optional[str] = None, |
| memory: Optional[DriftMemory] = None, |
| ): |
| self.client = BugcrowdClient(api_key=api_key) |
| self.findings = FindingsDB() |
| self.targets = TargetManager() |
| self.reports = ReportBuilder() |
| self.memory = memory |
| RECON_DIR.mkdir(parents=True, exist_ok=True) |
| BUGBOT_LOG.parent.mkdir(parents=True, exist_ok=True) |
|
|
| |
|
|
| def sync_programs(self) -> str: |
| """Fetch Bugcrowd programs and sync scope to local target DB.""" |
| try: |
| programs = self.client.list_programs() |
| except Exception as exc: |
| return f"β Failed to fetch programs: {exc}" |
|
|
| if not programs: |
| return "β οΈ No Bugcrowd programs found. Check your API key." |
|
|
| for prog in programs: |
| try: |
| full = self.client.get_program(prog.uuid) |
| self.targets.delete_program(prog.uuid) |
| self.targets.bulk_add(prog.uuid, prog.name, full.scope, scope="in") |
| self.targets.bulk_add(prog.uuid, prog.name, full.oos, scope="out") |
| self._log( |
| f"Synced program: {prog.name} ({len(full.scope)} in / {len(full.oos)} out)" |
| ) |
| except Exception as exc: |
| self._log(f"β οΈ Failed to sync {prog.name}: {exc}") |
|
|
| return f"β
Synced {len(programs)} program(s). Targets: {self.targets.count(scope='in')} in / {self.targets.count(scope='out')} out." |
|
|
| def list_programs(self) -> str: |
| """Pretty-print enrolled programs.""" |
| try: |
| programs = self.client.list_programs() |
| except Exception as exc: |
| return f"β {exc}" |
| if not programs: |
| return "No programs found." |
| lines = ["π― Bugcrowd Programs"] |
| for p in programs: |
| lines.append( |
| f" β’ {p.name} β {p.status} ({p.rewards or 'no rewards info'})" |
| ) |
| return "\n".join(lines) |
|
|
| |
|
|
| def recon(self, program_id: str, tool: str = "all") -> str: |
| """ |
| Run scoped recon for a program. |
| |
| tool: all | subdomains | nuclei | fuzz |
| """ |
| targets = self.targets.list_targets(program_id=program_id, scope="in") |
| if not targets: |
| return ( |
| f"β No in-scope targets for program {program_id}. Run /bug sync first." |
| ) |
|
|
| domains = [t.asset for t in targets if t.asset_type in ("domain", "wildcard")] |
| urls = [t.asset for t in targets if t.asset_type == "url"] |
|
|
| results: List[str] = [] |
| stamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| out_dir = RECON_DIR / program_id / stamp |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| if tool in ("all", "subdomains") and domains: |
| results.append(self._run_subfinder(domains, out_dir)) |
|
|
| |
| if tool in ("all", "nuclei"): |
| scan_targets = domains + urls |
| if scan_targets: |
| results.append(self._run_nuclei(scan_targets, out_dir)) |
|
|
| |
| if tool in ("all", "fuzz") and (domains or urls): |
| results.append(self._run_ffuf(domains + urls, out_dir)) |
|
|
| summary = "\n".join(results) |
| self._log(f"Recon {program_id}/{tool}: {summary[:200]}") |
| return f"π°οΈ Recon complete for {program_id}\n{summary}\nπ Artifacts: {out_dir}" |
|
|
| def _run_subfinder(self, domains: List[str], out_dir: Path) -> str: |
| domain_file = out_dir / "domains.txt" |
| domain_file.write_text("\n".join(domains)) |
| output = out_dir / "subdomains.txt" |
| try: |
| subprocess.run( |
| ["subfinder", "-dL", str(domain_file), "-o", str(output), "-silent"], |
| capture_output=True, |
| text=True, |
| timeout=300, |
| check=False, |
| ) |
| count = len(output.read_text().splitlines()) if output.exists() else 0 |
| return f" π subfinder: {count} subdomains β {output.name}" |
| except FileNotFoundError: |
| return " β οΈ subfinder not installed" |
| except Exception as exc: |
| return f" β subfinder error: {exc}" |
|
|
| def _run_nuclei(self, targets: List[str], out_dir: Path) -> str: |
| target_file = out_dir / "nuclei_targets.txt" |
| target_file.write_text("\n".join(targets)) |
| output = out_dir / "nuclei.json" |
| try: |
| subprocess.run( |
| [ |
| "nuclei", |
| "-l", |
| str(target_file), |
| "-severity", |
| "critical,high,medium", |
| "-rate-limit", |
| "10", |
| "-json-export", |
| str(output), |
| "-silent", |
| ], |
| capture_output=True, |
| text=True, |
| timeout=600, |
| check=False, |
| ) |
| if output.exists(): |
| data = ( |
| json.loads(output.read_text()) if output.stat().st_size > 0 else [] |
| ) |
| |
| skipped = 0 |
| seen_this_run: set = set() |
| for item in data: |
| sev = item.get("info", {}).get("severity", "").lower() |
| if sev in ("critical", "high"): |
| asset = item.get("host", "") |
| vuln_type = f"Nuclei: {item.get('template-id', 'unknown')}" |
| dedup_key = (asset, vuln_type) |
| |
| duplicate = dedup_key in seen_this_run or self.findings.exists_by_asset_and_vuln_type(asset, vuln_type) |
| if duplicate: |
| skipped += 1 |
| continue |
| seen_this_run.add(dedup_key) |
| self.add_finding( |
| program_id="auto", |
| title=item.get("info", {}).get("name", "Nuclei finding"), |
| vuln_type=vuln_type, |
| severity="P2" if sev == "critical" else "P3", |
| asset=asset, |
| description=json.dumps(item.get("info", {}), indent=2), |
| confidence="medium", |
| ) |
| ch_count = len( |
| [ |
| d |
| for d in data |
| if d.get("info", {}).get("severity", "").lower() in ("critical", "high") |
| ] |
| ) |
| skip_note = f", {skipped} skipped (dedup)" if skipped else "" |
| return f" π― nuclei: {len(data)} findings β {output.name} ({ch_count} critical/high{skip_note})" |
| return " π― nuclei: no output" |
| except FileNotFoundError: |
| return " β οΈ nuclei not installed" |
| except Exception as exc: |
| return f" β nuclei error: {exc}" |
|
|
| def _run_ffuf(self, targets: List[str], out_dir: Path) -> str: |
| |
| domain = targets[0].replace("https://", "").replace("http://", "").split("/")[0] |
| output = out_dir / "ffuf.json" |
| try: |
| subprocess.run( |
| [ |
| "ffuf", |
| "-u", |
| f"https://{domain}/FUZZ", |
| "-w", |
| "/usr/share/wordlists/dirb/common.txt", |
| "-rate", |
| "10", |
| "-o", |
| str(output), |
| "-of", |
| "json", |
| "-s", |
| ], |
| capture_output=True, |
| text=True, |
| timeout=300, |
| check=False, |
| ) |
| if output.exists(): |
| data = ( |
| json.loads(output.read_text()) if output.stat().st_size > 0 else {} |
| ) |
| results = data.get("results", []) |
| return f" πͺοΈ ffuf: {len(results)} hits on {domain} β {output.name}" |
| return " πͺοΈ ffuf: no output" |
| except FileNotFoundError: |
| return " β οΈ ffuf not installed" |
| except Exception as exc: |
| return f" β ffuf error: {exc}" |
|
|
| |
|
|
| def add_finding( |
| self, |
| program_id: str = "", |
| program_name: str = "", |
| title: str = "", |
| vuln_type: str = "", |
| severity: str = "P5", |
| cvss: float = 0.0, |
| asset: str = "", |
| description: str = "", |
| reproduction: str = "", |
| impact: str = "", |
| fix: str = "", |
| confidence: str = "low", |
| notes: str = "", |
| ) -> str: |
| finding = Finding( |
| program_id=program_id, |
| program_name=program_name, |
| title=title, |
| vuln_type=vuln_type, |
| severity=severity, |
| cvss=cvss, |
| asset=asset, |
| description=description, |
| reproduction=reproduction, |
| impact=impact, |
| fix=fix, |
| confidence=confidence, |
| notes=notes, |
| ) |
| fid = self.findings.add(finding) |
| if self.memory: |
| self.memory.learn_concept( |
| f"Finding {fid}: {title}", |
| f"{description}\n\nAsset: {asset}\nSeverity: {severity}\nConfidence: {confidence}", |
| tags=["bug", "finding", program_id], |
| importance=0.9 if severity in ("P1", "P2") else 0.7, |
| ) |
| return f"π Finding logged: {fid} β {title} ({severity})" |
|
|
| def get_finding(self, fid: str) -> str: |
| f = self.findings.get(fid) |
| if not f: |
| return f"β Finding {fid} not found." |
| ev = self.findings.get_evidence(fid) |
| lines = [ |
| f"π Finding {f.id}", |
| f" Title: {f.title}", |
| f" Severity: {f.severity} | Confidence: {f.confidence} | Status: {f.status}", |
| f" Asset: {f.asset or 'N/A'}", |
| f" Type: {f.vuln_type or 'N/A'}", |
| f" Description: {f.description[:200]}..." |
| if len(f.description) > 200 |
| else f" Description: {f.description}", |
| f" Evidence: {len(ev)} item(s)", |
| ] |
| return "\n".join(lines) |
|
|
| def list_findings( |
| self, program_id: Optional[str] = None, status: Optional[str] = None |
| ) -> str: |
| items = self.findings.list(program_id=program_id, status=status, limit=100) |
| if not items: |
| return "No findings." |
| lines = [f"{'ID':<10} {'Severity':<10} {'Status':<12} {'Title'}"] |
| lines.append("-" * 60) |
| for f in items: |
| title = (f.title[:35] + "...") if len(f.title) > 35 else f.title |
| lines.append(f"{f.id:<10} {f.severity:<10} {f.status:<12} {title}") |
| return "\n".join(lines) |
|
|
| def stats(self) -> str: |
| s = self.findings.stats() |
| lines = [ |
| "π Bug Bot Stats", |
| f" Total findings: {s['total']}", |
| " By severity:", |
| ] |
| for sev, count in sorted(s.get("by_severity", {}).items()): |
| lines.append(f" {sev}: {count}") |
| lines.append(" By status:") |
| for st, count in sorted(s.get("by_status", {}).items()): |
| lines.append(f" {st}: {count}") |
| return "\n".join(lines) |
|
|
| |
|
|
| def generate_report(self, finding_id: str) -> str: |
| f = self.findings.get(finding_id) |
| if not f: |
| return f"β Finding {finding_id} not found." |
| ev = self.findings.get_evidence(finding_id) |
| report = self.reports.build(f, ev) |
| out_path = RECON_DIR / f"report_{finding_id}.md" |
| out_path.write_text(report) |
| return f"π Report drafted: {out_path}\n\n{report[:500]}..." |
|
|
| def preview_report(self, finding_id: str) -> str: |
| f = self.findings.get(finding_id) |
| if not f: |
| return f"β Finding {finding_id} not found." |
| ev = self.findings.get_evidence(finding_id) |
| return self.reports.build(f, ev) |
|
|
| |
|
|
| def submit(self, finding_id: str) -> str: |
| f = self.findings.get(finding_id) |
| if not f: |
| return f"β Finding {finding_id} not found." |
| if not f.program_id or f.program_id == "auto": |
| return "β Finding has no Bugcrowd program ID. Set it with /bug update." |
| try: |
| sub_id = self.client.create_submission( |
| program_uuid=f.program_id, |
| title=f.title, |
| vulnerability_type=f.vuln_type or "Other", |
| severity=f.severity, |
| description=f.description, |
| reproduction=f.reproduction, |
| impact=f.impact, |
| asset=f.asset, |
| ) |
| self.findings.update( |
| finding_id, |
| status="submitted", |
| bugcrowd_submission_id=sub_id, |
| submitted_at=datetime.now().isoformat(timespec="seconds"), |
| ) |
| return f"β
Submitted to Bugcrowd! Submission ID: {sub_id}" |
| except Exception as exc: |
| return f"β Submission failed: {exc}" |
|
|
| def attach_evidence( |
| self, |
| finding_id: str, |
| path: str, |
| ev_type: str = "file", |
| description: str = "", |
| ) -> str: |
| """Attach evidence to a finding.""" |
| f = self.findings.get(finding_id) |
| if not f: |
| return f"β Finding {finding_id} not found." |
| self.findings.add_evidence(finding_id, ev_type, path, description) |
| return f"π Evidence attached to {finding_id}: [{ev_type}] {path}" |
|
|
| def dashboard(self) -> str: |
| """Return a rich text dashboard summary.""" |
| s = self.findings.stats() |
| total = s["total"] |
| by_sev = s.get("by_severity", {}) |
| by_status = s.get("by_status", {}) |
| by_program = s.get("by_program", {}) |
|
|
| lines = [ |
| "βββ Bug Bot Dashboard βββ", |
| f"Total findings: {total}", |
| "", |
| "By Severity:", |
| ] |
| for sev in ("P1", "P2", "P3", "P4", "P5"): |
| count = by_sev.get(sev, 0) |
| if count: |
| lines.append(f" {sev}: {count}") |
|
|
| lines.append("") |
| lines.append("By Status:") |
| for st, count in sorted(by_status.items()): |
| lines.append(f" {st}: {count}") |
|
|
| |
| recent = self.findings.list(limit=5) |
| if recent: |
| lines.extend( |
| [ |
| "", |
| "Recent Findings:", |
| f"{'ID':<10} {'Sev':<5} {'Status':<14} {'Title'}", |
| "-" * 60, |
| ] |
| ) |
| for f in recent: |
| title = (f.title[:40] + "...") if len(f.title) > 40 else f.title |
| lines.append(f"{f.id:<10} {f.severity:<5} {f.status:<14} {title}") |
|
|
| |
| if by_program: |
| top_prog = max(by_program, key=lambda k: by_program[k]) |
| if top_prog: |
| lines.append( |
| f"\nTop program: {top_prog} ({by_program[top_prog]} finding(s))" |
| ) |
|
|
| lines.append(f"\nDB: {self.findings.db_path}") |
| return "\n".join(lines) |
|
|
| def draft_with_ai(self, finding_id: str, brain=None) -> str: |
| """Generate an AI-enhanced report for a finding.""" |
| f = self.findings.get(finding_id) |
| if not f: |
| return f"β Finding {finding_id} not found." |
| ev = self.findings.get_evidence(finding_id) |
| standard_report = self.reports.build(f, ev) |
|
|
| if brain is not None: |
| prompt = ( |
| f"You are a professional bug bounty report writer. " |
| f"Please improve the following vulnerability report for clarity and impact. " |
| f"Make it compelling, well-structured, and persuasive for a triage team. " |
| f"Preserve all technical details.\n\n{standard_report}" |
| ) |
| try: |
| enhanced = brain.agent_turn(prompt, tools_enabled=False) |
| if enhanced: |
| return enhanced |
| except Exception: |
| pass |
|
|
| return ( |
| standard_report |
| + "\n\n---\n*Note: AI enhancement was unavailable. Showing standard report.*" |
| ) |
|
|
| |
|
|
| def health(self) -> str: |
| api = self.client.health() |
| return f"{api}\nπ Findings DB: {self.findings.stats()['total']} finding(s)\nπ― Targets DB: {self.targets.count(scope='in')} in-scope / {self.targets.count(scope='out')} out-of-scope" |
|
|
| def _log(self, msg: str): |
| line = f"[{datetime.now().isoformat(timespec='seconds')}] {msg}\n" |
| try: |
| with open(BUGBOT_LOG, "a") as fh: |
| fh.write(line) |
| except Exception: |
| pass |
|
|