Spaces:
Runtime error
Runtime error
| # enterprise_ra.py | |
| from fastapi import APIRouter, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import os | |
| import openai | |
| import json | |
| import uuid | |
| # Environment Variables | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
| if GROQ_API_KEY: | |
| GROQ_API_KEY = GROQ_API_KEY.strip() | |
| # Model Setup | |
| def generate_response(system_prompt: str, user_message: str): | |
| if not GROQ_API_KEY: | |
| raise Exception("GROQ_API_KEY environment variable is not set") | |
| client = openai.OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1") | |
| try: | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| temperature=0.4 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| raise Exception(f"GROQ API connection failed: {str(e)}") | |
| # Request Models | |
| class RiskGenerationRequest(BaseModel): | |
| category: str | |
| department: str | |
| business_context: Optional[str] = "" | |
| specific_concerns: Optional[str] = "" | |
| number_of_risks: Optional[int] = 5 | |
| class ThreatGenerationRequest(BaseModel): | |
| risk_name: str | |
| category: str | |
| department: str | |
| number_of_threats: Optional[int] = 3 | |
| # Response Models | |
| class Threat(BaseModel): | |
| name: str | |
| description: str | |
| justification: str | |
| class Risk(BaseModel): | |
| id: str | |
| category: str | |
| name: str | |
| description: str | |
| likelihood: int | |
| impact: int | |
| likelihood_justification: str | |
| impact_justification: str | |
| treatment: str | |
| department: str | |
| escalated: bool | |
| threats: List[Threat] | |
| class RiskGenerationResponse(BaseModel): | |
| success: bool | |
| risks: List[Risk] | |
| message: str | |
| class ThreatGenerationResponse(BaseModel): | |
| success: bool | |
| threats: List[Threat] | |
| message: str | |
| # Enterprise RA Router | |
| enterprise_ra_router = APIRouter() | |
| def generate_enterprise_risks(request: RiskGenerationRequest): | |
| """ | |
| Generate comprehensive enterprise risks based on category and department | |
| """ | |
| system_prompt = """You are an expert enterprise risk analyst. Your task is to generate comprehensive risks for organizations based on the provided category, department, and business context. | |
| CRITICAL: You must respond with ONLY a valid JSON object. Do not include any markdown formatting, code blocks, or additional text. | |
| For each risk, you need to: | |
| 1. Create a clear, specific risk name | |
| 2. Provide a detailed description of the risk | |
| 3. Assess likelihood (1-5 scale, where 1=very unlikely, 5=very likely) with justification | |
| 4. Assess impact (1-5 scale, where 1=minimal impact, 5=catastrophic impact) with justification | |
| 5. Provide appropriate treatment strategies | |
| 6. Generate relevant threats associated with each risk with industry-specific justifications | |
| Consider: | |
| - Industry best practices for risk identification | |
| - Department-specific risks and challenges | |
| - Current business environment factors | |
| - Regulatory and compliance considerations | |
| - Technological and operational dependencies | |
| - Industry statistics and common threat patterns | |
| - Regional and sector-specific risk factors | |
| Provide specific justifications that reference: | |
| - Industry trends and statistics | |
| - Regulatory requirements for the sector | |
| - Common attack vectors in similar organizations | |
| - Historical incidents in the industry | |
| - Technology adoption patterns | |
| - Business model vulnerabilities | |
| RESPOND WITH ONLY THIS EXACT JSON FORMAT (no markdown, no code blocks, no additional text): | |
| { | |
| "risks": [ | |
| { | |
| "name": "Clear, specific risk name", | |
| "description": "Detailed description of the risk and its potential impact on the organization", | |
| "likelihood": 3, | |
| "likelihood_justification": "Specific reasons for this likelihood score based on industry data, trends, and organizational factors", | |
| "impact": 4, | |
| "impact_justification": "Specific reasons for this impact score based on business dependencies, regulatory requirements, and potential consequences", | |
| "treatment": "Specific treatment strategies to mitigate the risk", | |
| "threats": [ | |
| { | |
| "name": "Threat name", | |
| "description": "Detailed description of the threat", | |
| "justification": "Industry-specific reasoning for why this threat is relevant, including statistics, trends, or common occurrences in this sector" | |
| } | |
| ] | |
| } | |
| ] | |
| }""" | |
| user_message = f""" | |
| Generate {request.number_of_risks} enterprise risks for the following context: | |
| Category: {request.category} | |
| Department: {request.department} | |
| Business Context: {request.business_context} | |
| Specific Concerns: {request.specific_concerns} | |
| Please provide comprehensive risks that are relevant to this department and category, including appropriate likelihood and impact assessments, treatment strategies, and associated threats. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Clean the response - remove markdown code blocks if present | |
| cleaned_result = result.strip() | |
| if cleaned_result.startswith('```json'): | |
| cleaned_result = cleaned_result[7:] # Remove ```json | |
| elif cleaned_result.startswith('```'): | |
| cleaned_result = cleaned_result[3:] # Remove ``` | |
| if cleaned_result.endswith('```'): | |
| cleaned_result = cleaned_result[:-3] # Remove trailing ``` | |
| cleaned_result = cleaned_result.strip() | |
| # Extract JSON from the response | |
| json_start = cleaned_result.find('{') | |
| json_end = cleaned_result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = cleaned_result[json_start:json_end] | |
| risks_data = json.loads(json_str) | |
| # Convert to our response format | |
| risks = [] | |
| for risk_data in risks_data.get("risks", []): | |
| risk = Risk( | |
| id=str(uuid.uuid4())[:8], # Generate unique ID | |
| category=request.category, | |
| name=risk_data.get("name", ""), | |
| description=risk_data.get("description", ""), | |
| likelihood=risk_data.get("likelihood", 3), | |
| impact=risk_data.get("impact", 3), | |
| likelihood_justification=risk_data.get("likelihood_justification", "Standard industry assessment"), | |
| impact_justification=risk_data.get("impact_justification", "Based on business impact analysis"), | |
| treatment=risk_data.get("treatment", ""), | |
| department=request.department, | |
| escalated=False, | |
| threats=[Threat(**threat) for threat in risk_data.get("threats", [])] | |
| ) | |
| risks.append(risk) | |
| return RiskGenerationResponse( | |
| success=True, | |
| risks=risks, | |
| message=f"Successfully generated {len(risks)} enterprise risks" | |
| ) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response with sample risks | |
| fallback_risks = [ | |
| Risk( | |
| id=str(uuid.uuid4())[:8], | |
| category=request.category, | |
| name=f"{request.category} Risk Assessment", | |
| description=f"Potential risks related to {request.category} operations in {request.department} department", | |
| likelihood=3, | |
| impact=3, | |
| likelihood_justification=f"Moderate likelihood based on common {request.category} challenges in {request.department} departments", | |
| impact_justification=f"Moderate impact considering typical {request.department} operational dependencies", | |
| treatment=f"Implement comprehensive {request.category} risk management framework", | |
| department=request.department, | |
| escalated=False, | |
| threats=[ | |
| Threat( | |
| name="Operational Disruption", | |
| description="Potential for operational processes to be disrupted", | |
| justification=f"Common threat in {request.department} departments due to process dependencies" | |
| ), | |
| Threat( | |
| name="Compliance Violation", | |
| description="Risk of non-compliance with regulatory requirements", | |
| justification=f"Regulatory compliance is critical in {request.category} category with increasing oversight" | |
| ) | |
| ] | |
| ) | |
| ] | |
| return RiskGenerationResponse( | |
| success=True, | |
| risks=fallback_risks, | |
| message="Generated fallback risks due to processing error" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error generating risks: {str(e)}") | |
| def generate_threats_for_risk(request: ThreatGenerationRequest): | |
| """ | |
| Generate specific threats for a given risk | |
| """ | |
| system_prompt = """You are an expert threat analyst. Your task is to generate specific threats that could lead to or contribute to a given risk. | |
| CRITICAL: You must respond with ONLY a valid JSON object. Do not include any markdown formatting, code blocks, or additional text. | |
| For each threat, provide: | |
| 1. A clear, specific threat name | |
| 2. A detailed description of how this threat could manifest and impact the organization | |
| 3. Industry-specific justification for why this threat is relevant | |
| Consider: | |
| - Direct and indirect threat vectors | |
| - Internal and external threat sources | |
| - Current threat landscape and emerging risks | |
| - Department-specific threat considerations | |
| - Industry-relevant threat patterns | |
| - Statistical data on threat frequency in similar organizations | |
| - Regulatory and compliance threat vectors | |
| - Technology-specific vulnerabilities | |
| - Geographic and sector-specific threat patterns | |
| Provide specific justifications that reference: | |
| - Industry statistics and threat intelligence reports | |
| - Common attack patterns in the sector | |
| - Historical incidents and case studies | |
| - Regulatory requirements and compliance risks | |
| - Technology adoption vulnerabilities | |
| - Supply chain and third-party risks | |
| - Insider threat patterns specific to the department | |
| RESPOND WITH ONLY THIS EXACT JSON FORMAT (no markdown, no code blocks, no additional text): | |
| { | |
| "threats": [ | |
| { | |
| "name": "Specific threat name", | |
| "description": "Detailed description of the threat and how it could impact the organization", | |
| "justification": "Industry-specific reasoning for why this threat is particularly relevant, including statistics, trends, regulatory factors, or common occurrences in this sector and department" | |
| } | |
| ] | |
| }""" | |
| user_message = f""" | |
| Generate {request.number_of_threats} specific threats for the following risk: | |
| Risk Name: {request.risk_name} | |
| Category: {request.category} | |
| Department: {request.department} | |
| Please provide threats that are directly relevant to this risk and could realistically occur in this department context. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Clean the response - remove markdown code blocks if present | |
| cleaned_result = result.strip() | |
| if cleaned_result.startswith('```json'): | |
| cleaned_result = cleaned_result[7:] # Remove ```json | |
| elif cleaned_result.startswith('```'): | |
| cleaned_result = cleaned_result[3:] # Remove ``` | |
| if cleaned_result.endswith('```'): | |
| cleaned_result = cleaned_result[:-3] # Remove trailing ``` | |
| cleaned_result = cleaned_result.strip() | |
| # Extract JSON from the response | |
| json_start = cleaned_result.find('{') | |
| json_end = cleaned_result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = cleaned_result[json_start:json_end] | |
| threats_data = json.loads(json_str) | |
| threats = [Threat(**threat) for threat in threats_data.get("threats", [])] | |
| return ThreatGenerationResponse( | |
| success=True, | |
| threats=threats, | |
| message=f"Successfully generated {len(threats)} threats for risk: {request.risk_name}" | |
| ) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response | |
| fallback_threats = [ | |
| Threat( | |
| name="System Failure", | |
| description="Critical system components may fail leading to operational disruption", | |
| justification=f"System failures are common in {request.department} departments due to technology dependencies and aging infrastructure" | |
| ), | |
| Threat( | |
| name="Human Error", | |
| description="Mistakes by personnel could trigger or worsen the risk scenario", | |
| justification=f"Human error accounts for 80% of security incidents in {request.category} category according to industry reports" | |
| ), | |
| Threat( | |
| name="External Dependencies", | |
| description="Failure of external services or suppliers could contribute to the risk", | |
| justification=f"Third-party dependencies are increasing in {request.department} operations, creating additional threat vectors" | |
| ) | |
| ] | |
| return ThreatGenerationResponse( | |
| success=True, | |
| threats=fallback_threats[:request.number_of_threats], | |
| message="Generated fallback threats due to processing error" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error generating threats: {str(e)}") | |