Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import subprocess | |
| import os | |
| import shutil | |
| import tempfile | |
| import zipfile | |
| from pathlib import Path | |
| import docker | |
| import git | |
| def cleanup_resources(): | |
| """Clean up temporary directories, Docker images, and Trivy cache""" | |
| cleanup_log = [] | |
| # Clean tmp directories | |
| tmp_dirs = ['/tmp/trivy', '/tmp/uploads'] | |
| for tmp_dir in tmp_dirs: | |
| if os.path.exists(tmp_dir): | |
| try: | |
| shutil.rmtree(tmp_dir) | |
| os.makedirs(tmp_dir, exist_ok=True) | |
| cleanup_log.append(f"β Cleaned {tmp_dir}") | |
| except Exception as e: | |
| cleanup_log.append(f"β Error cleaning {tmp_dir}: {str(e)}") | |
| # Clean Docker images (dangling and unused) | |
| try: | |
| client = docker.from_env() | |
| # Remove dangling images | |
| dangling_images = client.images.list(filters={'dangling': True}) | |
| for img in dangling_images: | |
| try: | |
| client.images.remove(img.id, force=True) | |
| cleanup_log.append(f"β Removed dangling image: {img.id[:12]}") | |
| except Exception as e: | |
| cleanup_log.append(f"β Error removing image {img.id[:12]}: {str(e)}") | |
| except Exception as e: | |
| cleanup_log.append(f"β Docker cleanup error: {str(e)}") | |
| # Run trivy clean --all | |
| try: | |
| result = subprocess.run( | |
| ['trivy', 'clean', '--all'], | |
| capture_output=True, | |
| text=True, | |
| timeout=60 | |
| ) | |
| if result.returncode == 0: | |
| cleanup_log.append("β Trivy cache cleaned successfully") | |
| else: | |
| cleanup_log.append(f"β Trivy clean error: {result.stderr}") | |
| except Exception as e: | |
| cleanup_log.append(f"β Trivy clean exception: {str(e)}") | |
| return "\n".join(cleanup_log) | |
| def scan_docker_image(image_name, scan_type="vuln"): | |
| """Scan a Docker image using Trivy""" | |
| if not image_name.strip(): | |
| return "β Error: Please provide a Docker image name", "" | |
| cleanup_log = cleanup_resources() | |
| try: | |
| # Build trivy command | |
| scanners = scan_type if scan_type else "vuln" | |
| cmd = [ | |
| 'trivy', 'image', | |
| '--scanners', scanners, | |
| '--timeout', '10m', | |
| image_name.strip() | |
| ] | |
| # Run trivy scan | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=600 | |
| ) | |
| output = result.stdout + result.stderr | |
| if result.returncode == 0: | |
| status = f"β Scan completed successfully for image: {image_name}" | |
| else: | |
| status = f"β οΈ Scan completed with warnings for image: {image_name}" | |
| return status, output, cleanup_log | |
| except subprocess.TimeoutExpired: | |
| return f"β Error: Scan timed out for image: {image_name}", "", cleanup_log | |
| except Exception as e: | |
| return f"β Error: {str(e)}", "", cleanup_log | |
| def scan_github_repo(repo_url, scan_type="vuln,secret,misconfig"): | |
| """Clone and scan a GitHub repository using Trivy""" | |
| if not repo_url.strip(): | |
| return "β Error: Please provide a GitHub repository URL", "" | |
| cleanup_log = cleanup_resources() | |
| temp_dir = None | |
| try: | |
| # Create temporary directory for cloning | |
| temp_dir = tempfile.mkdtemp(dir='/tmp/trivy') | |
| # Clone repository | |
| status = f"π₯ Cloning repository: {repo_url}\n" | |
| git.Repo.clone_from(repo_url.strip(), temp_dir, depth=1) | |
| status += "β Repository cloned successfully\n\n" | |
| # Build trivy command | |
| scanners = scan_type if scan_type else "vuln,secret,misconfig" | |
| cmd = [ | |
| 'trivy', 'fs', | |
| '--scanners', scanners, | |
| '--timeout', '10m', | |
| temp_dir | |
| ] | |
| # Run trivy scan | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=600 | |
| ) | |
| output = result.stdout + result.stderr | |
| if result.returncode == 0: | |
| status += f"β Scan completed successfully" | |
| else: | |
| status += f"β οΈ Scan completed with warnings" | |
| return status, output, cleanup_log | |
| except git.exc.GitCommandError as e: | |
| return f"β Git Error: {str(e)}", "", cleanup_log | |
| except subprocess.TimeoutExpired: | |
| return f"β Error: Scan timed out", "", cleanup_log | |
| except Exception as e: | |
| return f"β Error: {str(e)}", "", cleanup_log | |
| finally: | |
| # Clean up cloned repository | |
| if temp_dir and os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| def scan_uploaded_zip(zip_file, scan_type="vuln,secret,misconfig"): | |
| """Extract and scan an uploaded ZIP file using Trivy""" | |
| if zip_file is None: | |
| return "β Error: Please upload a ZIP file", "" | |
| cleanup_log = cleanup_resources() | |
| extract_dir = None | |
| try: | |
| # Create temporary directory for extraction | |
| extract_dir = tempfile.mkdtemp(dir='/tmp/uploads') | |
| # Extract ZIP file | |
| status = f"π¦ Extracting ZIP file: {os.path.basename(zip_file.name)}\n" | |
| with zipfile.ZipFile(zip_file.name, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| status += "β ZIP file extracted successfully\n\n" | |
| # Build trivy command | |
| scanners = scan_type if scan_type else "vuln,secret,misconfig" | |
| cmd = [ | |
| 'trivy', 'fs', | |
| '--scanners', scanners, | |
| '--timeout', '10m', | |
| extract_dir | |
| ] | |
| # Run trivy scan | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=600 | |
| ) | |
| output = result.stdout + result.stderr | |
| if result.returncode == 0: | |
| status += f"β Scan completed successfully" | |
| else: | |
| status += f"β οΈ Scan completed with warnings" | |
| return status, output, cleanup_log | |
| except zipfile.BadZipFile: | |
| return f"β Error: Invalid ZIP file", "", cleanup_log | |
| except subprocess.TimeoutExpired: | |
| return f"β Error: Scan timed out", "", cleanup_log | |
| except Exception as e: | |
| return f"β Error: {str(e)}", "", cleanup_log | |
| finally: | |
| # Clean up extracted files | |
| if extract_dir and os.path.exists(extract_dir): | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| def create_ui(): | |
| """Create Gradio UI""" | |
| with gr.Blocks(title="Trivy Security Scanner") as demo: | |
| gr.Markdown( | |
| """ | |
| # π Trivy Security Scanner | |
| Scan Docker images, GitHub repositories, or ZIP files for vulnerabilities, secrets, and misconfigurations. | |
| **Powered by [Aqua Security Trivy](https://github.com/aquasecurity/trivy)** | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| # Docker Image Scan Tab | |
| with gr.Tab("π³ Docker Image"): | |
| gr.Markdown("### Scan a Docker Image") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| docker_image = gr.Textbox( | |
| label="Docker Image", | |
| placeholder="e.g., python:3.4-alpine, nginx:latest, ubuntu:20.04" | |
| ) | |
| gr.Markdown("Enter a Docker image name and tag") | |
| with gr.Column(scale=1): | |
| docker_scanners = gr.Dropdown( | |
| choices=["vuln", "secret", "misconfig", "vuln,secret", "vuln,secret,misconfig", "vuln,misconfig"], | |
| value="vuln", | |
| label="Scanners", | |
| allow_custom_value=True | |
| ) | |
| gr.Markdown("Select scan types") | |
| docker_button = gr.Button("π Scan Docker Image", variant="primary") | |
| docker_status = gr.Textbox(label="Status", lines=2) | |
| docker_output = gr.Textbox(label="Scan Results", lines=20, max_lines=50) | |
| docker_cleanup = gr.Textbox(label="Cleanup Log", lines=5) | |
| gr.Examples( | |
| examples=[ | |
| ["python:3.4-alpine", "vuln"], | |
| ["nginx:latest", "vuln,misconfig"], | |
| ["alpine:3.14", "vuln,secret"], | |
| ], | |
| inputs=[docker_image, docker_scanners], | |
| ) | |
| # GitHub Repository Scan Tab | |
| with gr.Tab("π¦ GitHub Repository"): | |
| gr.Markdown("### Scan a GitHub Repository") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| github_url = gr.Textbox( | |
| label="GitHub Repository URL", | |
| placeholder="e.g., https://github.com/username/repository" | |
| ) | |
| gr.Markdown("Enter a public GitHub repository URL") | |
| with gr.Column(scale=1): | |
| github_scanners = gr.Dropdown( | |
| choices=["vuln", "secret", "misconfig", "vuln,secret", "vuln,secret,misconfig", "vuln,misconfig"], | |
| value="vuln,secret,misconfig", | |
| label="Scanners", | |
| allow_custom_value=True | |
| ) | |
| gr.Markdown("Select scan types") | |
| github_button = gr.Button("π Scan Repository", variant="primary") | |
| github_status = gr.Textbox(label="Status", lines=3) | |
| github_output = gr.Textbox(label="Scan Results", lines=20, max_lines=50) | |
| github_cleanup = gr.Textbox(label="Cleanup Log", lines=5) | |
| gr.Examples( | |
| examples=[ | |
| ["https://github.com/aquasecurity/trivy", "vuln,secret,misconfig"], | |
| ], | |
| inputs=[github_url, github_scanners], | |
| ) | |
| # ZIP File Scan Tab | |
| with gr.Tab("π Upload ZIP"): | |
| gr.Markdown("### Scan an Uploaded ZIP File") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| zip_upload = gr.File( | |
| label="Upload ZIP File", | |
| file_types=[".zip"], | |
| type="filepath" | |
| ) | |
| gr.Markdown("Upload a ZIP file containing your project") | |
| with gr.Column(scale=1): | |
| zip_scanners = gr.Dropdown( | |
| choices=["vuln", "secret", "misconfig", "vuln,secret", "vuln,secret,misconfig", "vuln,misconfig"], | |
| value="vuln,secret,misconfig", | |
| label="Scanners", | |
| allow_custom_value=True | |
| ) | |
| gr.Markdown("Select scan types") | |
| zip_button = gr.Button("π Scan ZIP File", variant="primary") | |
| zip_status = gr.Textbox(label="Status", lines=3) | |
| zip_output = gr.Textbox(label="Scan Results", lines=20, max_lines=50) | |
| zip_cleanup = gr.Textbox(label="Cleanup Log", lines=5) | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### βΉοΈ Information | |
| - **Vulnerabilities (vuln)**: Scans for known CVEs in dependencies and OS packages | |
| - **Secrets (secret)**: Detects hardcoded secrets, API keys, and credentials | |
| - **Misconfigurations (misconfig)**: Checks for security misconfigurations in IaC files | |
| **Note**: Resources are automatically cleaned up after each scan. | |
| """ | |
| ) | |
| # Connect buttons to functions | |
| docker_button.click( | |
| fn=scan_docker_image, | |
| inputs=[docker_image, docker_scanners], | |
| outputs=[docker_status, docker_output, docker_cleanup] | |
| ) | |
| github_button.click( | |
| fn=scan_github_repo, | |
| inputs=[github_url, github_scanners], | |
| outputs=[github_status, github_output, github_cleanup] | |
| ) | |
| zip_button.click( | |
| fn=scan_uploaded_zip, | |
| inputs=[zip_upload, zip_scanners], | |
| outputs=[zip_status, zip_output, zip_cleanup] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| # Create necessary directories | |
| os.makedirs('/tmp/trivy', exist_ok=True) | |
| os.makedirs('/tmp/uploads', exist_ok=True) | |
| # Launch Gradio app | |
| demo = create_ui() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |