import duckdb from dateutil.parser import parse from config.log_definitions import log_definitions class LogParser: """ A class that takes a log file path and a log definition (for example from log_definitions), then parses the file and returns a DuckDB relation containing the extracted data. """ def __init__(self, file_path, log_type, db_path=":memory:"): self.file_path = file_path self.log_definition = log_definitions[log_type] self.log_separator = self.log_definition["sep"] self.conn = duckdb.connect(db_path) self.table_name = f"log_{log_type}" def parse_line(self, line): """Parse a line from the log file using the provided definition.""" # Start by replacing [ and ] with spaces line = line.replace("[", " ").replace("]", " ") # Get separator from log definition, default to whitespace if not specified tokens = line.strip().split(self.log_separator) # Ignore the line if it does not contain enough tokens if len(tokens) < len(self.log_definition["fields"]): return None entry = {} for field in self.log_definition["fields"]: pos = field["pos"] # Extract the value according to the indicated position if isinstance(pos, slice): value = " ".join(tokens[pos]) else: try: value = tokens[pos] except IndexError: value = None # Type conversion if "type" in field: typ = field["type"] if typ == "datetime": # Try to parse the date with dateutil.parser try: value = parse(value) except ValueError: # If the date is not parsable, try several formats value = None elif typ == "direction": value = "download" if value == "o" else "upload" else: try: value = typ(value) except Exception: pass entry[field["name"]] = value return entry def parse_file(self): """Iterate through the entire log file and return a DuckDB relation containing the parsed entries.""" data = [] with open(self.file_path, "r") as f: for line in f: parsed = self.parse_line(line) if parsed: data.append(parsed) if not data: return None # Create a DuckDB relation directly from the list of dictionaries relation = self.conn.execute("SELECT * FROM ?", [data]).fetch_arrow_table() # Create a table from the relation self.conn.execute( f"CREATE OR REPLACE TABLE {self.table_name} AS SELECT * FROM relation" ) # Return a DuckDB relation return self.conn.table(self.table_name)