ey-catalyst / app.py
Yaswanth-Bolla's picture
Added endpoints for both the features
d46032a
raw
history blame
14 kB
# app.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
import os
import openai
import json
app = FastAPI()
# Environment Variables
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
# Model Setup
def generate_response(system_prompt: str, user_message: str):
client = openai.OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1")
response = client.chat.completions.create(
model="llama3-8b-8192", # Try this supported model
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
temperature=0.4
)
return response.choices[0].message.content
# Request models
class Message(BaseModel):
message: str
class ProcessData(BaseModel):
processName: str
department: str
description: str
owner: str
businessContext: str
rto: str
mtpd: str
minTolerableDowntime: str
# Response models
class Threat(BaseModel):
id: int
name: str
description: str
likelihood: int
impact: int
category: str
mitigation: str
class ThreatsResponse(BaseModel):
threats: List[Threat]
@app.post("/api/generate-threats", response_model=ThreatsResponse)
def generate_threats(process_data: ProcessData):
"""
Generate threats for a given business process based on its characteristics
"""
system_prompt = """
You are an expert cybersecurity and business continuity risk analyst. Your task is to analyze business processes and identify potential threats that could disrupt operations.
Given the process information, generate a comprehensive list of threats that could affect this specific business process. Consider:
- Cybersecurity threats (malware, ransomware, phishing, insider threats)
- Operational threats (system failures, human error, supply chain disruption)
- Natural disasters and environmental threats
- Regulatory and compliance risks
- Third-party and vendor risks
- Physical security threats
For each threat, provide:
- A unique sequential ID
- A clear, specific name
- A detailed description of how it could impact this process
- Likelihood rating (1-5, where 1=very unlikely, 5=very likely)
- Impact rating (1-5, where 1=minimal impact, 5=catastrophic impact)
- A relevant category
- Practical mitigation strategies
Consider the process's RTO, MTPD, and minimum tolerable downtime when assessing impact.
Respond strictly in this JSON format:
{
"threats": [
{
"id": 1,
"name": "Threat Name",
"description": "Detailed description of the threat and its potential impact",
"likelihood": 3,
"impact": 4,
"category": "Security/Operational/Environmental/Regulatory",
"mitigation": "Specific mitigation strategies"
}
]
}
"""
user_message = f"""
Process Details:
- Process Name: {process_data.processName}
- Department: {process_data.department}
- Description: {process_data.description}
- Process Owner: {process_data.owner}
- Business Context: {process_data.businessContext}
- Recovery Time Objective (RTO): {process_data.rto}
- Maximum Tolerable Period of Disruption (MTPD): {process_data.mtpd}
- Minimum Tolerable Downtime: {process_data.minTolerableDowntime}
Please analyze this process and generate 8-12 relevant threats with their risk assessments and mitigation strategies.
"""
try:
result = generate_response(system_prompt, user_message)
# Extract JSON from the response (AI sometimes adds explanatory text)
json_start = result.find('{')
json_end = result.rfind('}') + 1
if json_start != -1 and json_end > json_start:
json_str = result[json_start:json_end]
threats_data = json.loads(json_str)
return ThreatsResponse(**threats_data)
else:
raise ValueError("No valid JSON found in response")
except (json.JSONDecodeError, ValueError) as e:
# Fallback response if JSON parsing fails
return ThreatsResponse(threats=[
Threat(
id=1,
name="System Unavailability",
description="Critical system failure affecting process execution",
likelihood=3,
impact=4,
category="Operational",
mitigation="Implement redundant systems and regular backups"
),
Threat(
id=2,
name="Cyber Attack",
description="Malicious attack disrupting core operations",
likelihood=3,
impact=4,
category="Security",
mitigation="Use firewalls and real-time monitoring"
),
Threat(
id=3,
name="Data Breach",
description="Unauthorized access to sensitive process data",
likelihood=2,
impact=5,
category="Security",
mitigation="Implement encryption and access controls"
)
])
except Exception as e:
# Fallback response for any other errors
return ThreatsResponse(threats=[
Threat(
id=1,
name="Process Disruption",
description="General process disruption due to unforeseen circumstances",
likelihood=3,
impact=3,
category="Operational",
mitigation="Develop comprehensive business continuity plans"
)
])
@app.post("/bia/threat-assessment")
def bia_threat_assessment(req: Message):
prompt = """
You are a cybersecurity and geopolitical risk analyst AI working on Business Impact Assessment (BIA).
Given a paragraph, do the following:
1. Identify the **place** mentioned in the text.
2. List likely **threats** specific to that place and context.
3. For each threat:
- Give a **likelihood rating (1–5)**.
- Give a **severity rating (1–5)**.
- Describe the **potential impact**.
- Compute **threat rating = likelihood × severity**.
Respond strictly in this JSON format:
{
"place": "<place>",
"threats": [
{
"name": "<threat name>",
"likelihood": <1-5>,
"severity": <1-5>,
"impact": "<impact statement>",
"threat_rating": <likelihood * severity>
}
]
}
"""
result = generate_response(prompt, req.message)
return result
@app.post("/bia/impact-analysis")
def bia_impact_analysis(req: Message):
return {
"status": "placeholder",
"note": "This endpoint is reserved for BIA impact analysis logic."
}
class RiskItem(BaseModel):
enablerType: str
enablerDomain: str
majorCategory: str
mappedThreat: str
existingControls: str
complianceStatus: str
impact: str
likelihood: str
riskValue: str
class MitigationResponse(BaseModel):
revisedImpact: int
revisedLikelihood: int
revisedRiskValue: int
mitigationPlan: str
ownership: str
class RiskMitigationResponse(BaseModel):
mitigatedRisks: List[MitigationResponse]
@app.post("/api/risk-mitigation", response_model=RiskMitigationResponse)
def generate_risk_mitigation(risk_items: List[RiskItem]):
"""
Generate mitigation plans and revised risk assessments for identified threats
"""
system_prompt = """
You are an expert risk management and business continuity analyst. Your task is to analyze existing risk items and provide comprehensive mitigation strategies that will reduce the overall risk.
For each risk item provided, you need to:
1. Analyze the current risk assessment (impact, likelihood, risk value)
2. Evaluate existing controls and compliance status
3. Recommend additional mitigation measures
4. Provide revised risk ratings after implementing the mitigation plan
5. Assign appropriate ownership for the mitigation activities
Consider:
- Current controls effectiveness and compliance status
- Industry best practices for the specific threat type
- Cost-effective mitigation strategies
- Realistic timeline for implementation
- Appropriate ownership based on enabler type and domain
For revised ratings:
- Impact (1-5): Consider how mitigation reduces potential damage
- Likelihood (1-5): Consider how mitigation reduces probability of occurrence
- Risk Value: Calculate as revised impact × revised likelihood
For mitigation plans:
- Maximum 3 bullet points
- Each point maximum 10 words
- Be concise and actionable
Respond strictly in this JSON format (no newlines within strings):
{
"mitigatedRisks": [
{
"revisedImpact": 2,
"revisedLikelihood": 2,
"revisedRiskValue": 4,
"mitigationPlan": "• Install fire suppression systems • Conduct quarterly training • Implement monitoring alerts",
"ownership": "Responsible party/department"
}
]
}
"""
# Format the risk items for the AI
risk_data = []
for i, item in enumerate(risk_items, 1):
risk_data.append(f"""
Risk Item {i}:
- Enabler Type: {item.enablerType}
- Enabler Domain: {item.enablerDomain}
- Major Category: {item.majorCategory}
- Mapped Threat: {item.mappedThreat}
- Existing Controls: {item.existingControls}
- Compliance Status: {item.complianceStatus}
- Current Impact: {item.impact}
- Current Likelihood: {item.likelihood}
- Current Risk Value: {item.riskValue}
""")
user_message = f"""
Please analyze the following risk items and provide mitigation strategies:
{''.join(risk_data)}
For each risk item, provide:
1. Revised impact rating (1-5) after implementing mitigation
2. Revised likelihood rating (1-5) after implementing mitigation
3. Revised risk value (impact × likelihood)
4. Concise mitigation plan (max 3 points, 10 words each)
5. Appropriate ownership assignment (department/role responsible)
Consider the existing controls and compliance status when developing mitigation plans.
"""
try:
result = generate_response(system_prompt, user_message)
# Extract JSON from the response
json_start = result.find('{')
json_end = result.rfind('}') + 1
if json_start != -1 and json_end > json_start:
json_str = result[json_start:json_end]
# The AI returns properly formatted JSON with newlines, just parse it directly
try:
mitigation_data = json.loads(json_str)
return RiskMitigationResponse(**mitigation_data)
except json.JSONDecodeError as e:
# If direct parsing fails, try cleaning the JSON
import re
json_str = re.sub(r'\n\s*', ' ', json_str)
json_str = re.sub(r'\r\s*', ' ', json_str)
json_str = re.sub(r'\t+', ' ', json_str)
json_str = re.sub(r'\s+', ' ', json_str)
mitigation_data = json.loads(json_str)
return RiskMitigationResponse(**mitigation_data)
else:
raise ValueError("No valid JSON found in response")
except (json.JSONDecodeError, ValueError) as e:
# Fallback response if JSON parsing fails - provide intelligent mitigation
fallback_risks = []
for i, item in enumerate(risk_items):
# Intelligent fallback logic based on threat category and existing controls
current_impact = int(item.impact)
current_likelihood = int(item.likelihood)
# Risk reduction logic based on category
impact_reduction = 1
likelihood_reduction = 1
if item.majorCategory.lower() in ['fire', 'natural disaster']:
impact_reduction = 2
likelihood_reduction = 2
elif item.majorCategory.lower() in ['cyber security', 'security']:
impact_reduction = 1
likelihood_reduction = 2
revised_impact = max(1, current_impact - impact_reduction)
revised_likelihood = max(1, current_likelihood - likelihood_reduction)
# Generate category-specific mitigation plans
if item.majorCategory.lower() == 'fire':
mitigation_plan = "• Install automatic fire suppression systems • Conduct quarterly safety training • Implement 24/7 monitoring alerts"
ownership = "Facilities Management Team"
elif item.majorCategory.lower() in ['cyber security', 'security']:
mitigation_plan = "• Deploy endpoint detection response systems • Implement network segmentation controls • Conduct regular penetration testing"
ownership = "Information Security Team"
else:
mitigation_plan = f"• Enhance existing {item.majorCategory.lower()} controls • Implement continuous monitoring systems • Establish incident response procedures"
ownership = f"{item.enablerDomain} Team"
fallback_risks.append(MitigationResponse(
revisedImpact=revised_impact,
revisedLikelihood=revised_likelihood,
revisedRiskValue=revised_impact * revised_likelihood,
mitigationPlan=mitigation_plan,
ownership=ownership
))
return RiskMitigationResponse(mitigatedRisks=fallback_risks)
except Exception as e:
# General fallback for any other errors
fallback_risks = []
for i, item in enumerate(risk_items):
fallback_risks.append(MitigationResponse(
revisedImpact=2,
revisedLikelihood=2,
revisedRiskValue=4,
mitigationPlan="• Implement enhanced risk controls • Establish monitoring procedures • Conduct regular assessments",
ownership="Risk Management Team"
))
return RiskMitigationResponse(mitigatedRisks=fallback_risks)