Spaces:
Running
Running
| import duckdb | |
| from dateutil.parser import parse | |
| from config.log_definitions import log_definitions | |
| class LogParser: | |
| """ | |
| A class that takes a log file path and a log definition (for example from log_definitions), | |
| then parses the file and returns a DuckDB relation containing the extracted data. | |
| """ | |
| def __init__(self, file_path, log_type, db_path=":memory:"): | |
| self.file_path = file_path | |
| self.log_definition = log_definitions[log_type] | |
| self.log_separator = self.log_definition["sep"] | |
| self.conn = duckdb.connect(db_path) | |
| self.table_name = f"log_{log_type}" | |
| def parse_line(self, line): | |
| """Parse a line from the log file using the provided definition.""" | |
| # Start by replacing [ and ] with spaces | |
| line = line.replace("[", " ").replace("]", " ") | |
| # Get separator from log definition, default to whitespace if not specified | |
| tokens = line.strip().split(self.log_separator) | |
| # Ignore the line if it does not contain enough tokens | |
| if len(tokens) < len(self.log_definition["fields"]): | |
| return None | |
| entry = {} | |
| for field in self.log_definition["fields"]: | |
| pos = field["pos"] | |
| # Extract the value according to the indicated position | |
| if isinstance(pos, slice): | |
| value = " ".join(tokens[pos]) | |
| else: | |
| try: | |
| value = tokens[pos] | |
| except IndexError: | |
| value = None | |
| # Type conversion | |
| if "type" in field: | |
| typ = field["type"] | |
| if typ == "datetime": | |
| # Try to parse the date with dateutil.parser | |
| try: | |
| value = parse(value) | |
| except ValueError: | |
| # If the date is not parsable, try several formats | |
| value = None | |
| elif typ == "direction": | |
| value = "download" if value == "o" else "upload" | |
| else: | |
| try: | |
| value = typ(value) | |
| except Exception: | |
| pass | |
| entry[field["name"]] = value | |
| return entry | |
| def parse_file(self): | |
| """Iterate through the entire log file and return a DuckDB relation containing the parsed entries.""" | |
| data = [] | |
| with open(self.file_path, "r") as f: | |
| for line in f: | |
| parsed = self.parse_line(line) | |
| if parsed: | |
| data.append(parsed) | |
| if not data: | |
| return None | |
| # Create a DuckDB relation directly from the list of dictionaries | |
| relation = self.conn.execute("SELECT * FROM ?", [data]).fetch_arrow_table() | |
| # Create a table from the relation | |
| self.conn.execute( | |
| f"CREATE OR REPLACE TABLE {self.table_name} AS SELECT * FROM relation" | |
| ) | |
| # Return a DuckDB relation | |
| return self.conn.table(self.table_name) | |