ey-catalyst / app.py
Yaswanth-Bolla's picture
Merge branch 'main' of https://huggingface.co/spaces/EY-Catalyst-RVCE/ey-catalyst
22e0141
raw
history blame
19.7 kB
# app.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
import os
import openai
import json
app = FastAPI()
# Environment Variables
GROQ_API_KEY = os.environ.get("GROQ_API_KEY").strip()
# Model Setup
def generate_response(system_prompt: str, user_message: str):
client = openai.OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1")
response = client.chat.completions.create(
model="llama3-8b-8192", # Try this supported model
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
temperature=0.4
)
return response.choices[0].message.content
# Request models
class Message(BaseModel):
message: str
class ProcessData(BaseModel):
processName: str
department: str
description: str
owner: str
businessContext: str
rto: str
mtpd: str
minTolerableDowntime: str
# Response models
class Threat(BaseModel):
id: int
name: str
description: str
likelihood: int
impact: int
category: str
mitigation: str
class ThreatsResponse(BaseModel):
threats: List[Threat]
@app.post("/api/generate-threats", response_model=ThreatsResponse)
def generate_threats(process_data: ProcessData):
"""
Generate threats for a given business process based on its characteristics
"""
system_prompt = """
You are an expert cybersecurity and business continuity risk analyst. Your task is to analyze business processes and identify potential threats that could disrupt operations.
Given the process information, generate a comprehensive list of threats that could affect this specific business process. Consider:
- Cybersecurity threats (malware, ransomware, phishing, insider threats)
- Operational threats (system failures, human error, supply chain disruption)
- Natural disasters and environmental threats
- Regulatory and compliance risks
- Third-party and vendor risks
- Physical security threats
For each threat, provide:
- A unique sequential ID
- A clear, specific name
- A detailed description of how it could impact this process
- Likelihood rating (1-5, where 1=very unlikely, 5=very likely)
- Impact rating (1-5, where 1=minimal impact, 5=catastrophic impact)
- A relevant category
- Practical mitigation strategies
Consider the process's RTO, MTPD, and minimum tolerable downtime when assessing impact.
Respond strictly in this JSON format:
{
"threats": [
{
"id": 1,
"name": "Threat Name",
"description": "Detailed description of the threat and its potential impact",
"likelihood": 3,
"impact": 4,
"category": "Security/Operational/Environmental/Regulatory",
"mitigation": "Specific mitigation strategies"
}
]
}
"""
user_message = f"""
Process Details:
- Process Name: {process_data.processName}
- Department: {process_data.department}
- Description: {process_data.description}
- Process Owner: {process_data.owner}
- Business Context: {process_data.businessContext}
- Recovery Time Objective (RTO): {process_data.rto}
- Maximum Tolerable Period of Disruption (MTPD): {process_data.mtpd}
- Minimum Tolerable Downtime: {process_data.minTolerableDowntime}
Please analyze this process and generate 8-12 relevant threats with their risk assessments and mitigation strategies.
"""
try:
result = generate_response(system_prompt, user_message)
# Extract JSON from the response (AI sometimes adds explanatory text)
json_start = result.find('{')
json_end = result.rfind('}') + 1
if json_start != -1 and json_end > json_start:
json_str = result[json_start:json_end]
threats_data = json.loads(json_str)
return ThreatsResponse(**threats_data)
else:
raise ValueError("No valid JSON found in response")
except (json.JSONDecodeError, ValueError) as e:
# Fallback response if JSON parsing fails
return ThreatsResponse(threats=[
Threat(
id=1,
name="System Unavailability",
description="Critical system failure affecting process execution",
likelihood=3,
impact=4,
category="Operational",
mitigation="Implement redundant systems and regular backups"
),
Threat(
id=2,
name="Cyber Attack",
description="Malicious attack disrupting core operations",
likelihood=3,
impact=4,
category="Security",
mitigation="Use firewalls and real-time monitoring"
),
Threat(
id=3,
name="Data Breach",
description="Unauthorized access to sensitive process data",
likelihood=2,
impact=5,
category="Security",
mitigation="Implement encryption and access controls"
)
])
except Exception as e:
# Fallback response for any other errors
return ThreatsResponse(threats=[
Threat(
id=1,
name="Process Disruption",
description="General process disruption due to unforeseen circumstances",
likelihood=3,
impact=3,
category="Operational",
mitigation="Develop comprehensive business continuity plans"
)
])
@app.post("/bia/threat-assessment")
def bia_threat_assessment(req: Message):
prompt = """
You are a cybersecurity and geopolitical risk analyst AI working on Business Impact Assessment (BIA).
Given a paragraph, do the following:
1. Identify the **place** mentioned in the text.
2. List likely **threats** specific to that place and context.
3. For each threat:
- Give a **likelihood rating (1–5)**.
- Give a **severity rating (1–5)**.
- Describe the **potential impact**.
- Compute **threat rating = likelihood × severity**.
Respond strictly in this JSON format:
{
"place": "<place>",
"threats": [
{
"name": "<threat name>",
"likelihood": <1-5>,
"severity": <1-5>,
"impact": "<impact statement>",
"threat_rating": <likelihood * severity>
}
]
}
"""
result = generate_response(prompt, req.message)
return result
class RiskQuestion(BaseModel):
category: str
question: str
user_answer: str
class RiskRequestModel(BaseModel):
responses: List[RiskQuestion]
class RiskTrends(BaseModel):
top_category: str
risk_severity: str
observations: List[str]
class RiskSummary(BaseModel):
risk_classification_summary: str
mitigation_suggestions: List[str]
risk_trends: RiskTrends
class RiskAnalysis(BaseModel):
risk_id: str
category: str
question: str
user_answer: str
risk_name: str
identified_threat: str
likelihood: str
impact: str
risk_value: int
residual_risk: str
current_control_description: str
current_control_rating: str
business_unit: str
risk_owner: str
timeline: str
mitigation_plan: str
summary: RiskSummary
class RiskMitigationResponse(BaseModel):
risk_analysis: RiskAnalysis
@app.post("/api/risk-mitigation", response_model=RiskMitigationResponse)
def generate_risk_mitigation(request: RiskRequestModel):
"""
Generate comprehensive risk analysis and mitigation plan based on user responses
"""
system_prompt = """
You are an expert risk management and business continuity analyst. Your task is to analyze user responses to risk assessment questions and provide detailed risk analysis with mitigation strategies.
For the risk question provided, you need to:
1. Create a unique risk identifier (RISK-XXX format)
2. Identify the specific risk from the user's answer
3. Assess likelihood (Low, Medium, High, Very High) and impact (Minor, Moderate, Significant, Severe)
4. Calculate a risk value (1-10 scale)
5. Determine residual risk (Low, Moderate, High, Critical)
6. Evaluate current controls based on the user's answer
7. Assign appropriate business unit and risk owner
8. Provide a mitigation plan with timeline
9. Create a comprehensive risk summary with classification, mitigation suggestions, and trends
Use your expertise to make reasonable assumptions about the business context when details are limited.
Respond strictly in this JSON format:
{
"risk_analysis": {
"risk_id": "RISK-XXX",
"category": "The risk category from input",
"question": "The original question",
"user_answer": "The user's response",
"risk_name": "Concise name of the identified risk",
"identified_threat": "Detailed description of the threat identified",
"likelihood": "High/Medium/Low/Very High",
"impact": "Severe/Significant/Moderate/Minor",
"risk_value": 1-10,
"residual_risk": "Critical/High/Moderate/Low",
"current_control_description": "Description of current controls based on user answer",
"current_control_rating": "Good/Fair/Poor",
"business_unit": "Relevant department responsible",
"risk_owner": "Specific role responsible for the risk",
"timeline": "Immediate/Short-term/Medium-term/Long-term",
"mitigation_plan": "Detailed plan to address the risk",
"summary": {
"risk_classification_summary": "Brief summary of the risk classification",
"mitigation_suggestions": [
"Suggestion 1",
"Suggestion 2",
"Suggestion 3"
],
"risk_trends": {
"top_category": "Most critical risk category",
"risk_severity": "Overall severity assessment",
"observations": [
"Observation 1",
"Observation 2",
"Observation 3"
]
}
}
}
}
"""
# For simplicity, we'll just analyze the first question in the list
# In a real implementation, you might want to handle multiple questions differently
if not request.responses:
raise ValueError("No risk questions provided")
item = request.responses[0] # Take the first response for analysis
user_message = f"""
Please analyze the following risk assessment response:
Category: {item.category}
Question: {item.question}
User Answer: {item.user_answer}
Provide a comprehensive risk analysis with mitigation plan based on this response.
"""
try:
result = generate_response(system_prompt, user_message)
# Extract JSON from the response
json_start = result.find('{')
json_end = result.rfind('}') + 1
if json_start != -1 and json_end > json_start:
json_str = result[json_start:json_end]
# The AI returns properly formatted JSON with newlines, just parse it directly
try:
risk_data = json.loads(json_str)
return RiskMitigationResponse(**risk_data)
except json.JSONDecodeError as e:
# If direct parsing fails, try cleaning the JSON
import re
json_str = re.sub(r'\n\s*', ' ', json_str)
json_str = re.sub(r'\r\s*', ' ', json_str)
json_str = re.sub(r'\t+', ' ', json_str)
json_str = re.sub(r'\s+', ' ', json_str)
risk_data = json.loads(json_str)
return RiskMitigationResponse(**risk_data)
else:
raise ValueError("No valid JSON found in response")
except (json.JSONDecodeError, ValueError) as e:
# Fallback response if JSON parsing fails - provide intelligent risk analysis
item = request.responses[0] # Take the first response
# Create a fallback risk analysis based on the category
if item.category.lower() == 'fire':
risk_analysis = RiskAnalysis(
risk_id="RISK-001",
category=item.category,
question=item.question,
user_answer=item.user_answer,
risk_name="Fire safety control deficiency",
identified_threat="Potential for uncontrolled fire damage due to inadequate fire suppression systems",
likelihood="High",
impact="Severe",
risk_value=9,
residual_risk="Critical",
current_control_description="Basic manual fire control measures without automated systems",
current_control_rating="Poor",
business_unit="Facilities",
risk_owner="Fire Safety Officer",
timeline="Immediate",
mitigation_plan="Install automated fire suppression systems, implement 24/7 monitoring, and conduct regular fire drills",
summary=RiskSummary(
risk_classification_summary="Critical fire safety risk requiring immediate mitigation",
mitigation_suggestions=[
"Deploy automated fire suppression systems",
"Install early detection monitoring",
"Conduct regular fire drills"
],
risk_trends=RiskTrends(
top_category="Fire",
risk_severity="Critical",
observations=[
"Fire safety systems are outdated or insufficient",
"Manual responses may be inadequate for rapid fire spread",
"Immediate automated solution implementation is recommended"
]
)
)
)
elif item.category.lower() == 'cybercrime':
risk_analysis = RiskAnalysis(
risk_id="RISK-002",
category=item.category,
question=item.question,
user_answer=item.user_answer,
risk_name="Outdated incident response planning",
identified_threat="Delayed or ineffective response to cyber incidents due to outdated procedures",
likelihood="High",
impact="Severe",
risk_value=8,
residual_risk="High",
current_control_description="Outdated incident response plan without recent testing",
current_control_rating="Poor",
business_unit="Information Technology",
risk_owner="CISO",
timeline="Immediate",
mitigation_plan="Update incident response plan, conduct regular testing, and implement automated threat detection",
summary=RiskSummary(
risk_classification_summary="High-risk cybersecurity vulnerability requiring prompt remediation",
mitigation_suggestions=[
"Update and test incident response plan quarterly",
"Implement automated threat detection systems",
"Conduct regular cybersecurity training"
],
risk_trends=RiskTrends(
top_category="Cybercrime",
risk_severity="High",
observations=[
"Incident response plans are outdated across organization",
"Limited testing reduces effectiveness of responses",
"Regular plan updates and testing are essential"
]
)
)
)
else:
# Generic risk analysis for other categories
risk_analysis = RiskAnalysis(
risk_id="RISK-003",
category=item.category,
question=item.question,
user_answer=item.user_answer,
risk_name=f"Inadequate {item.category} controls",
identified_threat=f"Increased vulnerability to {item.category}-related incidents",
likelihood="Medium",
impact="Moderate",
risk_value=6,
residual_risk="Moderate",
current_control_description=f"Basic {item.category} controls with improvement opportunities",
current_control_rating="Fair",
business_unit="Operations",
risk_owner="Risk Manager",
timeline="Short-term",
mitigation_plan=f"Enhance {item.category} controls, implement monitoring systems, and establish regular review procedures",
summary=RiskSummary(
risk_classification_summary=f"Moderate {item.category} risk requiring planned mitigation",
mitigation_suggestions=[
f"Enhance existing {item.category} controls",
"Implement monitoring systems",
"Conduct regular control reviews"
],
risk_trends=RiskTrends(
top_category=item.category,
risk_severity="Moderate",
observations=[
f"{item.category} controls need enhancement",
"Regular monitoring would improve risk posture",
"Structured improvement plan recommended"
]
)
)
)
return RiskMitigationResponse(risk_analysis=risk_analysis)
except Exception as e:
# General fallback for any other errors
item = request.responses[0] if request.responses else RiskQuestion(
category="General",
question="Risk assessment question",
user_answer="Insufficient information provided"
)
# Generic fallback risk analysis
risk_analysis = RiskAnalysis(
risk_id="RISK-000",
category=item.category,
question=item.question,
user_answer=item.user_answer,
risk_name="Undefined risk",
identified_threat="Potential business impact from unassessed risk",
likelihood="Medium",
impact="Moderate",
risk_value=4,
residual_risk="Moderate",
current_control_description="Unknown or unassessed controls",
current_control_rating="Fair",
business_unit="Risk Management",
risk_owner="Risk Officer",
timeline="Short-term",
mitigation_plan="Conduct comprehensive risk assessment and implement appropriate controls",
summary=RiskSummary(
risk_classification_summary="General risk requiring assessment and control implementation",
mitigation_suggestions=[
"Conduct detailed risk assessment",
"Implement appropriate controls",
"Establish regular monitoring"
],
risk_trends=RiskTrends(
top_category="General",
risk_severity="Moderate",
observations=[
"Risk assessment needs improvement",
"Control effectiveness should be evaluated",
"Regular risk monitoring recommended"
]
)
)
)
return RiskMitigationResponse(risk_analysis=risk_analysis)