Spaces:
Sleeping
Sleeping
File size: 9,272 Bytes
dbe2c62 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 | import logging
import re, os
import pandas as pd
import json, csv, openpyxl
from typing import Dict, List, Any, Tuple
from collections import Counter
# ===============================
# 0. ERROR CATCHER
# ===============================
def exc(func, fallback=None):
"""
Thực thi func() an toàn.
Nếu lỗi → log exception (e) và trả về fallback.
"""
try:
return func()
except Exception as e:
logging.warning(e)
return fallback
# ===============================
# 1. JSON
# ===============================
def read_json(path: str) -> Any:
if not os.path.exists(path):
return []
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def write_json(data: Any, path: str, indent: int = 2) -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=indent, ensure_ascii=False)
def insert_json(data: Any, path: str, indent: int = 2):
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, 'a', encoding='utf-8') as f:
json.dump(data, f, indent=indent, ensure_ascii=False)
# ===============================
# 2. JSONL
# ===============================
def read_jsonl(path: str) -> List[dict]:
with open(path, "r", encoding="utf-8") as f:
return [json.loads(line) for line in f]
def write_jsonl(data: List[dict], path: str) -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
def insert_jsonl(data: List[dict], path: str):
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, 'a', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
# ===============================
# 3. CSV
# ===============================
def read_csv(path: str) -> List[dict]:
with open(path, "r", encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def write_csv(data: List[dict], path: str) -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
if not data:
return
with open(path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
# ===============================
# 4.XLSX
# ===============================
def read_xlsx(path: str, sheet_name: str = None) -> List[dict]:
wb = openpyxl.load_workbook(path)
sheet = wb[sheet_name] if sheet_name else wb.active
rows = list(sheet.values)
headers = rows[0]
return [dict(zip(headers, row)) for row in rows[1:]]
def write_xlsx(data: List[dict], path: str, sheet_name: str = "Sheet1") -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
wb = openpyxl.Workbook()
ws = wb.active
ws.title = sheet_name
if not data:
wb.save(path)
return
ws.append(list(data[0].keys()))
for row in data:
ws.append(list(row.values()))
wb.save(path)
def convert_to_xlsx(json_path, xlsx_path):
os.makedirs(os.path.dirname(xlsx_path), exist_ok=True)
"""Chuyển file JSON (dạng list các object) hoặc JSONL sang XLSX."""
try:
if json_path.endswith('.jsonl'):
df = pd.read_json(json_path, lines=True)
else:
df = pd.read_json(json_path)
column_order = ["category", "sub_category", "url", "title", "description", "content", "date", "words"]
df = df[[col for col in column_order if col in df.columns]]
df.to_excel(xlsx_path, index=False, engine='openpyxl')
print(f"-> Đã xuất thành công file Excel tại {xlsx_path}")
except (FileNotFoundError, ValueError) as e:
print(f"-> Không có dữ liệu hoặc lỗi khi chuyển sang Excel: {e}")
# ===============================
# 5. Convert
# ===============================
def json_convert(data: Any, pretty: bool = True) -> str:
return json.dumps(data, ensure_ascii=False, indent=2 if pretty else None)
def jsonl_convert(data: List[dict]) -> str:
return "\n".join(json.dumps(item, ensure_ascii=False) for item in data)
# ===============================
# 6. Sort
# ===============================
def sort_records(data: List[dict], keys: List[str]) -> List[dict]:
"""Sắp xếp theo nhiều keys với ưu tiên từ trái sang phải"""
return sorted(data, key=lambda x: tuple(x.get(k) for k in keys))
# ===============================
# 7. Most Common
# ===============================
def most_common(values):
if not values:
return None
return Counter(values).most_common(1)[0][0]
DEFAULT_NON_KEEP_PATTERN = re.compile(r"[^\w\s\(\)\.\,\;\:\-–]", flags=re.UNICODE)
def preprocess_text(
text: Any,
non_keep_pattern: re.Pattern = DEFAULT_NON_KEEP_PATTERN,
max_chars_per_text: int | None = None,
) -> Any:
"""
Làm sạch chuỗi: strip, bỏ ký tự không mong muốn, rút gọn khoảng trắng.
Vẫn cho phép list/dict đi qua để hàm preprocess_data xử lý đệ quy.
"""
if isinstance(text, list):
# Truyền tiếp đủ tham số khi gọi đệ quy
return [preprocess_text(t, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text) for t in text]
if isinstance(text, str):
s = text.strip() # <-- sửa từ s = strip()
s = non_keep_pattern.sub("", s)
s = re.sub(r"[ ]{2,}", " ", s)
if max_chars_per_text is not None and len(s) > max_chars_per_text:
s = s[: max_chars_per_text]
return s
return text
def preprocess_data(
data: Any,
non_keep_pattern: re.Pattern = DEFAULT_NON_KEEP_PATTERN,
max_chars_per_text: int | None = None,
) -> Any:
"""Đệ quy tiền xử lý lên toàn bộ JSON."""
if isinstance(data, dict):
return {
k: preprocess_data(v, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text)
for k, v in data.items()
}
if isinstance(data, list):
return [
preprocess_data(x, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text)
for x in data
]
return preprocess_text(data, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text)
# ===============================
# 9. Json
# ===============================
def flatten_json(
data: Any,
prefix: str = "",
flatten_mode: str = "split", # mặc định: tách từng phần tử list
join_sep: str = "\n", # mặc định: xuống dòng khi join list
) -> Dict[str, Any]:
"""
Làm phẳng JSON với xử lý list theo flatten_mode.
- "split": mỗi phần tử list tạo key riêng: a.b[0], a.b[1], ...
Nếu phần tử là dict/list → tiếp tục flatten (được lồng chỉ số).
- "join": join list về 1 chuỗi (join_sep). (Phần tử không phải str sẽ str())
- "keep": giữ nguyên list (chỉ gán 1 key cho toàn list).
Trả về: dict key->giá trị (lá).
"""
flat: Dict[str, Any] = {}
def _recur(node: Any, pfx: str) -> None:
if isinstance(node, dict):
for k, v in node.items():
new_pfx = f"{pfx}{k}" if not pfx else f"{pfx}.{k}"
_recur(v, new_pfx)
return
if isinstance(node, list):
if flatten_mode == "split":
for i, item in enumerate(node):
idx_key = f"{pfx}[{i}]"
_recur(item, idx_key)
elif flatten_mode == "join":
joined = join_sep.join(str(x).strip() for x in node if str(x).strip())
flat[pfx] = joined
else: # "keep"
flat[pfx] = node
return
# lá: số/chuỗi/None/...
flat[pfx] = node
_recur(data, prefix.rstrip("."))
return flat
def deduplicates_by_key(pairs: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
"""
Lọc trùng theo value trong cùng key (hoặc base_key).
Giữ lại **lần xuất hiện đầu tiên** của mỗi (key, text),
loại bỏ những dòng có cùng key và cùng text lặp lại sau đó.
Args:
pairs: Danh sách (key, text) sau khi flatten.
Returns:
Danh sách (key, text) đã loại bỏ trùng lặp.
"""
seen_per_key: Dict[str, set] = {}
filtered: List[Tuple[str, str]] = []
for key, text in pairs:
text_norm = text.strip()
if not text_norm:
continue
base_key = re.sub(r"\[\d+\]", "", key)
if base_key not in seen_per_key:
seen_per_key[base_key] = set()
if text_norm in seen_per_key[base_key]:
continue
seen_per_key[base_key].add(text_norm)
filtered.append((key, text_norm))
return filtered
|