|
|
| import os |
| import sys |
| import re |
| import asyncio |
| import logging |
| import aiohttp |
| import shutil |
| from pathlib import Path |
| from typing import List, Dict, Optional |
|
|
| logger = logging.getLogger("s3shastra.github") |
|
|
| |
| |
| |
| |
| |
| GITHUB_TOKEN_CONFIG = os.getenv("GITHUB_TOKEN_CONFIG", "") |
|
|
| |
| SECRET_REGEX = { |
| 'amazon_aws_access_key_id': r'A[SK]IA[0-9A-Z]{16}', |
| 'amazon_mws_auth_toke': r'amzn\\.mws\\.[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', |
| 'aws_secret_key': r'(?i)aws_secret_access_key\s*=\s*[a-zA-Z0-9/+=]{40}', |
| } |
|
|
| |
| BUCKET_REGEX = re.compile(r"""(?ix) |
| (?:[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]\.s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com |
| |s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]) |
| |(?:[a-z0-9.-]+\.storage\.googleapis\.com |
| |storage\.googleapis\.com/[a-z0-9.-]+) |
| |(?:[a-z0-9-]+\.blob\.core\.windows\.net) |
| |(?:[a-z0-9.-]+\.(?:nyc3|ams3|sfo3|fra1|sgp1|tor1|blr1)\.digitaloceanspaces\.com) |
| |(?:[a-z0-9.-]+\.objectstorage\.[a-z0-9-]+\.oraclecloud\.com) |
| |(?:[a-z0-9.-]+\.[a-z0-9-]+\.linodeobjects\.com) |
| |(?:s3\.[a-z0-9-]+\.wasabisys\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9] |
| |[a-z0-9.-]+\.s3\.[a-z0-9-]+\.wasabisys\.com) |
| |(?:[a-z0-9.-]+\.r2\.cloudflarestorage\.com) |
| |(?:f\d+\.backblazeb2\.com/[a-z0-9./-]+) |
| |(?:[a-z0-9.-]+\.oss-[a-z0-9-]+\.aliyuncs\.com) |
| |(?:[a-z0-9.-]+\.cos\.[a-z0-9.-]+\.myqcloud\.com) |
| |(?:[a-z0-9.-]+\.storage\.yandexcloud\.net) |
| """, re.VERBOSE) |
|
|
| IGNORED_EXTENSIONS = { |
| '.png', '.jpg', '.jpeg', '.gif', '.pdf', '.svg', '.bmp', '.ico', '.tiff', |
| '.mp4', '.mp3', '.avi', '.mov', '.zip', '.tar', '.gz', '.7z', '.rar', |
| '.exe', '.dll', '.so', '.dylib', '.bin', '.iso', '.dmg', '.class', '.jar' |
| } |
|
|
| MAX_REPO_SIZE_MB = 100 |
| TEMP_DIR = Path("temp_github_repos") |
|
|
| |
| GITHUB_HEADERS = { |
| "Accept": "application/vnd.github.v3+json", |
| "User-Agent": "S3Shastra-Scanner" |
| } |
|
|
| async def get_github_headers(): |
| |
| token = GITHUB_TOKEN_CONFIG if GITHUB_TOKEN_CONFIG else os.environ.get("GITHUB_TOKEN") |
| |
| headers = GITHUB_HEADERS.copy() |
| if token: |
| headers["Authorization"] = f"token {token}" |
| return headers |
|
|
| async def resolve_github_target(domain: str, websocket) -> Optional[str]: |
| """ |
| Attempts to resolve a domain to a GitHub organization or user. |
| Strategies: |
| 1. Try exact match (e.g. 'kalshi.com' -> users/kalshi.com) - rare but possible |
| 2. Try name without TLD (e.g. 'kalshi.com' -> users/kalshi) |
| 3. Search GitHub for the domain string and pick top user/org result |
| """ |
| headers = await get_github_headers() |
| |
| |
| target_name = domain.split('.')[0] |
| |
| async with aiohttp.ClientSession(headers=headers) as session: |
| |
| async def check_entity(name): |
| url = f"https://api.github.com/users/{name}" |
| async with session.get(url) as resp: |
| if resp.status == 200: |
| return name |
| elif resp.status == 403: |
| await websocket.send_json({"type": "log", "message": f"[GitHub] API Rate Limit hit while checking {name}."}) |
| return None |
|
|
| |
| if await check_entity(target_name): |
| return target_name |
| |
| |
| if await check_entity(domain): |
| return domain |
|
|
| |
| await websocket.send_json({"type": "status", "message": f"[GitHub] Direct match failed. Searching GitHub for '{domain}'..."}) |
| search_url = f"https://api.github.com/search/users?q={domain}&type=org" |
| async with session.get(search_url) as resp: |
| if resp.status == 200: |
| data = await resp.json() |
| items = data.get("items", []) |
| if items: |
| |
| best_match = items[0]["login"] |
| await websocket.send_json({"type": "status", "message": f"[GitHub] Search found: {best_match}"}) |
| return best_match |
| |
| |
| search_url_2 = f"https://api.github.com/search/users?q={target_name}&type=org" |
| async with session.get(search_url_2) as resp2: |
| if resp2.status == 200: |
| data = await resp2.json() |
| items = data.get("items", []) |
| if items: |
| best_match = items[0]["login"] |
| await websocket.send_json({"type": "status", "message": f"[GitHub] Search (name) found: {best_match}"}) |
| return best_match |
|
|
| return None |
|
|
| async def get_org_repos(org_name: str, websocket): |
| """Fetches repositories for a GitHub organization/user.""" |
| repos = [] |
| page = 1 |
| headers = await get_github_headers() |
|
|
| async with aiohttp.ClientSession(headers=headers) as session: |
| while True: |
| url = f"https://api.github.com/users/{org_name}/repos?page={page}&per_page=100" |
| async with session.get(url) as response: |
| if response.status == 403: |
| await websocket.send_json({"type": "error", "message": f"[GitHub] Rate limit exceeded. Set GITHUB_TOKEN to increase limits."}) |
| break |
| |
| if response.status != 200: |
| break |
| |
| data = await response.json() |
| if not data: |
| break |
| |
| for repo in data: |
| repos.append({ |
| "name": repo["name"], |
| "clone_url": repo["clone_url"], |
| "size": repo["size"], |
| "html_url": repo["html_url"], |
| "default_branch": repo.get("default_branch", "master") |
| }) |
| page += 1 |
| return repos |
|
|
| async def clone_repo(repo, websocket): |
| """Clones a repository.""" |
| repo_path = TEMP_DIR / repo["name"] |
| if repo_path.exists(): |
| shutil.rmtree(repo_path, ignore_errors=True) |
|
|
| size_mb = repo["size"] / 1024 |
| |
| cmd = ["git", "clone", repo["clone_url"], str(repo_path)] |
| if size_mb > MAX_REPO_SIZE_MB: |
| cmd.extend(["--depth", "1"]) |
| else: |
| |
| |
| |
| pass |
|
|
| |
| proc = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.DEVNULL, |
| stderr=asyncio.subprocess.DEVNULL |
| ) |
| await proc.wait() |
| return repo_path |
|
|
| def scan_content(content: str, file_path_str: str, repo_url: str) -> List[dict]: |
| findings = [] |
| |
| |
| for name, pattern in SECRET_REGEX.items(): |
| for match in re.finditer(pattern, content): |
| findings.append({ |
| "type": "secret", |
| "name": name, |
| "match": match.group(0), |
| "file": file_path_str, |
| "repo": repo_url, |
| "line": content.count('\n', 0, match.start()) + 1 |
| }) |
|
|
| |
| for match in BUCKET_REGEX.finditer(content): |
| findings.append({ |
| "type": "bucket", |
| "name": "Cloud Bucket", |
| "match": match.group(0), |
| "file": file_path_str, |
| "repo": repo_url, |
| "line": content.count('\n', 0, match.start()) + 1 |
| }) |
| |
| return findings |
|
|
| async def scan_repo(repo_path: Path, repo_url: str, websocket): |
| findings = [] |
| files_scanned = 0 |
| |
| |
| def _scan_sync(): |
| local_findings = [] |
| nonlocal files_scanned |
| for root, _, files in os.walk(repo_path): |
| if ".git" in root: continue |
| |
| for file in files: |
| file_path = Path(root) / file |
| if file_path.suffix.lower() in IGNORED_EXTENSIONS: |
| continue |
| |
| try: |
| |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
| content = f.read() |
| files_scanned += 1 |
| |
| if len(content) > 10_000_000: |
| continue |
| |
| f_findings = scan_content(content, str(file_path), repo_url) |
| local_findings.extend(f_findings) |
| except Exception: |
| pass |
| return local_findings |
|
|
| findings = await asyncio.to_thread(_scan_sync) |
| return findings, files_scanned |
|
|
| async def scan_github(domain: str, websocket): |
| """Main entry point for GitHub scanning.""" |
| try: |
| await websocket.send_json({"type": "status", "message": f"[GitHub] Resolving target for {domain}..."}) |
| |
| org_name = await resolve_github_target(domain, websocket) |
| |
| if not org_name: |
| await websocket.send_json({"type": "error", "message": f"[GitHub] Could not find a matching GitHub Organization or User for {domain}."}) |
| return |
|
|
| await websocket.send_json({"type": "status", "message": f"[GitHub] Resolved to: {org_name}. Fetching repos..."}) |
| repos = await get_org_repos(org_name, websocket) |
| |
| if not repos: |
| await websocket.send_json({"type": "status", "message": f"[GitHub] No repositories found for {org_name} (or rate limited)."}) |
| return |
|
|
| await websocket.send_json({"type": "status", "message": f"[GitHub] Found {len(repos)} repositories. Starting clone & scan..."}) |
|
|
| |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| sem = asyncio.Semaphore(3) |
| total_files_scanned = 0 |
| total_findings_count = 0 |
|
|
| async def process_repo(repo): |
| nonlocal total_files_scanned, total_findings_count |
| async with sem: |
| try: |
| |
| path = await clone_repo(repo, websocket) |
| if not path.exists(): |
| return |
| |
| |
| repo_findings, repo_files_count = await scan_repo(path, repo["html_url"], websocket) |
| total_files_scanned += repo_files_count |
| total_findings_count += len(repo_findings) |
| |
| |
| for f in repo_findings: |
| provider_label = "GitHub" |
| status_label = "Found Secret" if f['type'] == 'secret' else "Found Bucket" |
| |
| result = { |
| "subdomain": f['repo'], |
| "reference": f['match'], |
| "provider": provider_label, |
| "status": status_label, |
| "server": "GitHub", |
| "url": f['match'], |
| "extra": f"File: {f['file']} (L{f['line']})" |
| } |
| await websocket.send_json({"type": "result", "data": result}) |
| |
| |
| shutil.rmtree(path, ignore_errors=True) |
| except Exception as e: |
| logger.error("Error processing repo %s: %s", repo['name'], e) |
|
|
| tasks = [asyncio.create_task(process_repo(repo)) for repo in repos] |
| await asyncio.gather(*tasks) |
| |
| |
| if TEMP_DIR.exists(): |
| try: |
| shutil.rmtree(TEMP_DIR, ignore_errors=True) |
| except: pass |
| |
| await websocket.send_json({"type": "status", "message": f"[GitHub] Scan completed for {org_name}. Scanned {total_files_scanned} files across {len(repos)} repos. Found {total_findings_count} potential secrets/buckets."}) |
|
|
| except Exception as e: |
| logger.error("GitHub scan critical error: %s", e, exc_info=True) |
| await websocket.send_json({"type": "error", "message": "[GitHub] A critical error occurred during scanning."}) |
|
|
|
|