Spaces:
Sleeping
Sleeping
| import logging | |
| import re, os | |
| import pandas as pd | |
| import json, csv, openpyxl | |
| from typing import Dict, List, Any, Tuple | |
| from collections import Counter | |
| # =============================== | |
| # 0. ERROR CATCHER | |
| # =============================== | |
| def exc(func, fallback=None): | |
| """ | |
| Thực thi func() an toàn. | |
| Nếu lỗi → log exception (e) và trả về fallback. | |
| """ | |
| try: | |
| return func() | |
| except Exception as e: | |
| logging.warning(e) | |
| return fallback | |
| # =============================== | |
| # 1. JSON | |
| # =============================== | |
| def read_json(path: str) -> Any: | |
| if not os.path.exists(path): | |
| return [] | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def write_json(data: Any, path: str, indent: int = 2) -> None: | |
| dir_path = os.path.dirname(path) | |
| if dir_path: os.makedirs(dir_path, exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=indent, ensure_ascii=False) | |
| def insert_json(data: Any, path: str, indent: int = 2): | |
| dir_path = os.path.dirname(path) | |
| if dir_path: os.makedirs(dir_path, exist_ok=True) | |
| with open(path, 'a', encoding='utf-8') as f: | |
| json.dump(data, f, indent=indent, ensure_ascii=False) | |
| # =============================== | |
| # 2. JSONL | |
| # =============================== | |
| def read_jsonl(path: str) -> List[dict]: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return [json.loads(line) for line in f] | |
| def write_jsonl(data: List[dict], path: str) -> None: | |
| dir_path = os.path.dirname(path) | |
| if dir_path: os.makedirs(dir_path, exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| for item in data: | |
| f.write(json.dumps(item, ensure_ascii=False) + "\n") | |
| def insert_jsonl(data: List[dict], path: str): | |
| dir_path = os.path.dirname(path) | |
| if dir_path: os.makedirs(dir_path, exist_ok=True) | |
| with open(path, 'a', encoding='utf-8') as f: | |
| for item in data: | |
| f.write(json.dumps(item, ensure_ascii=False) + '\n') | |
| # =============================== | |
| # 3. CSV | |
| # =============================== | |
| def read_csv(path: str) -> List[dict]: | |
| with open(path, "r", encoding="utf-8", newline="") as f: | |
| return list(csv.DictReader(f)) | |
| def write_csv(data: List[dict], path: str) -> None: | |
| dir_path = os.path.dirname(path) | |
| if dir_path: os.makedirs(dir_path, exist_ok=True) | |
| if not data: | |
| return | |
| with open(path, "w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=data[0].keys()) | |
| writer.writeheader() | |
| writer.writerows(data) | |
| # =============================== | |
| # 4.XLSX | |
| # =============================== | |
| def read_xlsx(path: str, sheet_name: str = None) -> List[dict]: | |
| wb = openpyxl.load_workbook(path) | |
| sheet = wb[sheet_name] if sheet_name else wb.active | |
| rows = list(sheet.values) | |
| headers = rows[0] | |
| return [dict(zip(headers, row)) for row in rows[1:]] | |
| def write_xlsx(data: List[dict], path: str, sheet_name: str = "Sheet1") -> None: | |
| dir_path = os.path.dirname(path) | |
| if dir_path: os.makedirs(dir_path, exist_ok=True) | |
| wb = openpyxl.Workbook() | |
| ws = wb.active | |
| ws.title = sheet_name | |
| if not data: | |
| wb.save(path) | |
| return | |
| ws.append(list(data[0].keys())) | |
| for row in data: | |
| ws.append(list(row.values())) | |
| wb.save(path) | |
| def convert_to_xlsx(json_path, xlsx_path): | |
| os.makedirs(os.path.dirname(xlsx_path), exist_ok=True) | |
| """Chuyển file JSON (dạng list các object) hoặc JSONL sang XLSX.""" | |
| try: | |
| if json_path.endswith('.jsonl'): | |
| df = pd.read_json(json_path, lines=True) | |
| else: | |
| df = pd.read_json(json_path) | |
| column_order = ["category", "sub_category", "url", "title", "description", "content", "date", "words"] | |
| df = df[[col for col in column_order if col in df.columns]] | |
| df.to_excel(xlsx_path, index=False, engine='openpyxl') | |
| print(f"-> Đã xuất thành công file Excel tại {xlsx_path}") | |
| except (FileNotFoundError, ValueError) as e: | |
| print(f"-> Không có dữ liệu hoặc lỗi khi chuyển sang Excel: {e}") | |
| # =============================== | |
| # 5. Convert | |
| # =============================== | |
| def json_convert(data: Any, pretty: bool = True) -> str: | |
| return json.dumps(data, ensure_ascii=False, indent=2 if pretty else None) | |
| def jsonl_convert(data: List[dict]) -> str: | |
| return "\n".join(json.dumps(item, ensure_ascii=False) for item in data) | |
| # =============================== | |
| # 6. Sort | |
| # =============================== | |
| def sort_records(data: List[dict], keys: List[str]) -> List[dict]: | |
| """Sắp xếp theo nhiều keys với ưu tiên từ trái sang phải""" | |
| return sorted(data, key=lambda x: tuple(x.get(k) for k in keys)) | |
| # =============================== | |
| # 7. Most Common | |
| # =============================== | |
| def most_common(values): | |
| if not values: | |
| return None | |
| return Counter(values).most_common(1)[0][0] | |
| DEFAULT_NON_KEEP_PATTERN = re.compile(r"[^\w\s\(\)\.\,\;\:\-–]", flags=re.UNICODE) | |
| def preprocess_text( | |
| text: Any, | |
| non_keep_pattern: re.Pattern = DEFAULT_NON_KEEP_PATTERN, | |
| max_chars_per_text: int | None = None, | |
| ) -> Any: | |
| """ | |
| Làm sạch chuỗi: strip, bỏ ký tự không mong muốn, rút gọn khoảng trắng. | |
| Vẫn cho phép list/dict đi qua để hàm preprocess_data xử lý đệ quy. | |
| """ | |
| if isinstance(text, list): | |
| # Truyền tiếp đủ tham số khi gọi đệ quy | |
| return [preprocess_text(t, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text) for t in text] | |
| if isinstance(text, str): | |
| s = text.strip() # <-- sửa từ s = strip() | |
| s = non_keep_pattern.sub("", s) | |
| s = re.sub(r"[ ]{2,}", " ", s) | |
| if max_chars_per_text is not None and len(s) > max_chars_per_text: | |
| s = s[: max_chars_per_text] | |
| return s | |
| return text | |
| def preprocess_data( | |
| data: Any, | |
| non_keep_pattern: re.Pattern = DEFAULT_NON_KEEP_PATTERN, | |
| max_chars_per_text: int | None = None, | |
| ) -> Any: | |
| """Đệ quy tiền xử lý lên toàn bộ JSON.""" | |
| if isinstance(data, dict): | |
| return { | |
| k: preprocess_data(v, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text) | |
| for k, v in data.items() | |
| } | |
| if isinstance(data, list): | |
| return [ | |
| preprocess_data(x, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text) | |
| for x in data | |
| ] | |
| return preprocess_text(data, non_keep_pattern=non_keep_pattern, max_chars_per_text=max_chars_per_text) | |
| # =============================== | |
| # 9. Json | |
| # =============================== | |
| def flatten_json( | |
| data: Any, | |
| prefix: str = "", | |
| flatten_mode: str = "split", # mặc định: tách từng phần tử list | |
| join_sep: str = "\n", # mặc định: xuống dòng khi join list | |
| ) -> Dict[str, Any]: | |
| """ | |
| Làm phẳng JSON với xử lý list theo flatten_mode. | |
| - "split": mỗi phần tử list tạo key riêng: a.b[0], a.b[1], ... | |
| Nếu phần tử là dict/list → tiếp tục flatten (được lồng chỉ số). | |
| - "join": join list về 1 chuỗi (join_sep). (Phần tử không phải str sẽ str()) | |
| - "keep": giữ nguyên list (chỉ gán 1 key cho toàn list). | |
| Trả về: dict key->giá trị (lá). | |
| """ | |
| flat: Dict[str, Any] = {} | |
| def _recur(node: Any, pfx: str) -> None: | |
| if isinstance(node, dict): | |
| for k, v in node.items(): | |
| new_pfx = f"{pfx}{k}" if not pfx else f"{pfx}.{k}" | |
| _recur(v, new_pfx) | |
| return | |
| if isinstance(node, list): | |
| if flatten_mode == "split": | |
| for i, item in enumerate(node): | |
| idx_key = f"{pfx}[{i}]" | |
| _recur(item, idx_key) | |
| elif flatten_mode == "join": | |
| joined = join_sep.join(str(x).strip() for x in node if str(x).strip()) | |
| flat[pfx] = joined | |
| else: # "keep" | |
| flat[pfx] = node | |
| return | |
| # lá: số/chuỗi/None/... | |
| flat[pfx] = node | |
| _recur(data, prefix.rstrip(".")) | |
| return flat | |
| def deduplicates_by_key(pairs: List[Tuple[str, str]]) -> List[Tuple[str, str]]: | |
| """ | |
| Lọc trùng theo value trong cùng key (hoặc base_key). | |
| Giữ lại **lần xuất hiện đầu tiên** của mỗi (key, text), | |
| loại bỏ những dòng có cùng key và cùng text lặp lại sau đó. | |
| Args: | |
| pairs: Danh sách (key, text) sau khi flatten. | |
| Returns: | |
| Danh sách (key, text) đã loại bỏ trùng lặp. | |
| """ | |
| seen_per_key: Dict[str, set] = {} | |
| filtered: List[Tuple[str, str]] = [] | |
| for key, text in pairs: | |
| text_norm = text.strip() | |
| if not text_norm: | |
| continue | |
| base_key = re.sub(r"\[\d+\]", "", key) | |
| if base_key not in seen_per_key: | |
| seen_per_key[base_key] = set() | |
| if text_norm in seen_per_key[base_key]: | |
| continue | |
| seen_per_key[base_key].add(text_norm) | |
| filtered.append((key, text_norm)) | |
| return filtered | |