File size: 17,748 Bytes
9b4e2a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
#!/usr/bin/env python3
"""
Bug Bounty Security Chatbot
Specialized in Network Security and Web Application Testing
Uses fine-tuned language models for security analysis and guidance
"""

import gradio as gr
import torch
from transformers import (
    AutoTokenizer, 
    AutoModelForSequenceClassification,
    AutoModelForCausalLM,
    pipeline,
    BitsAndBytesConfig
)
import json
import re
import os
from typing import List, Dict, Optional, Tuple
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BugBountyChatbot:
    def __init__(self, model_path: str = None, model_type: str = "classification"):
        """
        Initialize the Bug Bounty Chatbot
        
        Args:
            model_path: Path to the fine-tuned model
            model_type: Type of model ("classification" or "generation")
        """
        self.model_path = model_path
        self.model_type = model_type
        self.tokenizer = None
        self.model = None
        self.pipeline = None
        
        # Security testing categories and methodologies
        self.security_categories = {
            "web_app": [
                "SQL Injection", "XSS (Cross-Site Scripting)", "CSRF (Cross-Site Request Forgery)",
                "Authentication Bypass", "Authorization Flaws", "File Upload Vulnerabilities",
                "Directory Traversal", "Server-Side Request Forgery (SSRF)", "XML External Entity (XXE)",
                "Insecure Direct Object References", "Security Misconfiguration"
            ],
            "network": [
                "Port Scanning", "Service Enumeration", "Network Sniffing", "Man-in-the-Middle",
                "DNS Spoofing", "ARP Poisoning", "Network Segmentation Bypass",
                "Wireless Security Testing", "VPN Vulnerabilities", "Firewall Bypass"
            ],
            "infrastructure": [
                "Server Misconfiguration", "Default Credentials", "Privilege Escalation",
                "Container Security", "Cloud Security", "API Security", "Database Security"
            ]
        }
        
        # Common tools and techniques
        self.security_tools = {
            "reconnaissance": ["nmap", "masscan", "sublist3r", "amass", "theHarvester"],
            "web_testing": ["burp_suite", "owasp_zap", "sqlmap", "nikto", "dirb"],
            "network_testing": ["wireshark", "tcpdump", "netcat", "metasploit", "nmap"],
            "exploitation": ["metasploit", "exploit_db", "custom_scripts", "burp_suite"]
        }
        
        # Load model if path is provided
        if model_path and os.path.exists(model_path):
            self.load_model()
    
    def load_model(self):
        """Load the fine-tuned model and tokenizer"""
        try:
            logger.info(f"Loading model from {self.model_path}")
            
            if self.model_type == "classification":
                self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
                self.model = AutoModelForSequenceClassification.from_pretrained(
                    self.model_path,
                    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
                )
                self.pipeline = pipeline(
                    "text-classification",
                    model=self.model,
                    tokenizer=self.tokenizer,
                    device=0 if torch.cuda.is_available() else -1
                )
            else:
                # For generation models (like CodeGemma)
                self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
                self.model = AutoModelForCausalLM.from_pretrained(
                    self.model_path,
                    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                    device_map="auto" if torch.cuda.is_available() else None
                )
                self.pipeline = pipeline(
                    "text-generation",
                    model=self.model,
                    tokenizer=self.tokenizer,
                    device=0 if torch.cuda.is_available() else -1
                )
            
            logger.info("Model loaded successfully")
            
        except Exception as e:
            logger.error(f"Error loading model: {e}")
            self.model = None
            self.tokenizer = None
            self.pipeline = None
    
    def analyze_security_query(self, query: str) -> Dict:
        """
        Analyze a security-related query and provide structured response
        
        Args:
            query: User's security question or request
            
        Returns:
            Dictionary with analysis results
        """
        analysis = {
            "category": "general",
            "vulnerability_types": [],
            "tools_suggested": [],
            "methodology": [],
            "risk_level": "medium",
            "response": ""
        }
        
        query_lower = query.lower()
        
        # Categorize the query
        if any(term in query_lower for term in ["web", "website", "application", "app", "http", "https"]):
            analysis["category"] = "web_app"
            analysis["vulnerability_types"] = self.security_categories["web_app"]
            analysis["tools_suggested"] = self.security_tools["web_testing"]
        elif any(term in query_lower for term in ["network", "port", "scan", "tcp", "udp", "ip"]):
            analysis["category"] = "network"
            analysis["vulnerability_types"] = self.security_categories["network"]
            analysis["tools_suggested"] = self.security_tools["network_testing"]
        elif any(term in query_lower for term in ["server", "infrastructure", "cloud", "container"]):
            analysis["category"] = "infrastructure"
            analysis["vulnerability_types"] = self.security_categories["infrastructure"]
            analysis["tools_suggested"] = self.security_tools["exploitation"]
        
        # Determine risk level based on keywords
        high_risk_terms = ["exploit", "bypass", "injection", "privilege", "escalation"]
        if any(term in query_lower for term in high_risk_terms):
            analysis["risk_level"] = "high"
        
        return analysis
    
    def generate_security_response(self, query: str, analysis: Dict) -> str:
        """
        Generate a comprehensive security response based on analysis
        
        Args:
            query: Original user query
            analysis: Analysis results from analyze_security_query
            
        Returns:
            Formatted response string
        """
        response_parts = []
        
        # Header with category and risk level
        risk_emoji = {"low": "🟒", "medium": "🟑", "high": "πŸ”΄"}
        response_parts.append(
            f"## {risk_emoji.get(analysis['risk_level'], '🟑')} Security Analysis - {analysis['category'].title()}"
        )
        
        # Main response based on query type
        if "how to" in query.lower() or "method" in query.lower():
            response_parts.append("### Methodology:")
            response_parts.append("1. **Reconnaissance Phase**")
            response_parts.append("   - Gather information about the target")
            response_parts.append("   - Identify attack surface")
            response_parts.append("   - Map network topology")
            
            response_parts.append("\n2. **Scanning Phase**")
            response_parts.append("   - Port scanning and service enumeration")
            response_parts.append("   - Vulnerability scanning")
            response_parts.append("   - Web application scanning")
            
            response_parts.append("\n3. **Exploitation Phase**")
            response_parts.append("   - Attempt to exploit identified vulnerabilities")
            response_parts.append("   - Document findings")
            response_parts.append("   - Maintain access if required")
        
        elif "tool" in query.lower() or "scan" in query.lower():
            response_parts.append("### Recommended Tools:")
            for tool in analysis["tools_suggested"][:5]:  # Limit to top 5
                response_parts.append(f"- **{tool.replace('_', ' ').title()}**")
        
        elif "vulnerability" in query.lower() or "exploit" in query.lower():
            response_parts.append("### Common Vulnerabilities:")
            for vuln in analysis["vulnerability_types"][:5]:  # Limit to top 5
                response_parts.append(f"- {vuln}")
        
        else:
            # General security guidance
            response_parts.append("### Security Guidance:")
            response_parts.append("Based on your query, here are key security considerations:")
            
            if analysis["category"] == "web_app":
                response_parts.append("- Focus on OWASP Top 10 vulnerabilities")
                response_parts.append("- Test authentication and authorization mechanisms")
                response_parts.append("- Validate all input parameters")
                response_parts.append("- Check for insecure direct object references")
            
            elif analysis["category"] == "network":
                response_parts.append("- Perform comprehensive port scanning")
                response_parts.append("- Analyze network traffic patterns")
                response_parts.append("- Test network segmentation")
                response_parts.append("- Verify firewall rules and configurations")
            
            elif analysis["category"] == "infrastructure":
                response_parts.append("- Review server configurations")
                response_parts.append("- Check for default credentials")
                response_parts.append("- Analyze privilege levels")
                response_parts.append("- Test container and cloud security")
        
        # Add model-based response if available
        if self.pipeline and self.model_type == "generation":
            try:
                # Create a prompt for the model
                prompt = f"""<|system|>
You are a cybersecurity expert specializing in bug bounty hunting and penetration testing. 
Provide detailed, actionable security guidance.
<|user|>
{query}
<|assistant|>"""
                
                model_response = self.pipeline(
                    prompt,
                    max_length=512,
                    num_return_sequences=1,
                    temperature=0.7,
                    do_sample=True,
                    pad_token_id=self.tokenizer.eos_token_id
                )
                
                if model_response and len(model_response) > 0:
                    generated_text = model_response[0]['generated_text']
                    # Extract only the assistant's response
                    if "<|assistant|>" in generated_text:
                        assistant_response = generated_text.split("<|assistant|>")[-1].strip()
                        response_parts.append(f"\n### AI-Generated Insights:\n{assistant_response}")
                
            except Exception as e:
                logger.error(f"Error generating model response: {e}")
        
        # Add disclaimer
        response_parts.append("\n---")
        response_parts.append("⚠️ **Disclaimer**: This information is for educational and authorized testing purposes only.")
        response_parts.append("Always ensure you have proper authorization before testing any systems.")
        
        return "\n".join(response_parts)
    
    def chat(self, message: str, history: List[List[str]]) -> Tuple[str, List[List[str]]]:
        """
        Main chat function for Gradio interface
        
        Args:
            message: User's message
            history: Chat history
            
        Returns:
            Tuple of (response, updated_history)
        """
        if not message.strip():
            return "Please enter a security-related question or request.", history
        
        # Analyze the query
        analysis = self.analyze_security_query(message)
        
        # Generate response
        response = self.generate_security_response(message, analysis)
        
        # Update history
        history.append([message, response])
        
        return "", history

