from flask import Flask, render_template, request, jsonify, flash, redirect, url_for import os import json import logging from datetime import datetime from typing import Dict, List, Optional, Any import requests from dataclasses import dataclass, asdict import re import hashlib import ipaddress from urllib.parse import urlparse import openai from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import RandomForestClassifier import joblib import numpy as np from config import config # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/app.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) app = Flask(__name__) # Load configuration config_name = os.environ.get('FLASK_ENV', 'development') app.config.from_object(config[config_name]) @dataclass class ThreatIndicator: """Represents a threat indicator (IP, domain, or hash)""" value: str indicator_type: str # 'ip', 'domain', 'hash' confidence: float = 0.0 first_seen: Optional[str] = None last_seen: Optional[str] = None sources: List[str] = None def __post_init__(self): if self.sources is None: self.sources = [] @dataclass class ThreatReport: """Represents a comprehensive threat analysis report""" indicator: ThreatIndicator risk_level: str # 'Low', 'Medium', 'High' risk_score: float # 0.0 to 1.0 threat_types: List[str] summary: str detailed_analysis: str recommendations: List[str] source_links: List[Dict[str, str]] analysis_timestamp: str confidence: float class ThreatIntelligenceAPI: """Handles API calls to various threat intelligence sources""" def __init__(self): self.alienvault_base = "https://otx.alienvault.com/api/v1" self.abuseipdb_base = "https://api.abuseipdb.com/api/v2" def check_ip_alienvault(self, ip_address: str) -> Dict[str, Any]: """Check IP address against AlienVault OTX""" try: if not app.config['ALIENVAULT_API_KEY']: return {"source": "AlienVault OTX", "malicious": False, "pulse_count": 0, "pulses": [], "url": f"https://otx.alienvault.com/indicator/ip/{ip_address}", "note": "API key not configured"} headers = {'X-OTX-API-KEY': app.config['ALIENVAULT_API_KEY']} url = f"{self.alienvault_base}/indicators/IPv4/{ip_address}/general" response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data = response.json() return { "source": "AlienVault OTX", "malicious": data.get('pulse_info', {}).get('count', 0) > 0, "pulse_count": data.get('pulse_info', {}).get('count', 0), "pulses": data.get('pulse_info', {}).get('pulses', []), "url": f"https://otx.alienvault.com/indicator/ip/{ip_address}" } except Exception as e: logger.error(f"AlienVault API error for {ip_address}: {str(e)}") return {"error": str(e)} def check_ip_abuseipdb(self, ip_address: str) -> Dict[str, Any]: """Check IP address against AbuseIPDB""" try: if not app.config['ABUSEIPDB_API_KEY']: return {"source": "AbuseIPDB", "malicious": False, "confidence": 0, "usage_type": "Unknown", "country": "Unknown", "isp": "Unknown", "reports": 0, "url": f"https://www.abuseipdb.com/check/{ip_address}", "note": "API key not configured"} headers = {'Key': app.config['ABUSEIPDB_API_KEY'], 'Accept': 'application/json'} params = {'ipAddress': ip_address, 'maxAgeInDays': 90, 'verbose': ''} response = requests.get(f"{self.abuseipdb_base}/check", headers=headers, params=params, timeout=10) response.raise_for_status() data = response.json() result = data.get('data', {}) return { "source": "AbuseIPDB", "malicious": result.get('abuseConfidencePercentage', 0) > 0, "confidence": result.get('abuseConfidencePercentage', 0), "usage_type": result.get('usageType', 'Unknown'), "country": result.get('countryCode', 'Unknown'), "isp": result.get('isp', 'Unknown'), "reports": result.get('totalReports', 0), "url": f"https://www.abuseipdb.com/check/{ip_address}" } except Exception as e: logger.error(f"AbuseIPDB API error for {ip_address}: {str(e)}") return {"error": str(e)} def check_domain_alienvault(self, domain: str) -> Dict[str, Any]: """Check domain against AlienVault OTX""" try: if not app.config['ALIENVAULT_API_KEY']: return {"source": "AlienVault OTX", "malicious": False, "pulse_count": 0, "pulses": [], "url": f"https://otx.alienvault.com/indicator/domain/{domain}", "note": "API key not configured"} headers = {'X-OTX-API-KEY': app.config['ALIENVAULT_API_KEY']} url = f"{self.alienvault_base}/indicators/domain/{domain}/general" response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data = response.json() return { "source": "AlienVault OTX", "malicious": data.get('pulse_info', {}).get('count', 0) > 0, "pulse_count": data.get('pulse_info', {}).get('count', 0), "pulses": data.get('pulse_info', {}).get('pulses', []), "url": f"https://otx.alienvault.com/indicator/domain/{domain}" } except Exception as e: logger.error(f"AlienVault domain API error for {domain}: {str(e)}") return {"error": str(e)} def check_hash_alienvault(self, file_hash: str) -> Dict[str, Any]: """Check file hash against AlienVault OTX""" try: if not app.config['ALIENVAULT_API_KEY']: return {"source": "AlienVault OTX", "malicious": False, "pulse_count": 0, "pulses": [], "url": f"https://otx.alienvault.com/indicator/file/{file_hash}", "note": "API key not configured"} headers = {'X-OTX-API-KEY': app.config['ALIENVAULT_API_KEY']} url = f"{self.alienvault_base}/indicators/file/{file_hash}/general" response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data = response.json() return { "source": "AlienVault OTX", "malicious": data.get('pulse_info', {}).get('count', 0) > 0, "pulse_count": data.get('pulse_info', {}).get('count', 0), "pulses": data.get('pulse_info', {}).get('pulses', []), "url": f"https://otx.alienvault.com/indicator/file/{file_hash}" } except Exception as e: logger.error(f"AlienVault hash API error for {file_hash}: {str(e)}") return {"error": str(e)} class RAGSystem: """Retrieval-Augmented Generation system for threat intelligence synthesis""" def __init__(self): self.openai_client = None if app.config['OPENAI_API_KEY']: openai.api_key = app.config['OPENAI_API_KEY'] self.openai_client = openai def synthesize_threat_intelligence(self, indicator: ThreatIndicator, raw_data: List[Dict[str, Any]]) -> Dict[str, Any]: """Synthesize threat intelligence using RAG approach""" # Extract relevant information from raw data threat_context = self._extract_threat_context(raw_data) # Generate summary using OpenAI if available, otherwise use rule-based approach if self.openai_client: return self._generate_ai_summary(indicator, threat_context) else: return self._generate_rule_based_summary(indicator, threat_context) def _extract_threat_context(self, raw_data: List[Dict[str, Any]]) -> Dict[str, Any]: """Extract relevant threat context from raw API data""" context = { "malicious_indicators": [], "threat_types": set(), "confidence_scores": [], "source_reports": [], "geographic_info": [], "temporal_info": [] } for data in raw_data: if "error" in data: continue # Skip data with notes about missing API keys if "note" in data and "API key not configured" in data["note"]: continue source = data.get("source", "Unknown") if data.get("malicious", False): context["malicious_indicators"].append(source) if "pulse_count" in data and data["pulse_count"] > 0: context["confidence_scores"].append(min(data["pulse_count"] * 10, 100)) if "confidence" in data: context["confidence_scores"].append(data["confidence"]) if "pulses" in data: for pulse in data["pulses"]: if "tags" in pulse: context["threat_types"].update(pulse["tags"]) if "country" in data: context["geographic_info"].append(data["country"]) if "url" in data: context["source_reports"].append({ "source": source, "url": data["url"] }) context["threat_types"] = list(context["threat_types"]) return context def _generate_ai_summary(self, indicator: ThreatIndicator, context: Dict[str, Any]) -> Dict[str, Any]: """Generate AI-powered threat summary""" try: prompt = f""" Analyze this threat intelligence data for {indicator.indicator_type.upper()}: {indicator.value} Context: - Malicious indicators: {context['malicious_indicators']} - Threat types: {context['threat_types']} - Confidence scores: {context['confidence_scores']} - Geographic info: {context['geographic_info']} Provide: 1. Risk level (Low/Medium/High) 2. Threat types involved 3. Plain-language summary 4. Detailed analysis 5. Recommendations """ response = self.openai_client.ChatCompletion.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], max_tokens=500, temperature=0.3 ) ai_summary = response.choices[0].message.content return { "summary": ai_summary, "detailed_analysis": ai_summary, "recommendations": self._extract_recommendations(ai_summary) } except Exception as e: logger.error(f"OpenAI API error: {str(e)}") return self._generate_rule_based_summary(indicator, context) def _generate_rule_based_summary(self, indicator: ThreatIndicator, context: Dict[str, Any]) -> Dict[str, Any]: """Generate rule-based threat summary when AI is not available""" malicious_count = len(context["malicious_indicators"]) avg_confidence = sum(context["confidence_scores"]) / len(context["confidence_scores"]) if context["confidence_scores"] else 0 # Determine risk level if malicious_count >= 2 or avg_confidence >= 70: risk_level = "High" elif malicious_count >= 1 or avg_confidence >= 30: risk_level = "Medium" else: risk_level = "Low" # Generate threat types threat_types = context["threat_types"] if context["threat_types"] else ["Unknown"] # Generate summary if risk_level == "High": summary = f"This {indicator.indicator_type} has been flagged as malicious by {malicious_count} sources with high confidence. It has been associated with {', '.join(threat_types[:3])} activities." elif risk_level == "Medium": summary = f"This {indicator.indicator_type} shows suspicious activity with moderate confidence. It may be involved in {', '.join(threat_types[:2])} activities." else: summary = f"This {indicator.indicator_type} shows minimal or no malicious activity in our threat intelligence sources." # Generate recommendations recommendations = [] if risk_level == "High": recommendations.extend([ "Block this indicator immediately", "Investigate any systems that have communicated with this indicator", "Monitor for related indicators" ]) elif risk_level == "Medium": recommendations.extend([ "Monitor this indicator closely", "Consider blocking if it appears in your environment", "Investigate if suspicious activity is observed" ]) else: recommendations.extend([ "Continue normal monitoring", "No immediate action required" ]) return { "summary": summary, "detailed_analysis": f"Analysis based on {malicious_count} threat intelligence sources. Average confidence: {avg_confidence:.1f}%", "recommendations": recommendations } def _extract_recommendations(self, ai_summary: str) -> List[str]: """Extract recommendations from AI summary""" # Simple extraction - in a real implementation, you'd use more sophisticated NLP recommendations = [] if "block" in ai_summary.lower(): recommendations.append("Consider blocking this indicator") if "monitor" in ai_summary.lower(): recommendations.append("Monitor for suspicious activity") if "investigate" in ai_summary.lower(): recommendations.append("Investigate further") return recommendations if recommendations else ["Review the analysis and take appropriate action"] class RiskAssessmentModel: """Machine Learning model for risk assessment""" def __init__(self): self.model = None self.vectorizer = None self.load_or_train_model() def load_or_train_model(self): """Load existing model or train a new one""" model_path = os.path.join(app.config.get('MODEL_PATH', 'models/'), app.config.get('RISK_MODEL_FILE', 'risk_assessment_model.pkl')) vectorizer_path = os.path.join(app.config.get('MODEL_PATH', 'models/'), app.config.get('VECTORIZER_FILE', 'vectorizer.pkl')) try: if os.path.exists(model_path) and os.path.exists(vectorizer_path): self.model = joblib.load(model_path) self.vectorizer = joblib.load(vectorizer_path) logger.info("Loaded existing risk assessment model") else: self._train_model() except Exception as e: logger.error(f"Error loading model: {str(e)}") self._train_model() def _train_model(self): """Train a simple risk assessment model""" # Get model paths model_path = os.path.join(app.config.get('MODEL_PATH', 'models/'), app.config.get('RISK_MODEL_FILE', 'risk_assessment_model.pkl')) vectorizer_path = os.path.join(app.config.get('MODEL_PATH', 'models/'), app.config.get('VECTORIZER_FILE', 'vectorizer.pkl')) # Sample training data - in production, you'd use real threat intelligence data training_data = [ ("malware distribution botnet command control", "High"), ("brute force ssh rdp attacks", "High"), ("phishing spam email campaigns", "Medium"), ("suspicious network activity", "Medium"), ("legitimate business website", "Low"), ("cdn content delivery network", "Low"), ("tor exit node proxy", "Medium"), ("cryptocurrency mining pool", "Low"), ("ransomware distribution", "High"), ("data exfiltration", "High") ] texts = [item[0] for item in training_data] labels = [item[1] for item in training_data] # Convert labels to numeric label_map = {"Low": 0, "Medium": 1, "High": 2} numeric_labels = [label_map[label] for label in labels] # Vectorize text self.vectorizer = TfidfVectorizer(max_features=100, stop_words='english') X = self.vectorizer.fit_transform(texts) # Train model self.model = RandomForestClassifier(n_estimators=10, random_state=42) self.model.fit(X, numeric_labels) # Save model os.makedirs(app.config.get('MODEL_PATH', 'models/'), exist_ok=True) joblib.dump(self.model, model_path) joblib.dump(self.vectorizer, vectorizer_path) logger.info("Trained new risk assessment model") def assess_risk(self, threat_context: Dict[str, Any]) -> tuple: """Assess risk level and score""" if not self.model or not self.vectorizer: return "Medium", 0.5 # Create feature text from context feature_text = " ".join([ " ".join(threat_context.get("threat_types", [])), " ".join(threat_context.get("malicious_indicators", [])), str(threat_context.get("confidence_scores", [0])) ]) # Clean up the feature text and check if it's meaningful feature_text = feature_text.strip() # If feature text is empty or just contains brackets/spaces, treat as low risk if not feature_text or feature_text in ['[]', ' []', '[] ', ' [] ']: return "Low", 0.1 # Vectorize and predict X = self.vectorizer.transform([feature_text]) prediction = self.model.predict(X)[0] probabilities = self.model.predict_proba(X)[0] # Convert back to label label_map = {0: "Low", 1: "Medium", 2: "High"} risk_level = label_map[prediction] risk_score = probabilities[prediction] return risk_level, risk_score class ThreatAnalysisEngine: """Main threat analysis engine that orchestrates all components""" def __init__(self): self.api = ThreatIntelligenceAPI() self.rag = RAGSystem() self.risk_model = RiskAssessmentModel() def analyze_indicator(self, indicator_value: str) -> ThreatReport: """Analyze a threat indicator and return comprehensive report""" # Validate and determine indicator type indicator = self._validate_indicator(indicator_value) if not indicator: raise ValueError(f"Invalid indicator: {indicator_value}") # Gather raw data from APIs raw_data = self._gather_threat_data(indicator) # Extract threat context threat_context = self.rag._extract_threat_context(raw_data) # Assess risk - prefer rule-based using live API data; fallback to ML if no signal risk_level, risk_score = self._assess_risk_rule_based(raw_data, threat_context) if risk_level is None: risk_level, risk_score = self.risk_model.assess_risk(threat_context) # Synthesize intelligence synthesis = self.rag.synthesize_threat_intelligence(indicator, raw_data) # Create comprehensive report report = ThreatReport( indicator=indicator, risk_level=risk_level, risk_score=risk_score, threat_types=threat_context.get("threat_types", ["Unknown"]), summary=synthesis["summary"], detailed_analysis=synthesis["detailed_analysis"], recommendations=synthesis["recommendations"], source_links=threat_context.get("source_reports", []), analysis_timestamp=datetime.now().isoformat(), confidence=sum(threat_context.get("confidence_scores", [0])) / len(threat_context.get("confidence_scores", [0])) if threat_context.get("confidence_scores") else 0 ) return report def _assess_risk_rule_based(self, raw_data: List[Dict[str, Any]], threat_context: Dict[str, Any]) -> tuple: """Assess risk using clear, deterministic thresholds from feed responses. Returns (risk_level, risk_score) or (None, None) if there is no usable signal. """ if not raw_data: return None, None max_abuse_confidence = None total_pulse_count = 0 any_feed_present = False for entry in raw_data: if not isinstance(entry, dict): continue if "error" in entry: continue source = entry.get("source") if source: any_feed_present = True # AbuseIPDB confidence if source == "AbuseIPDB" and isinstance(entry.get("confidence"), (int, float)): conf = float(entry.get("confidence", 0)) max_abuse_confidence = conf if max_abuse_confidence is None else max(max_abuse_confidence, conf) # OTX pulse count if source == "AlienVault OTX" and isinstance(entry.get("pulse_count"), (int, float)): total_pulse_count += int(entry.get("pulse_count", 0)) if not any_feed_present: return None, None # Determine risk level from thresholds # High if strong signal from either feed if (max_abuse_confidence is not None and max_abuse_confidence >= 70) or (total_pulse_count >= 5): return "High", 0.9 if max_abuse_confidence and max_abuse_confidence >= 90 else 0.75 # Medium for moderate signals if (max_abuse_confidence is not None and max_abuse_confidence >= 30) or (total_pulse_count >= 1): # scale score between 0.4 and 0.7 base = 0.5 if max_abuse_confidence is not None: base = max(base, 0.3 + (max_abuse_confidence/100) * 0.4) if total_pulse_count: base = max(base, 0.45 + min(total_pulse_count, 5) * 0.05) return "Medium", min(base, 0.7) # Otherwise Low return "Low", 0.15 def _validate_indicator(self, value: str) -> Optional[ThreatIndicator]: """Validate and determine the type of threat indicator""" value = value.strip() # Check if it's an IP address try: ipaddress.ip_address(value) return ThreatIndicator(value=value, indicator_type="ip") except ValueError: pass # Check if it's a domain if self._is_valid_domain(value): return ThreatIndicator(value=value, indicator_type="domain") # Check if it's a hash (MD5, SHA1, SHA256) if self._is_valid_hash(value): return ThreatIndicator(value=value, indicator_type="hash") return None def _is_valid_domain(self, domain: str) -> bool: """Check if string is a valid domain""" try: result = urlparse(f"http://{domain}") return all([result.scheme, result.netloc]) and '.' in domain except: return False def _is_valid_hash(self, hash_value: str) -> bool: """Check if string is a valid hash""" hash_value = hash_value.lower() return (len(hash_value) in [32, 40, 64] and all(c in '0123456789abcdef' for c in hash_value)) def _gather_threat_data(self, indicator: ThreatIndicator) -> List[Dict[str, Any]]: """Gather threat data from all available sources""" raw_data = [] if indicator.indicator_type == "ip": raw_data.append(self.api.check_ip_alienvault(indicator.value)) raw_data.append(self.api.check_ip_abuseipdb(indicator.value)) elif indicator.indicator_type == "domain": raw_data.append(self.api.check_domain_alienvault(indicator.value)) elif indicator.indicator_type == "hash": raw_data.append(self.api.check_hash_alienvault(indicator.value)) return raw_data # Initialize the analysis engine analysis_engine = ThreatAnalysisEngine() # Global variable to store current analysis results for chatbot context current_analysis_results = None @app.route('/') def index(): """Main page""" return render_template('index.html') @app.route('/analyze', methods=['POST']) def analyze(): """Analyze a threat indicator""" global current_analysis_results try: data = request.get_json() indicator_value = data.get('indicator', '').strip() if not indicator_value: return jsonify({'error': 'No indicator provided'}), 400 # Analyze the indicator report = analysis_engine.analyze_indicator(indicator_value) # Convert to JSON-serializable format report_dict = asdict(report) # Store results for chatbot context current_analysis_results = report_dict return jsonify({ 'success': True, 'report': report_dict }) except ValueError as e: return jsonify({'error': str(e)}), 400 except Exception as e: logger.error(f"Analysis error: {str(e)}") return jsonify({'error': 'Analysis failed. Please try again.'}), 500 @app.route('/chat', methods=['POST']) def chat(): """Handle chatbot messages""" global current_analysis_results try: data = request.get_json() message = data.get('message', '').strip() if not message: return jsonify({'error': 'No message provided'}), 400 # Generate response based on current analysis results response = generate_chat_response(message, current_analysis_results) return jsonify({ 'success': True, 'response': response }) except Exception as e: logger.error(f"Chat error: {str(e)}") return jsonify({'error': 'Chat failed. Please try again.'}), 500 def generate_chat_response(message: str, analysis_results: Optional[Dict[str, Any]]) -> str: """Generate contextual chat response based on message and analysis results""" message_lower = message.lower() # If no analysis results available if not analysis_results: if any(word in message_lower for word in ['analyze', 'check', 'threat', 'indicator']): return "I'd be happy to help you analyze a threat indicator! Please enter an IP address, domain, or file hash in the main interface to get started." elif any(word in message_lower for word in ['hello', 'hi', 'hey']): return "Hello! I'm your threat intelligence assistant. I can help you understand threat analysis results and answer questions about cybersecurity. Try analyzing a threat indicator first!" else: return "I'm here to help with threat intelligence analysis. Please analyze a threat indicator first, then I can answer questions about the results!" # Context-aware responses based on analysis results indicator = analysis_results.get('indicator', {}) risk_level = analysis_results.get('risk_level', 'Unknown') risk_score = analysis_results.get('risk_score', 0) threat_types = analysis_results.get('threat_types', []) summary = analysis_results.get('summary', '') recommendations = analysis_results.get('recommendations', []) # Risk level questions if any(word in message_lower for word in ['risk', 'dangerous', 'safe', 'threat']): risk_percentage = int(risk_score * 100) if risk_level == 'High': return f"The {indicator.get('indicator_type', 'indicator')} {indicator.get('value', '')} has a HIGH risk level with a {risk_percentage}% risk score. This indicates significant threat activity and should be treated with caution." elif risk_level == 'Medium': return f"The {indicator.get('indicator_type', 'indicator')} {indicator.get('value', '')} has a MEDIUM risk level with a {risk_percentage}% risk score. It shows some suspicious activity and should be monitored closely." else: return f"The {indicator.get('indicator_type', 'indicator')} {indicator.get('value', '')} has a LOW risk level with a {risk_percentage}% risk score. It appears to be relatively safe based on current threat intelligence." # Threat types questions if any(word in message_lower for word in ['type', 'kind', 'category', 'malware', 'attack']): if threat_types and threat_types != ['Unknown']: threat_list = ', '.join(threat_types[:3]) # Show top 3 return f"This indicator has been associated with the following threat types: {threat_list}. This information comes from threat intelligence feeds and security research." else: return "No specific threat types have been identified for this indicator, or the threat type information is not available in our current data sources." # Recommendations questions if any(word in message_lower for word in ['recommend', 'should', 'action', 'do', 'next']): if recommendations: rec_list = '; '.join(recommendations[:3]) # Show top 3 return f"Based on the analysis, here are the key recommendations: {rec_list}" else: return "Based on the risk assessment, I recommend monitoring this indicator and taking appropriate security measures based on your organization's policies." # Summary questions if any(word in message_lower for word in ['summary', 'overview', 'explain', 'what', 'tell me']): return f"Here's the analysis summary: {summary}" # General questions about the indicator if any(word in message_lower for word in ['indicator', 'ip', 'domain', 'hash', 'address']): return f"I analyzed the {indicator.get('indicator_type', 'indicator')} '{indicator.get('value', '')}' and found it has a {risk_level} risk level. {summary}" # Default response return f"I can help you understand the analysis results for {indicator.get('value', 'the current indicator')}. You can ask me about the risk level, threat types, recommendations, or request a summary. What would you like to know?" @app.route('/health') def health(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'version': '1.0.0' }) if __name__ == '__main__': # Create necessary directories os.makedirs('templates', exist_ok=True) os.makedirs('static', exist_ok=True) os.makedirs('models', exist_ok=True) os.makedirs('logs', exist_ok=True) # Initialize the analysis engine try: analysis_engine = ThreatAnalysisEngine() logger.info("Threat Intelligence Assistant initialized successfully") except Exception as e: logger.error(f"Failed to initialize analysis engine: {str(e)}") analysis_engine = None # Run the application port = int(os.environ.get('PORT', 7860)) debug = app.config.get('DEBUG', False) logger.info(f"Starting Threat Intelligence Assistant on port {port}") app.run(debug=debug, host='0.0.0.0', port=port)