Spaces:
Runtime error
Runtime error
| # threat_ra.py | |
| from fastapi import APIRouter, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import os | |
| import openai | |
| import json | |
| import uuid | |
| # Environment Variables | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
| if GROQ_API_KEY: | |
| GROQ_API_KEY = GROQ_API_KEY.strip() | |
| # Model Setup | |
| def generate_response(system_prompt: str, user_message: str): | |
| if not GROQ_API_KEY: | |
| raise Exception("GROQ_API_KEY environment variable is not set") | |
| client = openai.OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1") | |
| try: | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| temperature=0.4 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| raise Exception(f"GROQ API connection failed: {str(e)}") | |
| # Request Models | |
| class ThreatRiskGenerationRequest(BaseModel): | |
| domain: str | |
| category: str | |
| business_context: Optional[str] = "" | |
| specific_focus: Optional[str] = "" | |
| number_of_records: Optional[int] = 10 | |
| class ThreatRiskAnalysisRequest(BaseModel): | |
| domain: str | |
| risk_name: str | |
| threat: str | |
| vulnerability: str | |
| category: str | |
| # Response Models | |
| class ThreatRisk(BaseModel): | |
| id: str | |
| domain: str | |
| riskName: str | |
| threat: str | |
| vulnerability: str | |
| category: str | |
| likelihood: int | |
| impact: int | |
| rating: int | |
| likelihood_justification: str | |
| impact_justification: str | |
| threat_justification: str | |
| vulnerability_justification: str | |
| class ThreatRiskGenerationResponse(BaseModel): | |
| success: bool | |
| threatRisks: List[ThreatRisk] | |
| message: str | |
| class ThreatRiskAnalysisResponse(BaseModel): | |
| success: bool | |
| analysis: ThreatRisk | |
| recommendations: List[str] | |
| message: str | |
| # Threat RA Router | |
| threat_ra_router = APIRouter() | |
| def generate_threat_risks(request: ThreatRiskGenerationRequest): | |
| """ | |
| Generate comprehensive threat risk records for threat risk assessment | |
| """ | |
| system_prompt = """You are an expert threat risk analyst. Your task is to generate comprehensive threat risk records that include domain-specific risks, threats, vulnerabilities, and their assessments. | |
| CRITICAL: You must respond with ONLY a valid JSON object. Do not include any markdown formatting, code blocks, or additional text. | |
| For each threat risk record, you need to: | |
| 1. Create a specific risk name relevant to the domain | |
| 2. Identify a credible threat that could exploit vulnerabilities | |
| 3. Identify specific vulnerabilities that could be exploited | |
| 4. Assess likelihood (1-5 scale, where 1=very unlikely, 5=very likely) with justification | |
| 5. Assess impact (1-5 scale, where 1=minimal impact, 5=catastrophic impact) with justification | |
| 6. Provide justifications for why this threat and vulnerability are relevant to the domain | |
| Consider: | |
| - Domain-specific threats and vulnerabilities | |
| - Current threat landscape and attack vectors | |
| - Industry-specific risk factors | |
| - Realistic likelihood and impact assessments | |
| - Emerging threats and evolving attack methods | |
| - Statistical data and threat intelligence | |
| - Regulatory and compliance considerations | |
| - Technology and process vulnerabilities | |
| Provide specific justifications that reference: | |
| - Industry threat statistics and reports | |
| - Common attack patterns in the domain | |
| - Vulnerability prevalence in similar organizations | |
| - Historical incidents and case studies | |
| - Regulatory requirements and compliance gaps | |
| - Technology adoption risks | |
| - Operational and process weaknesses | |
| RESPOND WITH ONLY THIS EXACT JSON FORMAT (no markdown, no code blocks, no additional text): | |
| { | |
| "threatRisks": [ | |
| { | |
| "riskName": "Specific risk name", | |
| "threat": "Specific threat vector", | |
| "threat_justification": "Industry-specific reasoning for why this threat is particularly relevant to this domain", | |
| "vulnerability": "Specific vulnerability that could be exploited", | |
| "vulnerability_justification": "Explanation of why this vulnerability is common or likely in this domain", | |
| "likelihood": 3, | |
| "likelihood_justification": "Specific reasons for this likelihood score based on domain factors and threat intelligence", | |
| "impact": 4, | |
| "impact_justification": "Specific reasons for this impact score based on business dependencies and potential consequences" | |
| } | |
| ] | |
| }""" | |
| user_message = f""" | |
| Generate {request.number_of_records} threat risk records for the following context: | |
| Domain: {request.domain} | |
| Category: {request.category} | |
| Business Context: {request.business_context} | |
| Specific Focus: {request.specific_focus} | |
| Please provide comprehensive threat risk records that include specific risks, threats, and vulnerabilities relevant to this domain and category. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Clean the response - remove markdown code blocks if present | |
| cleaned_result = result.strip() | |
| if cleaned_result.startswith('```json'): | |
| cleaned_result = cleaned_result[7:] # Remove ```json | |
| elif cleaned_result.startswith('```'): | |
| cleaned_result = cleaned_result[3:] # Remove ``` | |
| if cleaned_result.endswith('```'): | |
| cleaned_result = cleaned_result[:-3] # Remove trailing ``` | |
| cleaned_result = cleaned_result.strip() | |
| # Extract JSON from the response | |
| json_start = cleaned_result.find('{') | |
| json_end = cleaned_result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = cleaned_result[json_start:json_end] | |
| risks_data = json.loads(json_str) | |
| # Convert to our response format | |
| threat_risks = [] | |
| for risk_data in risks_data.get("threatRisks", []): | |
| likelihood = risk_data.get("likelihood", 3) | |
| impact = risk_data.get("impact", 3) | |
| rating = likelihood * impact | |
| threat_risk = ThreatRisk( | |
| id=str(uuid.uuid4())[:8], # Generate unique ID | |
| domain=request.domain, | |
| riskName=risk_data.get("riskName", ""), | |
| threat=risk_data.get("threat", ""), | |
| vulnerability=risk_data.get("vulnerability", ""), | |
| category=request.category, | |
| likelihood=likelihood, | |
| impact=impact, | |
| rating=rating, | |
| likelihood_justification=risk_data.get("likelihood_justification", "Standard domain assessment"), | |
| impact_justification=risk_data.get("impact_justification", "Based on business impact analysis"), | |
| threat_justification=risk_data.get("threat_justification", "Common threat in this domain"), | |
| vulnerability_justification=risk_data.get("vulnerability_justification", "Typical vulnerability for this category") | |
| ) | |
| threat_risks.append(threat_risk) | |
| return ThreatRiskGenerationResponse( | |
| success=True, | |
| threatRisks=threat_risks, | |
| message=f"Successfully generated {len(threat_risks)} threat risk records" | |
| ) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response with sample threat risks | |
| fallback_risks = [] | |
| for i in range(min(request.number_of_records, 3)): | |
| likelihood = 3 | |
| impact = 3 | |
| rating = likelihood * impact | |
| fallback_risk = ThreatRisk( | |
| id=str(uuid.uuid4())[:8], | |
| domain=request.domain, | |
| riskName=f"{request.category} Risk {i+1}", | |
| threat=f"Generic {request.category} Threat", | |
| vulnerability=f"System vulnerability in {request.domain}", | |
| category=request.category, | |
| likelihood=likelihood, | |
| impact=impact, | |
| rating=rating, | |
| likelihood_justification=f"Moderate likelihood based on typical {request.category} threats in {request.domain} domain", | |
| impact_justification=f"Moderate impact considering standard {request.domain} operational dependencies", | |
| threat_justification=f"Common threat vector observed in {request.category} category across similar organizations", | |
| vulnerability_justification=f"Typical vulnerability found in {request.domain} systems due to legacy infrastructure" | |
| ) | |
| fallback_risks.append(fallback_risk) | |
| return ThreatRiskGenerationResponse( | |
| success=True, | |
| threatRisks=fallback_risks, | |
| message="Generated fallback threat risks due to processing error" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error generating threat risks: {str(e)}") | |
| def analyze_threat_risk(request: ThreatRiskAnalysisRequest): | |
| """ | |
| Provide detailed analysis and recommendations for a specific threat risk scenario | |
| """ | |
| system_prompt = """You are an expert threat risk analyst. Your task is to provide detailed analysis and recommendations for a specific threat risk scenario. | |
| CRITICAL: You must respond with ONLY a valid JSON object. Do not include any markdown formatting, code blocks, or additional text. | |
| Analyze the provided threat risk scenario and provide: | |
| 1. Likelihood assessment (1-5 scale) with detailed justification | |
| 2. Impact assessment (1-5 scale) with detailed justification | |
| 3. Overall risk rating (likelihood × impact) | |
| 4. Specific recommendations for risk mitigation | |
| 5. Detection and prevention strategies | |
| 6. Industry-specific context and reasoning | |
| Consider: | |
| - Current threat landscape and attack trends | |
| - Domain-specific vulnerabilities and exposures | |
| - Industry best practices for risk mitigation | |
| - Cost-effective security controls | |
| - Realistic implementation timelines | |
| - Regulatory and compliance requirements | |
| - Historical incident data and case studies | |
| - Technology and operational dependencies | |
| Provide specific justifications that reference: | |
| - Industry threat intelligence and statistics | |
| - Regulatory requirements and compliance standards | |
| - Common vulnerabilities in similar organizations | |
| - Attack patterns and methodologies | |
| - Business impact factors and dependencies | |
| - Technology-specific risk factors | |
| - Geographic and sector-specific considerations | |
| RESPOND WITH ONLY THIS EXACT JSON FORMAT (no markdown, no code blocks, no additional text): | |
| { | |
| "analysis": { | |
| "likelihood": 3, | |
| "likelihood_justification": "Detailed justification for the likelihood assessment based on threat intelligence, industry data, and domain-specific factors", | |
| "impact": 4, | |
| "impact_justification": "Detailed justification for the impact assessment based on business dependencies, regulatory requirements, and potential consequences" | |
| }, | |
| "recommendations": [ | |
| "Specific recommendation 1 with industry context", | |
| "Specific recommendation 2 with regulatory reference", | |
| "Specific recommendation 3 with cost-benefit analysis" | |
| ] | |
| }""" | |
| user_message = f""" | |
| Analyze the following threat risk scenario: | |
| Domain: {request.domain} | |
| Risk Name: {request.risk_name} | |
| Threat: {request.threat} | |
| Vulnerability: {request.vulnerability} | |
| Category: {request.category} | |
| Please provide a comprehensive analysis including likelihood and impact assessments, and specific recommendations for mitigating this threat risk. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Clean the response - remove markdown code blocks if present | |
| cleaned_result = result.strip() | |
| if cleaned_result.startswith('```json'): | |
| cleaned_result = cleaned_result[7:] # Remove ```json | |
| elif cleaned_result.startswith('```'): | |
| cleaned_result = cleaned_result[3:] # Remove ``` | |
| if cleaned_result.endswith('```'): | |
| cleaned_result = cleaned_result[:-3] # Remove trailing ``` | |
| cleaned_result = cleaned_result.strip() | |
| # Extract JSON from the response | |
| json_start = cleaned_result.find('{') | |
| json_end = cleaned_result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = cleaned_result[json_start:json_end] | |
| analysis_data = json.loads(json_str) | |
| # Extract analysis data | |
| analysis_info = analysis_data.get("analysis", {}) | |
| likelihood = analysis_info.get("likelihood", 3) | |
| impact = analysis_info.get("impact", 3) | |
| rating = likelihood * impact | |
| # Create analysis response | |
| analysis = ThreatRisk( | |
| id=str(uuid.uuid4())[:8], | |
| domain=request.domain, | |
| riskName=request.risk_name, | |
| threat=request.threat, | |
| vulnerability=request.vulnerability, | |
| category=request.category, | |
| likelihood=likelihood, | |
| impact=impact, | |
| rating=rating, | |
| likelihood_justification=analysis_info.get("likelihood_justification", "Standard assessment"), | |
| impact_justification=analysis_info.get("impact_justification", "Based on business analysis"), | |
| threat_justification=f"Threat analysis for {request.threat} in {request.domain} domain", | |
| vulnerability_justification=f"Vulnerability assessment for {request.vulnerability} in {request.category} category" | |
| ) | |
| recommendations = analysis_data.get("recommendations", []) | |
| return ThreatRiskAnalysisResponse( | |
| success=True, | |
| analysis=analysis, | |
| recommendations=recommendations, | |
| message="Successfully analyzed threat risk scenario" | |
| ) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response | |
| likelihood = 3 | |
| impact = 3 | |
| rating = likelihood * impact | |
| fallback_analysis = ThreatRisk( | |
| id=str(uuid.uuid4())[:8], | |
| domain=request.domain, | |
| riskName=request.risk_name, | |
| threat=request.threat, | |
| vulnerability=request.vulnerability, | |
| category=request.category, | |
| likelihood=likelihood, | |
| impact=impact, | |
| rating=rating, | |
| likelihood_justification=f"Moderate likelihood based on common {request.threat} patterns in {request.domain} domain", | |
| impact_justification=f"Moderate impact considering typical {request.category} business dependencies", | |
| threat_justification=f"{request.threat} is a recognized threat vector in {request.domain} operations", | |
| vulnerability_justification=f"{request.vulnerability} is commonly found in {request.category} systems" | |
| ) | |
| fallback_recommendations = [ | |
| f"Implement security controls specific to {request.category}", | |
| f"Regular assessment and monitoring of {request.domain} systems", | |
| f"Employee training on {request.threat} prevention", | |
| f"Establish incident response procedures for {request.risk_name}" | |
| ] | |
| return ThreatRiskAnalysisResponse( | |
| success=True, | |
| analysis=fallback_analysis, | |
| recommendations=fallback_recommendations, | |
| message="Generated fallback analysis due to processing error" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error analyzing threat risk: {str(e)}") | |
| def generate_bulk_threat_analysis(domains: List[str], categories: List[str]): | |
| """ | |
| Generate bulk threat risk analysis for multiple domains and categories | |
| """ | |
| try: | |
| results = [] | |
| for domain in domains: | |
| for category in categories: | |
| request = ThreatRiskGenerationRequest( | |
| domain=domain, | |
| category=category, | |
| number_of_records=5 | |
| ) | |
| response = generate_threat_risks(request) | |
| if response.success: | |
| results.extend(response.threatRisks) | |
| return { | |
| "success": True, | |
| "total_records": len(results), | |
| "threat_risks": results, | |
| "message": f"Successfully generated {len(results)} threat risk records across {len(domains)} domains and {len(categories)} categories" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error in bulk analysis: {str(e)}") | |