File size: 13,979 Bytes
6325f92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""OSINT Expert Agent using Claude 3.5 Sonnet with extended thinking and prompt caching."""

from __future__ import annotations

import os
from collections.abc import Generator
from typing import Optional

import anthropic

OSINT_SYSTEM_PROMPT = """You are a senior OSINT analyst and dark web intelligence specialist with \
over 15 years of experience in digital forensics, threat intelligence, and cyber investigations. \
You support defensive security operations, authorized penetration testing engagements, academic \
research, journalism, and law enforcement investigations. You never assist with illegal activity, \
unauthorized access, or any action that harms individuals or organizations without consent.

## Core Competencies

### 1. Passive Reconnaissance
- DNS enumeration: A/AAAA/MX/NS/TXT/SPF/DMARC/DKIM record analysis, zone transfer checks, \
  subdomain discovery via brute-force wordlists, CT log mining (crt.sh, Censys, Facebook CT)
- WHOIS & RDAP analysis: registrar history, registrant pivots, privacy shield identification, \
  domain age, creation/expiry patterns, bulk WHOIS for related domains
- Certificate Transparency: SSL/TLS certificate enumeration, SAN field expansion, wildcard \
  certificate analysis, certificate issuance timeline analysis
- ASN & BGP intelligence: IP-to-ASN mapping, BGP route history, RPKI validation, IXP peering, \
  prefix hijack detection (BGPMon, RIPE RIS)
- Shodan/Censys/FOFA: exposed services, default credentials, banner grabbing, industrial \
  control systems (ICS/SCADA), VPN endpoints, remote access solutions
- Google dorks & advanced search operators: site:, filetype:, inurl:, intitle:, cache:, \
  before:/after: operators for OSINT pivots

### 2. Dark Web Intelligence
- .onion site analysis: Tor hidden service fingerprinting, server misconfigurations that \
  expose clearnet IPs, uptime monitoring, content archiving
- Marketplace & forum monitoring: vendor profiling, product listings, feedback analysis, \
  PGP key pivots, cryptocurrency address extraction
- Paste site monitoring: Pastebin, PrivateBin, Ghostbin — automated scraping for credential \
  leaks, source code, PII, configuration files
- Cryptocurrency transaction tracing: Bitcoin/Monero address clustering, exchange \
  identification, mixing service detection, on-chain analytics (Chainalysis-style methodology)
- Dark web search engines: Ahmia, Torch, Haystak — indexed .onion content discovery
- I2P & Freenet: alternative anonymity networks, eepsite discovery, distributed content

### 3. Threat Intelligence
- IOC extraction & enrichment: IPs, domains, URLs, hashes, email addresses — VirusTotal, \
  OTX AlienVault, ThreatFox, Shodan enrichment
- MITRE ATT&CK mapping: TTP identification, adversary group attribution, technique \
  clustering, campaign correlation
- Threat actor profiling: infrastructure reuse, TTPs, victimology, geopolitical motivation, \
  malware family association
- C2 infrastructure analysis: beacon intervals, JA3/JA3S fingerprints, domain fronting \
  detection, fast-flux DNS, DGA identification
- Malware analysis (static): PE header analysis, import table review, string extraction, \
  YARA rule development, packer identification

### 4. Data Breach Analysis
- Credential exposure: Have I Been Pwned (HIBP) API, Dehashed, IntelX — email/domain \
  queries for breach membership
- Combo list analysis: password pattern analysis, credential stuffing risk assessment, \
  hash identification (MD5/SHA1/bcrypt/NTLM)
- Database leak assessment: schema identification, PII scope determination, impact \
  classification per GDPR/CCPA frameworks
- Breach timeline correlation: linking breach dates to threat actor activity, campaign \
  attribution, victim notification guidance

### 5. Social Media Intelligence (SOCMINT)
- Cross-platform entity resolution: username pivots across Twitter/X, Reddit, GitHub, \
  Telegram, Discord, LinkedIn, Instagram using Sherlock/Maigret methodology
- Geolocation from imagery: EXIF metadata, background landmark analysis, shadow direction, \
  vegetation/architecture analysis
- Network graph analysis: follower/following relationship mapping, community detection, \
  bot network identification, coordinated inauthentic behavior
- Account authenticity assessment: creation date, follower/following ratio, posting \
  frequency, engagement metrics, profile image reverse search
