File size: 6,174 Bytes
e00e744
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import json
import os
import re
from pathlib import Path
from typing import Dict, Any, Optional, Union
from datetime import datetime


def clean_json_string(text: str) -> str:
    """
    Clean a string that might contain markdown formatting and extract just the JSON content.
    """
    # Remove markdown code blocks
    text = re.sub(r'```json\s*\n?', '', text)
    text = re.sub(r'```\s*\n?', '', text)
    
    # Remove leading/trailing whitespace and newlines
    text = text.strip()
    
    return text


def dump_json_record(filename: str, record: Union[Dict[str, Any], str], lineage_extraction_dumps_folder: str = "lineage_extraction_dumps") -> Union[Dict[str, Any], str]:
    """
    Create a file under the lineagedb folder and dump a JSON record as a new line.
    
    Args:
        filename (str): The name of the file (without extension, .json will be added)
        record (Union[Dict[str, Any], str]): The JSON record to dump (can be dict or string)
        lineage_extraction_dumps_folder (str): The folder name for lineage database files (default: "lineage_extraction_dumps")
    
    Returns:
        Union[Dict[str, Any], str]: The processed record that was dumped to the file
    
    Example:
        dumped_data = dump_json_record("user_queries", {"query": "SELECT * FROM users"})
        dumped_data = dump_json_record("outputs", "This is a string output")
    """
    # Create the lineagedb folder if it doesn't exist
    # folder_path = Path(lineage_extraction_dumps_folder)
    # folder_path.mkdir(exist_ok=True)
    
    # Create the full file path with .json extension
    # file_path = folder_path / f"{filename}.json"
    
    # Handle different input types
    if isinstance(record, str):
        # Clean the string first to remove any markdown formatting
        cleaned_record = clean_json_string(record)
        
        # Try to parse as JSON first, then re-serialize properly
        try:
            # Parse the string as JSON to get the actual data
            parsed_data = json.loads(cleaned_record)
            # Re-serialize without escaping newlines and with proper formatting
            json_line = json.dumps(parsed_data, ensure_ascii=False, separators=(',', ':'))
            processed_record = parsed_data
        except json.JSONDecodeError:
            # If it's not valid JSON, treat it as a plain string
            json_line = json.dumps(cleaned_record, ensure_ascii=False)
            processed_record = cleaned_record
            
    elif isinstance(record, dict):
        # If it's already a dict, convert to JSON string
        json_line = json.dumps(record, ensure_ascii=False, separators=(',', ':'))
        processed_record = record
    else:
        # For other types, convert to string and then to JSON
        cleaned_record = clean_json_string(str(record))
        try:
            parsed_data = json.loads(cleaned_record)
            json_line = json.dumps(parsed_data, ensure_ascii=False, separators=(',', ':'))
            processed_record = parsed_data
        except json.JSONDecodeError:
            json_line = json.dumps(cleaned_record, ensure_ascii=False)
            processed_record = cleaned_record
    
    # Append the JSON record as a new line to the file
    # with open(file_path, "a", encoding="utf-8") as f:
    #     f.write(json_line + "\n")
    
    return processed_record


def read_json_records(filename: str, lineagedb_folder: str = "lineagedb") -> list:
    """
    Read all JSON records from a file in the lineagedb folder.
    
    Args:
        filename (str): The name of the file (without extension)
        lineagedb_folder (str): The folder name for lineage database files (default: "lineagedb")
    
    Returns:
        list: List of dictionaries containing the JSON records
    """
    folder_path = Path(lineagedb_folder)
    file_path = folder_path / f"{filename}.json"
    
    records = []
    if file_path.exists():
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if line:  # Skip empty lines
                    try:
                        record = json.loads(line)
                        records.append(record)
                    except json.JSONDecodeError as e:
                        print(f"Warning: Could not parse JSON line: {line[:50]}... Error: {e}")
    
    return records


def clear_json_file(filename: str, lineagedb_folder: str = "lineagedb") -> None:
    """
    Clear all records from a JSON file in the lineagedb folder.
    
    Args:
        filename (str): The name of the file (without extension)
        lineagedb_folder (str): The folder name for lineage database files (default: "lineagedb")
    """
    folder_path = Path(lineagedb_folder)
    file_path = folder_path / f"{filename}.json"
    
    if file_path.exists():
        file_path.unlink()  # Delete the file
        print(f"Cleared file: {file_path}")


def get_file_stats(filename: str, lineagedb_folder: str = "lineagedb") -> Dict[str, Any]:
    """
    Get statistics about a JSON file in the lineagedb folder.
    
    Args:
        filename (str): The name of the file (without extension)
        lineagedb_folder (str): The folder name for lineage database files (default: "lineagedb")
    
    Returns:
        Dict[str, Any]: Statistics about the file including record count, file size, etc.
    """
    folder_path = Path(lineagedb_folder)
    file_path = folder_path / f"{filename}.json"
    
    stats = {
        "filename": f"{filename}.json",
        "exists": file_path.exists(),
        "record_count": 0,
        "file_size_bytes": 0,
        "created_time": None,
        "modified_time": None
    }
    
    if file_path.exists():
        stats["file_size_bytes"] = file_path.stat().st_size
        stats["created_time"] = datetime.fromtimestamp(file_path.stat().st_ctime).isoformat()
        stats["modified_time"] = datetime.fromtimestamp(file_path.stat().st_mtime).isoformat()
        
        # Count records
        with open(file_path, "r", encoding="utf-8") as f:
            stats["record_count"] = sum(1 for line in f if line.strip())
    
    return stats