Spaces:
Paused
Paused
File size: 5,367 Bytes
d404a15 27a74af d404a15 1819b50 27a74af d404a15 d2ee6c8 27a74af d2ee6c8 d404a15 0095be4 e5596be 0095be4 5e727b9 0095be4 1819b50 0095be4 e5596be 0095be4 5e727b9 0095be4 5e727b9 0095be4 63c9bdc e5596be 0095be4 f2f12c2 27a74af 4f595c2 8afee73 4f595c2 63c9bdc 27a74af d2ee6c8 63c9bdc cf92e66 27a74af cf92e66 1819b50 63c9bdc 1819b50 27a74af cf92e66 76ad8cf cf92e66 76ad8cf cf92e66 63c9bdc f2f12c2 63c9bdc 27a74af | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | import pandas as pd
import openai
import chainlit as cl
from dotenv import load_dotenv
import re
from openai import OpenAI
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI()
# Load UDM fields CSV file
def load_udm_fields(csv_path):
return pd.read_csv(csv_path)
# Function to retrieve relevant UDM fields for log mapping
def retrieve_udm_field(log_field, udm_fields):
""" Retrieves the UDM field for a given log field """
udm_match = udm_fields[udm_fields['UDM_Field'].str.contains(log_field, case=False)]
if not udm_match.empty:
return udm_match.iloc[0]['UDM_Field']
else:
return None
# Function to extract log fields from sample log or description
def extract_log_fields(user_input):
""" Extract fields from the user input which may contain log samples or descriptions """
fields = re.findall(r'\b\w+\b', user_input)
return list(set(fields))
# Function to map log fields to UDM fields
def map_log_fields_to_udm(log_fields, udm_fields):
""" Map log fields to UDM fields """
mapped_fields = []
custom_fields = set()
for field in log_fields:
udm_field = retrieve_udm_field(field, udm_fields)
if udm_field:
mapped_fields.append({'Log_Field': field, 'UDM_Field': udm_field})
else:
custom_field = f"custom_fields.{field}"
while custom_field in custom_fields:
custom_field = f"{custom_field}_1"
custom_fields.add(custom_field)
mapped_fields.append({'Log_Field': field, 'UDM_Field': custom_field})
return pd.DataFrame(mapped_fields)
# GPT-4-based generation function with few-shot learning
def generate_udm_mapping_response(log_fields, udm_fields_csv):
""" Generate a response using GPT-4 to map log fields to UDM fields """
# Load UDM Fields
udm_fields = load_udm_fields(udm_fields_csv)
# Map the log fields to UDM
mapped_fields_df = map_log_fields_to_udm(log_fields, udm_fields)
# Prepare the mapping as context
mapped_fields_text = mapped_fields_df.to_string(index=False)
# Few-shot learning examples
examples = """
### Example 1: Fortinet Fields to UDM Mapping
Log Attribute | UDM Attribute
--------------|---------------
devname | intermediary.hostname
devid | intermediary.asset.hardware.serial_number
srcip | principal.ip
dstip | target.ip
dstport | target.port
### Example 2: Palo Alto Fields to UDM Mapping
Log Attribute | UDM Attribute
--------------|---------------
src_ip | principal.ip
dest_ip | target.ip
dest_port | target.port
action | security_result.action_details
severity | security_result.severity_details
"""
system_template = """You are a cybersecurity expert specialized in log analysis and data normalization, helping security teams to map security log fields to Google Chronicle's Unified Data Model (UDM).
Please follow these steps: 1. Identify the vendor name from the input, and understand the specific log field conventions of that vendor.
2. Extract log fields while ignoring general words that are not part of the field names.
3. Use web search or previous examples to consult the latest log documentation for the product provided and Google Chronicle UDM schema documentation.
4. Map each product log field to its corresponding UDM field based on known mappings or documentation. For the mapping to UDM fields,
use the full list of UDM fields in the csv file provided udm_fields_csv.
5. Only attempt to map the user input that you deem as log fields for this product.
6.For fields that don't have a direct match in UDM, place them into custom fields, ensuring that each custom field is unique and logically consistent.
7. Organize the mapping into a structured table format and provide the user the option to download your mapping into a csv file.
Think through your response step by step, and always aim for accurate, and professional responses with a focus on precision.
"""
# Call GPT-4 for final output with additional explanation
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_template},
{"role": "user", "content": f"Here are the log fields: {log_fields}. Please map them to UDM:\n\n{mapped_fields_text}\n\n{examples}"}
]
)
return response.choices[0].message.content
# Chainlit app functionality
@cl.on_chat_start
async def start_chat():
await cl.Message(content="Welcome! Please provide the product name and the log fields or sample log you want to map to UDM.").send()
settings = {
"model": "gpt-3.5-turbo",
"temperature": 0,
"max_tokens": 500,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0,
}
cl.user_session.set("settings", settings)
@cl.on_message
async def main(message: cl.Message):
user_input = message.content
log_fields = extract_log_fields(user_input)
udm_fields_csv = 'udm_field_list_v2.csv'
# Generate the UDM mapping response
response = generate_udm_mapping_response(log_fields, udm_fields_csv)
# Send the response back to Chainlit
await cl.Message(content=f"Here is the mapped log fields to UDM:\n\n{response}").send() |