Spaces:
Runtime error
Runtime error
| # app.py | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import os | |
| import openai | |
| import json | |
| app = FastAPI() | |
| # Environment Variables | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY").strip() | |
| # Model Setup | |
| def generate_response(system_prompt: str, user_message: str): | |
| client = openai.OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1") | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", # Try this supported model | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| temperature=0.4 | |
| ) | |
| return response.choices[0].message.content | |
| # Request models | |
| class Message(BaseModel): | |
| message: str | |
| class ProcessData(BaseModel): | |
| processName: str | |
| department: str | |
| description: str | |
| owner: str | |
| businessContext: str | |
| rto: str | |
| mtpd: str | |
| minTolerableDowntime: str | |
| # Response models | |
| class Threat(BaseModel): | |
| id: int | |
| name: str | |
| description: str | |
| likelihood: int | |
| impact: int | |
| category: str | |
| mitigation: str | |
| class ThreatsResponse(BaseModel): | |
| threats: List[Threat] | |
| def generate_threats(process_data: ProcessData): | |
| """ | |
| Generate threats for a given business process based on its characteristics | |
| """ | |
| system_prompt = """ | |
| You are an expert cybersecurity and business continuity risk analyst. Your task is to analyze business processes and identify potential threats that could disrupt operations. | |
| Given the process information, generate a comprehensive list of threats that could affect this specific business process. Consider: | |
| - Cybersecurity threats (malware, ransomware, phishing, insider threats) | |
| - Operational threats (system failures, human error, supply chain disruption) | |
| - Natural disasters and environmental threats | |
| - Regulatory and compliance risks | |
| - Third-party and vendor risks | |
| - Physical security threats | |
| For each threat, provide: | |
| - A unique sequential ID | |
| - A clear, specific name | |
| - A detailed description of how it could impact this process | |
| - Likelihood rating (1-5, where 1=very unlikely, 5=very likely) | |
| - Impact rating (1-5, where 1=minimal impact, 5=catastrophic impact) | |
| - A relevant category | |
| - Practical mitigation strategies | |
| Consider the process's RTO, MTPD, and minimum tolerable downtime when assessing impact. | |
| Respond strictly in this JSON format: | |
| { | |
| "threats": [ | |
| { | |
| "id": 1, | |
| "name": "Threat Name", | |
| "description": "Detailed description of the threat and its potential impact", | |
| "likelihood": 3, | |
| "impact": 4, | |
| "category": "Security/Operational/Environmental/Regulatory", | |
| "mitigation": "Specific mitigation strategies" | |
| } | |
| ] | |
| } | |
| """ | |
| user_message = f""" | |
| Process Details: | |
| - Process Name: {process_data.processName} | |
| - Department: {process_data.department} | |
| - Description: {process_data.description} | |
| - Process Owner: {process_data.owner} | |
| - Business Context: {process_data.businessContext} | |
| - Recovery Time Objective (RTO): {process_data.rto} | |
| - Maximum Tolerable Period of Disruption (MTPD): {process_data.mtpd} | |
| - Minimum Tolerable Downtime: {process_data.minTolerableDowntime} | |
| Please analyze this process and generate 8-12 relevant threats with their risk assessments and mitigation strategies. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Extract JSON from the response (AI sometimes adds explanatory text) | |
| json_start = result.find('{') | |
| json_end = result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = result[json_start:json_end] | |
| threats_data = json.loads(json_str) | |
| return ThreatsResponse(**threats_data) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response if JSON parsing fails | |
| return ThreatsResponse(threats=[ | |
| Threat( | |
| id=1, | |
| name="System Unavailability", | |
| description="Critical system failure affecting process execution", | |
| likelihood=3, | |
| impact=4, | |
| category="Operational", | |
| mitigation="Implement redundant systems and regular backups" | |
| ), | |
| Threat( | |
| id=2, | |
| name="Cyber Attack", | |
| description="Malicious attack disrupting core operations", | |
| likelihood=3, | |
| impact=4, | |
| category="Security", | |
| mitigation="Use firewalls and real-time monitoring" | |
| ), | |
| Threat( | |
| id=3, | |
| name="Data Breach", | |
| description="Unauthorized access to sensitive process data", | |
| likelihood=2, | |
| impact=5, | |
| category="Security", | |
| mitigation="Implement encryption and access controls" | |
| ) | |
| ]) | |
| except Exception as e: | |
| # Fallback response for any other errors | |
| return ThreatsResponse(threats=[ | |
| Threat( | |
| id=1, | |
| name="Process Disruption", | |
| description="General process disruption due to unforeseen circumstances", | |
| likelihood=3, | |
| impact=3, | |
| category="Operational", | |
| mitigation="Develop comprehensive business continuity plans" | |
| ) | |
| ]) | |
| def bia_threat_assessment(req: Message): | |
| prompt = """ | |
| You are a cybersecurity and geopolitical risk analyst AI working on Business Impact Assessment (BIA). | |
| Given a paragraph, do the following: | |
| 1. Identify the **place** mentioned in the text. | |
| 2. List likely **threats** specific to that place and context. | |
| 3. For each threat: | |
| - Give a **likelihood rating (1–5)**. | |
| - Give a **severity rating (1–5)**. | |
| - Describe the **potential impact**. | |
| - Compute **threat rating = likelihood × severity**. | |
| Respond strictly in this JSON format: | |
| { | |
| "place": "<place>", | |
| "threats": [ | |
| { | |
| "name": "<threat name>", | |
| "likelihood": <1-5>, | |
| "severity": <1-5>, | |
| "impact": "<impact statement>", | |
| "threat_rating": <likelihood * severity> | |
| } | |
| ] | |
| } | |
| """ | |
| result = generate_response(prompt, req.message) | |
| return result | |
| class RiskQuestion(BaseModel): | |
| category: str | |
| question: str | |
| user_answer: str | |
| class RiskRequestModel(BaseModel): | |
| responses: List[RiskQuestion] | |
| class RiskTrends(BaseModel): | |
| top_category: str | |
| risk_severity: str | |
| observations: List[str] | |
| class RiskSummary(BaseModel): | |
| risk_classification_summary: str | |
| mitigation_suggestions: List[str] | |
| risk_trends: RiskTrends | |
| class RiskAnalysis(BaseModel): | |
| risk_id: str | |
| category: str | |
| question: str | |
| user_answer: str | |
| risk_name: str | |
| identified_threat: str | |
| likelihood: str | |
| impact: str | |
| risk_value: int | |
| residual_risk: str | |
| current_control_description: str | |
| current_control_rating: str | |
| business_unit: str | |
| risk_owner: str | |
| timeline: str | |
| mitigation_plan: str | |
| summary: RiskSummary | |
| class RiskMitigationResponse(BaseModel): | |
| risk_analysis: RiskAnalysis | |
| def generate_risk_mitigation(request: RiskRequestModel): | |
| """ | |
| Generate comprehensive risk analysis and mitigation plan based on user responses | |
| """ | |
| system_prompt = """ | |
| You are an expert risk management and business continuity analyst. Your task is to analyze user responses to risk assessment questions and provide detailed risk analysis with mitigation strategies. | |
| For the risk question provided, you need to: | |
| 1. Create a unique risk identifier (RISK-XXX format) | |
| 2. Identify the specific risk from the user's answer | |
| 3. Assess likelihood (Low, Medium, High, Very High) and impact (Minor, Moderate, Significant, Severe) | |
| 4. Calculate a risk value (1-10 scale) | |
| 5. Determine residual risk (Low, Moderate, High, Critical) | |
| 6. Evaluate current controls based on the user's answer | |
| 7. Assign appropriate business unit and risk owner | |
| 8. Provide a mitigation plan with timeline | |
| 9. Create a comprehensive risk summary with classification, mitigation suggestions, and trends | |
| Use your expertise to make reasonable assumptions about the business context when details are limited. | |
| Respond strictly in this JSON format: | |
| { | |
| "risk_analysis": { | |
| "risk_id": "RISK-XXX", | |
| "category": "The risk category from input", | |
| "question": "The original question", | |
| "user_answer": "The user's response", | |
| "risk_name": "Concise name of the identified risk", | |
| "identified_threat": "Detailed description of the threat identified", | |
| "likelihood": "High/Medium/Low/Very High", | |
| "impact": "Severe/Significant/Moderate/Minor", | |
| "risk_value": 1-10, | |
| "residual_risk": "Critical/High/Moderate/Low", | |
| "current_control_description": "Description of current controls based on user answer", | |
| "current_control_rating": "Good/Fair/Poor", | |
| "business_unit": "Relevant department responsible", | |
| "risk_owner": "Specific role responsible for the risk", | |
| "timeline": "Immediate/Short-term/Medium-term/Long-term", | |
| "mitigation_plan": "Detailed plan to address the risk", | |
| "summary": { | |
| "risk_classification_summary": "Brief summary of the risk classification", | |
| "mitigation_suggestions": [ | |
| "Suggestion 1", | |
| "Suggestion 2", | |
| "Suggestion 3" | |
| ], | |
| "risk_trends": { | |
| "top_category": "Most critical risk category", | |
| "risk_severity": "Overall severity assessment", | |
| "observations": [ | |
| "Observation 1", | |
| "Observation 2", | |
| "Observation 3" | |
| ] | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| # For simplicity, we'll just analyze the first question in the list | |
| # In a real implementation, you might want to handle multiple questions differently | |
| if not request.responses: | |
| raise ValueError("No risk questions provided") | |
| item = request.responses[0] # Take the first response for analysis | |
| user_message = f""" | |
| Please analyze the following risk assessment response: | |
| Category: {item.category} | |
| Question: {item.question} | |
| User Answer: {item.user_answer} | |
| Provide a comprehensive risk analysis with mitigation plan based on this response. | |
| """ | |
| try: | |
| result = generate_response(system_prompt, user_message) | |
| # Extract JSON from the response | |
| json_start = result.find('{') | |
| json_end = result.rfind('}') + 1 | |
| if json_start != -1 and json_end > json_start: | |
| json_str = result[json_start:json_end] | |
| # The AI returns properly formatted JSON with newlines, just parse it directly | |
| try: | |
| risk_data = json.loads(json_str) | |
| return RiskMitigationResponse(**risk_data) | |
| except json.JSONDecodeError as e: | |
| # If direct parsing fails, try cleaning the JSON | |
| import re | |
| json_str = re.sub(r'\n\s*', ' ', json_str) | |
| json_str = re.sub(r'\r\s*', ' ', json_str) | |
| json_str = re.sub(r'\t+', ' ', json_str) | |
| json_str = re.sub(r'\s+', ' ', json_str) | |
| risk_data = json.loads(json_str) | |
| return RiskMitigationResponse(**risk_data) | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| # Fallback response if JSON parsing fails - provide intelligent risk analysis | |
| item = request.responses[0] # Take the first response | |
| # Create a fallback risk analysis based on the category | |
| if item.category.lower() == 'fire': | |
| risk_analysis = RiskAnalysis( | |
| risk_id="RISK-001", | |
| category=item.category, | |
| question=item.question, | |
| user_answer=item.user_answer, | |
| risk_name="Fire safety control deficiency", | |
| identified_threat="Potential for uncontrolled fire damage due to inadequate fire suppression systems", | |
| likelihood="High", | |
| impact="Severe", | |
| risk_value=9, | |
| residual_risk="Critical", | |
| current_control_description="Basic manual fire control measures without automated systems", | |
| current_control_rating="Poor", | |
| business_unit="Facilities", | |
| risk_owner="Fire Safety Officer", | |
| timeline="Immediate", | |
| mitigation_plan="Install automated fire suppression systems, implement 24/7 monitoring, and conduct regular fire drills", | |
| summary=RiskSummary( | |
| risk_classification_summary="Critical fire safety risk requiring immediate mitigation", | |
| mitigation_suggestions=[ | |
| "Deploy automated fire suppression systems", | |
| "Install early detection monitoring", | |
| "Conduct regular fire drills" | |
| ], | |
| risk_trends=RiskTrends( | |
| top_category="Fire", | |
| risk_severity="Critical", | |
| observations=[ | |
| "Fire safety systems are outdated or insufficient", | |
| "Manual responses may be inadequate for rapid fire spread", | |
| "Immediate automated solution implementation is recommended" | |
| ] | |
| ) | |
| ) | |
| ) | |
| elif item.category.lower() == 'cybercrime': | |
| risk_analysis = RiskAnalysis( | |
| risk_id="RISK-002", | |
| category=item.category, | |
| question=item.question, | |
| user_answer=item.user_answer, | |
| risk_name="Outdated incident response planning", | |
| identified_threat="Delayed or ineffective response to cyber incidents due to outdated procedures", | |
| likelihood="High", | |
| impact="Severe", | |
| risk_value=8, | |
| residual_risk="High", | |
| current_control_description="Outdated incident response plan without recent testing", | |
| current_control_rating="Poor", | |
| business_unit="Information Technology", | |
| risk_owner="CISO", | |
| timeline="Immediate", | |
| mitigation_plan="Update incident response plan, conduct regular testing, and implement automated threat detection", | |
| summary=RiskSummary( | |
| risk_classification_summary="High-risk cybersecurity vulnerability requiring prompt remediation", | |
| mitigation_suggestions=[ | |
| "Update and test incident response plan quarterly", | |
| "Implement automated threat detection systems", | |
| "Conduct regular cybersecurity training" | |
| ], | |
| risk_trends=RiskTrends( | |
| top_category="Cybercrime", | |
| risk_severity="High", | |
| observations=[ | |
| "Incident response plans are outdated across organization", | |
| "Limited testing reduces effectiveness of responses", | |
| "Regular plan updates and testing are essential" | |
| ] | |
| ) | |
| ) | |
| ) | |
| else: | |
| # Generic risk analysis for other categories | |
| risk_analysis = RiskAnalysis( | |
| risk_id="RISK-003", | |
| category=item.category, | |
| question=item.question, | |
| user_answer=item.user_answer, | |
| risk_name=f"Inadequate {item.category} controls", | |
| identified_threat=f"Increased vulnerability to {item.category}-related incidents", | |
| likelihood="Medium", | |
| impact="Moderate", | |
| risk_value=6, | |
| residual_risk="Moderate", | |
| current_control_description=f"Basic {item.category} controls with improvement opportunities", | |
| current_control_rating="Fair", | |
| business_unit="Operations", | |
| risk_owner="Risk Manager", | |
| timeline="Short-term", | |
| mitigation_plan=f"Enhance {item.category} controls, implement monitoring systems, and establish regular review procedures", | |
| summary=RiskSummary( | |
| risk_classification_summary=f"Moderate {item.category} risk requiring planned mitigation", | |
| mitigation_suggestions=[ | |
| f"Enhance existing {item.category} controls", | |
| "Implement monitoring systems", | |
| "Conduct regular control reviews" | |
| ], | |
| risk_trends=RiskTrends( | |
| top_category=item.category, | |
| risk_severity="Moderate", | |
| observations=[ | |
| f"{item.category} controls need enhancement", | |
| "Regular monitoring would improve risk posture", | |
| "Structured improvement plan recommended" | |
| ] | |
| ) | |
| ) | |
| ) | |
| return RiskMitigationResponse(risk_analysis=risk_analysis) | |
| except Exception as e: | |
| # General fallback for any other errors | |
| item = request.responses[0] if request.responses else RiskQuestion( | |
| category="General", | |
| question="Risk assessment question", | |
| user_answer="Insufficient information provided" | |
| ) | |
| # Generic fallback risk analysis | |
| risk_analysis = RiskAnalysis( | |
| risk_id="RISK-000", | |
| category=item.category, | |
| question=item.question, | |
| user_answer=item.user_answer, | |
| risk_name="Undefined risk", | |
| identified_threat="Potential business impact from unassessed risk", | |
| likelihood="Medium", | |
| impact="Moderate", | |
| risk_value=4, | |
| residual_risk="Moderate", | |
| current_control_description="Unknown or unassessed controls", | |
| current_control_rating="Fair", | |
| business_unit="Risk Management", | |
| risk_owner="Risk Officer", | |
| timeline="Short-term", | |
| mitigation_plan="Conduct comprehensive risk assessment and implement appropriate controls", | |
| summary=RiskSummary( | |
| risk_classification_summary="General risk requiring assessment and control implementation", | |
| mitigation_suggestions=[ | |
| "Conduct detailed risk assessment", | |
| "Implement appropriate controls", | |
| "Establish regular monitoring" | |
| ], | |
| risk_trends=RiskTrends( | |
| top_category="General", | |
| risk_severity="Moderate", | |
| observations=[ | |
| "Risk assessment needs improvement", | |
| "Control effectiveness should be evaluated", | |
| "Regular risk monitoring recommended" | |
| ] | |
| ) | |
| ) | |
| ) | |
| return RiskMitigationResponse(risk_analysis=risk_analysis) | |