Spaces:
Sleeping
Sleeping
| """ | |
| IIS Log Parser using Polars for high-performance processing. | |
| Handles large log files (200MB-1GB+) efficiently with streaming. | |
| """ | |
| import polars as pl | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Optional | |
| from datetime import datetime | |
| import re | |
| class IISLogParser: | |
| """Parser for IIS W3C Extended Log Format.""" | |
| # IIS log column names | |
| COLUMNS = [ | |
| "date", "time", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", | |
| "s_port", "cs_username", "c_ip", "cs_user_agent", "cs_referer", | |
| "sc_status", "sc_substatus", "sc_win32_status", "time_taken" | |
| ] | |
| def __init__(self, file_path: str): | |
| self.file_path = Path(file_path) | |
| self.service_name = None # Will be determined from URI paths during parsing | |
| def parse(self, chunk_size: Optional[int] = None) -> pl.DataFrame: | |
| """ | |
| Parse IIS log file. | |
| Args: | |
| chunk_size: If provided, process in chunks (for very large files) | |
| Returns: | |
| Polars DataFrame with parsed log data | |
| """ | |
| # Read file, skip comment lines | |
| with open(self.file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| lines = [] | |
| for line in f: | |
| # Skip header/comment lines starting with # | |
| if not line.startswith('#'): | |
| lines.append(line.strip()) | |
| # Create DataFrame from lines | |
| if not lines: | |
| return pl.DataFrame() | |
| # Split each line by space and create DataFrame | |
| data = [line.split() for line in lines if line] | |
| # Filter out lines that don't have correct number of columns | |
| data = [row for row in data if len(row) == len(self.COLUMNS)] | |
| if not data: | |
| return pl.DataFrame() | |
| df = pl.DataFrame(data, schema=self.COLUMNS, orient="row") | |
| # Convert data types | |
| df = df.with_columns([ | |
| pl.col("date").cast(pl.Utf8), | |
| pl.col("time").cast(pl.Utf8), | |
| pl.col("sc_status").cast(pl.Int32), | |
| pl.col("sc_substatus").cast(pl.Int32), | |
| pl.col("sc_win32_status").cast(pl.Int32), | |
| pl.col("time_taken").cast(pl.Int32), | |
| ]) | |
| # Create timestamp column | |
| df = df.with_columns([ | |
| (pl.col("date") + " " + pl.col("time")).alias("timestamp") | |
| ]) | |
| # Convert timestamp to datetime | |
| df = df.with_columns([ | |
| pl.col("timestamp").str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S") | |
| ]) | |
| # Extract service name and method name from URI | |
| df = df.with_columns([ | |
| self._extract_service_name().alias("service_name"), | |
| self._extract_method_name().alias("method_name"), | |
| self._extract_full_method_path().alias("full_method_path") | |
| ]) | |
| # Determine the primary service name for this log file | |
| if df.height > 0: | |
| # Get the most common service name | |
| service_counts = df.group_by("service_name").agg([ | |
| pl.count().alias("count") | |
| ]).sort("count", descending=True) | |
| if service_counts.height > 0: | |
| self.service_name = service_counts.row(0, named=True)["service_name"] | |
| else: | |
| self.service_name = "Unknown" | |
| else: | |
| self.service_name = "Unknown" | |
| return df | |
| def _extract_service_name(self) -> pl.Expr: | |
| """Extract service name from URI stem (e.g., AdministratorOfficeService, CustomerOfficeService).""" | |
| # Extract the first meaningful part after the leading slash | |
| # Example: /AdministratorOfficeService/Contact/Get -> AdministratorOfficeService | |
| return ( | |
| pl.col("cs_uri_stem") | |
| .str.split("/") | |
| .list.get(1) # Get first element after leading / | |
| .fill_null("Unknown") | |
| ) | |
| def _extract_full_method_path(self) -> pl.Expr: | |
| """Extract full method path for better error tracking (e.g., Contact/Get, Order/Create).""" | |
| # Extract everything after the service name | |
| # Example: /AdministratorOfficeService/Contact/Get -> Contact/Get | |
| return ( | |
| pl.col("cs_uri_stem") | |
| .str.split("/") | |
| .list.slice(2) # Skip leading / and service name | |
| .list.join("/") | |
| .fill_null("Unknown") | |
| ) | |
| def _extract_method_name(self) -> pl.Expr: | |
| """Extract method name from URI stem.""" | |
| # Extract last part of URI path (e.g., /Service/Contact/Get -> Get) | |
| return pl.col("cs_uri_stem").str.split("/").list.last().fill_null("Unknown") | |
| class LogAnalyzer: | |
| """Analyze parsed IIS logs and generate performance metrics.""" | |
| def __init__(self, df: pl.DataFrame, service_name: str = "Unknown", slow_threshold: int = 3000): | |
| self.df = df | |
| self.service_name = service_name | |
| self.slow_threshold = slow_threshold | |
| self._filtered_df = None | |
| def filter_logs(self) -> pl.DataFrame: | |
| """ | |
| Apply filtering rules: | |
| 1. Exclude lines with both HEAD and Zabbix | |
| 2. Exclude 401 status codes (for error counting) | |
| Returns: | |
| Filtered DataFrame | |
| """ | |
| if self._filtered_df is not None: | |
| return self._filtered_df | |
| # Filter out HEAD + Zabbix | |
| filtered = self.df.filter( | |
| ~( | |
| (pl.col("cs_method") == "HEAD") & | |
| ( | |
| pl.col("cs_user_agent").str.contains("Zabbix") | | |
| pl.col("cs_uri_stem").str.contains("Zabbix") | |
| ) | |
| ) | |
| ) | |
| self._filtered_df = filtered | |
| return filtered | |
| def get_summary_stats(self) -> Dict: | |
| """Get overall summary statistics.""" | |
| df = self.filter_logs() | |
| # Count requests | |
| total_before = self.df.height | |
| total_after = df.height | |
| excluded = total_before - total_after | |
| # Count 401s separately | |
| count_401 = self.df.filter(pl.col("sc_status") == 401).height | |
| # Count errors (status != 200 and != 401) | |
| errors = df.filter( | |
| (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) | |
| ).height | |
| # Count slow requests (using configured threshold) | |
| slow_requests = df.filter(pl.col("time_taken") > self.slow_threshold).height | |
| # Response time statistics | |
| time_stats = df.select([ | |
| pl.col("time_taken").min().alias("min_time"), | |
| pl.col("time_taken").max().alias("max_time"), | |
| pl.col("time_taken").mean().alias("avg_time"), | |
| ]).to_dicts()[0] | |
| # Peak RPS | |
| rps_data = self._calculate_peak_rps(df) | |
| return { | |
| "service_name": self.service_name, | |
| "total_requests_before": total_before, | |
| "excluded_requests": excluded, | |
| "excluded_401": count_401, | |
| "total_requests_after": total_after, | |
| "errors": errors, | |
| "slow_requests": slow_requests, | |
| "slow_threshold": self.slow_threshold, | |
| "min_time_ms": int(time_stats["min_time"]) if time_stats["min_time"] else 0, | |
| "max_time_ms": int(time_stats["max_time"]) if time_stats["max_time"] else 0, | |
| "avg_time_ms": int(time_stats["avg_time"]) if time_stats["avg_time"] else 0, | |
| "peak_rps": rps_data["peak_rps"], | |
| "peak_timestamp": rps_data["peak_timestamp"], | |
| } | |
| def _calculate_peak_rps(self, df: pl.DataFrame) -> Dict: | |
| """Calculate peak requests per second.""" | |
| if df.height == 0: | |
| return {"peak_rps": 0, "peak_timestamp": None} | |
| # Group by second and count requests | |
| rps = df.group_by("timestamp").agg([ | |
| pl.count().alias("count") | |
| ]).sort("count", descending=True) | |
| if rps.height == 0: | |
| return {"peak_rps": 0, "peak_timestamp": None} | |
| peak_row = rps.row(0, named=True) | |
| return { | |
| "peak_rps": peak_row["count"], | |
| "peak_timestamp": str(peak_row["timestamp"]) | |
| } | |
| def get_top_methods(self, n: int = 5) -> List[Dict]: | |
| """Get top N methods by request count.""" | |
| df = self.filter_logs() | |
| if df.height == 0: | |
| return [] | |
| # Group by method name | |
| method_stats = df.group_by("method_name").agg([ | |
| pl.count().alias("count"), | |
| pl.col("time_taken").mean().alias("avg_time"), | |
| pl.col("sc_status").filter( | |
| (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) | |
| ).count().alias("errors") | |
| ]).sort("count", descending=True).limit(n) | |
| return method_stats.to_dicts() | |
| def get_error_breakdown(self) -> List[Dict]: | |
| """Get breakdown of errors by status code.""" | |
| df = self.filter_logs() | |
| errors = df.filter( | |
| (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) | |
| ) | |
| if errors.height == 0: | |
| return [] | |
| error_stats = errors.group_by("sc_status").agg([ | |
| pl.count().alias("count") | |
| ]).sort("count", descending=True) | |
| return error_stats.to_dicts() | |
| def get_errors_by_method(self, n: int = 10) -> List[Dict]: | |
| """ | |
| Get detailed error breakdown by method with full context. | |
| Shows which methods are causing the most errors. | |
| Args: | |
| n: Number of top error-prone methods to return | |
| Returns: | |
| List of dicts with method, error count, total calls, and error rate | |
| """ | |
| df = self.filter_logs() | |
| if df.height == 0: | |
| return [] | |
| # Get error counts and total counts per full method path | |
| method_errors = df.group_by("full_method_path").agg([ | |
| pl.count().alias("total_calls"), | |
| pl.col("sc_status").filter( | |
| (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) | |
| ).count().alias("error_count"), | |
| pl.col("sc_status").filter( | |
| (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) | |
| ).first().alias("most_common_error_status"), | |
| pl.col("time_taken").mean().alias("avg_response_time_ms"), | |
| ]).filter( | |
| pl.col("error_count") > 0 | |
| ).with_columns([ | |
| (pl.col("error_count") * 100.0 / pl.col("total_calls")).alias("error_rate_percent") | |
| ]).sort("error_count", descending=True).limit(n) | |
| return method_errors.to_dicts() | |
| def get_error_details(self, method_path: str = None, limit: int = 100) -> List[Dict]: | |
| """ | |
| Get detailed error logs with full context for debugging. | |
| Args: | |
| method_path: Optional filter for specific method path | |
| limit: Maximum number of error records to return | |
| Returns: | |
| List of error records with timestamp, method, status, response time, etc. | |
| """ | |
| df = self.filter_logs() | |
| # Filter for errors only | |
| errors = df.filter( | |
| (pl.col("sc_status") != 200) & (pl.col("sc_status") != 401) | |
| ) | |
| # Apply method filter if specified | |
| if method_path: | |
| errors = errors.filter(pl.col("full_method_path") == method_path) | |
| if errors.height == 0: | |
| return [] | |
| # Select relevant columns for debugging | |
| error_details = errors.select([ | |
| "timestamp", | |
| "service_name", | |
| "full_method_path", | |
| "method_name", | |
| "sc_status", | |
| "sc_substatus", | |
| "sc_win32_status", | |
| "time_taken", | |
| "c_ip", | |
| "cs_uri_query" | |
| ]).sort("timestamp", descending=True).limit(limit) | |
| return error_details.to_dicts() | |
| def get_response_time_distribution(self, buckets: List[int] = None) -> Dict: | |
| """Get response time distribution by buckets.""" | |
| if buckets is None: | |
| buckets = [0, 50, 100, 200, 500, 1000, 3000, 10000] | |
| df = self.filter_logs() | |
| if df.height == 0: | |
| return {} | |
| distribution = {} | |
| for i in range(len(buckets) - 1): | |
| lower = buckets[i] | |
| upper = buckets[i + 1] | |
| count = df.filter( | |
| (pl.col("time_taken") >= lower) & (pl.col("time_taken") < upper) | |
| ).height | |
| distribution[f"{lower}-{upper}ms"] = count | |
| # Add bucket for values above last threshold | |
| count = df.filter(pl.col("time_taken") >= buckets[-1]).height | |
| distribution[f">{buckets[-1]}ms"] = count | |
| return distribution | |
| def get_rps_timeline(self, interval: str = "1m") -> pl.DataFrame: | |
| """Get RPS over time with specified interval.""" | |
| df = self.filter_logs() | |
| if df.height == 0: | |
| return pl.DataFrame() | |
| # Group by time interval | |
| timeline = df.group_by_dynamic("timestamp", every=interval).agg([ | |
| pl.count().alias("requests") | |
| ]).sort("timestamp") | |
| return timeline | |
| def analyze_multiple_logs(log_files: List[str]) -> Tuple[Dict, List[Dict]]: | |
| """ | |
| Analyze multiple log files and generate combined report. | |
| Args: | |
| log_files: List of log file paths | |
| Returns: | |
| Tuple of (combined_stats, individual_stats) | |
| """ | |
| individual_stats = [] | |
| for log_file in log_files: | |
| parser = IISLogParser(log_file) | |
| df = parser.parse() | |
| analyzer = LogAnalyzer(df, parser.service_name) | |
| stats = { | |
| "summary": analyzer.get_summary_stats(), | |
| "top_methods": analyzer.get_top_methods(), | |
| "error_breakdown": analyzer.get_error_breakdown(), | |
| "errors_by_method": analyzer.get_errors_by_method(n=10), | |
| "response_time_dist": analyzer.get_response_time_distribution(), | |
| "analyzer": analyzer, | |
| } | |
| individual_stats.append(stats) | |
| # Calculate combined statistics | |
| combined = { | |
| "total_requests_before": sum(s["summary"]["total_requests_before"] for s in individual_stats), | |
| "excluded_requests": sum(s["summary"]["excluded_requests"] for s in individual_stats), | |
| "excluded_401": sum(s["summary"]["excluded_401"] for s in individual_stats), | |
| "total_requests_after": sum(s["summary"]["total_requests_after"] for s in individual_stats), | |
| "errors": sum(s["summary"]["errors"] for s in individual_stats), | |
| "slow_requests": sum(s["summary"]["slow_requests"] for s in individual_stats), | |
| } | |
| return combined, individual_stats | |