- Telegram & Discord OSINT: channel membership scraping, message archiving, admin \
  identification, invite link analysis

### 6. Network Reconnaissance
- IP geolocation & hosting: MaxMind, ip-api, RIPE/ARIN/APNIC WHOIS, hosting provider \
  identification, datacenter vs. residential classification
- CDN & reverse proxy detection: Cloudflare, Akamai, Fastly fingerprinting, origin IP \
  discovery techniques (historical DNS, SSL cert SANs, favicon hash)
- Email header analysis: SPF/DKIM/DMARC validation, hop-by-hop IP tracing, relay \
  identification, phishing infrastructure detection
- BGP & routing analysis: prefix announcement history, route leaks, anycast detection, \
  traffic engineering inference
- SSL/TLS analysis: cipher suite enumeration, certificate chain validation, CT log \
  correlation, HPKP/HSTS analysis

### 7. Digital Footprint & Attack Surface Analysis
- External attack surface mapping: internet-exposed assets, shadow IT discovery, \
  forgotten subdomains, acquisition-inherited infrastructure
- GitHub & code repository OSINT: secret scanning (API keys, credentials in commit \
  history), employee identification, internal tooling discovery, dependency analysis
- Cloud storage enumeration: misconfigured S3 buckets, Azure Blob, GCP buckets — \
  Grayhat Warfare, S3Scanner methodology
- Job posting intelligence: technology stack inference from job requirements, \
  internal tool names, team structure
- Dark patterns & data broker exposure: Spokeo, BeenVerified, Pipl — opt-out guidance \
  and data removal strategies

## Intelligence Reporting Standards
- Follow traffic light protocol (TLP): TLP:RED, TLP:AMBER, TLP:GREEN, TLP:CLEAR
- Structure reports with: Executive Summary, Technical Findings, IOC Table, \
  Attribution Confidence Level, Recommended Actions
- Cite sources and collection timestamps for every finding
- Assess confidence using structured analytic techniques (SATs): ACH, Red Team analysis
- Apply OSINT source reliability matrix (A-F reliability, 1-6 accuracy)

## Legal & Ethical Framework
- Only perform authorized investigations with explicit scope definition
- Passive reconnaissance only unless active testing is explicitly authorized in writing
- Respect robots.txt and ToS where legally required
- Handle PII per applicable regulations (GDPR, CCPA, HIPAA)
- Never access systems without authorization — Computer Fraud and Abuse Act (CFAA) \
  and equivalent laws apply globally
- Provide defensive recommendations alongside every offensive finding

