HFGitDorker / dorker.py
Jack698's picture
Upload folder using huggingface_hub
2133ad1 verified
import asyncio
import httpx
import base64
import re
from itertools import cycle
# A dictionary of regex patterns to find secrets
SECRET_PATTERNS = {
"GitHub Token": r'ghp_[0-9a-zA-Z]{36}',
"GitHub App Token": r'ghu_[0-9a-zA-Z]{36}',
"GitHub Refresh Token": r'ghr_[0-9a-zA-Z]{76}',
"AWS Access Key": r'AKIA[0-9A-Z]{16}',
"Google API Key": r'AIza[0-9A-Za-z\-_]{35}',
"Heroku API Key": r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}',
"Stripe API Key": r'sk_live_[0-9a-zA-Z]{24}',
"Slack Token": r'xox[baprs]-[0-9a-zA-Z]{10,48}',
"SSH Private Key": r'-----BEGIN (RSA|OPENSSH|EC) PRIVATE KEY-----',
"Clash Subscription": r'https?://[\w.-]+(?:\:[0-9]+)?(?:/.*)?(?:clash|sub|v2ray|trojan|shadowsocks)(?:/.*)?',
}
GITHUB_API_URL = "https://api.github.com/search/code"
def urlencode(s: str) -> str:
return s.replace(':', '%3A').replace('"', '%22').replace(' ', '+')
async def search(tokens: list, query: str, dorks: list):
if not tokens:
yield "[ERROR] No GitHub tokens provided."
return
token_cycler = cycle(tokens)
headers = {"Accept": "application/vnd.github.v3+json"}
async with httpx.AsyncClient(timeout=30.0) as client:
for i, dork in enumerate(dorks):
full_query = f"{query} {dork}"
search_url = f"{GITHUB_API_URL}?q={urlencode(full_query)}&per_page=5" # Limit to top 5 results per dork
yield f"[INFO] [{i+1}/{len(dorks)}] Searching with dork: {dork}"
try:
current_token = next(token_cycler)
headers["Authorization"] = f"token {current_token}"
search_res = await client.get(search_url, headers=headers)
if search_res.status_code == 403: # Rate limit
yield "[WARN] Rate limit hit. Sleeping for 60s..."
await asyncio.sleep(60)
search_res = await client.get(search_url, headers=headers) # Retry
search_res.raise_for_status()
search_data = search_res.json()
if not search_data.get("items"):
yield f"[-] No files found for: {dork}"
await asyncio.sleep(2.1) # Still sleep to respect search rate limit
continue
# Stage 2: Fetch and scan content of found files
for item in search_data["items"]:
file_api_url = item.get("url")
if not file_api_url: continue
file_res = await client.get(file_api_url, headers=headers)
if file_res.status_code != 200: continue # Skip if we can't fetch content
content_b64 = file_res.json().get("content")
if not content_b64: continue
try:
decoded_content = base64.b64decode(content_b64).decode("utf-8")
except Exception:
continue # Skip non-utf8 files
# Scan for all secret patterns
for secret_type, pattern in SECRET_PATTERNS.items():
# Use re.MULTILINE to handle keys that span lines (like SSH keys)
matches = re.finditer(pattern, decoded_content, re.MULTILINE)
for match in matches:
# For clarity, show a snippet of the found secret
found_secret_snippet = match.group(0).strip().split('\n')[0] # Get first line of match
yield f"[+] FOUND [{secret_type}] in {item['html_url']}: {found_secret_snippet}..."
except httpx.HTTPStatusError as e:
yield f"[ERROR] HTTP Error for '{dork}': {e.response.status_code}"
except Exception as e:
yield f"[ERROR] Unexpected error for '{dork}': {str(e)}"
# Main sleep to respect the search API rate limit (30/min)
await asyncio.sleep(2.1)