berangerthomas commited on
Commit
cc08730
·
1 Parent(s): e978f7c

Add log2polars

Browse files
Files changed (1) hide show
  1. utils/log2polars.py +74 -0
utils/log2polars.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import polars as pl
2
+ from dateutil.parser import parse
3
+
4
+ from config.log_definitions import log_definitions
5
+
6
+
7
+ class LogParser:
8
+ """
9
+ A class that takes a log file path and a log definition (for example from log_definitions),
10
+ then parses the file and returns a polars DataFrame containing the extracted data.
11
+ """
12
+
13
+ def __init__(self, file_path, log_type):
14
+ self.file_path = file_path
15
+ self.log_definition = log_definitions[log_type]
16
+ self.log_separator = self.log_definition["sep"]
17
+
18
+ def parse_line(self, line):
19
+ """Parse a line from the log file using the provided definition."""
20
+ # Start by replacing [ and ] with spaces
21
+ line = line.replace("[", " ").replace("]", " ")
22
+
23
+ # Get separator from log definition, default to whitespace if not specified
24
+ tokens = line.strip().split(self.log_separator)
25
+
26
+ # Ignore the line if it does not contain enough tokens
27
+ if len(tokens) < len(self.log_definition["fields"]):
28
+ return None
29
+
30
+ entry = {}
31
+ for field in self.log_definition["fields"]:
32
+ pos = field["pos"]
33
+
34
+ # Extract the value according to the indicated position
35
+ if isinstance(pos, slice):
36
+ value = " ".join(tokens[pos])
37
+ else:
38
+ try:
39
+ value = tokens[pos]
40
+ except IndexError:
41
+ value = None
42
+
43
+ # Type conversion
44
+ if "type" in field:
45
+ typ = field["type"]
46
+ if typ == "datetime":
47
+ # Try to parse the date with dateutil.parser
48
+ try:
49
+ value = parse(value)
50
+ except ValueError:
51
+ # If the date is not parsable, try several formats
52
+ value = None
53
+ elif typ == "direction":
54
+ value = "download" if value == "o" else "upload"
55
+ else:
56
+ try:
57
+ value = typ(value)
58
+ except Exception:
59
+ pass
60
+
61
+ entry[field["name"]] = value
62
+
63
+ return entry
64
+
65
+ def parse_file(self):
66
+ """Iterate through the entire log file and return a polars DataFrame containing the parsed entries."""
67
+ data = []
68
+ with open(self.file_path, "r") as f:
69
+ for line in f:
70
+ parsed = self.parse_line(line)
71
+ if parsed:
72
+ data.append(parsed)
73
+
74
+ return pl.DataFrame(data)