test / analyzer.py
Sakib Ahmed
Your commit message
90b3b3f
import re
import json
def extract_entities(log_content):
# Initialize entity dictionaries
entities = {
"DateTime": [],
"System": [],
"Service": [],
"Process": [],
"Action": [],
"IPAddress": [],
"DNSName": [],
"Username": [],
"Role": [],
"Metadata": [],
"Status": [],
"Error": [],
"Severity": [],
"SessionID": [],
"SessionStatus": [],
"FileName": [],
"Object": [],
"ApplicationSpecific": [],
"AuthenticationType": [],
"ResourceType": [],
"ResourceUsage": [],
# Add new entity types
"TimeServer": [],
"Port": [],
"SourcePort": [],
"DestinationPort": [],
"Protocol": [],
"Interface": [],
"InterfaceType": [],
"Subnet": [],
"Rule": [],
"TTL": [],
"MAC": [],
"Flags": [],
"CPU": [],
"MemoryInfo": [],
"Hypervisor": [],
"Device": [],
"FileSystem": [],
"DataBus": [],
"EventID": [],
"CMD": []
}
# Process log content line by line to maintain proper context
lines = log_content.split('\n')
# DateTime patterns with clear start/end markers
# ISO timestamps in JSON - starts with @timestamp": " and ends with "
iso_timestamps = re.findall(r'@timestamp\":\s*\"([^\"]+)\"', log_content)
entities["DateTime"].extend(iso_timestamps)
# Log timestamps - starts with year or month and ends with space
for line in lines:
# YYYY-MM-DD HH:MM:SS format (starts with digit, ends with space)
date_match = re.search(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s', line)
if date_match:
entities["DateTime"].append(date_match.group(1))
# Month DD HH:MM:SS format (starts with month name, ends with space)
syslog_match = re.match(r'^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+(\d{1,2})\s+(\d{2}:\d{2}:\d{2})\s', line)
if syslog_match:
month, day, time = syslog_match.groups()
entities["DateTime"].append(f"{month} {day} {time}")
# Audit timestamps - starts with msg=audit( and ends with :
audit_timestamps = re.findall(r'msg=audit\((\d+\.\d+):', log_content)
entities["DateTime"].extend(audit_timestamps)
# System/Node patterns - with clear start/end markers
# Extract hostname from JSON - starts with "hostname": " and ends with "
hostnames = re.findall(r'\"hostname\":\s*\"([^\"]+)\"', log_content)
entities["System"].extend(hostnames)
# Extract hostname from JSON host field - starts with "host": {"name": " and ends with "
host_names = re.findall(r'\"host\":\s*\{[^}]*\"name\":\s*\"([^\"]+)\"', log_content)
entities["System"].extend(host_names)
# Extract hostname from syslog format - after timestamp and before service
for line in lines:
syslog_match = re.match(r'^(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s+([^\s:]+)\s', line)
if syslog_match and syslog_match.group(1) not in ["?", "-"]:
entities["System"].append(syslog_match.group(1))
# Service patterns - with clear start/end markers
# Extract service from syslog format - after hostname and before colon or bracket
for line in lines:
service_match = re.search(r'^\w+\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s+[^\s]+\s+([^:\[\s]+)(?:\[\d+\])?:', line)
if service_match and service_match.group(1) not in ["?", "-"]:
entities["Service"].append(service_match.group(1))
# Extract service from JSON - starts with "service": {"type": " and ends with "
service_types = re.findall(r'\"service\":\s*\{\s*\"type\":\s*\"([^\"]+)\"', log_content)
entities["Service"].extend(service_types)
# Extract agent types - starts with "agent": and contains "type": " and ends with "
agent_types = re.findall(r'\"agent\":[^}]*\"type\":\s*\"([^\"]+)\"', log_content)
entities["Service"].extend(agent_types)
# Process IDs - with clear start/end markers
# Extract PIDs from brackets - starts with [ and ends with ]
for line in lines:
pid_matches = re.findall(r'(?:sshd|dnsmasq|cron|systemd|openvpn|metricbeat)\[(\d+)\]', line)
entities["Process"].extend(pid_matches)
# Extract PIDs from audit logs - starts with pid= and ends with space
pid_audit = re.findall(r'pid=(\d+)\s', log_content)
entities["Process"].extend(pid_audit)
# Action patterns - with clear start/end markers
# Session actions - starts with session and ends with for user or space
session_actions = re.findall(r'session\s+(opened|closed)(?:\s+for\s+user|\s)', log_content)
entities["Action"].extend(session_actions)
# DNS actions - starts with dnsmasq[PID]: and ends with space
for line in lines:
if "dnsmasq" in line:
dns_match = re.search(r'dnsmasq\[\d+\]:\s+(query|forwarded|reply|cached|NODATA-IPv[46])(?:\s)', line)
if dns_match:
entities["Action"].append(dns_match.group(1))
# VPN actions - starts with clear identifier and ends with space
vpn_actions = re.findall(r'(?:TLS|VERIFY)\s+(OK|soft\s+reset)(?:\s|$)', log_content)
entities["Action"].extend(vpn_actions)
# IP Address patterns - with clear start/end markers
# Find IPs with context - starts with from/to/is and ends with space or port
for line in lines:
ip_from = re.findall(r'from\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?:\s|$|:)', line)
entities["IPAddress"].extend(ip_from)
ip_to = re.findall(r'to\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?:\s|$)', line)
entities["IPAddress"].extend(ip_to)
ip_is = re.findall(r'is\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?:\s|$)', line)
entities["IPAddress"].extend(ip_is)
# Find IPs in VPN logs - starts with username/ and ends with :
vpn_ips = re.findall(r'[a-zA-Z0-9]+/(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):', log_content)
entities["IPAddress"].extend(vpn_ips)
# Extract Source IP more comprehensively
src_ips = re.findall(r'SRC=(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', log_content)
entities["IPAddress"].extend(src_ips)
# Extract Destination IP more comprehensively
dst_ips = re.findall(r'DST=(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', log_content)
entities["IPAddress"].extend(dst_ips)
# DNS Name patterns - with clear start/end markers
# Find domains in DNS queries - starts with query/forwarded/reply and ends with from/to/is
for line in lines:
if "dnsmasq" in line:
dns_match = re.search(r'(?:query\[[A-Z]+\]|forwarded|reply)\s+([-a-zA-Z0-9.*_/]+(?:\.[a-zA-Z0-9.*_/-]+)+)(?:\s+from|\s+to|\s+is)', line)
if dns_match:
entities["DNSName"].append(dns_match.group(1))
# Username patterns - with clear start/end markers
# Extract usernames from quotes - starts with user=" or acct=" and ends with "
usernames_quoted = re.findall(r'(?:user|acct)=\"([^\"]+)\"', log_content)
entities["Username"].extend(usernames_quoted)
# Extract usernames from VPN logs - username before slash and IP
vpn_users = re.findall(r'(\w+)/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:', log_content)
entities["Username"].extend(vpn_users)
# Extract usernames from session logs - starts with for user and ends with space or by
usernames_session = re.findall(r'for\s+user\s+(\w+)(?:\s|$|by)', log_content)
entities["Username"].extend(usernames_session)
# Extract usernames from SSH logs - starts with Accepted type for and ends with from
usernames_ssh = re.findall(r'Accepted\s+\w+\s+for\s+(\w+)\s+from', log_content)
entities["Username"].extend(usernames_ssh)
# Time Server patterns - NTP servers with port 123
time_servers = re.findall(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):123\s+\(([^)]+)\)', log_content)
for ip, name in time_servers:
entities["TimeServer"].append(f"{ip}:123 ({name})")
# Port patterns - explicit port mentions
port_patterns = re.findall(r'(?:port\s+|:)(\d+)(?:\s|$|,|\))', log_content)
entities["Port"].extend(port_patterns)
# Source Port patterns
source_ports = re.findall(r'SPT=(\d+)', log_content)
entities["SourcePort"].extend(source_ports)
# Destination Port patterns
dest_ports = re.findall(r'DPT=(\d+)', log_content)
entities["DestinationPort"].extend(dest_ports)
# Protocol patterns
protocols = re.findall(r'(?:PROTO=|protocol\s+)([a-zA-Z]+\d*)', log_content) # Modified to avoid numeric-only protocols
entities["Protocol"].extend(protocols)
# Add common protocols if mentioned
for proto in ["tcp", "udp", "icmp", "TCP", "IPv4", "IPv6"]:
if re.search(r'\b' + proto + r'\b', log_content, re.IGNORECASE):
entities["Protocol"].append(proto)
# Interface patterns
interfaces = re.findall(r'(?:interface|dev)\s+(ens\d+|eth\d+|wlan\d+|lo)', log_content)
entities["Interface"].extend(interfaces)
# Interface Type patterns
interface_types = re.findall(r'(?:zone|type)\s+(inet|lan|dmz|wan)', log_content, re.IGNORECASE)
entities["InterfaceType"].extend(interface_types)
# Subnet patterns
subnets = re.findall(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2})', log_content)
entities["Subnet"].extend(subnets)
# Rule patterns
rules = re.findall(r'(DNAT|ACCEPT|REJECT|DROP|Policy)\s', log_content)
entities["Rule"].extend(rules)
# TTL patterns
ttls = re.findall(r'TTL=(\d+)', log_content)
entities["TTL"].extend(ttls)
# MAC Address patterns
macs = re.findall(r'((?:[0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}|(?:[0-9a-fA-F]{2}-){5}[0-9a-fA-F]{2})', log_content)
entities["MAC"].extend(macs)
# Flags patterns
flags = re.findall(r'(?:flags|FLAG)\s+(\w+)', log_content)
entities["Flags"].extend(flags)
# Add specific flags if found
for flag in ["RST", "DF", "ACK", "SYN", "FIN", "PSH", "URG"]:
if re.search(r'\b' + flag + r'\b', log_content):
entities["Flags"].append(flag)
# CPU patterns
cpu_info = re.findall(r'(Intel GenuineIntel|AMD AuthenticAMD|Centaur CentaurHauls)', log_content)
entities["CPU"].extend(cpu_info)
# Memory Info patterns
memory_info = re.findall(r'mem\s+0x[0-9a-f]+-0x[0-9a-f]+\s+([a-z]+)', log_content)
entities["MemoryInfo"].extend(memory_info)
# Hypervisor patterns
hypervisor = re.findall(r'Hypervisor detected:\s+(\w+)', log_content)
entities["Hypervisor"].extend(hypervisor)
# Device/Component patterns
devices = re.findall(r'(?:device|component):\s+([a-zA-Z0-9_-]+)', log_content)
entities["Device"].extend(devices)
# Add common devices if found
for device in ["PCI-DMA", "ehci_hcd", "usb", "rtc_cmos", "virtio-pci", "i8042", "ata_piix"]:
if re.search(r'\b' + device + r'\b', log_content):
entities["Device"].append(device)
# File System patterns
filesystems = re.findall(r'(squashfs|ext4|xfs|btrfs):\s', log_content)
entities["FileSystem"].extend(filesystems)
# Data Bus patterns
for bus in ["PCI", "USB", "i2c", "PS/2", "Serial", "ATA", "SATA", "TUN/TAP"]:
if re.search(r'\b' + bus + r'\b', log_content):
entities["DataBus"].append(bus)
# Event ID patterns
event_ids = re.findall(r'\[(\d+\.\d+)\]', log_content)
entities["EventID"].extend(event_ids)
# CMD patterns
cmd_patterns = re.findall(r'CMD\s+\(([^)]+)\)', log_content)
entities["CMD"].extend(cmd_patterns)
# Enhanced file path detection
file_paths = re.findall(r'(/etc/[a-zA-Z0-9_/.-]+)', log_content)
entities["FileName"].extend(file_paths)
# File name patterns - with clear file extensions
for line in lines:
# Look for common file extensions - starts with word character and ends with known extension
file_match = re.search(r'(?<!\S)([a-zA-Z0-9_-]+\.(?:xlsx|txt|java|log|csv|pdf|docx|cfg|conf))(?:\s|$|\.)', line)
if file_match:
entities["FileName"].append(file_match.group(1))
# Enhanced severity level detection
for severity in ["warning", "info", "error", "debug", "notice", "critical", "alert", "emergency"]:
if re.search(r'\b' + severity + r'\b', log_content, re.IGNORECASE):
entities["Severity"].append(severity)
# Session status patterns
session_status = re.findall(r'session\s+(opened|closed)', log_content)
entities["SessionStatus"].extend(session_status)
# Error message patterns
error_patterns = re.findall(r'(?:error|failure|failed):\s+([^,\n]+)', log_content, re.IGNORECASE)
entities["Error"].extend(error_patterns)
# Authentication type patterns
auth_types = re.findall(r'Accepted\s+(\w+)', log_content)
entities["AuthenticationType"].extend(auth_types)
# Status patterns - with clear start/end markers
# Extract PAM results - starts with res= and ends with space or quote
status_pam = re.findall(r'res=(\w+)(?:\s|\'|\")', log_content)
entities["Status"].extend(status_pam)
# Extract verification results - starts with VERIFY and ends with space
status_verify = re.findall(r'VERIFY\s+(OK|FAILED|KU OK|EKU OK)(?:\s|$)', log_content)
entities["Status"].extend(status_verify)
# Resource type patterns - with clear indicators
resource_types = re.findall(r'(?:resource|type):\s+([a-zA-Z0-9_-]+)', log_content)
entities["ResourceType"].extend(resource_types)
# Add CPU as resource type if system CPU metrics are present
if re.search(r'\"system\":\s*\{\s*\"cpu\":', log_content):
entities["ResourceType"].append("CPU")
# SessionID patterns - starts with session_id= or sessionId= and ends with space or comma
session_id_patterns = re.findall(r'session(?:_id|Id)=([a-zA-Z0-9-]+)(?:\s|,|$)', log_content)
entities["SessionID"].extend(session_id_patterns)
# Extract session numbers - starts with session space and ends with space
session_numbers = re.findall(r'session\s+(\d+)(?:\s|$)', log_content)
entities["SessionID"].extend(session_numbers)
# Extract ses= format SessionIDs from audit logs
ses_patterns = re.findall(r'ses=(\d+)(?:\s|,|$)', log_content)
entities["SessionID"].extend(ses_patterns)
# Object patterns - starts with object= and ends with space or comma
object_patterns = re.findall(r'object=([a-zA-Z0-9_-]+)(?:\s|,|$)', log_content)
entities["Object"].extend(object_patterns)
# Extract unit names from systemd logs
unit_objects = re.findall(r'unit=([a-zA-Z0-9_-]+)(?:\s|,|$)', log_content)
entities["Object"].extend(unit_objects)
# Application specific patterns - starts with app= and ends with space or comma
app_specific_patterns = re.findall(r'app=([a-zA-Z0-9_-]+)(?:\s|,|$)', log_content)
entities["ApplicationSpecific"].extend(app_specific_patterns)
# Extract specific applications from content
if "OpenVPN" in log_content:
entities["ApplicationSpecific"].append("OpenVPN")
if "metricbeat" in log_content:
entities["ApplicationSpecific"].append("metricbeat")
# Role patterns - starts with role= and ends with space or comma
role_patterns = re.findall(r'role=([a-zA-Z0-9_-]+)(?:\s|,|$)', log_content)
entities["Role"].extend(role_patterns)
# Metadata patterns - starts with metadata={ and ends with }
metadata_patterns = re.findall(r'metadata=\{([^}]+)\}', log_content)
entities["Metadata"].extend(metadata_patterns)
# Resource usage patterns - CPU and memory metrics
cpu_usage = re.findall(r'\"cpu\":\s*{\s*\"pct\":\s*([0-9.]+)', log_content)
if cpu_usage:
entities["ResourceUsage"].extend([f"CPU: {usage}%" for usage in cpu_usage])
# Remove duplicates
for entity_type in list(entities.keys()):
if entities[entity_type]:
entities[entity_type] = list(set(entities[entity_type]))
else:
# Remove empty entity types from output
del entities[entity_type]
return entities