def create_chatbot_interface():
    """Create and configure the Gradio interface"""
    
    # Initialize chatbot with CodeGemma 7B model from Hugging Face Hub
    chatbot = BugBountyChatbot(
        model_path="BenjaminKaindu0506/codegemma-7b-bugbounty",
        model_type="generation",
        base_model="unsloth/codegemma-7b"
    )
    
    # Custom CSS for better styling
    css = """
    .gradio-container {
        font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
    }
    .chat-message {
        padding: 10px;
        margin: 5px 0;
        border-radius: 10px;
    }
    .user-message {
        background-color: #e3f2fd;
        margin-left: 20%;
    }
    .bot-message {
        background-color: #f5f5f5;
        margin-right: 20%;
    }
    """
    
    # Create Gradio interface
    with gr.Blocks(css=css, title="Bug Bounty Security Chatbot") as interface:
        gr.Markdown("""
        # πŸ›‘οΈ Bug Bounty Security Chatbot
        
        **Specialized in Network Security and Web Application Testing**
        
        This AI-powered chatbot provides expert guidance on:
        - πŸ” **Reconnaissance techniques**
        - 🌐 **Web application security testing**
        - πŸ”— **Network security analysis**
        - ⚑ **Vulnerability assessment**
        - πŸ› οΈ **Security tool recommendations**
        
        Ask me about security testing methodologies, tools, vulnerabilities, or specific attack techniques!
        """)
        
        with gr.Row():
            with gr.Column(scale=3):
                chatbot_interface = gr.Chatbot(
                    label="Security Chat",
                    height=600,
                    show_label=True,
                    container=True,
                    bubble_full_width=False
                )
                
                with gr.Row():
                    msg_input = gr.Textbox(
                        placeholder="Ask about security testing, vulnerabilities, tools, or methodologies...",
                        label="Your Security Question",
                        lines=2,
                        scale=4
                    )
                    send_btn = gr.Button("Send", variant="primary", scale=1)
                
                with gr.Row():
                    clear_btn = gr.Button("Clear Chat", variant="secondary")
                    example_btn = gr.Button("Load Examples", variant="secondary")
            
            with gr.Column(scale=1):
                gr.Markdown("### 🎯 Quick Examples")
                
                examples = [
                    "How to test for SQL injection vulnerabilities?",
                    "What tools should I use for network reconnaissance?",
                    "How to perform web application security testing?",
                    "What are common authentication bypass techniques?",
                    "How to scan for open ports and services?",
                    "What is the OWASP Top 10 and how to test for them?",
                    "How to perform privilege escalation testing?",
                    "What are the steps for a complete penetration test?"
                ]
                
                example_buttons = []
                for example in examples:
                    btn = gr.Button(example, size="sm", variant="outline")
                    example_buttons.append(btn)
                
                gr.Markdown("### πŸ”§ Security Categories")
                gr.Markdown("""
                - **Web Applications**: XSS, SQLi, CSRF, Auth bypass
                - **Network Security**: Port scanning, traffic analysis
                - **Infrastructure**: Server configs, privilege escalation
                - **Cloud Security**: Container security, API testing
                """)
        
        # Event handlers
        def user_input(message, history):
            return chatbot.chat(message, history)
        
        def load_examples():
            return examples
        
        # Connect events
        send_btn.click(
            user_input,
            inputs=[msg_input, chatbot_interface],
            outputs=[msg_input, chatbot_interface]
        )
        
        msg_input.submit(
            user_input,
            inputs=[msg_input, chatbot_interface],
            outputs=[msg_input, chatbot_interface]
        )
        
        clear_btn.click(
            lambda: ([], ""),
            outputs=[chatbot_interface, msg_input]
        )
        
        # Example button clicks
        for i, btn in enumerate(example_buttons):
            btn.click(
                lambda x=examples[i]: (x, ""),
                outputs=[msg_input, chatbot_interface]
            ).then(
                user_input,
                inputs=[msg_input, chatbot_interface],
                outputs=[msg_input, chatbot_interface]
            )
    
    return interface

def main():
    """Main function to run the chatbot"""
    print("πŸ›‘οΈ Initializing Bug Bounty Security Chatbot...")
    
    # Create and launch the interface
    interface = create_chatbot_interface()
    
    print("πŸš€ Starting chatbot interface...")
    interface.launch(
        server_name="0.0.0.0",
        server_port=7860,
        share=True,  # Enable public sharing for Hugging Face Spaces
        show_error=True
    )

if __name__ == "__main__":
    main()