Spaces:
Sleeping
Sleeping
File size: 3,074 Bytes
e978f7c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
import duckdb
from dateutil.parser import parse
from config.log_definitions import log_definitions
class LogParser:
"""
A class that takes a log file path and a log definition (for example from log_definitions),
then parses the file and returns a DuckDB relation containing the extracted data.
"""
def __init__(self, file_path, log_type, db_path=":memory:"):
self.file_path = file_path
self.log_definition = log_definitions[log_type]
self.log_separator = self.log_definition["sep"]
self.conn = duckdb.connect(db_path)
self.table_name = f"log_{log_type}"
def parse_line(self, line):
"""Parse a line from the log file using the provided definition."""
# Start by replacing [ and ] with spaces
line = line.replace("[", " ").replace("]", " ")
# Get separator from log definition, default to whitespace if not specified
tokens = line.strip().split(self.log_separator)
# Ignore the line if it does not contain enough tokens
if len(tokens) < len(self.log_definition["fields"]):
return None
entry = {}
for field in self.log_definition["fields"]:
pos = field["pos"]
# Extract the value according to the indicated position
if isinstance(pos, slice):
value = " ".join(tokens[pos])
else:
try:
value = tokens[pos]
except IndexError:
value = None
# Type conversion
if "type" in field:
typ = field["type"]
if typ == "datetime":
# Try to parse the date with dateutil.parser
try:
value = parse(value)
except ValueError:
# If the date is not parsable, try several formats
value = None
elif typ == "direction":
value = "download" if value == "o" else "upload"
else:
try:
value = typ(value)
except Exception:
pass
entry[field["name"]] = value
return entry
def parse_file(self):
"""Iterate through the entire log file and return a DuckDB relation containing the parsed entries."""
data = []
with open(self.file_path, "r") as f:
for line in f:
parsed = self.parse_line(line)
if parsed:
data.append(parsed)
if not data:
return None
# Create a DuckDB relation directly from the list of dictionaries
relation = self.conn.execute("SELECT * FROM ?", [data]).fetch_arrow_table()
# Create a table from the relation
self.conn.execute(
f"CREATE OR REPLACE TABLE {self.table_name} AS SELECT * FROM relation"
)
# Return a DuckDB relation
return self.conn.table(self.table_name)
|