berangerthomas commited on
Commit
e978f7c
·
1 Parent(s): 50a6a65

Add log2duckdb

Browse files
Files changed (1) hide show
  1. utils/log2duckdb.py +88 -0
utils/log2duckdb.py ADDED
@@ -0,0 +1,88 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import duckdb
2
+ from dateutil.parser import parse
3
+
4
+ from config.log_definitions import log_definitions
5
+
6
+
7
+ class LogParser:
8
+ """
9
+ A class that takes a log file path and a log definition (for example from log_definitions),
10
+ then parses the file and returns a DuckDB relation containing the extracted data.
11
+ """
12
+
13
+ def __init__(self, file_path, log_type, db_path=":memory:"):
14
+ self.file_path = file_path
15
+ self.log_definition = log_definitions[log_type]
16
+ self.log_separator = self.log_definition["sep"]
17
+ self.conn = duckdb.connect(db_path)
18
+ self.table_name = f"log_{log_type}"
19
+
20
+ def parse_line(self, line):
21
+ """Parse a line from the log file using the provided definition."""
22
+ # Start by replacing [ and ] with spaces
23
+ line = line.replace("[", " ").replace("]", " ")
24
+
25
+ # Get separator from log definition, default to whitespace if not specified
26
+ tokens = line.strip().split(self.log_separator)
27
+
28
+ # Ignore the line if it does not contain enough tokens
29
+ if len(tokens) < len(self.log_definition["fields"]):
30
+ return None
31
+
32
+ entry = {}
33
+ for field in self.log_definition["fields"]:
34
+ pos = field["pos"]
35
+
36
+ # Extract the value according to the indicated position
37
+ if isinstance(pos, slice):
38
+ value = " ".join(tokens[pos])
39
+ else:
40
+ try:
41
+ value = tokens[pos]
42
+ except IndexError:
43
+ value = None
44
+
45
+ # Type conversion
46
+ if "type" in field:
47
+ typ = field["type"]
48
+ if typ == "datetime":
49
+ # Try to parse the date with dateutil.parser
50
+ try:
51
+ value = parse(value)
52
+ except ValueError:
53
+ # If the date is not parsable, try several formats
54
+ value = None
55
+ elif typ == "direction":
56
+ value = "download" if value == "o" else "upload"
57
+ else:
58
+ try:
59
+ value = typ(value)
60
+ except Exception:
61
+ pass
62
+
63
+ entry[field["name"]] = value
64
+
65
+ return entry
66
+
67
+ def parse_file(self):
68
+ """Iterate through the entire log file and return a DuckDB relation containing the parsed entries."""
69
+ data = []
70
+ with open(self.file_path, "r") as f:
71
+ for line in f:
72
+ parsed = self.parse_line(line)
73
+ if parsed:
74
+ data.append(parsed)
75
+
76
+ if not data:
77
+ return None
78
+
79
+ # Create a DuckDB relation directly from the list of dictionaries
80
+ relation = self.conn.execute("SELECT * FROM ?", [data]).fetch_arrow_table()
81
+
82
+ # Create a table from the relation
83
+ self.conn.execute(
84
+ f"CREATE OR REPLACE TABLE {self.table_name} AS SELECT * FROM relation"
85
+ )
86
+
87
+ # Return a DuckDB relation
88
+ return self.conn.table(self.table_name)