File size: 6,462 Bytes
dbe2c62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from typing import Dict, List, Any, Optional, Iterable

# --------- A. Tiện ích cơ bản ---------

def _ordered_unique_chunk_ids(reranked: List[Dict[str, Any]]) -> List[int]:
    seen, ordered = set(), []
    for r in reranked:
        for cid in r.get("chunk_ids", []):
            if isinstance(cid, (int, str)) and str(cid).isdigit():
                cid = int(cid)
                if cid not in seen:
                    seen.add(cid)
                    ordered.append(cid)
    return ordered


def _filter_fields_recursive(obj: Any, drop_lower: set) -> Any:
    """Loại bỏ các field có tên xuất hiện trong drop_lower (case-insensitive) trên toàn cấu trúc."""
    if isinstance(obj, dict):
        return {
            k: _filter_fields_recursive(v, drop_lower)
            for k, v in obj.items()
            if k.lower() not in drop_lower
        }
    if isinstance(obj, list):
        return [_filter_fields_recursive(x, drop_lower) for x in obj]
    return obj


def _iter_values_no_keys(obj: Any) -> Iterable[str]:
    """Duyệt đệ quy, chỉ yield GIÁ TRỊ (bỏ key), split theo '\n' nếu là chuỗi."""
    if isinstance(obj, dict):
        for v in obj.values():
            yield from _iter_values_no_keys(v)
    elif isinstance(obj, list):
        for item in obj:
            yield from _iter_values_no_keys(item)
    elif isinstance(obj, str):
        for line in obj.splitlines():
            yield line
    else:
        yield str(obj)


def _get_by_path(obj: Any, path: str) -> Any:
    """
    Lấy giá trị theo path kiểu 'A.B.C'.
    - Nếu gặp list trong quá trình đi xuống → thu thập giá trị từ từng phần tử (map-collect).
    - Nếu path không tồn tại → trả về None.
    """
    parts = path.split(".")
    def _step(o, idx=0):
        if idx == len(parts):
            return o
        key = parts[idx]
        if isinstance(o, dict):
            if key not in o:
                return None
            return _step(o[key], idx + 1)
        if isinstance(o, list):
            collected = []
            for it in o:
                collected.append(_step(it, idx))
            # gộp phẳng các None
            flat = []
            for v in collected:
                if v is None:
                    continue
                if isinstance(v, list):
                    flat.extend(v)
                else:
                    flat.append(v)
            return flat
        return None
    return _step(obj, 0)


# --------- B. Các hàm chính ---------

def extract_chunks_from_rerank_flexible(
    reranked_results: List[Dict[str, Any]],
    SegmentDict: List[Dict[str, Any]],
    n_chunks: Optional[int] = None,
    drop_fields: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
    """
    - Lấy chunk theo thứ tự từ reranked.
    - Giới hạn số lượng chunk gốc trả về bằng n_chunks (nếu có).
    - Áp dụng bỏ trường theo drop_fields (toàn bộ cấu trúc).
    - Kết quả: [{"chunk_id": int, "data": <json đã lọc>}]
    """
    if not reranked_results:
        return []

    ordered_ids = _ordered_unique_chunk_ids(reranked_results)
    if n_chunks is not None:
        ordered_ids = ordered_ids[:int(n_chunks)]

    drop_lower = set(x.lower() for x in (drop_fields or []))

    out = []
    seen = set()
    for cid in ordered_ids:
        if cid in seen:
            continue
        seen.add(cid)
        if 1 <= cid <= len(SegmentDict):
            data = SegmentDict[cid - 1]
            filtered = _filter_fields_recursive(data, drop_lower) if drop_lower else data
            out.append({"chunk_id": cid, "data": filtered})
    return out


def collect_chunk_text(chunks: List[Dict[str, Any]]) -> str:
    """Biến toàn bộ danh sách chunk thành text (bỏ key, split dòng)."""
    if not chunks:
        return "(Không có chunk nào)"

    lines: List[str] = []
    for ch in chunks:
        for line in _iter_values_no_keys(ch["data"]):
            lines.append(line)
        lines.append("")
    return "\n".join(lines).strip()


def extract_fields_for_each_chunk(
    chunks: List[Dict[str, Any]],
    fields: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
    """
    - Với mỗi chunk gốc, lấy những TRƯỜNG được truyền vào (hỗ trợ path 'A.B.C').
    - Nếu fields=None → lấy TẤT CẢ top-level fields còn lại trong chunk['data'].
    - Trả về list theo từng chunk: {"chunk_id": ..., "fields": {...}}
    """
    results = []
    for ch in chunks:
        data = ch["data"]
        if not isinstance(data, dict):
            results.append({"chunk_id": ch["chunk_id"], "fields": data})
            continue

        if fields is None:
            payload = {k: v for k, v in data.items()}
        else:
            payload = {}
            for f in fields:
                payload[f] = _get_by_path(data, f)
        results.append({"chunk_id": ch["chunk_id"], "fields": payload})
    return results


def process_chunks_pipeline(
    reranked_results: List[Dict[str, Any]],
    SegmentDict: List[Dict[str, Any]],
    drop_fields: Optional[List[str]] = None,     # Trường bị bỏ qua (áp dụng toàn bộ)
    fields: Optional[List[str]] = None,          # Trường muốn trích xuất (None → tất cả top-level)
    n_chunks: Optional[int] = None               # Số lượng chunk gốc & text (nếu None → tất cả)
) -> Dict[str, Any]:
    """
    Trả về:
      - chunks_json: đúng số lượng chunk gốc (đã drop_fields)
      - chunks_text: text từ cùng số lượng chunk (bỏ key, split dòng)
      - extracted_fields: các trường được chỉ định cho mỗi chunk
    """
    # 1️⃣ Lấy chunk gốc (JSON)
    chunks_json = extract_chunks_from_rerank_flexible(
        reranked_results=reranked_results,
        SegmentDict=SegmentDict,
        n_chunks=n_chunks,
        drop_fields=drop_fields,
    )

    # 2️⃣ Biến thành text (cùng số lượng chunk)
    chunks_text = collect_chunk_text(chunks_json)

    # 3️⃣ Lấy các trường cụ thể
    extracted_fields = extract_fields_for_each_chunk(chunks_json, fields=fields)

    return {
        "chunks_json": chunks_json,          # JSON chuẩn
        "chunks_text": chunks_text,          # text của cùng số lượng chunk
        "extracted_fields": extracted_fields # field được chọn
    }