File size: 3,074 Bytes
e978f7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import duckdb
from dateutil.parser import parse

from config.log_definitions import log_definitions


class LogParser:
    """
    A class that takes a log file path and a log definition (for example from log_definitions),
    then parses the file and returns a DuckDB relation containing the extracted data.
    """

    def __init__(self, file_path, log_type, db_path=":memory:"):
        self.file_path = file_path
        self.log_definition = log_definitions[log_type]
        self.log_separator = self.log_definition["sep"]
        self.conn = duckdb.connect(db_path)
        self.table_name = f"log_{log_type}"

    def parse_line(self, line):
        """Parse a line from the log file using the provided definition."""
        # Start by replacing [ and ] with spaces
        line = line.replace("[", " ").replace("]", " ")

        # Get separator from log definition, default to whitespace if not specified
        tokens = line.strip().split(self.log_separator)

        # Ignore the line if it does not contain enough tokens
        if len(tokens) < len(self.log_definition["fields"]):
            return None

        entry = {}
        for field in self.log_definition["fields"]:
            pos = field["pos"]

            # Extract the value according to the indicated position
            if isinstance(pos, slice):
                value = " ".join(tokens[pos])
            else:
                try:
                    value = tokens[pos]
                except IndexError:
                    value = None

            # Type conversion
            if "type" in field:
                typ = field["type"]
                if typ == "datetime":
                    # Try to parse the date with dateutil.parser
                    try:
                        value = parse(value)
                    except ValueError:
                        # If the date is not parsable, try several formats
                        value = None
                elif typ == "direction":
                    value = "download" if value == "o" else "upload"
                else:
                    try:
                        value = typ(value)
                    except Exception:
                        pass

            entry[field["name"]] = value

        return entry

    def parse_file(self):
        """Iterate through the entire log file and return a DuckDB relation containing the parsed entries."""
        data = []
        with open(self.file_path, "r") as f:
            for line in f:
                parsed = self.parse_line(line)
                if parsed:
                    data.append(parsed)

        if not data:
            return None

        # Create a DuckDB relation directly from the list of dictionaries
        relation = self.conn.execute("SELECT * FROM ?", [data]).fetch_arrow_table()

        # Create a table from the relation
        self.conn.execute(
            f"CREATE OR REPLACE TABLE {self.table_name} AS SELECT * FROM relation"
        )

        # Return a DuckDB relation
        return self.conn.table(self.table_name)