Spaces:
Running
Running
| """ | |
| core/models.py β Shared dataclasses for all pipeline stages. | |
| Every agent imports from here so data flows cleanly across parts. | |
| """ | |
| import datetime | |
| import hashlib | |
| from dataclasses import dataclass, field, asdict | |
| from typing import Optional | |
| from dataclasses import dataclass, field | |
| from typing import Literal | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PART 1 β RECON MODELS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ReconFindings: | |
| """Output of Part 1 β feeds directly into Part 2.""" | |
| target: str | |
| timestamp: str = field(default_factory=lambda: datetime.datetime.utcnow().isoformat()) | |
| whois_info: dict = field(default_factory=dict) | |
| dns_records: dict = field(default_factory=dict) | |
| subdomains: list = field(default_factory=list) | |
| open_ports: list = field(default_factory=list) | |
| technologies: list = field(default_factory=list) | |
| emails: list = field(default_factory=list) | |
| shodan_data: dict = field(default_factory=dict) | |
| http_headers: dict = field(default_factory=dict) | |
| ssl_info: dict = field(default_factory=dict) | |
| notes: list = field(default_factory=list) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PART 2 β VULNERABILITY MODELS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Vulnerability: | |
| """Single vulnerability finding from Part 2.""" | |
| vuln_id: str | |
| title: str | |
| description: str | |
| severity: str # CRITICAL / HIGH / MEDIUM / LOW / INFO / NONE | |
| cvss_score: float | |
| category: str # SQLi / XSS / MissingHeader / CVE / PathExposure / etc. | |
| target_url: str | |
| evidence: str | |
| remediation: str | |
| cve_ids: list = field(default_factory=list) | |
| tool_source: str = "" | |
| verified: bool = False | |
| cve_verified: bool = False | |
| confidence: Literal['low', 'medium', 'high'] = 'medium' | |
| notes: str = "" | |
| manual_review_required: bool = True | |
| class VulnFindings: | |
| """Output of Part 2 β feeds directly into Part 3.""" | |
| target: str | |
| scan_start: str = field(default_factory=lambda: datetime.datetime.utcnow().isoformat()) | |
| scan_end: str = "" | |
| vulnerabilities: list = field(default_factory=list) # list[Vulnerability] | |
| skipped_checks: list = field(default_factory=list) | |
| notes: list = field(default_factory=list) | |
| def add_vuln(self, vuln: Vulnerability): | |
| existing_ids = {v.vuln_id for v in self.vulnerabilities} | |
| if vuln.vuln_id not in existing_ids: | |
| self.vulnerabilities.append(vuln) | |
| def summary(self) -> dict: | |
| counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0} | |
| for v in self.vulnerabilities: | |
| counts[v.severity] = counts.get(v.severity, 0) + 1 | |
| return counts | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PART 3 β EXPLOIT MODELS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ExploitResult: | |
| """Single confirmed/failed exploit attempt from Part 3.""" | |
| exploit_id: str | |
| vuln_title: str | |
| vuln_category: str | |
| target_url: str | |
| status: str # CONFIRMED / FAILED / SKIPPED / ERROR | |
| technique: str | |
| request_sent: str | |
| response_snippet: str | |
| impact: str | |
| cvss_score: float | |
| severity: str | |
| evidence_type: str # data_leak / error_message / delay / reflection / auth_success | |
| timestamp: str = field(default_factory=lambda: datetime.datetime.utcnow().isoformat()) | |
| notes: str = "" | |
| class ExploitSession: | |
| """Output of Part 3 β feeds directly into Part 4.""" | |
| target: str | |
| session_start: str = field(default_factory=lambda: datetime.datetime.utcnow().isoformat()) | |
| session_end: str = "" | |
| results: list = field(default_factory=list) # list[ExploitResult] | |
| dry_run: bool = False | |
| def add_result(self, r: ExploitResult): | |
| self.results.append(r) | |
| def confirmed(self) -> list: | |
| return [r for r in self.results if r.status == "CONFIRMED"] | |
| def summary(self) -> dict: | |
| counts: dict = {} | |
| for r in self.results: | |
| counts[r.status] = counts.get(r.status, 0) + 1 | |
| return counts | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HELPERS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def make_id(label: str, url: str) -> str: | |
| """Stable 10-char dedup key.""" | |
| return hashlib.md5(f"{label}:{url}".encode()).hexdigest()[:10] | |
| def to_dict(obj) -> dict: | |
| """Recursively convert dataclass to plain dict.""" | |
| return asdict(obj) | |