import os from fastapi import FastAPI, Request from huggingface_hub import InferenceClient import json app = FastAPI() HF_API_KEY = os.getenv("HF_API_KEY") # 🔥 No Hardcoded Keys! # ✅ Set up Hugging Face Inference Client client = InferenceClient( provider="hf-inference", api_key=HF_API_KEY # Use the key from environment ) # ✅ Tag descriptions (helps model understand each tag's meaning) TAGS_DESCRIPTION = """ IPWhitelisting: Adding IP addresses into Okta Security Networks fields if blocked. AppLocker: Used when adding hashes to allow execution of applications (AppLocker or CrowdStrike). ADSecurityGroup: Removing a user from an AD Security group to stop or allow access. AttachmentRelease: Releasing an attachment if password protected. EmailWhitelisting: Allowing emails through ProofPoint or Checkpoint for delivery. AppAssignment: Adding a user to an Application. chatops:mfa-bypass: Automates MFA AD group changes or Requests from the bypass MFA. VM: Removing hosts from Nessus or decommissioning hosts from tracking in nessus and tenable or All requests for tenable and nessus. PhishingReport: When a user submits a phishing report or Email investigation or Email Analysis. GenericInformation: Generic tickets assigned to SecOps. PasswordReset: When a ticket needs Okta or AD password reset. KeeperAccounts: Issues with Keeper Password Manager accounts. MemberIssues: Investigating possible security issues with Member accounts. ADPassword: AD password issues, including resets. OktaMFAResets: Okta MFA resets performed by Security or Requests for MFA reset. Zscaler: Issues related to Zscaler security or Domain/URL allow requests or Domain/URL block requests. chatops:cs-usb: Automates individual USB whitelisting. USBDeviceControl: Requests for workstation USB whitelisting. Imperva: Issues related to Imperva and CDN security. """ # ✅ Function to summarize & assign a single tag def analyze_ticket(title, body): messages = [ {"role": "system", "content": "You are an AI that summarizes security tickets and assigns a single most relevant tag based on " "the title and body."}, {"role": "user", "content": f"Title: {title}\nBody: {body}\n\n" "### TASK 1: Summarization\nSummarize the ticket body in 1-2 sentences.\n\n" "### TASK 2: Tagging\nChoose the **one best tag** from this list that matches the " "ticket:\n " f"{TAGS_DESCRIPTION}\n\n" "Return output in JSON format with keys: 'summary' (ticket summary) and 'tag' (" "best matching tag)."} ] completion = client.chat.completions.create( model="mistralai/Mistral-7B-Instruct-v0.3", messages=messages, max_tokens=200 ) # ✅ Parse JSON response try: response = json.loads(completion.choices[0].message.content.strip()) except json.JSONDecodeError: response = {"summary": "Could not generate summary", "tag": "Unknown"} return response # ✅ API Endpoint to accept input from Tines @app.post("/process-ticket/") async def process_ticket(request: Request): data = await request.json() # ✅ Extract from "inputs" instead of top-level JSON inputs = data.get("inputs", {}) # ✅ Extract input fields ticket_id = inputs.get("id", "No ID") title = inputs.get("title", "No Title") body = inputs.get("body", "No Body") # ✅ Get AI-generated summary & tag result = analyze_ticket(title, body) # ✅ Return response including ID return { "id": ticket_id, # ✅ Keep original ID "summary": result["summary"], "tag": result["tag"] } # def process_input_file(filename): # try: # with open(filename, "r", encoding="utf-8") as file: # data = json.load(file) # ✅ Load JSON file # # title = data.get("title", "No Title") # body = data.get("body", "No Body") # # # ✅ Get AI-generated summary & tag # result = analyze_ticket(title, body) # # return result # ✅ Returns {'summary': "...", 'tag': "..."} # # except FileNotFoundError: # print(f"❌ Error: The file '{filename}' was not found.") # return {"error": "File not found"} # # except json.JSONDecodeError: # print(f"❌ Error: The file '{filename}' contains invalid JSON.") # return {"error": "Invalid JSON format"} # # # # ✅ Load input from `input.json` # output = process_input_file("input.json") # # # ✅ Print the output # print(json.dumps(output, indent=2))