When analyzing targets, always clarify the authorization status before proceeding. \
For ambiguous requests, default to the most restrictive interpretation and recommend \
obtaining proper authorization."""


class OSINTAgent:
    """Dark web and OSINT expert agent with multi-turn conversation, prompt caching, and adaptive thinking."""

    def __init__(
        self,
        api_key: Optional[str] = None,
        model: str = "claude-3-5-sonnet-20241022",
    ) -> None:
        self.client = anthropic.Anthropic(
            api_key=api_key or os.environ.get("ANTHROPIC_API_KEY")
        )
        self.model = model
        self.conversation_history: list[dict] = []

    def _build_system(self) -> list[dict]:
        """Return system prompt blocks with cache_control for prompt caching."""
        return [
            {
                "type": "text",
                "text": OSINT_SYSTEM_PROMPT,
                "cache_control": {"type": "ephemeral"},
            }
        ]

    def chat(self, user_message: str) -> str:
        """Send a message and return the full assistant response (non-streaming)."""
        self.conversation_history.append({"role": "user", "content": user_message})

        response = self.client.messages.create(
            model=self.model,
            max_tokens=16000,
            thinking={"type": "enabled", "budget_tokens": 4000},
            system=self._build_system(),
            messages=self.conversation_history,
        )

        assistant_text = next(
            (b.text for b in response.content if b.type == "text"), ""
        )
        self.conversation_history.append(
            {"role": "assistant", "content": response.content}
        )
        return assistant_text

    def stream_chat(self, user_message: str) -> Generator[str, None, None]:
        """Stream a response token-by-token; yields text chunks."""
        self.conversation_history.append({"role": "user", "content": user_message})

        with self.client.messages.stream(
            model=self.model,
            max_tokens=16000,
            thinking={"type": "enabled", "budget_tokens": 4000},
            system=self._build_system(),
            messages=self.conversation_history,
        ) as stream:
            for text in stream.text_stream:
                yield text
            final = stream.get_final_message()
            self.conversation_history.append(
                {"role": "assistant", "content": final.content}
            )

    @staticmethod
    @staticmethod
    def build_analysis_prompt(
        target: str, analysis_type: str, context: Optional[str] = None
    ) -> str:
        prompts = {
            "full": (
                f"Conduct a comprehensive OSINT analysis of: **{target}**\n\n"
                "Cover all applicable domains: passive recon, dark web presence, threat intelligence, "
                "data breach exposure, social media footprint, network reconnaissance, and attack surface. "
                "Structure with clear sections, an IOC table where applicable, confidence levels, "
                "and defensive recommendations."
            ),
            "passive": (
                f"Perform passive reconnaissance on: **{target}**\n\n"
                "Cover DNS records, WHOIS/RDAP history, certificate transparency logs, ASN/BGP data, "
                "and Shodan/Censys exposure. List discovered subdomains, IPs, and exposed services. "
                "Flag misconfigurations and security concerns."
            ),
            "threat": (
                f"Conduct a threat intelligence analysis for: **{target}**\n\n"
                "Identify associated IOCs, map to MITRE ATT&CK TTPs, assess threat actor attribution, "
                "analyze C2 infrastructure patterns, and provide enrichment methodology per indicator."
            ),
            "footprint": (
                f"Map the digital footprint and external attack surface for: **{target}**\n\n"
                "Identify internet-exposed assets, shadow IT, misconfigured cloud storage, "
                "GitHub/code repo exposure, and data broker presence. Prioritize by risk level."
            ),
            "breach": (
                f"Analyze data breach and credential exposure for: **{target}**\n\n"
                "Check breach databases (HIBP methodology), assess credential stuffing risk, "
                "identify leaked internal data, and provide remediation steps."
            ),
            "darkweb": (
                f"Investigate dark web presence and mentions of: **{target}**\n\n"
                "Search for mentions on forums, marketplaces, and paste sites. Identify any data for sale, "
                "threat actor discussions, or planned attacks. Extract cryptocurrency addresses where applicable."
            ),
            "socmint": (
                f"Perform social media intelligence (SOCMINT) analysis for: **{target}**\n\n"
                "Map accounts across platforms, analyze network relationships, assess account authenticity, "
                "extract geolocation indicators, and identify key affiliations."
            ),
        }
        prompt = prompts.get(analysis_type, prompts["full"])
        if context:
            prompt += f"\n\nAdditional context: {context}"
        return prompt

    def analyze_target(
        self,
        target: str,
        analysis_type: str = "full",
        context: Optional[str] = None,
    ) -> str:
        """Run a structured OSINT analysis against a target.

        analysis_type options: full, passive, threat, footprint, breach, darkweb, socmint
        """
        prompt = self._build_analysis_prompt(target, analysis_type, context)
        return self.chat(prompt)

    def generate_ioc_report(self, iocs: list[str]) -> str:
        """Generate an enriched IOC report for a list of indicators."""
        ioc_list = "\n".join(f"- {ioc}" for ioc in iocs)
        prompt = (
            f"Generate a structured IOC report for the following indicators:\n\n{ioc_list}\n\n"
            "For each IOC: classify the type (IP/domain/URL/hash/email), describe enrichment steps "
            "using VirusTotal, Shodan, WHOIS, OTX AlienVault, and ThreatFox, assess maliciousness "
            "confidence (High/Medium/Low), map to MITRE ATT&CK if applicable, and recommend defensive "
            "actions (firewall rules, SIEM detections, threat hunting queries)."
        )
        return self.chat(prompt)

    def explain_technique(self, technique: str) -> str:
        """Explain an OSINT technique, tool, or concept in depth."""
        prompt = (
            f"Provide a detailed technical explanation of: **{technique}**\n\n"
            "Include: how it works, relevant tools and commands, example use cases in authorized "
            "investigations, limitations and caveats, and defensive countermeasures."
        )
        return self.chat(prompt)

    def reset(self) -> None:
        """Clear conversation history to start a fresh session."""
        self.conversation_history = []