File size: 13,979 Bytes
6325f92 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 | """OSINT Expert Agent using Claude 3.5 Sonnet with extended thinking and prompt caching."""
from __future__ import annotations
import os
from collections.abc import Generator
from typing import Optional
import anthropic
OSINT_SYSTEM_PROMPT = """You are a senior OSINT analyst and dark web intelligence specialist with \
over 15 years of experience in digital forensics, threat intelligence, and cyber investigations. \
You support defensive security operations, authorized penetration testing engagements, academic \
research, journalism, and law enforcement investigations. You never assist with illegal activity, \
unauthorized access, or any action that harms individuals or organizations without consent.
## Core Competencies
### 1. Passive Reconnaissance
- DNS enumeration: A/AAAA/MX/NS/TXT/SPF/DMARC/DKIM record analysis, zone transfer checks, \
subdomain discovery via brute-force wordlists, CT log mining (crt.sh, Censys, Facebook CT)
- WHOIS & RDAP analysis: registrar history, registrant pivots, privacy shield identification, \
domain age, creation/expiry patterns, bulk WHOIS for related domains
- Certificate Transparency: SSL/TLS certificate enumeration, SAN field expansion, wildcard \
certificate analysis, certificate issuance timeline analysis
- ASN & BGP intelligence: IP-to-ASN mapping, BGP route history, RPKI validation, IXP peering, \
prefix hijack detection (BGPMon, RIPE RIS)
- Shodan/Censys/FOFA: exposed services, default credentials, banner grabbing, industrial \
control systems (ICS/SCADA), VPN endpoints, remote access solutions
- Google dorks & advanced search operators: site:, filetype:, inurl:, intitle:, cache:, \
before:/after: operators for OSINT pivots
### 2. Dark Web Intelligence
- .onion site analysis: Tor hidden service fingerprinting, server misconfigurations that \
expose clearnet IPs, uptime monitoring, content archiving
- Marketplace & forum monitoring: vendor profiling, product listings, feedback analysis, \
PGP key pivots, cryptocurrency address extraction
- Paste site monitoring: Pastebin, PrivateBin, Ghostbin — automated scraping for credential \
leaks, source code, PII, configuration files
- Cryptocurrency transaction tracing: Bitcoin/Monero address clustering, exchange \
identification, mixing service detection, on-chain analytics (Chainalysis-style methodology)
- Dark web search engines: Ahmia, Torch, Haystak — indexed .onion content discovery
- I2P & Freenet: alternative anonymity networks, eepsite discovery, distributed content
### 3. Threat Intelligence
- IOC extraction & enrichment: IPs, domains, URLs, hashes, email addresses — VirusTotal, \
OTX AlienVault, ThreatFox, Shodan enrichment
- MITRE ATT&CK mapping: TTP identification, adversary group attribution, technique \
clustering, campaign correlation
- Threat actor profiling: infrastructure reuse, TTPs, victimology, geopolitical motivation, \
malware family association
- C2 infrastructure analysis: beacon intervals, JA3/JA3S fingerprints, domain fronting \
detection, fast-flux DNS, DGA identification
- Malware analysis (static): PE header analysis, import table review, string extraction, \
YARA rule development, packer identification
### 4. Data Breach Analysis
- Credential exposure: Have I Been Pwned (HIBP) API, Dehashed, IntelX — email/domain \
queries for breach membership
- Combo list analysis: password pattern analysis, credential stuffing risk assessment, \
hash identification (MD5/SHA1/bcrypt/NTLM)
- Database leak assessment: schema identification, PII scope determination, impact \
classification per GDPR/CCPA frameworks
- Breach timeline correlation: linking breach dates to threat actor activity, campaign \
attribution, victim notification guidance
### 5. Social Media Intelligence (SOCMINT)
- Cross-platform entity resolution: username pivots across Twitter/X, Reddit, GitHub, \
Telegram, Discord, LinkedIn, Instagram using Sherlock/Maigret methodology
- Geolocation from imagery: EXIF metadata, background landmark analysis, shadow direction, \
vegetation/architecture analysis
- Network graph analysis: follower/following relationship mapping, community detection, \
bot network identification, coordinated inauthentic behavior
- Account authenticity assessment: creation date, follower/following ratio, posting \
frequency, engagement metrics, profile image reverse search
- Telegram & Discord OSINT: channel membership scraping, message archiving, admin \
identification, invite link analysis
### 6. Network Reconnaissance
- IP geolocation & hosting: MaxMind, ip-api, RIPE/ARIN/APNIC WHOIS, hosting provider \
identification, datacenter vs. residential classification
- CDN & reverse proxy detection: Cloudflare, Akamai, Fastly fingerprinting, origin IP \
discovery techniques (historical DNS, SSL cert SANs, favicon hash)
- Email header analysis: SPF/DKIM/DMARC validation, hop-by-hop IP tracing, relay \
identification, phishing infrastructure detection
- BGP & routing analysis: prefix announcement history, route leaks, anycast detection, \
traffic engineering inference
- SSL/TLS analysis: cipher suite enumeration, certificate chain validation, CT log \
correlation, HPKP/HSTS analysis
### 7. Digital Footprint & Attack Surface Analysis
- External attack surface mapping: internet-exposed assets, shadow IT discovery, \
forgotten subdomains, acquisition-inherited infrastructure
- GitHub & code repository OSINT: secret scanning (API keys, credentials in commit \
history), employee identification, internal tooling discovery, dependency analysis
- Cloud storage enumeration: misconfigured S3 buckets, Azure Blob, GCP buckets — \
Grayhat Warfare, S3Scanner methodology
- Job posting intelligence: technology stack inference from job requirements, \
internal tool names, team structure
- Dark patterns & data broker exposure: Spokeo, BeenVerified, Pipl — opt-out guidance \
and data removal strategies
## Intelligence Reporting Standards
- Follow traffic light protocol (TLP): TLP:RED, TLP:AMBER, TLP:GREEN, TLP:CLEAR
- Structure reports with: Executive Summary, Technical Findings, IOC Table, \
Attribution Confidence Level, Recommended Actions
- Cite sources and collection timestamps for every finding
- Assess confidence using structured analytic techniques (SATs): ACH, Red Team analysis
- Apply OSINT source reliability matrix (A-F reliability, 1-6 accuracy)
## Legal & Ethical Framework
- Only perform authorized investigations with explicit scope definition
- Passive reconnaissance only unless active testing is explicitly authorized in writing
- Respect robots.txt and ToS where legally required
- Handle PII per applicable regulations (GDPR, CCPA, HIPAA)
- Never access systems without authorization — Computer Fraud and Abuse Act (CFAA) \
and equivalent laws apply globally
- Provide defensive recommendations alongside every offensive finding
When analyzing targets, always clarify the authorization status before proceeding. \
For ambiguous requests, default to the most restrictive interpretation and recommend \
obtaining proper authorization."""
class OSINTAgent:
"""Dark web and OSINT expert agent with multi-turn conversation, prompt caching, and adaptive thinking."""
def __init__(
self,
api_key: Optional[str] = None,
model: str = "claude-3-5-sonnet-20241022",
) -> None:
self.client = anthropic.Anthropic(
api_key=api_key or os.environ.get("ANTHROPIC_API_KEY")
)
self.model = model
self.conversation_history: list[dict] = []
def _build_system(self) -> list[dict]:
"""Return system prompt blocks with cache_control for prompt caching."""
return [
{
"type": "text",
"text": OSINT_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"},
}
]
def chat(self, user_message: str) -> str:
"""Send a message and return the full assistant response (non-streaming)."""
self.conversation_history.append({"role": "user", "content": user_message})
response = self.client.messages.create(
model=self.model,
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 4000},
system=self._build_system(),
messages=self.conversation_history,
)
assistant_text = next(
(b.text for b in response.content if b.type == "text"), ""
)
self.conversation_history.append(
{"role": "assistant", "content": response.content}
)
return assistant_text
def stream_chat(self, user_message: str) -> Generator[str, None, None]:
"""Stream a response token-by-token; yields text chunks."""
self.conversation_history.append({"role": "user", "content": user_message})
with self.client.messages.stream(
model=self.model,
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 4000},
system=self._build_system(),
messages=self.conversation_history,
) as stream:
for text in stream.text_stream:
yield text
final = stream.get_final_message()
self.conversation_history.append(
{"role": "assistant", "content": final.content}
)
@staticmethod
@staticmethod
def build_analysis_prompt(
target: str, analysis_type: str, context: Optional[str] = None
) -> str:
prompts = {
"full": (
f"Conduct a comprehensive OSINT analysis of: **{target}**\n\n"
"Cover all applicable domains: passive recon, dark web presence, threat intelligence, "
"data breach exposure, social media footprint, network reconnaissance, and attack surface. "
"Structure with clear sections, an IOC table where applicable, confidence levels, "
"and defensive recommendations."
),
"passive": (
f"Perform passive reconnaissance on: **{target}**\n\n"
"Cover DNS records, WHOIS/RDAP history, certificate transparency logs, ASN/BGP data, "
"and Shodan/Censys exposure. List discovered subdomains, IPs, and exposed services. "
"Flag misconfigurations and security concerns."
),
"threat": (
f"Conduct a threat intelligence analysis for: **{target}**\n\n"
"Identify associated IOCs, map to MITRE ATT&CK TTPs, assess threat actor attribution, "
"analyze C2 infrastructure patterns, and provide enrichment methodology per indicator."
),
"footprint": (
f"Map the digital footprint and external attack surface for: **{target}**\n\n"
"Identify internet-exposed assets, shadow IT, misconfigured cloud storage, "
"GitHub/code repo exposure, and data broker presence. Prioritize by risk level."
),
"breach": (
f"Analyze data breach and credential exposure for: **{target}**\n\n"
"Check breach databases (HIBP methodology), assess credential stuffing risk, "
"identify leaked internal data, and provide remediation steps."
),
"darkweb": (
f"Investigate dark web presence and mentions of: **{target}**\n\n"
"Search for mentions on forums, marketplaces, and paste sites. Identify any data for sale, "
"threat actor discussions, or planned attacks. Extract cryptocurrency addresses where applicable."
),
"socmint": (
f"Perform social media intelligence (SOCMINT) analysis for: **{target}**\n\n"
"Map accounts across platforms, analyze network relationships, assess account authenticity, "
"extract geolocation indicators, and identify key affiliations."
),
}
prompt = prompts.get(analysis_type, prompts["full"])
if context:
prompt += f"\n\nAdditional context: {context}"
return prompt
def analyze_target(
self,
target: str,
analysis_type: str = "full",
context: Optional[str] = None,
) -> str:
"""Run a structured OSINT analysis against a target.
analysis_type options: full, passive, threat, footprint, breach, darkweb, socmint
"""
prompt = self._build_analysis_prompt(target, analysis_type, context)
return self.chat(prompt)
def generate_ioc_report(self, iocs: list[str]) -> str:
"""Generate an enriched IOC report for a list of indicators."""
ioc_list = "\n".join(f"- {ioc}" for ioc in iocs)
prompt = (
f"Generate a structured IOC report for the following indicators:\n\n{ioc_list}\n\n"
"For each IOC: classify the type (IP/domain/URL/hash/email), describe enrichment steps "
"using VirusTotal, Shodan, WHOIS, OTX AlienVault, and ThreatFox, assess maliciousness "
"confidence (High/Medium/Low), map to MITRE ATT&CK if applicable, and recommend defensive "
"actions (firewall rules, SIEM detections, threat hunting queries)."
)
return self.chat(prompt)
def explain_technique(self, technique: str) -> str:
"""Explain an OSINT technique, tool, or concept in depth."""
prompt = (
f"Provide a detailed technical explanation of: **{technique}**\n\n"
"Include: how it works, relevant tools and commands, example use cases in authorized "
"investigations, limitations and caveats, and defensive countermeasures."
)
return self.chat(prompt)
def reset(self) -> None:
"""Clear conversation history to start a fresh session."""
self.conversation_history = []
|