doc-ai-api / Libraries /Common_MyUtils.py
LongK171's picture
Add all
dbe2c62
import logging
import re, os
import pandas as pd
import json, csv, openpyxl
from typing import Dict, List, Any, Tuple
from collections import Counter
# ===============================
# 0. ERROR CATCHER
# ===============================
def exc(func, fallback=None):
"""
Thực thi func() an toàn.
Nếu lỗi → log exception (e) và trả về fallback.
"""
try:
return func()
except Exception as e:
logging.warning(e)
return fallback
# ===============================
# 1. JSON
# ===============================
def read_json(path: str) -> Any:
if not os.path.exists(path):
return []
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def write_json(data: Any, path: str, indent: int = 2) -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=indent, ensure_ascii=False)
def insert_json(data: Any, path: str, indent: int = 2):
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, 'a', encoding='utf-8') as f:
json.dump(data, f, indent=indent, ensure_ascii=False)
# ===============================
# 2. JSONL
# ===============================
def read_jsonl(path: str) -> List[dict]:
with open(path, "r", encoding="utf-8") as f:
return [json.loads(line) for line in f]
def write_jsonl(data: List[dict], path: str) -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
def insert_jsonl(data: List[dict], path: str):
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
with open(path, 'a', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
# ===============================
# 3. CSV
# ===============================
def read_csv(path: str) -> List[dict]:
with open(path, "r", encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def write_csv(data: List[dict], path: str) -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
if not data:
return
with open(path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
# ===============================
# 4.XLSX
# ===============================
def read_xlsx(path: str, sheet_name: str = None) -> List[dict]:
wb = openpyxl.load_workbook(path)
sheet = wb[sheet_name] if sheet_name else wb.active
rows = list(sheet.values)
headers = rows[0]
return [dict(zip(headers, row)) for row in rows[1:]]
def write_xlsx(data: List[dict], path: str, sheet_name: str = "Sheet1") -> None:
dir_path = os.path.dirname(path)
if dir_path: os.makedirs(dir_path, exist_ok=True)
wb = openpyxl.Workbook()
ws = wb.active
ws.title = sheet_name
if not data:
wb.save(path)
return
ws.append(list(data[0].keys()))
for row in data:
ws.append(list(row.values()))
wb.save(path)
def convert_to_xlsx(json_path, xlsx_path):
os.makedirs(os.path.dirname(xlsx_path), exist_ok=True)
"""Chuyển file JSON (dạng list các object) hoặc JSONL sang XLSX."""
try:
if json_path.endswith('.jsonl'):
df = pd.read_json(json_path, lines=True)
else:
df = pd.read_json(json_path)
column_order = ["category", "sub_category", "url", "title", "description", "content", "date", "words"]
df = df[[col for col in column_order if col in df.columns]]
df.to_excel(xlsx_path, index=False, engine='openpyxl')
print(f"-> Đã xuất thành công file Excel tại {xlsx_path}")
except (FileNotFoundError, ValueError) as e:
print(f"-> Không có dữ liệu hoặc lỗi khi chuyển sang Excel: {e}")
# ===============================
# 5. Convert
# ===============================
def json_convert(data: Any, pretty: bool = True) -> str:
return json.dumps(data, ensure_ascii=False, indent=2 if pretty else None)
def jsonl_convert(data: List[dict]) -> str:
return "\n".join(json.dumps(item, ensure_ascii=False) for item in data)
# ===============================
# 6. Sort
# ===============================
def sort_records(data: List[dict], keys: List[str]) -> List[dict]:
"""Sắp xếp theo nhiều keys với ưu tiên từ trái sang phải"""
return sorted(data, key=lambda x: tuple(x.get(k) for k in keys))
# ===============================
# 7. Most Common
# ===============================
def most_common(values):
if not values:
return None
return Counter(values).most_common(1)[0][0]
DEFAULT_NON_KEEP_PATTERN = re.compile(r"[^\w\s\(\)\.\,\;\:\-–]", flags=re.UNICODE)
def preprocess_text(
text: Any,
non_keep_pattern: re.Pattern = DEFAULT_NON_KEEP_PATTERN,
max_chars_per_text: int | None = None,
) -> Any:
"""
Làm sạch chuỗi: strip, bỏ ký tự không mong muốn, rút gọn khoảng trắng.
Vẫn cho phép list/dict đi qua để hàm preprocess_data xử lý đệ quy.
"""
if isinstance(text, list):
# Truyền tiếp đủ tham số khi gọi đệ quy
return [preprocess_text(t, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text) for t in text]
if isinstance(text, str):
s = text.strip() # <-- sửa từ s = strip()
s = non_keep_pattern.sub("", s)
s = re.sub(r"[ ]{2,}", " ", s)
if max_chars_per_text is not None and len(s) > max_chars_per_text:
s = s[: max_chars_per_text]
return s
return text
def preprocess_data(
data: Any,
non_keep_pattern: re.Pattern = DEFAULT_NON_KEEP_PATTERN,
max_chars_per_text: int | None = None,
) -> Any:
"""Đệ quy tiền xử lý lên toàn bộ JSON."""
if isinstance(data, dict):
return {
k: preprocess_data(v, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text)
for k, v in data.items()
}
if isinstance(data, list):
return [
preprocess_data(x, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text)
for x in data
]
return preprocess_text(data, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text)
# ===============================
# 9. Json
# ===============================
def flatten_json(
data: Any,
prefix: str = "",
flatten_mode: str = "split", # mặc định: tách từng phần tử list
join_sep: str = "\n", # mặc định: xuống dòng khi join list
) -> Dict[str, Any]:
"""
Làm phẳng JSON với xử lý list theo flatten_mode.
- "split": mỗi phần tử list tạo key riêng: a.b[0], a.b[1], ...
Nếu phần tử là dict/list → tiếp tục flatten (được lồng chỉ số).
- "join": join list về 1 chuỗi (join_sep). (Phần tử không phải str sẽ str())
- "keep": giữ nguyên list (chỉ gán 1 key cho toàn list).
Trả về: dict key->giá trị (lá).
"""
flat: Dict[str, Any] = {}
def _recur(node: Any, pfx: str) -> None:
if isinstance(node, dict):
for k, v in node.items():
new_pfx = f"{pfx}{k}" if not pfx else f"{pfx}.{k}"
_recur(v, new_pfx)
return
if isinstance(node, list):
if flatten_mode == "split":
for i, item in enumerate(node):
idx_key = f"{pfx}[{i}]"
_recur(item, idx_key)
elif flatten_mode == "join":
joined = join_sep.join(str(x).strip() for x in node if str(x).strip())
flat[pfx] = joined
else: # "keep"
flat[pfx] = node
return
# lá: số/chuỗi/None/...
flat[pfx] = node
_recur(data, prefix.rstrip("."))
return flat
def deduplicates_by_key(pairs: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
"""
Lọc trùng theo value trong cùng key (hoặc base_key).
Giữ lại **lần xuất hiện đầu tiên** của mỗi (key, text),
loại bỏ những dòng có cùng key và cùng text lặp lại sau đó.
Args:
pairs: Danh sách (key, text) sau khi flatten.
Returns:
Danh sách (key, text) đã loại bỏ trùng lặp.
"""
seen_per_key: Dict[str, set] = {}
filtered: List[Tuple[str, str]] = []
for key, text in pairs:
text_norm = text.strip()
if not text_norm:
continue
base_key = re.sub(r"\[\d+\]", "", key)
if base_key not in seen_per_key:
seen_per_key[base_key] = set()
if text_norm in seen_per_key[base_key]:
continue
seen_per_key[base_key].add(text_norm)
filtered.append((key, text_norm))
return filtered