Spaces:
Sleeping
Sleeping
| import asyncio | |
| import httpx | |
| import base64 | |
| import re | |
| from itertools import cycle | |
| # A dictionary of regex patterns to find secrets | |
| SECRET_PATTERNS = { | |
| "GitHub Token": r'ghp_[0-9a-zA-Z]{36}', | |
| "GitHub App Token": r'ghu_[0-9a-zA-Z]{36}', | |
| "GitHub Refresh Token": r'ghr_[0-9a-zA-Z]{76}', | |
| "AWS Access Key": r'AKIA[0-9A-Z]{16}', | |
| "Google API Key": r'AIza[0-9A-Za-z\-_]{35}', | |
| "Heroku API Key": r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}', | |
| "Stripe API Key": r'sk_live_[0-9a-zA-Z]{24}', | |
| "Slack Token": r'xox[baprs]-[0-9a-zA-Z]{10,48}', | |
| "SSH Private Key": r'-----BEGIN (RSA|OPENSSH|EC) PRIVATE KEY-----', | |
| "Clash Subscription": r'https?://[\w.-]+(?:\:[0-9]+)?(?:/.*)?(?:clash|sub|v2ray|trojan|shadowsocks)(?:/.*)?', | |
| } | |
| GITHUB_API_URL = "https://api.github.com/search/code" | |
| def urlencode(s: str) -> str: | |
| return s.replace(':', '%3A').replace('"', '%22').replace(' ', '+') | |
| async def search(tokens: list, query: str, dorks: list): | |
| if not tokens: | |
| yield "[ERROR] No GitHub tokens provided." | |
| return | |
| token_cycler = cycle(tokens) | |
| headers = {"Accept": "application/vnd.github.v3+json"} | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| for i, dork in enumerate(dorks): | |
| full_query = f"{query} {dork}" | |
| search_url = f"{GITHUB_API_URL}?q={urlencode(full_query)}&per_page=5" # Limit to top 5 results per dork | |
| yield f"[INFO] [{i+1}/{len(dorks)}] Searching with dork: {dork}" | |
| try: | |
| current_token = next(token_cycler) | |
| headers["Authorization"] = f"token {current_token}" | |
| search_res = await client.get(search_url, headers=headers) | |
| if search_res.status_code == 403: # Rate limit | |
| yield "[WARN] Rate limit hit. Sleeping for 60s..." | |
| await asyncio.sleep(60) | |
| search_res = await client.get(search_url, headers=headers) # Retry | |
| search_res.raise_for_status() | |
| search_data = search_res.json() | |
| if not search_data.get("items"): | |
| yield f"[-] No files found for: {dork}" | |
| await asyncio.sleep(2.1) # Still sleep to respect search rate limit | |
| continue | |
| # Stage 2: Fetch and scan content of found files | |
| for item in search_data["items"]: | |
| file_api_url = item.get("url") | |
| if not file_api_url: continue | |
| file_res = await client.get(file_api_url, headers=headers) | |
| if file_res.status_code != 200: continue # Skip if we can't fetch content | |
| content_b64 = file_res.json().get("content") | |
| if not content_b64: continue | |
| try: | |
| decoded_content = base64.b64decode(content_b64).decode("utf-8") | |
| except Exception: | |
| continue # Skip non-utf8 files | |
| # Scan for all secret patterns | |
| for secret_type, pattern in SECRET_PATTERNS.items(): | |
| # Use re.MULTILINE to handle keys that span lines (like SSH keys) | |
| matches = re.finditer(pattern, decoded_content, re.MULTILINE) | |
| for match in matches: | |
| # For clarity, show a snippet of the found secret | |
| found_secret_snippet = match.group(0).strip().split('\n')[0] # Get first line of match | |
| yield f"[+] FOUND [{secret_type}] in {item['html_url']}: {found_secret_snippet}..." | |
| except httpx.HTTPStatusError as e: | |
| yield f"[ERROR] HTTP Error for '{dork}': {e.response.status_code}" | |
| except Exception as e: | |
| yield f"[ERROR] Unexpected error for '{dork}': {str(e)}" | |
| # Main sleep to respect the search API rate limit (30/min) | |
| await asyncio.sleep(2.1) |