Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Bug Bounty Security Chatbot | |
| Specialized in Network Security and Web Application Testing | |
| Uses fine-tuned language models for security analysis and guidance | |
| """ | |
| import gradio as gr | |
| import torch | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| AutoModelForCausalLM, | |
| pipeline, | |
| BitsAndBytesConfig | |
| ) | |
| import json | |
| import re | |
| import os | |
| from typing import List, Dict, Optional, Tuple | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class BugBountyChatbot: | |
| def __init__(self, model_path: str = None, model_type: str = "classification"): | |
| """ | |
| Initialize the Bug Bounty Chatbot | |
| Args: | |
| model_path: Path to the fine-tuned model | |
| model_type: Type of model ("classification" or "generation") | |
| """ | |
| self.model_path = model_path | |
| self.model_type = model_type | |
| self.tokenizer = None | |
| self.model = None | |
| self.pipeline = None | |
| # Security testing categories and methodologies | |
| self.security_categories = { | |
| "web_app": [ | |
| "SQL Injection", "XSS (Cross-Site Scripting)", "CSRF (Cross-Site Request Forgery)", | |
| "Authentication Bypass", "Authorization Flaws", "File Upload Vulnerabilities", | |
| "Directory Traversal", "Server-Side Request Forgery (SSRF)", "XML External Entity (XXE)", | |
| "Insecure Direct Object References", "Security Misconfiguration" | |
| ], | |
| "network": [ | |
| "Port Scanning", "Service Enumeration", "Network Sniffing", "Man-in-the-Middle", | |
| "DNS Spoofing", "ARP Poisoning", "Network Segmentation Bypass", | |
| "Wireless Security Testing", "VPN Vulnerabilities", "Firewall Bypass" | |
| ], | |
| "infrastructure": [ | |
| "Server Misconfiguration", "Default Credentials", "Privilege Escalation", | |
| "Container Security", "Cloud Security", "API Security", "Database Security" | |
| ] | |
| } | |
| # Common tools and techniques | |
| self.security_tools = { | |
| "reconnaissance": ["nmap", "masscan", "sublist3r", "amass", "theHarvester"], | |
| "web_testing": ["burp_suite", "owasp_zap", "sqlmap", "nikto", "dirb"], | |
| "network_testing": ["wireshark", "tcpdump", "netcat", "metasploit", "nmap"], | |
| "exploitation": ["metasploit", "exploit_db", "custom_scripts", "burp_suite"] | |
| } | |
| # Load model if path is provided | |
| if model_path and os.path.exists(model_path): | |
| self.load_model() | |
| def load_model(self): | |
| """Load the fine-tuned model and tokenizer""" | |
| try: | |
| logger.info(f"Loading model from {self.model_path}") | |
| if self.model_type == "classification": | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) | |
| self.model = AutoModelForSequenceClassification.from_pretrained( | |
| self.model_path, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ) | |
| self.pipeline = pipeline( | |
| "text-classification", | |
| model=self.model, | |
| tokenizer=self.tokenizer, | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| else: | |
| # For generation models (like CodeGemma) | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_path, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| self.pipeline = pipeline( | |
| "text-generation", | |
| model=self.model, | |
| tokenizer=self.tokenizer, | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| logger.info("Model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| self.model = None | |
| self.tokenizer = None | |
| self.pipeline = None | |
| def analyze_security_query(self, query: str) -> Dict: | |
| """ | |
| Analyze a security-related query and provide structured response | |
| Args: | |
| query: User's security question or request | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| analysis = { | |
| "category": "general", | |
| "vulnerability_types": [], | |
| "tools_suggested": [], | |
| "methodology": [], | |
| "risk_level": "medium", | |
| "response": "" | |
| } | |
| query_lower = query.lower() | |
| # Categorize the query | |
| if any(term in query_lower for term in ["web", "website", "application", "app", "http", "https"]): | |
| analysis["category"] = "web_app" | |
| analysis["vulnerability_types"] = self.security_categories["web_app"] | |
| analysis["tools_suggested"] = self.security_tools["web_testing"] | |
| elif any(term in query_lower for term in ["network", "port", "scan", "tcp", "udp", "ip"]): | |
| analysis["category"] = "network" | |
| analysis["vulnerability_types"] = self.security_categories["network"] | |
| analysis["tools_suggested"] = self.security_tools["network_testing"] | |
| elif any(term in query_lower for term in ["server", "infrastructure", "cloud", "container"]): | |
| analysis["category"] = "infrastructure" | |
| analysis["vulnerability_types"] = self.security_categories["infrastructure"] | |
| analysis["tools_suggested"] = self.security_tools["exploitation"] | |
| # Determine risk level based on keywords | |
| high_risk_terms = ["exploit", "bypass", "injection", "privilege", "escalation"] | |
| if any(term in query_lower for term in high_risk_terms): | |
| analysis["risk_level"] = "high" | |
| return analysis | |
| def generate_security_response(self, query: str, analysis: Dict) -> str: | |
| """ | |
| Generate a comprehensive security response based on analysis | |
| Args: | |
| query: Original user query | |
| analysis: Analysis results from analyze_security_query | |
| Returns: | |
| Formatted response string | |
| """ | |
| response_parts = [] | |
| # Header with category and risk level | |
| risk_emoji = {"low": "π’", "medium": "π‘", "high": "π΄"} | |
| response_parts.append( | |
| f"## {risk_emoji.get(analysis['risk_level'], 'π‘')} Security Analysis - {analysis['category'].title()}" | |
| ) | |
| # Main response based on query type | |
| if "how to" in query.lower() or "method" in query.lower(): | |
| response_parts.append("### Methodology:") | |
| response_parts.append("1. **Reconnaissance Phase**") | |
| response_parts.append(" - Gather information about the target") | |
| response_parts.append(" - Identify attack surface") | |
| response_parts.append(" - Map network topology") | |
| response_parts.append("\n2. **Scanning Phase**") | |
| response_parts.append(" - Port scanning and service enumeration") | |
| response_parts.append(" - Vulnerability scanning") | |
| response_parts.append(" - Web application scanning") | |
| response_parts.append("\n3. **Exploitation Phase**") | |
| response_parts.append(" - Attempt to exploit identified vulnerabilities") | |
| response_parts.append(" - Document findings") | |
| response_parts.append(" - Maintain access if required") | |
| elif "tool" in query.lower() or "scan" in query.lower(): | |
| response_parts.append("### Recommended Tools:") | |
| for tool in analysis["tools_suggested"][:5]: # Limit to top 5 | |
| response_parts.append(f"- **{tool.replace('_', ' ').title()}**") | |
| elif "vulnerability" in query.lower() or "exploit" in query.lower(): | |
| response_parts.append("### Common Vulnerabilities:") | |
| for vuln in analysis["vulnerability_types"][:5]: # Limit to top 5 | |
| response_parts.append(f"- {vuln}") | |
| else: | |
| # General security guidance | |
| response_parts.append("### Security Guidance:") | |
| response_parts.append("Based on your query, here are key security considerations:") | |
| if analysis["category"] == "web_app": | |
| response_parts.append("- Focus on OWASP Top 10 vulnerabilities") | |
| response_parts.append("- Test authentication and authorization mechanisms") | |
| response_parts.append("- Validate all input parameters") | |
| response_parts.append("- Check for insecure direct object references") | |
| elif analysis["category"] == "network": | |
| response_parts.append("- Perform comprehensive port scanning") | |
| response_parts.append("- Analyze network traffic patterns") | |
| response_parts.append("- Test network segmentation") | |
| response_parts.append("- Verify firewall rules and configurations") | |
| elif analysis["category"] == "infrastructure": | |
| response_parts.append("- Review server configurations") | |
| response_parts.append("- Check for default credentials") | |
| response_parts.append("- Analyze privilege levels") | |
| response_parts.append("- Test container and cloud security") | |
| # Add model-based response if available | |
| if self.pipeline and self.model_type == "generation": | |
| try: | |
| # Create a prompt for the model | |
| prompt = f"""<|system|> | |
| You are a cybersecurity expert specializing in bug bounty hunting and penetration testing. | |
| Provide detailed, actionable security guidance. | |
| <|user|> | |
| {query} | |
| <|assistant|>""" | |
| model_response = self.pipeline( | |
| prompt, | |
| max_length=512, | |
| num_return_sequences=1, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id | |
| ) | |
| if model_response and len(model_response) > 0: | |
| generated_text = model_response[0]['generated_text'] | |
| # Extract only the assistant's response | |
| if "<|assistant|>" in generated_text: | |
| assistant_response = generated_text.split("<|assistant|>")[-1].strip() | |
| response_parts.append(f"\n### AI-Generated Insights:\n{assistant_response}") | |
| except Exception as e: | |
| logger.error(f"Error generating model response: {e}") | |
| # Add disclaimer | |
| response_parts.append("\n---") | |
| response_parts.append("β οΈ **Disclaimer**: This information is for educational and authorized testing purposes only.") | |
| response_parts.append("Always ensure you have proper authorization before testing any systems.") | |
| return "\n".join(response_parts) | |
| def chat(self, message: str, history: List[List[str]]) -> Tuple[str, List[List[str]]]: | |
| """ | |
| Main chat function for Gradio interface | |
| Args: | |
| message: User's message | |
| history: Chat history | |
| Returns: | |
| Tuple of (response, updated_history) | |
| """ | |
| if not message.strip(): | |
| return "Please enter a security-related question or request.", history | |
| # Analyze the query | |
| analysis = self.analyze_security_query(message) | |
| # Generate response | |
| response = self.generate_security_response(message, analysis) | |
| # Update history | |
| history.append([message, response]) | |
| return "", history | |
| def create_chatbot_interface(): | |
| """Create and configure the Gradio interface""" | |
| # Initialize chatbot with CodeGemma 7B model from Hugging Face Hub | |
| chatbot = BugBountyChatbot( | |
| model_path="BenjaminKaindu0506/codegemma-7b-bugbounty", | |
| model_type="generation", | |
| base_model="unsloth/codegemma-7b" | |
| ) | |
| # Custom CSS for better styling | |
| css = """ | |
| .gradio-container { | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| } | |
| .chat-message { | |
| padding: 10px; | |
| margin: 5px 0; | |
| border-radius: 10px; | |
| } | |
| .user-message { | |
| background-color: #e3f2fd; | |
| margin-left: 20%; | |
| } | |
| .bot-message { | |
| background-color: #f5f5f5; | |
| margin-right: 20%; | |
| } | |
| """ | |
| # Create Gradio interface | |
| with gr.Blocks(css=css, title="Bug Bounty Security Chatbot") as interface: | |
| gr.Markdown(""" | |
| # π‘οΈ Bug Bounty Security Chatbot | |
| **Specialized in Network Security and Web Application Testing** | |
| This AI-powered chatbot provides expert guidance on: | |
| - π **Reconnaissance techniques** | |
| - π **Web application security testing** | |
| - π **Network security analysis** | |
| - β‘ **Vulnerability assessment** | |
| - π οΈ **Security tool recommendations** | |
| Ask me about security testing methodologies, tools, vulnerabilities, or specific attack techniques! | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot_interface = gr.Chatbot( | |
| label="Security Chat", | |
| height=600, | |
| show_label=True, | |
| container=True, | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(): | |
| msg_input = gr.Textbox( | |
| placeholder="Ask about security testing, vulnerabilities, tools, or methodologies...", | |
| label="Your Security Question", | |
| lines=2, | |
| scale=4 | |
| ) | |
| send_btn = gr.Button("Send", variant="primary", scale=1) | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear Chat", variant="secondary") | |
| example_btn = gr.Button("Load Examples", variant="secondary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π― Quick Examples") | |
| examples = [ | |
| "How to test for SQL injection vulnerabilities?", | |
| "What tools should I use for network reconnaissance?", | |
| "How to perform web application security testing?", | |
| "What are common authentication bypass techniques?", | |
| "How to scan for open ports and services?", | |
| "What is the OWASP Top 10 and how to test for them?", | |
| "How to perform privilege escalation testing?", | |
| "What are the steps for a complete penetration test?" | |
| ] | |
| example_buttons = [] | |
| for example in examples: | |
| btn = gr.Button(example, size="sm", variant="outline") | |
| example_buttons.append(btn) | |
| gr.Markdown("### π§ Security Categories") | |
| gr.Markdown(""" | |
| - **Web Applications**: XSS, SQLi, CSRF, Auth bypass | |
| - **Network Security**: Port scanning, traffic analysis | |
| - **Infrastructure**: Server configs, privilege escalation | |
| - **Cloud Security**: Container security, API testing | |
| """) | |
| # Event handlers | |
| def user_input(message, history): | |
| return chatbot.chat(message, history) | |
| def load_examples(): | |
| return examples | |
| # Connect events | |
| send_btn.click( | |
| user_input, | |
| inputs=[msg_input, chatbot_interface], | |
| outputs=[msg_input, chatbot_interface] | |
| ) | |
| msg_input.submit( | |
| user_input, | |
| inputs=[msg_input, chatbot_interface], | |
| outputs=[msg_input, chatbot_interface] | |
| ) | |
| clear_btn.click( | |
| lambda: ([], ""), | |
| outputs=[chatbot_interface, msg_input] | |
| ) | |
| # Example button clicks | |
| for i, btn in enumerate(example_buttons): | |
| btn.click( | |
| lambda x=examples[i]: (x, ""), | |
| outputs=[msg_input, chatbot_interface] | |
| ).then( | |
| user_input, | |
| inputs=[msg_input, chatbot_interface], | |
| outputs=[msg_input, chatbot_interface] | |
| ) | |
| return interface | |
| def main(): | |
| """Main function to run the chatbot""" | |
| print("π‘οΈ Initializing Bug Bounty Security Chatbot...") | |
| # Create and launch the interface | |
| interface = create_chatbot_interface() | |
| print("π Starting chatbot interface...") | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, # Enable public sharing for Hugging Face Spaces | |
| show_error=True | |
| ) | |
| if __name__ == "__main__": | |
| main() | |