import os import sys import re import asyncio import logging import aiohttp import shutil from pathlib import Path from typing import List, Dict, Optional logger = logging.getLogger("s3shastra.github") # ========================================== # CONFIGURATION # ========================================== # Use Environment Variable 'GITHUB_TOKEN' or set GITHUB_TOKEN_CONFIG below # NEVER hardcode tokens in source code GITHUB_TOKEN_CONFIG = os.getenv("GITHUB_TOKEN_CONFIG", "") # regex from SecretFinder (filtered for AWS keys/secrets) SECRET_REGEX = { 'amazon_aws_access_key_id': r'A[SK]IA[0-9A-Z]{16}', 'amazon_mws_auth_toke': r'amzn\\.mws\\.[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', 'aws_secret_key': r'(?i)aws_secret_access_key\s*=\s*[a-zA-Z0-9/+=]{40}', # Added common pattern for secret key } # Comprehensive Bucket Regex from scanner.py (reused for consistency) BUCKET_REGEX = re.compile(r"""(?ix) (?:[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]\.s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com |s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]) |(?:[a-z0-9.-]+\.storage\.googleapis\.com |storage\.googleapis\.com/[a-z0-9.-]+) |(?:[a-z0-9-]+\.blob\.core\.windows\.net) |(?:[a-z0-9.-]+\.(?:nyc3|ams3|sfo3|fra1|sgp1|tor1|blr1)\.digitaloceanspaces\.com) |(?:[a-z0-9.-]+\.objectstorage\.[a-z0-9-]+\.oraclecloud\.com) |(?:[a-z0-9.-]+\.[a-z0-9-]+\.linodeobjects\.com) |(?:s3\.[a-z0-9-]+\.wasabisys\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9] |[a-z0-9.-]+\.s3\.[a-z0-9-]+\.wasabisys\.com) |(?:[a-z0-9.-]+\.r2\.cloudflarestorage\.com) |(?:f\d+\.backblazeb2\.com/[a-z0-9./-]+) |(?:[a-z0-9.-]+\.oss-[a-z0-9-]+\.aliyuncs\.com) |(?:[a-z0-9.-]+\.cos\.[a-z0-9.-]+\.myqcloud\.com) |(?:[a-z0-9.-]+\.storage\.yandexcloud\.net) """, re.VERBOSE) IGNORED_EXTENSIONS = { '.png', '.jpg', '.jpeg', '.gif', '.pdf', '.svg', '.bmp', '.ico', '.tiff', '.mp4', '.mp3', '.avi', '.mov', '.zip', '.tar', '.gz', '.7z', '.rar', '.exe', '.dll', '.so', '.dylib', '.bin', '.iso', '.dmg', '.class', '.jar' } MAX_REPO_SIZE_MB = 100 TEMP_DIR = Path("temp_github_repos") # Add common user agents or tokens if needed GITHUB_HEADERS = { "Accept": "application/vnd.github.v3+json", "User-Agent": "S3Shastra-Scanner" } async def get_github_headers(): # Priority: Hardcoded Config > Environment Variable token = GITHUB_TOKEN_CONFIG if GITHUB_TOKEN_CONFIG else os.environ.get("GITHUB_TOKEN") headers = GITHUB_HEADERS.copy() if token: headers["Authorization"] = f"token {token}" return headers async def resolve_github_target(domain: str, websocket) -> Optional[str]: """ Attempts to resolve a domain to a GitHub organization or user. Strategies: 1. Try exact match (e.g. 'kalshi.com' -> users/kalshi.com) - rare but possible 2. Try name without TLD (e.g. 'kalshi.com' -> users/kalshi) 3. Search GitHub for the domain string and pick top user/org result """ headers = await get_github_headers() # Strategy 2: Strip TLD (Most likely) target_name = domain.split('.')[0] async with aiohttp.ClientSession(headers=headers) as session: # Check if direct user/org exists async def check_entity(name): url = f"https://api.github.com/users/{name}" async with session.get(url) as resp: if resp.status == 200: return name elif resp.status == 403: await websocket.send_json({"type": "log", "message": f"[GitHub] API Rate Limit hit while checking {name}."}) return None # Try stripped name first (e.g. 'kalshi') if await check_entity(target_name): return target_name # Try full domain (e.g. 'kalshi.com') if await check_entity(domain): return domain # Strategy 3: Search await websocket.send_json({"type": "status", "message": f"[GitHub] Direct match failed. Searching GitHub for '{domain}'..."}) search_url = f"https://api.github.com/search/users?q={domain}&type=org" async with session.get(search_url) as resp: if resp.status == 200: data = await resp.json() items = data.get("items", []) if items: # Pick the most relevant? For now, the first one. best_match = items[0]["login"] await websocket.send_json({"type": "status", "message": f"[GitHub] Search found: {best_match}"}) return best_match # fallback search for just the name part search_url_2 = f"https://api.github.com/search/users?q={target_name}&type=org" async with session.get(search_url_2) as resp2: if resp2.status == 200: data = await resp2.json() items = data.get("items", []) if items: best_match = items[0]["login"] await websocket.send_json({"type": "status", "message": f"[GitHub] Search (name) found: {best_match}"}) return best_match return None async def get_org_repos(org_name: str, websocket): """Fetches repositories for a GitHub organization/user.""" repos = [] page = 1 headers = await get_github_headers() async with aiohttp.ClientSession(headers=headers) as session: while True: url = f"https://api.github.com/users/{org_name}/repos?page={page}&per_page=100" async with session.get(url) as response: if response.status == 403: await websocket.send_json({"type": "error", "message": f"[GitHub] Rate limit exceeded. Set GITHUB_TOKEN to increase limits."}) break if response.status != 200: break data = await response.json() if not data: break for repo in data: repos.append({ "name": repo["name"], "clone_url": repo["clone_url"], "size": repo["size"], # Size is in KB "html_url": repo["html_url"], "default_branch": repo.get("default_branch", "master") }) page += 1 return repos async def clone_repo(repo, websocket): """Clones a repository.""" repo_path = TEMP_DIR / repo["name"] if repo_path.exists(): shutil.rmtree(repo_path, ignore_errors=True) size_mb = repo["size"] / 1024 cmd = ["git", "clone", repo["clone_url"], str(repo_path)] if size_mb > MAX_REPO_SIZE_MB: cmd.extend(["--depth", "1"]) else: # Shallow clone is always faster for scanning, maybe always default to depth 1 unless history is needed? # For secret scanning, history IS important but expensive. # User requested "bigger than 100mb do shallow", so we stick to that. pass # Suppress output proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await proc.wait() return repo_path def scan_content(content: str, file_path_str: str, repo_url: str) -> List[dict]: findings = [] # Check patterns for name, pattern in SECRET_REGEX.items(): for match in re.finditer(pattern, content): findings.append({ "type": "secret", "name": name, "match": match.group(0), "file": file_path_str, "repo": repo_url, "line": content.count('\n', 0, match.start()) + 1 }) # Check buckets for match in BUCKET_REGEX.finditer(content): findings.append({ "type": "bucket", "name": "Cloud Bucket", "match": match.group(0), "file": file_path_str, "repo": repo_url, "line": content.count('\n', 0, match.start()) + 1 }) return findings async def scan_repo(repo_path: Path, repo_url: str, websocket): findings = [] files_scanned = 0 # Use to_thread for file I/O heavy operations def _scan_sync(): local_findings = [] nonlocal files_scanned for root, _, files in os.walk(repo_path): if ".git" in root: continue for file in files: file_path = Path(root) / file if file_path.suffix.lower() in IGNORED_EXTENSIONS: continue try: # Try reading as utf-8, ignore errors with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() files_scanned += 1 # Optimization: Skip massive files? if len(content) > 10_000_000: # 10MB limit continue f_findings = scan_content(content, str(file_path), repo_url) local_findings.extend(f_findings) except Exception: pass return local_findings findings = await asyncio.to_thread(_scan_sync) return findings, files_scanned async def scan_github(domain: str, websocket): """Main entry point for GitHub scanning.""" try: await websocket.send_json({"type": "status", "message": f"[GitHub] Resolving target for {domain}..."}) org_name = await resolve_github_target(domain, websocket) if not org_name: await websocket.send_json({"type": "error", "message": f"[GitHub] Could not find a matching GitHub Organization or User for {domain}."}) return await websocket.send_json({"type": "status", "message": f"[GitHub] Resolved to: {org_name}. Fetching repos..."}) repos = await get_org_repos(org_name, websocket) if not repos: await websocket.send_json({"type": "status", "message": f"[GitHub] No repositories found for {org_name} (or rate limited)."}) return await websocket.send_json({"type": "status", "message": f"[GitHub] Found {len(repos)} repositories. Starting clone & scan..."}) # Ensure temp dir exists TEMP_DIR.mkdir(parents=True, exist_ok=True) sem = asyncio.Semaphore(3) total_files_scanned = 0 total_findings_count = 0 async def process_repo(repo): nonlocal total_files_scanned, total_findings_count async with sem: try: # Clone path = await clone_repo(repo, websocket) if not path.exists(): return # Scan repo_findings, repo_files_count = await scan_repo(path, repo["html_url"], websocket) total_files_scanned += repo_files_count total_findings_count += len(repo_findings) # Report for f in repo_findings: provider_label = "GitHub" status_label = "Found Secret" if f['type'] == 'secret' else "Found Bucket" result = { "subdomain": f['repo'], "reference": f['match'], "provider": provider_label, "status": status_label, "server": "GitHub", "url": f['match'], "extra": f"File: {f['file']} (L{f['line']})" } await websocket.send_json({"type": "result", "data": result}) # Cleanup shutil.rmtree(path, ignore_errors=True) except Exception as e: logger.error("Error processing repo %s: %s", repo['name'], e) tasks = [asyncio.create_task(process_repo(repo)) for repo in repos] await asyncio.gather(*tasks) # Final cleanup if TEMP_DIR.exists(): try: shutil.rmtree(TEMP_DIR, ignore_errors=True) except: pass await websocket.send_json({"type": "status", "message": f"[GitHub] Scan completed for {org_name}. Scanned {total_files_scanned} files across {len(repos)} repos. Found {total_findings_count} potential secrets/buckets."}) except Exception as e: logger.error("GitHub scan critical error: %s", e, exc_info=True) await websocket.send_json({"type": "error", "message": "[GitHub] A critical error occurred during scanning."})