docker / app.py
starlord3307's picture
Update app.py
1eeb956 verified
import os
from fastapi import FastAPI, Request
from huggingface_hub import InferenceClient
import json
app = FastAPI()
HF_API_KEY = os.getenv("HF_API_KEY") # πŸ”₯ No Hardcoded Keys!
# βœ… Set up Hugging Face Inference Client
client = InferenceClient(
provider="hf-inference",
api_key=HF_API_KEY # Use the key from environment
)
# βœ… Tag descriptions (helps model understand each tag's meaning)
TAGS_DESCRIPTION = """
IPWhitelisting: Adding IP addresses into Okta Security Networks fields if blocked.
AppLocker: Used when adding hashes to allow execution of applications (AppLocker or CrowdStrike).
ADSecurityGroup: Removing a user from an AD Security group to stop or allow access.
AttachmentRelease: Releasing an attachment if password protected.
EmailWhitelisting: Allowing emails through ProofPoint or Checkpoint for delivery.
AppAssignment: Adding a user to an Application.
chatops:mfa-bypass: Automates MFA AD group changes or Requests from the bypass MFA.
VM: Removing hosts from Nessus or decommissioning hosts from tracking in nessus and tenable or All requests for tenable and nessus.
PhishingReport: When a user submits a phishing report or Email investigation or Email Analysis.
GenericInformation: Generic tickets assigned to SecOps.
PasswordReset: When a ticket needs Okta or AD password reset.
KeeperAccounts: Issues with Keeper Password Manager accounts.
MemberIssues: Investigating possible security issues with Member accounts.
ADPassword: AD password issues, including resets.
OktaMFAResets: Okta MFA resets performed by Security or Requests for MFA reset.
Zscaler: Issues related to Zscaler security or Domain/URL allow requests or Domain/URL block requests.
chatops:cs-usb: Automates individual USB whitelisting.
USBDeviceControl: Requests for workstation USB whitelisting.
Imperva: Issues related to Imperva and CDN security.
"""
# βœ… Function to summarize & assign a single tag
def analyze_ticket(title, body):
messages = [
{"role": "system",
"content": "You are an AI that summarizes security tickets and assigns a single most relevant tag based on "
"the title and body."},
{"role": "user", "content": f"Title: {title}\nBody: {body}\n\n"
"### TASK 1: Summarization\nSummarize the ticket body in 1-2 sentences.\n\n"
"### TASK 2: Tagging\nChoose the **one best tag** from this list that matches the "
"ticket:\n "
f"{TAGS_DESCRIPTION}\n\n"
"Return output in JSON format with keys: 'summary' (ticket summary) and 'tag' ("
"best matching tag)."}
]
completion = client.chat.completions.create(
model="mistralai/Mistral-7B-Instruct-v0.3",
messages=messages,
max_tokens=200
)
# βœ… Parse JSON response
try:
response = json.loads(completion.choices[0].message.content.strip())
except json.JSONDecodeError:
response = {"summary": "Could not generate summary", "tag": "Unknown"}
return response
# βœ… API Endpoint to accept input from Tines
@app.post("/process-ticket/")
async def process_ticket(request: Request):
data = await request.json()
# βœ… Extract from "inputs" instead of top-level JSON
inputs = data.get("inputs", {})
# βœ… Extract input fields
ticket_id = inputs.get("id", "No ID")
title = inputs.get("title", "No Title")
body = inputs.get("body", "No Body")
# βœ… Get AI-generated summary & tag
result = analyze_ticket(title, body)
# βœ… Return response including ID
return {
"id": ticket_id, # βœ… Keep original ID
"summary": result["summary"],
"tag": result["tag"]
}
# def process_input_file(filename):
# try:
# with open(filename, "r", encoding="utf-8") as file:
# data = json.load(file) # βœ… Load JSON file
#
# title = data.get("title", "No Title")
# body = data.get("body", "No Body")
#
# # βœ… Get AI-generated summary & tag
# result = analyze_ticket(title, body)
#
# return result # βœ… Returns {'summary': "...", 'tag': "..."}
#
# except FileNotFoundError:
# print(f"❌ Error: The file '{filename}' was not found.")
# return {"error": "File not found"}
#
# except json.JSONDecodeError:
# print(f"❌ Error: The file '{filename}' contains invalid JSON.")
# return {"error": "Invalid JSON format"}
#
#
# # βœ… Load input from `input.json`
# output = process_input_file("input.json")
#
# # βœ… Print the output
# print(json.dumps(output, indent=2))