File size: 6,346 Bytes
e3df6f2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | #!/usr/bin/env python3
"""
HaveIBeenPwned Unified Search API Script
Uses cloudscraper to bypass Cloudflare protection and serves results via a local HTTP API.
"""
import json
import sys
import urllib.parse
from datetime import datetime
import cloudscraper
from flask import Flask, jsonify, request
# =============================================================================
# CONFIGURATION
# =============================================================================
HIBP_BASE_URL = "https://haveibeenpwned.com"
DEFAULT_HEADERS = {
"Accept": "*/*",
"Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7",
"DNT": "1",
"Referer": "https://haveibeenpwned.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"TE": "trailers",
"Priority": "u=0",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
}
# =============================================================================
# CLOUDSCRAPER CLIENT
# =============================================================================
class HIBPClient:
def __init__(self):
self.scraper = cloudscraper.create_scraper(
browser={
"browser": "firefox",
"platform": "windows",
"desktop": True,
},
delay=5,
)
self.scraper.headers.update(DEFAULT_HEADERS)
def search_email(self, email: str) -> dict:
encoded_email = urllib.parse.quote(email, safe="")
url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded_email}"
result = {
"email": email,
"timestamp": datetime.utcnow().isoformat() + "Z",
"status_code": None,
"data": None,
"error": None,
}
try:
response = self.scraper.get(url, timeout=30)
result["status_code"] = response.status_code
if response.status_code == 200:
result["data"] = response.json()
elif response.status_code == 404:
result["data"] = {"Breaches": None, "Pastes": None}
result["error"] = "Email not found in any known breach."
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After", "unknown")
result["error"] = f"Rate limited. Retry after {retry_after} seconds."
elif response.status_code == 403:
result["error"] = "Blocked by Cloudflare or access denied."
else:
result["error"] = f"Unexpected status code: {response.status_code}"
except cloudscraper.exceptions.CloudflareChallengeError as e:
result["error"] = f"Cloudflare challenge failed: {str(e)}"
result["status_code"] = 503
except Exception as e:
result["error"] = f"Request failed: {str(e)}"
result["status_code"] = 500
return result
def parse_breaches(self, data: dict) -> list:
breaches = data.get("Breaches") or []
return [
{
"name": b.get("Name"),
"title": b.get("Title"),
"domain": b.get("Domain"),
"breach_date": b.get("BreachDate"),
"added_date": b.get("AddedDate"),
"pwn_count": b.get("PwnCount"),
"data_classes": b.get("DataClasses", []),
"description": b.get("Description"),
"logo": b.get("LogoPath"),
"is_verified": b.get("IsVerified"),
"is_sensitive": b.get("IsSensitive"),
"is_stealer_log": b.get("IsStealerLog"),
"is_malware": b.get("IsMalware"),
}
for b in breaches
]
# =============================================================================
# FLASK APP
# =============================================================================
app = Flask(__name__)
hibp_client = HIBPClient()
@app.route("/", methods=["GET"])
def index():
return jsonify({
"service": "HaveIBeenPwned Proxy API",
"version": "1.0.0",
"endpoints": {
"GET /": "Documentation",
"GET /search/<email>": "Raw HIBP search",
"GET /breaches/<email>": "Parsed breach summaries",
"GET /health": "Health check",
},
})
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"status": "ok",
"timestamp": datetime.utcnow().isoformat() + "Z",
})
@app.route("/search/<path:email>", methods=["GET"])
def search_email(email: str):
if not email or "@" not in email:
return jsonify({"error": "Invalid email address.", "email": email}), 400
result = hibp_client.search_email(email)
if result["status_code"] in (200, 404):
http_status = 200
else:
http_status = result["status_code"] or 500
return jsonify(result), http_status
@app.route("/breaches/<path:email>", methods=["GET"])
def get_breaches(email: str):
if not email or "@" not in email:
return jsonify({"error": "Invalid email address.", "email": email}), 400
result = hibp_client.search_email(email)
if result["data"]:
breaches = hibp_client.parse_breaches(result["data"])
pastes = result["data"].get("Pastes") or []
return jsonify({
"email": email,
"timestamp": result["timestamp"],
"total_breaches": len(breaches),
"total_pastes": len(pastes),
"breaches": breaches,
"pastes": pastes,
"is_pwned": len(breaches) > 0 or len(pastes) > 0,
})
else:
return jsonify({
"email": email,
"timestamp": result["timestamp"],
"error": result["error"],
"is_pwned": None,
}), result["status_code"] or 500
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", 5000))
host = os.environ.get("HOST", "0.0.0.0")
app.run(host=host, port=port, debug=False) |