Spaces:
Runtime error
Runtime error
| # app.py | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import os | |
| import openai | |
| import json | |
| app = FastAPI() | |
| # Environment Variables | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
| # Model Setup | |
| def generate_response(system_prompt: str, user_message: str): | |
| client = openai.OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1") | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", # Try this supported model | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| temperature=0.4 | |
| ) | |
| return response.choices[0].message.content | |
| # Request models | |
| class Message(BaseModel): | |
| message: str | |
| class ProcessData(BaseModel): | |
| processName: str | |
| department: str | |
| description: str | |
| owner: str | |
| businessContext: str | |
| rto: str | |
| mtpd: str | |
| minTolerableDowntime: str | |
| # Response models | |
| class Threat(BaseModel): | |
| id: int | |
| name: str | |
| description: str | |
| likelihood: int | |
| impact: int | |
| category: str | |
| mitigation: str | |
| class ThreatsResponse(BaseModel): | |
| threats: List[Threat] | |
| def generate_threats(process_data: ProcessData): | |
| """ | |
| Generate threats for a given business process based on its characteristics | |
| """ | |
| system_prompt = """ | |
| You are an expert cybersecurity and business continuity risk analyst. Your task is to analyze business processes and identify potential threats that could disrupt operations. | |
| Given the process information, generate a comprehensive list of threats that could affect this specific business process. Consider: | |
| - Cybersecurity threats (malware, ransomware, phishing, insider threats) | |
| - Operational threats (system failures, human error, supply chain disruption) | |
| - Natural disasters and environmental threats | |
| - Regulatory and compliance risks | |
| - Third-party and vendor risks | |
| - Physical security threats | |
| For each threat, provide: | |
| - A unique sequential ID | |
| - A clear, specific name | |
| - A detailed description of how it could impact this process | |
| - Likelihood rating (1-5, where 1=very unlikely, 5=very likely) | |
| - Impact rating (1-5, where 1=minimal impact, 5=catastrophic impact) | |
| - A relevant category | |
| - Practical mitigation strategies | |
| Consider the process's RTO, MTPD, and minimum tolerable downtime when assessing impact. | |
| Respond strictly in this JSON format: | |
| { | |
| "threats": [ | |
| { | |
| "id": 1, | |
| "name": "Threat Name", | |
| "description": "Detailed description of the threat and its potential impact", | |
| "likelihood": 3, | |
| "impact": 4, | |
| "category": "Security/Operational/Environmental/Regulatory", | |
| "mitigation": "Specific mitigation strategies" | |
| } | |
| ] | |
| } | |
| """ | |
| user_message = f""" | |
| Process Details: | |
| - Process Name: {process_data.processName} | |
| - Department: {process_data.department} | |
| - Description: {process_data.description} | |
| - Process Owner: {process_data.owner} | |
| - Business Context: {process_data.businessContext} | |
| - Recovery Time Objective (RTO): {process_data.rto} | |
| - Maximum Tolerable Period of Disruption (MTPD): {process_data.mtpd} | |
| - Minimum Tolerable Downtime: {process_data.minTolerableDowntime} | |
| Please analyze this process and generate 8-12 relevant threats with their risk assessments and mitigation strategies. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Extract JSON from the response (AI sometimes adds explanatory text) | |
| json_start = result.find('{') | |
| json_end = result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = result[json_start:json_end] | |
| threats_data = json.loads(json_str) | |
| return ThreatsResponse(**threats_data) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response if JSON parsing fails | |
| return ThreatsResponse(threats=[ | |
| Threat( | |
| id=1, | |
| name="System Unavailability", | |
| description="Critical system failure affecting process execution", | |
| likelihood=3, | |
| impact=4, | |
| category="Operational", | |
| mitigation="Implement redundant systems and regular backups" | |
| ), | |
| Threat( | |
| id=2, | |
| name="Cyber Attack", | |
| description="Malicious attack disrupting core operations", | |
| likelihood=3, | |
| impact=4, | |
| category="Security", | |
| mitigation="Use firewalls and real-time monitoring" | |
| ), | |
| Threat( | |
| id=3, | |
| name="Data Breach", | |
| description="Unauthorized access to sensitive process data", | |
| likelihood=2, | |
| impact=5, | |
| category="Security", | |
| mitigation="Implement encryption and access controls" | |
| ) | |
| ]) | |
| except Exception as e: | |
| # Fallback response for any other errors | |
| return ThreatsResponse(threats=[ | |
| Threat( | |
| id=1, | |
| name="Process Disruption", | |
| description="General process disruption due to unforeseen circumstances", | |
| likelihood=3, | |
| impact=3, | |
| category="Operational", | |
| mitigation="Develop comprehensive business continuity plans" | |
| ) | |
| ]) | |
| def bia_threat_assessment(req: Message): | |
| prompt = """ | |
| You are a cybersecurity and geopolitical risk analyst AI working on Business Impact Assessment (BIA). | |
| Given a paragraph, do the following: | |
| 1. Identify the **place** mentioned in the text. | |
| 2. List likely **threats** specific to that place and context. | |
| 3. For each threat: | |
| - Give a **likelihood rating (1–5)**. | |
| - Give a **severity rating (1–5)**. | |
| - Describe the **potential impact**. | |
| - Compute **threat rating = likelihood × severity**. | |
| Respond strictly in this JSON format: | |
| { | |
| "place": "<place>", | |
| "threats": [ | |
| { | |
| "name": "<threat name>", | |
| "likelihood": <1-5>, | |
| "severity": <1-5>, | |
| "impact": "<impact statement>", | |
| "threat_rating": <likelihood * severity> | |
| } | |
| ] | |
| } | |
| """ | |
| result = generate_response(prompt, req.message) | |
| return result | |
| def bia_impact_analysis(req: Message): | |
| return { | |
| "status": "placeholder", | |
| "note": "This endpoint is reserved for BIA impact analysis logic." | |
| } | |
| class RiskItem(BaseModel): | |
| enablerType: str | |
| enablerDomain: str | |
| majorCategory: str | |
| mappedThreat: str | |
| existingControls: str | |
| complianceStatus: str | |
| impact: str | |
| likelihood: str | |
| riskValue: str | |
| class MitigationResponse(BaseModel): | |
| revisedImpact: int | |
| revisedLikelihood: int | |
| revisedRiskValue: int | |
| mitigationPlan: str | |
| ownership: str | |
| class RiskMitigationResponse(BaseModel): | |
| mitigatedRisks: List[MitigationResponse] | |
| def generate_risk_mitigation(risk_items: List[RiskItem]): | |
| """ | |
| Generate mitigation plans and revised risk assessments for identified threats | |
| """ | |
| system_prompt = """ | |
| You are an expert risk management and business continuity analyst. Your task is to analyze existing risk items and provide comprehensive mitigation strategies that will reduce the overall risk. | |
| For each risk item provided, you need to: | |
| 1. Analyze the current risk assessment (impact, likelihood, risk value) | |
| 2. Evaluate existing controls and compliance status | |
| 3. Recommend additional mitigation measures | |
| 4. Provide revised risk ratings after implementing the mitigation plan | |
| 5. Assign appropriate ownership for the mitigation activities | |
| Consider: | |
| - Current controls effectiveness and compliance status | |
| - Industry best practices for the specific threat type | |
| - Cost-effective mitigation strategies | |
| - Realistic timeline for implementation | |
| - Appropriate ownership based on enabler type and domain | |
| For revised ratings: | |
| - Impact (1-5): Consider how mitigation reduces potential damage | |
| - Likelihood (1-5): Consider how mitigation reduces probability of occurrence | |
| - Risk Value: Calculate as revised impact × revised likelihood | |
| For mitigation plans: | |
| - Maximum 3 bullet points | |
| - Each point maximum 10 words | |
| - Be concise and actionable | |
| Respond strictly in this JSON format (no newlines within strings): | |
| { | |
| "mitigatedRisks": [ | |
| { | |
| "revisedImpact": 2, | |
| "revisedLikelihood": 2, | |
| "revisedRiskValue": 4, | |
| "mitigationPlan": "• Install fire suppression systems • Conduct quarterly training • Implement monitoring alerts", | |
| "ownership": "Responsible party/department" | |
| } | |
| ] | |
| } | |
| """ | |
| # Format the risk items for the AI | |
| risk_data = [] | |
| for i, item in enumerate(risk_items, 1): | |
| risk_data.append(f""" | |
| Risk Item {i}: | |
| - Enabler Type: {item.enablerType} | |
| - Enabler Domain: {item.enablerDomain} | |
| - Major Category: {item.majorCategory} | |
| - Mapped Threat: {item.mappedThreat} | |
| - Existing Controls: {item.existingControls} | |
| - Compliance Status: {item.complianceStatus} | |
| - Current Impact: {item.impact} | |
| - Current Likelihood: {item.likelihood} | |
| - Current Risk Value: {item.riskValue} | |
| """) | |
| user_message = f""" | |
| Please analyze the following risk items and provide mitigation strategies: | |
| {''.join(risk_data)} | |
| For each risk item, provide: | |
| 1. Revised impact rating (1-5) after implementing mitigation | |
| 2. Revised likelihood rating (1-5) after implementing mitigation | |
| 3. Revised risk value (impact × likelihood) | |
| 4. Concise mitigation plan (max 3 points, 10 words each) | |
| 5. Appropriate ownership assignment (department/role responsible) | |
| Consider the existing controls and compliance status when developing mitigation plans. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Extract JSON from the response | |
| json_start = result.find('{') | |
| json_end = result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = result[json_start:json_end] | |
| # The AI returns properly formatted JSON with newlines, just parse it directly | |
| try: | |
| mitigation_data = json.loads(json_str) | |
| return RiskMitigationResponse(**mitigation_data) | |
| except json.JSONDecodeError as e: | |
| # If direct parsing fails, try cleaning the JSON | |
| import re | |
| json_str = re.sub(r'\n\s*', ' ', json_str) | |
| json_str = re.sub(r'\r\s*', ' ', json_str) | |
| json_str = re.sub(r'\t+', ' ', json_str) | |
| json_str = re.sub(r'\s+', ' ', json_str) | |
| mitigation_data = json.loads(json_str) | |
| return RiskMitigationResponse(**mitigation_data) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response if JSON parsing fails - provide intelligent mitigation | |
| fallback_risks = [] | |
| for i, item in enumerate(risk_items): | |
| # Intelligent fallback logic based on threat category and existing controls | |
| current_impact = int(item.impact) | |
| current_likelihood = int(item.likelihood) | |
| # Risk reduction logic based on category | |
| impact_reduction = 1 | |
| likelihood_reduction = 1 | |
| if item.majorCategory.lower() in ['fire', 'natural disaster']: | |
| impact_reduction = 2 | |
| likelihood_reduction = 2 | |
| elif item.majorCategory.lower() in ['cyber security', 'security']: | |
| impact_reduction = 1 | |
| likelihood_reduction = 2 | |
| revised_impact = max(1, current_impact - impact_reduction) | |
| revised_likelihood = max(1, current_likelihood - likelihood_reduction) | |
| # Generate category-specific mitigation plans | |
| if item.majorCategory.lower() == 'fire': | |
| mitigation_plan = "• Install automatic fire suppression systems • Conduct quarterly safety training • Implement 24/7 monitoring alerts" | |
| ownership = "Facilities Management Team" | |
| elif item.majorCategory.lower() in ['cyber security', 'security']: | |
| mitigation_plan = "• Deploy endpoint detection response systems • Implement network segmentation controls • Conduct regular penetration testing" | |
| ownership = "Information Security Team" | |
| else: | |
| mitigation_plan = f"• Enhance existing {item.majorCategory.lower()} controls • Implement continuous monitoring systems • Establish incident response procedures" | |
| ownership = f"{item.enablerDomain} Team" | |
| fallback_risks.append(MitigationResponse( | |
| revisedImpact=revised_impact, | |
| revisedLikelihood=revised_likelihood, | |
| revisedRiskValue=revised_impact * revised_likelihood, | |
| mitigationPlan=mitigation_plan, | |
| ownership=ownership | |
| )) | |
| return RiskMitigationResponse(mitigatedRisks=fallback_risks) | |
| except Exception as e: | |
| # General fallback for any other errors | |
| fallback_risks = [] | |
| for i, item in enumerate(risk_items): | |
| fallback_risks.append(MitigationResponse( | |
| revisedImpact=2, | |
| revisedLikelihood=2, | |
| revisedRiskValue=4, | |
| mitigationPlan="• Implement enhanced risk controls • Establish monitoring procedures • Conduct regular assessments", | |
| ownership="Risk Management Team" | |
| )) | |
| return RiskMitigationResponse(mitigatedRisks=fallback_risks) | |