odatalogparser / log_parser.py
pilotstuki's picture
Fix slow request threshold not applying from UI
113119c
"""
IIS Log Parser using Polars for high-performance processing.
Handles large log files (200MB-1GB+) efficiently with streaming.
"""
import polars as pl
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from datetime import datetime
import re
class IISLogParser:
"""Parser for IIS W3C Extended Log Format."""
# IIS log column names
COLUMNS = [
"date", "time", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query",
"s_port", "cs_username", "c_ip", "cs_user_agent", "cs_referer",
"sc_status", "sc_substatus", "sc_win32_status", "time_taken"
]
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.service_name = None # Will be determined from URI paths during parsing
def parse(self, chunk_size: Optional[int] = None) -> pl.DataFrame:
"""
Parse IIS log file.
Args:
chunk_size: If provided, process in chunks (for very large files)
Returns:
Polars DataFrame with parsed log data
"""
# Read file, skip comment lines
with open(self.file_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = []
for line in f:
# Skip header/comment lines starting with #
if not line.startswith('#'):
lines.append(line.strip())
# Create DataFrame from lines
if not lines:
return pl.DataFrame()
# Split each line by space and create DataFrame
data = [line.split() for line in lines if line]
# Filter out lines that don't have correct number of columns
data = [row for row in data if len(row) == len(self.COLUMNS)]
if not data:
return pl.DataFrame()
df = pl.DataFrame(data, schema=self.COLUMNS, orient="row")
# Convert data types
df = df.with_columns([
pl.col("date").cast(pl.Utf8),
pl.col("time").cast(pl.Utf8),
pl.col("sc_status").cast(pl.Int32),
pl.col("sc_substatus").cast(pl.Int32),
pl.col("sc_win32_status").cast(pl.Int32),
pl.col("time_taken").cast(pl.Int32),
])
# Create timestamp column
df = df.with_columns([
(pl.col("date") + " " + pl.col("time")).alias("timestamp")
])
# Convert timestamp to datetime
df = df.with_columns([
pl.col("timestamp").str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S")
])
# Extract service name and method name from URI
df = df.with_columns([
self._extract_service_name().alias("service_name"),
self._extract_method_name().alias("method_name"),
self._extract_full_method_path().alias("full_method_path")
])
# Determine the primary service name for this log file
if df.height > 0:
# Get the most common service name
service_counts = df.group_by("service_name").agg([
pl.count().alias("count")
]).sort("count", descending=True)
if service_counts.height > 0:
self.service_name = service_counts.row(0, named=True)["service_name"]
else:
self.service_name = "Unknown"
else:
self.service_name = "Unknown"
return df
def _extract_service_name(self) -> pl.Expr:
"""Extract service name from URI stem (e.g., AdministratorOfficeService, CustomerOfficeService)."""
# Extract the first meaningful part after the leading slash
# Example: /AdministratorOfficeService/Contact/Get -> AdministratorOfficeService
return (
pl.col("cs_uri_stem")
.str.split("/")
.list.get(1) # Get first element after leading /
.fill_null("Unknown")
)
def _extract_full_method_path(self) -> pl.Expr:
"""Extract full method path for better error tracking (e.g., Contact/Get, Order/Create)."""
# Extract everything after the service name
# Example: /AdministratorOfficeService/Contact/Get -> Contact/Get
return (
pl.col("cs_uri_stem")
.str.split("/")
.list.slice(2) # Skip leading / and service name
.list.join("/")
.fill_null("Unknown")
)
def _extract_method_name(self) -> pl.Expr:
"""Extract method name from URI stem."""
# Extract last part of URI path (e.g., /Service/Contact/Get -> Get)
return pl.col("cs_uri_stem").str.split("/").list.last().fill_null("Unknown")
class LogAnalyzer:
"""Analyze parsed IIS logs and generate performance metrics."""
def __init__(self, df: pl.DataFrame, service_name: str = "Unknown", slow_threshold: int = 3000):
self.df = df
self.service_name = service_name
self.slow_threshold = slow_threshold
self._filtered_df = None
def filter_logs(self) -> pl.DataFrame:
"""
Apply filtering rules:
1. Exclude lines with both HEAD and Zabbix
2. Exclude 401 status codes (for error counting)
Returns:
Filtered DataFrame
"""
if self._filtered_df is not None:
return self._filtered_df
# Filter out HEAD + Zabbix
filtered = self.df.filter(
~(
(pl.col("cs_method") == "HEAD") &
(
pl.col("cs_user_agent").str.contains("Zabbix") |
pl.col("cs_uri_stem").str.contains("Zabbix")
)
)
)
self._filtered_df = filtered
return filtered
def get_summary_stats(self) -> Dict:
"""Get overall summary statistics."""
df = self.filter_logs()
# Count requests
total_before = self.df.height
total_after = df.height
excluded = total_before - total_after
# Count 401s separately
count_401 = self.df.filter(pl.col("sc_status") == 401).height
# Count errors (status != 200 and != 401)
errors = df.filter(
(pl.col("sc_status") != 200) & (pl.col("sc_status") != 401)
).height
# Count slow requests (using configured threshold)
slow_requests = df.filter(pl.col("time_taken") > self.slow_threshold).height
# Response time statistics
time_stats = df.select([
pl.col("time_taken").min().alias("min_time"),
pl.col("time_taken").max().alias("max_time"),
pl.col("time_taken").mean().alias("avg_time"),
]).to_dicts()[0]
# Peak RPS
rps_data = self._calculate_peak_rps(df)
return {
"service_name": self.service_name,
"total_requests_before": total_before,
"excluded_requests": excluded,
"excluded_401": count_401,
"total_requests_after": total_after,
"errors": errors,
"slow_requests": slow_requests,
"slow_threshold": self.slow_threshold,
"min_time_ms": int(time_stats["min_time"]) if time_stats["min_time"] else 0,
"max_time_ms": int(time_stats["max_time"]) if time_stats["max_time"] else 0,
"avg_time_ms": int(time_stats["avg_time"]) if time_stats["avg_time"] else 0,
"peak_rps": rps_data["peak_rps"],
"peak_timestamp": rps_data["peak_timestamp"],
}
def _calculate_peak_rps(self, df: pl.DataFrame) -> Dict:
"""Calculate peak requests per second."""
if df.height == 0:
return {"peak_rps": 0, "peak_timestamp": None}
# Group by second and count requests
rps = df.group_by("timestamp").agg([
pl.count().alias("count")
]).sort("count", descending=True)
if rps.height == 0:
return {"peak_rps": 0, "peak_timestamp": None}
peak_row = rps.row(0, named=True)
return {
"peak_rps": peak_row["count"],
"peak_timestamp": str(peak_row["timestamp"])
}
def get_top_methods(self, n: int = 5) -> List[Dict]:
"""Get top N methods by request count."""
df = self.filter_logs()
if df.height == 0:
return []
# Group by method name
method_stats = df.group_by("method_name").agg([
pl.count().alias("count"),
pl.col("time_taken").mean().alias("avg_time"),
pl.col("sc_status").filter(
(pl.col("sc_status") != 200) & (pl.col("sc_status") != 401)
).count().alias("errors")
]).sort("count", descending=True).limit(n)
return method_stats.to_dicts()
def get_error_breakdown(self) -> List[Dict]:
"""Get breakdown of errors by status code."""
df = self.filter_logs()
errors = df.filter(
(pl.col("sc_status") != 200) & (pl.col("sc_status") != 401)
)
if errors.height == 0:
return []
error_stats = errors.group_by("sc_status").agg([
pl.count().alias("count")
]).sort("count", descending=True)
return error_stats.to_dicts()
def get_errors_by_method(self, n: int = 10) -> List[Dict]:
"""
Get detailed error breakdown by method with full context.
Shows which methods are causing the most errors.
Args:
n: Number of top error-prone methods to return
Returns:
List of dicts with method, error count, total calls, and error rate
"""
df = self.filter_logs()
if df.height == 0:
return []
# Get error counts and total counts per full method path
method_errors = df.group_by("full_method_path").agg([
pl.count().alias("total_calls"),
pl.col("sc_status").filter(
(pl.col("sc_status") != 200) & (pl.col("sc_status") != 401)
).count().alias("error_count"),
pl.col("sc_status").filter(
(pl.col("sc_status") != 200) & (pl.col("sc_status") != 401)
).first().alias("most_common_error_status"),
pl.col("time_taken").mean().alias("avg_response_time_ms"),
]).filter(
pl.col("error_count") > 0
).with_columns([
(pl.col("error_count") * 100.0 / pl.col("total_calls")).alias("error_rate_percent")
]).sort("error_count", descending=True).limit(n)
return method_errors.to_dicts()
def get_error_details(self, method_path: str = None, limit: int = 100) -> List[Dict]:
"""
Get detailed error logs with full context for debugging.
Args:
method_path: Optional filter for specific method path
limit: Maximum number of error records to return
Returns:
List of error records with timestamp, method, status, response time, etc.
"""
df = self.filter_logs()
# Filter for errors only
errors = df.filter(
(pl.col("sc_status") != 200) & (pl.col("sc_status") != 401)
)
# Apply method filter if specified
if method_path:
errors = errors.filter(pl.col("full_method_path") == method_path)
if errors.height == 0:
return []
# Select relevant columns for debugging
error_details = errors.select([
"timestamp",
"service_name",
"full_method_path",
"method_name",
"sc_status",
"sc_substatus",
"sc_win32_status",
"time_taken",
"c_ip",
"cs_uri_query"
]).sort("timestamp", descending=True).limit(limit)
return error_details.to_dicts()
def get_response_time_distribution(self, buckets: List[int] = None) -> Dict:
"""Get response time distribution by buckets."""
if buckets is None:
buckets = [0, 50, 100, 200, 500, 1000, 3000, 10000]
df = self.filter_logs()
if df.height == 0:
return {}
distribution = {}
for i in range(len(buckets) - 1):
lower = buckets[i]
upper = buckets[i + 1]
count = df.filter(
(pl.col("time_taken") >= lower) & (pl.col("time_taken") < upper)
).height
distribution[f"{lower}-{upper}ms"] = count
# Add bucket for values above last threshold
count = df.filter(pl.col("time_taken") >= buckets[-1]).height
distribution[f">{buckets[-1]}ms"] = count
return distribution
def get_rps_timeline(self, interval: str = "1m") -> pl.DataFrame:
"""Get RPS over time with specified interval."""
df = self.filter_logs()
if df.height == 0:
return pl.DataFrame()
# Group by time interval
timeline = df.group_by_dynamic("timestamp", every=interval).agg([
pl.count().alias("requests")
]).sort("timestamp")
return timeline
def analyze_multiple_logs(log_files: List[str]) -> Tuple[Dict, List[Dict]]:
"""
Analyze multiple log files and generate combined report.
Args:
log_files: List of log file paths
Returns:
Tuple of (combined_stats, individual_stats)
"""
individual_stats = []
for log_file in log_files:
parser = IISLogParser(log_file)
df = parser.parse()
analyzer = LogAnalyzer(df, parser.service_name)
stats = {
"summary": analyzer.get_summary_stats(),
"top_methods": analyzer.get_top_methods(),
"error_breakdown": analyzer.get_error_breakdown(),
"errors_by_method": analyzer.get_errors_by_method(n=10),
"response_time_dist": analyzer.get_response_time_distribution(),
"analyzer": analyzer,
}
individual_stats.append(stats)
# Calculate combined statistics
combined = {
"total_requests_before": sum(s["summary"]["total_requests_before"] for s in individual_stats),
"excluded_requests": sum(s["summary"]["excluded_requests"] for s in individual_stats),
"excluded_401": sum(s["summary"]["excluded_401"] for s in individual_stats),
"total_requests_after": sum(s["summary"]["total_requests_after"] for s in individual_stats),
"errors": sum(s["summary"]["errors"] for s in individual_stats),
"slow_requests": sum(s["summary"]["slow_requests"] for s in individual_stats),
}
return combined, individual_stats