Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, Request | |
| from huggingface_hub import InferenceClient | |
| import json | |
| app = FastAPI() | |
| HF_API_KEY = os.getenv("HF_API_KEY") # π₯ No Hardcoded Keys! | |
| # β Set up Hugging Face Inference Client | |
| client = InferenceClient( | |
| provider="hf-inference", | |
| api_key=HF_API_KEY # Use the key from environment | |
| ) | |
| # β Tag descriptions (helps model understand each tag's meaning) | |
| TAGS_DESCRIPTION = """ | |
| IPWhitelisting: Adding IP addresses into Okta Security Networks fields if blocked. | |
| AppLocker: Used when adding hashes to allow execution of applications (AppLocker or CrowdStrike). | |
| ADSecurityGroup: Removing a user from an AD Security group to stop or allow access. | |
| AttachmentRelease: Releasing an attachment if password protected. | |
| EmailWhitelisting: Allowing emails through ProofPoint or Checkpoint for delivery. | |
| AppAssignment: Adding a user to an Application. | |
| chatops:mfa-bypass: Automates MFA AD group changes or Requests from the bypass MFA. | |
| VM: Removing hosts from Nessus or decommissioning hosts from tracking in nessus and tenable or All requests for tenable and nessus. | |
| PhishingReport: When a user submits a phishing report or Email investigation or Email Analysis. | |
| GenericInformation: Generic tickets assigned to SecOps. | |
| PasswordReset: When a ticket needs Okta or AD password reset. | |
| KeeperAccounts: Issues with Keeper Password Manager accounts. | |
| MemberIssues: Investigating possible security issues with Member accounts. | |
| ADPassword: AD password issues, including resets. | |
| OktaMFAResets: Okta MFA resets performed by Security or Requests for MFA reset. | |
| Zscaler: Issues related to Zscaler security or Domain/URL allow requests or Domain/URL block requests. | |
| chatops:cs-usb: Automates individual USB whitelisting. | |
| USBDeviceControl: Requests for workstation USB whitelisting. | |
| Imperva: Issues related to Imperva and CDN security. | |
| """ | |
| # β Function to summarize & assign a single tag | |
| def analyze_ticket(title, body): | |
| messages = [ | |
| {"role": "system", | |
| "content": "You are an AI that summarizes security tickets and assigns a single most relevant tag based on " | |
| "the title and body."}, | |
| {"role": "user", "content": f"Title: {title}\nBody: {body}\n\n" | |
| "### TASK 1: Summarization\nSummarize the ticket body in 1-2 sentences.\n\n" | |
| "### TASK 2: Tagging\nChoose the **one best tag** from this list that matches the " | |
| "ticket:\n " | |
| f"{TAGS_DESCRIPTION}\n\n" | |
| "Return output in JSON format with keys: 'summary' (ticket summary) and 'tag' (" | |
| "best matching tag)."} | |
| ] | |
| completion = client.chat.completions.create( | |
| model="mistralai/Mistral-7B-Instruct-v0.3", | |
| messages=messages, | |
| max_tokens=200 | |
| ) | |
| # β Parse JSON response | |
| try: | |
| response = json.loads(completion.choices[0].message.content.strip()) | |
| except json.JSONDecodeError: | |
| response = {"summary": "Could not generate summary", "tag": "Unknown"} | |
| return response | |
| # β API Endpoint to accept input from Tines | |
| async def process_ticket(request: Request): | |
| data = await request.json() | |
| # β Extract from "inputs" instead of top-level JSON | |
| inputs = data.get("inputs", {}) | |
| # β Extract input fields | |
| ticket_id = inputs.get("id", "No ID") | |
| title = inputs.get("title", "No Title") | |
| body = inputs.get("body", "No Body") | |
| # β Get AI-generated summary & tag | |
| result = analyze_ticket(title, body) | |
| # β Return response including ID | |
| return { | |
| "id": ticket_id, # β Keep original ID | |
| "summary": result["summary"], | |
| "tag": result["tag"] | |
| } | |
| # def process_input_file(filename): | |
| # try: | |
| # with open(filename, "r", encoding="utf-8") as file: | |
| # data = json.load(file) # β Load JSON file | |
| # | |
| # title = data.get("title", "No Title") | |
| # body = data.get("body", "No Body") | |
| # | |
| # # β Get AI-generated summary & tag | |
| # result = analyze_ticket(title, body) | |
| # | |
| # return result # β Returns {'summary': "...", 'tag': "..."} | |
| # | |
| # except FileNotFoundError: | |
| # print(f"β Error: The file '{filename}' was not found.") | |
| # return {"error": "File not found"} | |
| # | |
| # except json.JSONDecodeError: | |
| # print(f"β Error: The file '{filename}' contains invalid JSON.") | |
| # return {"error": "Invalid JSON format"} | |
| # | |
| # | |
| # # β Load input from `input.json` | |
| # output = process_input_file("input.json") | |
| # | |
| # # β Print the output | |
| # print(json.dumps(output, indent=2)) | |