""" IIS Log Parser using Polars for high-performance processing. Handles large log files (200MB-1GB+) efficiently with streaming. """ import polars as pl from pathlib import Path from typing import Dict, List, Tuple, Optional from datetime import datetime import re class IISLogParser: """Parser for IIS W3C Extended Log Format.""" # IIS log column names COLUMNS = [ "date", "time", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "cs_username", "c_ip", "cs_user_agent", "cs_referer", "sc_status", "sc_substatus", "sc_win32_status", "time_taken" ] def __init__(self, file_path: str): self.file_path = Path(file_path) self.service_name = None # Will be determined from URI paths during parsing def parse(self, chunk_size: Optional[int] = None) -> pl.DataFrame: """ Parse IIS log file. Args: chunk_size: If provided, process in chunks (for very large files) Returns: Polars DataFrame with parsed log data """ # Read file, skip comment lines with open(self.file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = [] for line in f: # Skip header/comment lines starting with # if not line.startswith('#'): lines.append(line.strip()) # Create DataFrame from lines if not lines: return pl.DataFrame() # Split each line by space and create DataFrame data = [line.split() for line in lines if line] # Filter out lines that don't have correct number of columns data = [row for row in data if len(row) == len(self.COLUMNS)] if not data: return pl.DataFrame() df = pl.DataFrame(data, schema=self.COLUMNS, orient="row") # Convert data types df = df.with_columns([ pl.col("date").cast(pl.Utf8), pl.col("time").cast(pl.Utf8), pl.col("sc_status").cast(pl.Int32), pl.col("sc_substatus").cast(pl.Int32), pl.col("sc_win32_status").cast(pl.Int32), pl.col("time_taken").cast(pl.Int32), ]) # Create timestamp column df = df.with_columns([ (pl.col("date") + " " + pl.col("time")).alias("timestamp") ]) # Convert timestamp to datetime df = df.with_columns([ pl.col("timestamp").str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S") ]) # Extract service name and method name from URI df = df.with_columns([ self._extract_service_name().alias("service_name"), self._extract_method_name().alias("method_name"), self._extract_full_method_path().alias("full_method_path") ]) # Determine the primary service name for this log file if df.height > 0: # Get the most common service name service_counts = df.group_by("service_name").agg([ pl.count().alias("count") ]).sort("count", descending=True) if service_counts.height > 0: self.service_name = service_counts.row(0, named=True)["service_name"] else: self.service_name = "Unknown" else: self.service_name = "Unknown" return df def _extract_service_name(self) -> pl.Expr: """Extract service name from URI stem (e.g., AdministratorOfficeService, CustomerOfficeService).""" # Extract the first meaningful part after the leading slash # Example: /AdministratorOfficeService/Contact/Get -> AdministratorOfficeService return ( pl.col("cs_uri_stem") .str.split("/") .list.get(1) # Get first element after leading / .fill_null("Unknown") ) def _extract_full_method_path(self) -> pl.Expr: """Extract full method path for better error tracking (e.g., Contact/Get, Order/Create).""" # Extract everything after the service name # Example: /AdministratorOfficeService/Contact/Get -> Contact/Get return ( pl.col("cs_uri_stem") .str.split("/") .list.slice(2) # Skip leading / and service name .list.join("/") .fill_null("Unknown") ) def _extract_method_name(self) -> pl.Expr: """Extract method name from URI stem.""" # Extract last part of URI path (e.g., /Service/Contact/Get -> Get) return pl.col("cs_uri_stem").str.split("/").list.last().fill_null("Unknown") class LogAnalyzer: """Analyze parsed IIS logs and generate performance metrics.""" def __init__(self, df: pl.DataFrame, service_name: str = "Unknown", slow_threshold: int = 3000): self.df = df self.service_name = service_name self.slow_threshold = slow_threshold self._filtered_df = None def filter_logs(self) -> pl.DataFrame: """ Apply filtering rules: 1. Exclude lines with both HEAD and Zabbix 2. Exclude 401 status codes (for error counting) Returns: Filtered DataFrame """ if self._filtered_df is not None: return self._filtered_df # Filter out HEAD + Zabbix filtered = self.df.filter( ~( (pl.col("cs_method") == "HEAD") & ( pl.col("cs_user_agent").str.contains("Zabbix") | pl.col("cs_uri_stem").str.contains("Zabbix") ) ) ) self._filtered_df = filtered return filtered def get_summary_stats(self) -> Dict: """Get overall summary statistics.""" df = self.filter_logs() # Count requests total_before = self.df.height total_after = df.height excluded = total_before - total_after # Count 401s separately count_401 = self.df.filter(pl.col("sc_status") == 401).height # Count errors (status != 200 and != 401) errors = df.filter( (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) ).height # Count slow requests (using configured threshold) slow_requests = df.filter(pl.col("time_taken") > self.slow_threshold).height # Response time statistics time_stats = df.select([ pl.col("time_taken").min().alias("min_time"), pl.col("time_taken").max().alias("max_time"), pl.col("time_taken").mean().alias("avg_time"), ]).to_dicts()[0] # Peak RPS rps_data = self._calculate_peak_rps(df) return { "service_name": self.service_name, "total_requests_before": total_before, "excluded_requests": excluded, "excluded_401": count_401, "total_requests_after": total_after, "errors": errors, "slow_requests": slow_requests, "slow_threshold": self.slow_threshold, "min_time_ms": int(time_stats["min_time"]) if time_stats["min_time"] else 0, "max_time_ms": int(time_stats["max_time"]) if time_stats["max_time"] else 0, "avg_time_ms": int(time_stats["avg_time"]) if time_stats["avg_time"] else 0, "peak_rps": rps_data["peak_rps"], "peak_timestamp": rps_data["peak_timestamp"], } def _calculate_peak_rps(self, df: pl.DataFrame) -> Dict: """Calculate peak requests per second.""" if df.height == 0: return {"peak_rps": 0, "peak_timestamp": None} # Group by second and count requests rps = df.group_by("timestamp").agg([ pl.count().alias("count") ]).sort("count", descending=True) if rps.height == 0: return {"peak_rps": 0, "peak_timestamp": None} peak_row = rps.row(0, named=True) return { "peak_rps": peak_row["count"], "peak_timestamp": str(peak_row["timestamp"]) } def get_top_methods(self, n: int = 5) -> List[Dict]: """Get top N methods by request count.""" df = self.filter_logs() if df.height == 0: return [] # Group by method name method_stats = df.group_by("method_name").agg([ pl.count().alias("count"), pl.col("time_taken").mean().alias("avg_time"), pl.col("sc_status").filter( (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) ).count().alias("errors") ]).sort("count", descending=True).limit(n) return method_stats.to_dicts() def get_error_breakdown(self) -> List[Dict]: """Get breakdown of errors by status code.""" df = self.filter_logs() errors = df.filter( (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) ) if errors.height == 0: return [] error_stats = errors.group_by("sc_status").agg([ pl.count().alias("count") ]).sort("count", descending=True) return error_stats.to_dicts() def get_errors_by_method(self, n: int = 10) -> List[Dict]: """ Get detailed error breakdown by method with full context. Shows which methods are causing the most errors. Args: n: Number of top error-prone methods to return Returns: List of dicts with method, error count, total calls, and error rate """ df = self.filter_logs() if df.height == 0: return [] # Get error counts and total counts per full method path method_errors = df.group_by("full_method_path").agg([ pl.count().alias("total_calls"), pl.col("sc_status").filter( (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) ).count().alias("error_count"), pl.col("sc_status").filter( (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) ).first().alias("most_common_error_status"), pl.col("time_taken").mean().alias("avg_response_time_ms"), ]).filter( pl.col("error_count") > 0 ).with_columns([ (pl.col("error_count") * 100.0 / pl.col("total_calls")).alias("error_rate_percent") ]).sort("error_count", descending=True).limit(n) return method_errors.to_dicts() def get_error_details(self, method_path: str = None, limit: int = 100) -> List[Dict]: """ Get detailed error logs with full context for debugging. Args: method_path: Optional filter for specific method path limit: Maximum number of error records to return Returns: List of error records with timestamp, method, status, response time, etc. """ df = self.filter_logs() # Filter for errors only errors = df.filter( (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) ) # Apply method filter if specified if method_path: errors = errors.filter(pl.col("full_method_path") == method_path) if errors.height == 0: return [] # Select relevant columns for debugging error_details = errors.select([ "timestamp", "service_name", "full_method_path", "method_name", "sc_status", "sc_substatus", "sc_win32_status", "time_taken", "c_ip", "cs_uri_query" ]).sort("timestamp", descending=True).limit(limit) return error_details.to_dicts() def get_response_time_distribution(self, buckets: List[int] = None) -> Dict: """Get response time distribution by buckets.""" if buckets is None: buckets = [0, 50, 100, 200, 500, 1000, 3000, 10000] df = self.filter_logs() if df.height == 0: return {} distribution = {} for i in range(len(buckets) - 1): lower = buckets[i] upper = buckets[i + 1] count = df.filter( (pl.col("time_taken") >= lower) & (pl.col("time_taken") < upper) ).height distribution[f"{lower}-{upper}ms"] = count # Add bucket for values above last threshold count = df.filter(pl.col("time_taken") >= buckets[-1]).height distribution[f">{buckets[-1]}ms"] = count return distribution def get_rps_timeline(self, interval: str = "1m") -> pl.DataFrame: """Get RPS over time with specified interval.""" df = self.filter_logs() if df.height == 0: return pl.DataFrame() # Group by time interval timeline = df.group_by_dynamic("timestamp", every=interval).agg([ pl.count().alias("requests") ]).sort("timestamp") return timeline def analyze_multiple_logs(log_files: List[str]) -> Tuple[Dict, List[Dict]]: """ Analyze multiple log files and generate combined report. Args: log_files: List of log file paths Returns: Tuple of (combined_stats, individual_stats) """ individual_stats = [] for log_file in log_files: parser = IISLogParser(log_file) df = parser.parse() analyzer = LogAnalyzer(df, parser.service_name) stats = { "summary": analyzer.get_summary_stats(), "top_methods": analyzer.get_top_methods(), "error_breakdown": analyzer.get_error_breakdown(), "errors_by_method": analyzer.get_errors_by_method(n=10), "response_time_dist": analyzer.get_response_time_distribution(), "analyzer": analyzer, } individual_stats.append(stats) # Calculate combined statistics combined = { "total_requests_before": sum(s["summary"]["total_requests_before"] for s in individual_stats), "excluded_requests": sum(s["summary"]["excluded_requests"] for s in individual_stats), "excluded_401": sum(s["summary"]["excluded_401"] for s in individual_stats), "total_requests_after": sum(s["summary"]["total_requests_after"] for s in individual_stats), "errors": sum(s["summary"]["errors"] for s in individual_stats), "slow_requests": sum(s["summary"]["slow_requests"] for s in individual_stats), } return combined, individual_stats