s3shastra / github_scanner.py
Atharv834
Deploy S3Shastra backend - FastAPI + scanners + ML models
6a4dcb6
import os
import sys
import re
import asyncio
import logging
import aiohttp
import shutil
from pathlib import Path
from typing import List, Dict, Optional
logger = logging.getLogger("s3shastra.github")
# ==========================================
# CONFIGURATION
# ==========================================
# Use Environment Variable 'GITHUB_TOKEN' or set GITHUB_TOKEN_CONFIG below
# NEVER hardcode tokens in source code
GITHUB_TOKEN_CONFIG = os.getenv("GITHUB_TOKEN_CONFIG", "")
# regex from SecretFinder (filtered for AWS keys/secrets)
SECRET_REGEX = {
'amazon_aws_access_key_id': r'A[SK]IA[0-9A-Z]{16}',
'amazon_mws_auth_toke': r'amzn\\.mws\\.[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'aws_secret_key': r'(?i)aws_secret_access_key\s*=\s*[a-zA-Z0-9/+=]{40}', # Added common pattern for secret key
}
# Comprehensive Bucket Regex from scanner.py (reused for consistency)
BUCKET_REGEX = re.compile(r"""(?ix)
(?:[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]\.s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com
|s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9])
|(?:[a-z0-9.-]+\.storage\.googleapis\.com
|storage\.googleapis\.com/[a-z0-9.-]+)
|(?:[a-z0-9-]+\.blob\.core\.windows\.net)
|(?:[a-z0-9.-]+\.(?:nyc3|ams3|sfo3|fra1|sgp1|tor1|blr1)\.digitaloceanspaces\.com)
|(?:[a-z0-9.-]+\.objectstorage\.[a-z0-9-]+\.oraclecloud\.com)
|(?:[a-z0-9.-]+\.[a-z0-9-]+\.linodeobjects\.com)
|(?:s3\.[a-z0-9-]+\.wasabisys\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]
|[a-z0-9.-]+\.s3\.[a-z0-9-]+\.wasabisys\.com)
|(?:[a-z0-9.-]+\.r2\.cloudflarestorage\.com)
|(?:f\d+\.backblazeb2\.com/[a-z0-9./-]+)
|(?:[a-z0-9.-]+\.oss-[a-z0-9-]+\.aliyuncs\.com)
|(?:[a-z0-9.-]+\.cos\.[a-z0-9.-]+\.myqcloud\.com)
|(?:[a-z0-9.-]+\.storage\.yandexcloud\.net)
""", re.VERBOSE)
IGNORED_EXTENSIONS = {
'.png', '.jpg', '.jpeg', '.gif', '.pdf', '.svg', '.bmp', '.ico', '.tiff',
'.mp4', '.mp3', '.avi', '.mov', '.zip', '.tar', '.gz', '.7z', '.rar',
'.exe', '.dll', '.so', '.dylib', '.bin', '.iso', '.dmg', '.class', '.jar'
}
MAX_REPO_SIZE_MB = 100
TEMP_DIR = Path("temp_github_repos")
# Add common user agents or tokens if needed
GITHUB_HEADERS = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "S3Shastra-Scanner"
}
async def get_github_headers():
# Priority: Hardcoded Config > Environment Variable
token = GITHUB_TOKEN_CONFIG if GITHUB_TOKEN_CONFIG else os.environ.get("GITHUB_TOKEN")
headers = GITHUB_HEADERS.copy()
if token:
headers["Authorization"] = f"token {token}"
return headers
async def resolve_github_target(domain: str, websocket) -> Optional[str]:
"""
Attempts to resolve a domain to a GitHub organization or user.
Strategies:
1. Try exact match (e.g. 'kalshi.com' -> users/kalshi.com) - rare but possible
2. Try name without TLD (e.g. 'kalshi.com' -> users/kalshi)
3. Search GitHub for the domain string and pick top user/org result
"""
headers = await get_github_headers()
# Strategy 2: Strip TLD (Most likely)
target_name = domain.split('.')[0]
async with aiohttp.ClientSession(headers=headers) as session:
# Check if direct user/org exists
async def check_entity(name):
url = f"https://api.github.com/users/{name}"
async with session.get(url) as resp:
if resp.status == 200:
return name
elif resp.status == 403:
await websocket.send_json({"type": "log", "message": f"[GitHub] API Rate Limit hit while checking {name}."})
return None
# Try stripped name first (e.g. 'kalshi')
if await check_entity(target_name):
return target_name
# Try full domain (e.g. 'kalshi.com')
if await check_entity(domain):
return domain
# Strategy 3: Search
await websocket.send_json({"type": "status", "message": f"[GitHub] Direct match failed. Searching GitHub for '{domain}'..."})
search_url = f"https://api.github.com/search/users?q={domain}&type=org"
async with session.get(search_url) as resp:
if resp.status == 200:
data = await resp.json()
items = data.get("items", [])
if items:
# Pick the most relevant? For now, the first one.
best_match = items[0]["login"]
await websocket.send_json({"type": "status", "message": f"[GitHub] Search found: {best_match}"})
return best_match
# fallback search for just the name part
search_url_2 = f"https://api.github.com/search/users?q={target_name}&type=org"
async with session.get(search_url_2) as resp2:
if resp2.status == 200:
data = await resp2.json()
items = data.get("items", [])
if items:
best_match = items[0]["login"]
await websocket.send_json({"type": "status", "message": f"[GitHub] Search (name) found: {best_match}"})
return best_match
return None
async def get_org_repos(org_name: str, websocket):
"""Fetches repositories for a GitHub organization/user."""
repos = []
page = 1
headers = await get_github_headers()
async with aiohttp.ClientSession(headers=headers) as session:
while True:
url = f"https://api.github.com/users/{org_name}/repos?page={page}&per_page=100"
async with session.get(url) as response:
if response.status == 403:
await websocket.send_json({"type": "error", "message": f"[GitHub] Rate limit exceeded. Set GITHUB_TOKEN to increase limits."})
break
if response.status != 200:
break
data = await response.json()
if not data:
break
for repo in data:
repos.append({
"name": repo["name"],
"clone_url": repo["clone_url"],
"size": repo["size"], # Size is in KB
"html_url": repo["html_url"],
"default_branch": repo.get("default_branch", "master")
})
page += 1
return repos
async def clone_repo(repo, websocket):
"""Clones a repository."""
repo_path = TEMP_DIR / repo["name"]
if repo_path.exists():
shutil.rmtree(repo_path, ignore_errors=True)
size_mb = repo["size"] / 1024
cmd = ["git", "clone", repo["clone_url"], str(repo_path)]
if size_mb > MAX_REPO_SIZE_MB:
cmd.extend(["--depth", "1"])
else:
# Shallow clone is always faster for scanning, maybe always default to depth 1 unless history is needed?
# For secret scanning, history IS important but expensive.
# User requested "bigger than 100mb do shallow", so we stick to that.
pass
# Suppress output
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
return repo_path
def scan_content(content: str, file_path_str: str, repo_url: str) -> List[dict]:
findings = []
# Check patterns
for name, pattern in SECRET_REGEX.items():
for match in re.finditer(pattern, content):
findings.append({
"type": "secret",
"name": name,
"match": match.group(0),
"file": file_path_str,
"repo": repo_url,
"line": content.count('\n', 0, match.start()) + 1
})
# Check buckets
for match in BUCKET_REGEX.finditer(content):
findings.append({
"type": "bucket",
"name": "Cloud Bucket",
"match": match.group(0),
"file": file_path_str,
"repo": repo_url,
"line": content.count('\n', 0, match.start()) + 1
})
return findings
async def scan_repo(repo_path: Path, repo_url: str, websocket):
findings = []
files_scanned = 0
# Use to_thread for file I/O heavy operations
def _scan_sync():
local_findings = []
nonlocal files_scanned
for root, _, files in os.walk(repo_path):
if ".git" in root: continue
for file in files:
file_path = Path(root) / file
if file_path.suffix.lower() in IGNORED_EXTENSIONS:
continue
try:
# Try reading as utf-8, ignore errors
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
files_scanned += 1
# Optimization: Skip massive files?
if len(content) > 10_000_000: # 10MB limit
continue
f_findings = scan_content(content, str(file_path), repo_url)
local_findings.extend(f_findings)
except Exception:
pass
return local_findings
findings = await asyncio.to_thread(_scan_sync)
return findings, files_scanned
async def scan_github(domain: str, websocket):
"""Main entry point for GitHub scanning."""
try:
await websocket.send_json({"type": "status", "message": f"[GitHub] Resolving target for {domain}..."})
org_name = await resolve_github_target(domain, websocket)
if not org_name:
await websocket.send_json({"type": "error", "message": f"[GitHub] Could not find a matching GitHub Organization or User for {domain}."})
return
await websocket.send_json({"type": "status", "message": f"[GitHub] Resolved to: {org_name}. Fetching repos..."})
repos = await get_org_repos(org_name, websocket)
if not repos:
await websocket.send_json({"type": "status", "message": f"[GitHub] No repositories found for {org_name} (or rate limited)."})
return
await websocket.send_json({"type": "status", "message": f"[GitHub] Found {len(repos)} repositories. Starting clone & scan..."})
# Ensure temp dir exists
TEMP_DIR.mkdir(parents=True, exist_ok=True)
sem = asyncio.Semaphore(3)
total_files_scanned = 0
total_findings_count = 0
async def process_repo(repo):
nonlocal total_files_scanned, total_findings_count
async with sem:
try:
# Clone
path = await clone_repo(repo, websocket)
if not path.exists():
return
# Scan
repo_findings, repo_files_count = await scan_repo(path, repo["html_url"], websocket)
total_files_scanned += repo_files_count
total_findings_count += len(repo_findings)
# Report
for f in repo_findings:
provider_label = "GitHub"
status_label = "Found Secret" if f['type'] == 'secret' else "Found Bucket"
result = {
"subdomain": f['repo'],
"reference": f['match'],
"provider": provider_label,
"status": status_label,
"server": "GitHub",
"url": f['match'],
"extra": f"File: {f['file']} (L{f['line']})"
}
await websocket.send_json({"type": "result", "data": result})
# Cleanup
shutil.rmtree(path, ignore_errors=True)
except Exception as e:
logger.error("Error processing repo %s: %s", repo['name'], e)
tasks = [asyncio.create_task(process_repo(repo)) for repo in repos]
await asyncio.gather(*tasks)
# Final cleanup
if TEMP_DIR.exists():
try:
shutil.rmtree(TEMP_DIR, ignore_errors=True)
except: pass
await websocket.send_json({"type": "status", "message": f"[GitHub] Scan completed for {org_name}. Scanned {total_files_scanned} files across {len(repos)} repos. Found {total_findings_count} potential secrets/buckets."})
except Exception as e:
logger.error("GitHub scan critical error: %s", e, exc_info=True)
await websocket.send_json({"type": "error", "message": "[GitHub] A critical error occurred during scanning."})