File size: 5,237 Bytes
81e15e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | #!/usr/bin/env python3
"""
HaveIBeenPwned API - Hugging Face Spaces
"""
import urllib.parse
from datetime import datetime
import cloudscraper
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
# =============================================================================
# CONFIG
# =============================================================================
HIBP_BASE_URL = "https://haveibeenpwned.com"
DEFAULT_HEADERS = {
"Accept": "*/*",
"Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7",
"DNT": "1",
"Referer": "https://haveibeenpwned.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
}
# =============================================================================
# CLIENT
# =============================================================================
class HIBPClient:
def __init__(self):
self.scraper = cloudscraper.create_scraper(
browser={"browser": "firefox", "platform": "windows", "desktop": True},
delay=5,
)
self.scraper.headers.update(DEFAULT_HEADERS)
def search_email(self, email: str) -> dict:
encoded = urllib.parse.quote(email, safe="")
url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded}"
result = {
"email": email,
"timestamp": datetime.utcnow().isoformat() + "Z",
"status_code": None,
"data": None,
"error": None,
}
try:
resp = self.scraper.get(url, timeout=30)
result["status_code"] = resp.status_code
if resp.status_code == 200:
result["data"] = resp.json()
elif resp.status_code == 404:
result["data"] = {"Breaches": None, "Pastes": None}
result["error"] = "Email not found in any known breach."
elif resp.status_code == 429:
retry = resp.headers.get("Retry-After", "unknown")
result["error"] = f"Rate limited. Retry after {retry}s."
elif resp.status_code == 403:
result["error"] = "Blocked by Cloudflare."
else:
result["error"] = f"Unexpected status: {resp.status_code}"
except cloudscraper.exceptions.CloudflareChallengeError as e:
result["error"] = f"Cloudflare challenge failed: {e}"
result["status_code"] = 503
except Exception as e:
result["error"] = f"Request failed: {e}"
result["status_code"] = 500
return result
def parse_breaches(self, data: dict) -> list:
return [
{
"name": b.get("Name"),
"title": b.get("Title"),
"domain": b.get("Domain"),
"breach_date": b.get("BreachDate"),
"pwn_count": b.get("PwnCount"),
"data_classes": b.get("DataClasses", []),
"is_verified": b.get("IsVerified"),
"is_stealer_log": b.get("IsStealerLog"),
"logo": b.get("LogoPath"),
}
for b in (data.get("Breaches") or [])
]
# =============================================================================
# FASTAPI APP
# =============================================================================
app = FastAPI(
title="HaveIBeenPwned Proxy API",
description="HIBP search with Cloudflare bypass via cloudscraper",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
client = HIBPClient()
@app.get("/")
def index():
return {
"service": "HaveIBeenPwned Proxy API",
"version": "1.0.0",
"endpoints": {
"GET /": "This page",
"GET /search/{email}": "Raw HIBP search",
"GET /breaches/{email}": "Parsed breach summaries",
"GET /health": "Health check",
},
}
@app.get("/health")
def health():
return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"}
@app.get("/search/{email:path}")
def search(email: str):
if "@" not in email:
raise HTTPException(status_code=400, detail="Invalid email")
result = client.search_email(email)
return result
@app.get("/breaches/{email:path}")
def breaches(email: str):
if "@" not in email:
raise HTTPException(status_code=400, detail="Invalid email")
result = client.search_email(email)
if result["data"]:
parsed = client.parse_breaches(result["data"])
pastes = result["data"].get("Pastes") or []
return {
"email": email,
"timestamp": result["timestamp"],
"total_breaches": len(parsed),
"total_pastes": len(pastes),
"breaches": parsed,
"pastes": pastes,
"is_pwned": len(parsed) > 0 or len(pastes) > 0,
}
raise HTTPException(
status_code=result["status_code"] or 500,
detail=result["error"],
) |