Spaces:
Paused
Paused
| import pandas as pd | |
| import openai | |
| import chainlit as cl | |
| from dotenv import load_dotenv | |
| import re | |
| from openai import OpenAI | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize OpenAI client | |
| client = OpenAI() | |
| # Load UDM fields CSV file | |
| def load_udm_fields(csv_path): | |
| return pd.read_csv(csv_path) | |
| # Function to retrieve relevant UDM fields for log mapping | |
| def retrieve_udm_field(log_field, udm_fields): | |
| """ Retrieves the UDM field for a given log field """ | |
| udm_match = udm_fields[udm_fields['UDM_Field'].str.contains(log_field, case=False)] | |
| if not udm_match.empty: | |
| return udm_match.iloc[0]['UDM_Field'] | |
| else: | |
| return None | |
| # Function to extract log fields from sample log or description | |
| def extract_log_fields(user_input): | |
| """ Extract fields from the user input which may contain log samples or descriptions """ | |
| fields = re.findall(r'\b\w+\b', user_input) | |
| return list(set(fields)) | |
| # Function to map log fields to UDM fields | |
| def map_log_fields_to_udm(log_fields, udm_fields): | |
| """ Map log fields to UDM fields """ | |
| mapped_fields = [] | |
| custom_fields = set() | |
| for field in log_fields: | |
| udm_field = retrieve_udm_field(field, udm_fields) | |
| if udm_field: | |
| mapped_fields.append({'Log_Field': field, 'UDM_Field': udm_field}) | |
| else: | |
| custom_field = f"custom_fields.{field}" | |
| while custom_field in custom_fields: | |
| custom_field = f"{custom_field}_1" | |
| custom_fields.add(custom_field) | |
| mapped_fields.append({'Log_Field': field, 'UDM_Field': custom_field}) | |
| return pd.DataFrame(mapped_fields) | |
| # GPT-4-based generation function with few-shot learning | |
| def generate_udm_mapping_response(log_fields, udm_fields_csv): | |
| """ Generate a response using GPT-4 to map log fields to UDM fields """ | |
| # Load UDM Fields | |
| udm_fields = load_udm_fields(udm_fields_csv) | |
| # Map the log fields to UDM | |
| mapped_fields_df = map_log_fields_to_udm(log_fields, udm_fields) | |
| # Prepare the mapping as context | |
| mapped_fields_text = mapped_fields_df.to_string(index=False) | |
| # Few-shot learning examples | |
| examples = """ | |
| ### Example 1: Fortinet Fields to UDM Mapping | |
| Log Attribute | UDM Attribute | |
| --------------|--------------- | |
| devname | intermediary.hostname | |
| devid | intermediary.asset.hardware.serial_number | |
| srcip | principal.ip | |
| dstip | target.ip | |
| dstport | target.port | |
| ### Example 2: Palo Alto Fields to UDM Mapping | |
| Log Attribute | UDM Attribute | |
| --------------|--------------- | |
| src_ip | principal.ip | |
| dest_ip | target.ip | |
| dest_port | target.port | |
| action | security_result.action_details | |
| severity | security_result.severity_details | |
| """ | |
| system_template = """You are a cybersecurity expert specialized in log analysis and data normalization, helping security teams to map security log fields to Google Chronicle's Unified Data Model (UDM). | |
| Please follow these steps: 1. Identify the vendor name from the input, and understand the specific log field conventions of that vendor. | |
| 2. Extract log fields while ignoring general words that are not part of the field names. | |
| 3. Use web search or previous examples to consult the latest log documentation for the product provided and Google Chronicle UDM schema documentation. | |
| 4. Map each product log field to its corresponding UDM field based on known mappings or documentation. For the mapping to UDM fields, | |
| use the full list of UDM fields in the csv file provided udm_fields_csv. | |
| 5. Only attempt to map the user input that you deem as log fields for this product. | |
| 6.For fields that don't have a direct match in UDM, place them into custom fields, ensuring that each custom field is unique and logically consistent. | |
| 7. Organize the mapping into a structured table format and provide the user the option to download your mapping into a csv file. | |
| Think through your response step by step, and always aim for accurate, and professional responses with a focus on precision. | |
| """ | |
| # Call GPT-4 for final output with additional explanation | |
| response = client.chat.completions.create( | |
| model="gpt-4", | |
| messages=[ | |
| {"role": "system", "content": system_template}, | |
| {"role": "user", "content": f"Here are the log fields: {log_fields}. Please map them to UDM:\n\n{mapped_fields_text}\n\n{examples}"} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| # Chainlit app functionality | |
| async def start_chat(): | |
| await cl.Message(content="Welcome! Please provide the product name and the log fields or sample log you want to map to UDM.").send() | |
| settings = { | |
| "model": "gpt-3.5-turbo", | |
| "temperature": 0, | |
| "max_tokens": 500, | |
| "top_p": 1, | |
| "frequency_penalty": 0, | |
| "presence_penalty": 0, | |
| } | |
| cl.user_session.set("settings", settings) | |
| async def main(message: cl.Message): | |
| user_input = message.content | |
| log_fields = extract_log_fields(user_input) | |
| udm_fields_csv = 'udm_field_list_v2.csv' | |
| # Generate the UDM mapping response | |
| response = generate_udm_mapping_response(log_fields, udm_fields_csv) | |
| # Send the response back to Chainlit | |
| await cl.Message(content=f"Here is the mapped log fields to UDM:\n\n{response}").send() |