import gradio as gr import subprocess import os import shutil import tempfile import zipfile from pathlib import Path import docker import git def cleanup_resources(): """Clean up temporary directories, Docker images, and Trivy cache""" cleanup_log = [] # Clean tmp directories tmp_dirs = ['/tmp/trivy', '/tmp/uploads'] for tmp_dir in tmp_dirs: if os.path.exists(tmp_dir): try: shutil.rmtree(tmp_dir) os.makedirs(tmp_dir, exist_ok=True) cleanup_log.append(f"✓ Cleaned {tmp_dir}") except Exception as e: cleanup_log.append(f"✗ Error cleaning {tmp_dir}: {str(e)}") # Clean Docker images (dangling and unused) try: client = docker.from_env() # Remove dangling images dangling_images = client.images.list(filters={'dangling': True}) for img in dangling_images: try: client.images.remove(img.id, force=True) cleanup_log.append(f"✓ Removed dangling image: {img.id[:12]}") except Exception as e: cleanup_log.append(f"✗ Error removing image {img.id[:12]}: {str(e)}") except Exception as e: cleanup_log.append(f"✗ Docker cleanup error: {str(e)}") # Run trivy clean --all try: result = subprocess.run( ['trivy', 'clean', '--all'], capture_output=True, text=True, timeout=60 ) if result.returncode == 0: cleanup_log.append("✓ Trivy cache cleaned successfully") else: cleanup_log.append(f"✗ Trivy clean error: {result.stderr}") except Exception as e: cleanup_log.append(f"✗ Trivy clean exception: {str(e)}") return "\n".join(cleanup_log) def scan_docker_image(image_name, scan_type="vuln"): """Scan a Docker image using Trivy""" if not image_name.strip(): return "❌ Error: Please provide a Docker image name", "" cleanup_log = cleanup_resources() try: # Build trivy command scanners = scan_type if scan_type else "vuln" cmd = [ 'trivy', 'image', '--scanners', scanners, '--timeout', '10m', image_name.strip() ] # Run trivy scan result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 ) output = result.stdout + result.stderr if result.returncode == 0: status = f"✅ Scan completed successfully for image: {image_name}" else: status = f"⚠️ Scan completed with warnings for image: {image_name}" return status, output, cleanup_log except subprocess.TimeoutExpired: return f"❌ Error: Scan timed out for image: {image_name}", "", cleanup_log except Exception as e: return f"❌ Error: {str(e)}", "", cleanup_log def scan_github_repo(repo_url, scan_type="vuln,secret,misconfig"): """Clone and scan a GitHub repository using Trivy""" if not repo_url.strip(): return "❌ Error: Please provide a GitHub repository URL", "" cleanup_log = cleanup_resources() temp_dir = None try: # Create temporary directory for cloning temp_dir = tempfile.mkdtemp(dir='/tmp/trivy') # Clone repository status = f"📥 Cloning repository: {repo_url}\n" git.Repo.clone_from(repo_url.strip(), temp_dir, depth=1) status += "✓ Repository cloned successfully\n\n" # Build trivy command scanners = scan_type if scan_type else "vuln,secret,misconfig" cmd = [ 'trivy', 'fs', '--scanners', scanners, '--timeout', '10m', temp_dir ] # Run trivy scan result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 ) output = result.stdout + result.stderr if result.returncode == 0: status += f"✅ Scan completed successfully" else: status += f"⚠️ Scan completed with warnings" return status, output, cleanup_log except git.exc.GitCommandError as e: return f"❌ Git Error: {str(e)}", "", cleanup_log except subprocess.TimeoutExpired: return f"❌ Error: Scan timed out", "", cleanup_log except Exception as e: return f"❌ Error: {str(e)}", "", cleanup_log finally: # Clean up cloned repository if temp_dir and os.path.exists(temp_dir): shutil.rmtree(temp_dir, ignore_errors=True) def scan_uploaded_zip(zip_file, scan_type="vuln,secret,misconfig"): """Extract and scan an uploaded ZIP file using Trivy""" if zip_file is None: return "❌ Error: Please upload a ZIP file", "" cleanup_log = cleanup_resources() extract_dir = None try: # Create temporary directory for extraction extract_dir = tempfile.mkdtemp(dir='/tmp/uploads') # Extract ZIP file status = f"📦 Extracting ZIP file: {os.path.basename(zip_file.name)}\n" with zipfile.ZipFile(zip_file.name, 'r') as zip_ref: zip_ref.extractall(extract_dir) status += "✓ ZIP file extracted successfully\n\n" # Build trivy command scanners = scan_type if scan_type else "vuln,secret,misconfig" cmd = [ 'trivy', 'fs', '--scanners', scanners, '--timeout', '10m', extract_dir ] # Run trivy scan result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 ) output = result.stdout + result.stderr if result.returncode == 0: status += f"✅ Scan completed successfully" else: status += f"⚠️ Scan completed with warnings" return status, output, cleanup_log except zipfile.BadZipFile: return f"❌ Error: Invalid ZIP file", "", cleanup_log except subprocess.TimeoutExpired: return f"❌ Error: Scan timed out", "", cleanup_log except Exception as e: return f"❌ Error: {str(e)}", "", cleanup_log finally: # Clean up extracted files if extract_dir and os.path.exists(extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) def create_ui(): """Create Gradio UI""" with gr.Blocks(title="Trivy Security Scanner") as demo: gr.Markdown( """ # 🔒 Trivy Security Scanner Scan Docker images, GitHub repositories, or ZIP files for vulnerabilities, secrets, and misconfigurations. **Powered by [Aqua Security Trivy](https://github.com/aquasecurity/trivy)** """ ) with gr.Tabs(): # Docker Image Scan Tab with gr.Tab("🐳 Docker Image"): gr.Markdown("### Scan a Docker Image") with gr.Row(): with gr.Column(scale=2): docker_image = gr.Textbox( label="Docker Image", placeholder="e.g., python:3.4-alpine, nginx:latest, ubuntu:20.04" ) gr.Markdown("Enter a Docker image name and tag") with gr.Column(scale=1): docker_scanners = gr.Dropdown( choices=["vuln", "secret", "misconfig", "vuln,secret", "vuln,secret,misconfig", "vuln,misconfig"], value="vuln", label="Scanners", allow_custom_value=True ) gr.Markdown("Select scan types") docker_button = gr.Button("🔍 Scan Docker Image", variant="primary") docker_status = gr.Textbox(label="Status", lines=2) docker_output = gr.Textbox(label="Scan Results", lines=20, max_lines=50) docker_cleanup = gr.Textbox(label="Cleanup Log", lines=5) gr.Examples( examples=[ ["python:3.4-alpine", "vuln"], ["nginx:latest", "vuln,misconfig"], ["alpine:3.14", "vuln,secret"], ], inputs=[docker_image, docker_scanners], ) # GitHub Repository Scan Tab with gr.Tab("📦 GitHub Repository"): gr.Markdown("### Scan a GitHub Repository") with gr.Row(): with gr.Column(scale=2): github_url = gr.Textbox( label="GitHub Repository URL", placeholder="e.g., https://github.com/username/repository" ) gr.Markdown("Enter a public GitHub repository URL") with gr.Column(scale=1): github_scanners = gr.Dropdown( choices=["vuln", "secret", "misconfig", "vuln,secret", "vuln,secret,misconfig", "vuln,misconfig"], value="vuln,secret,misconfig", label="Scanners", allow_custom_value=True ) gr.Markdown("Select scan types") github_button = gr.Button("🔍 Scan Repository", variant="primary") github_status = gr.Textbox(label="Status", lines=3) github_output = gr.Textbox(label="Scan Results", lines=20, max_lines=50) github_cleanup = gr.Textbox(label="Cleanup Log", lines=5) gr.Examples( examples=[ ["https://github.com/aquasecurity/trivy", "vuln,secret,misconfig"], ], inputs=[github_url, github_scanners], ) # ZIP File Scan Tab with gr.Tab("📁 Upload ZIP"): gr.Markdown("### Scan an Uploaded ZIP File") with gr.Row(): with gr.Column(scale=2): zip_upload = gr.File( label="Upload ZIP File", file_types=[".zip"], type="filepath" ) gr.Markdown("Upload a ZIP file containing your project") with gr.Column(scale=1): zip_scanners = gr.Dropdown( choices=["vuln", "secret", "misconfig", "vuln,secret", "vuln,secret,misconfig", "vuln,misconfig"], value="vuln,secret,misconfig", label="Scanners", allow_custom_value=True ) gr.Markdown("Select scan types") zip_button = gr.Button("🔍 Scan ZIP File", variant="primary") zip_status = gr.Textbox(label="Status", lines=3) zip_output = gr.Textbox(label="Scan Results", lines=20, max_lines=50) zip_cleanup = gr.Textbox(label="Cleanup Log", lines=5) gr.Markdown( """ --- ### ℹ️ Information - **Vulnerabilities (vuln)**: Scans for known CVEs in dependencies and OS packages - **Secrets (secret)**: Detects hardcoded secrets, API keys, and credentials - **Misconfigurations (misconfig)**: Checks for security misconfigurations in IaC files **Note**: Resources are automatically cleaned up after each scan. """ ) # Connect buttons to functions docker_button.click( fn=scan_docker_image, inputs=[docker_image, docker_scanners], outputs=[docker_status, docker_output, docker_cleanup] ) github_button.click( fn=scan_github_repo, inputs=[github_url, github_scanners], outputs=[github_status, github_output, github_cleanup] ) zip_button.click( fn=scan_uploaded_zip, inputs=[zip_upload, zip_scanners], outputs=[zip_status, zip_output, zip_cleanup] ) return demo if __name__ == "__main__": # Create necessary directories os.makedirs('/tmp/trivy', exist_ok=True) os.makedirs('/tmp/uploads', exist_ok=True) # Launch Gradio app demo = create_ui() demo.launch( server_name="0.0.0.0", server_port=7860, share=False )