CG-IR / src /data_loader.py
Lifunn's picture
Upload 10 files
9342074 verified
Raw
History Blame Contribute Delete
6.15 kB
import re
import json
import pandas as pd
import networkx as nx
from typing import List, Dict, Optional
COLUMN_MAP = {
"id": ["id"],
"question": ["Pertanyaan"],
"context": ["Konteks"],
"bloom_level": ["Level Kognitif"],
"topic": [],
"concept": [],
}
def parse_bloom(value) -> int:
if isinstance(value, int) and 1 <= value <= 6:
return value
s = str(value).strip().lower()
m = re.match(r"c(\d)", s)
if m:
return min(max(int(m.group(1)), 1), 6)
keywords = {
"mengingat": 1, "memahami": 2, "mengaplikasikan": 3,
"menerapkan": 3, "menganalisis": 4, "mengevaluasi": 5,
"mencipta": 6, "membuat": 6,
}
for kw, lvl in keywords.items():
if kw in s:
return lvl
return 1
def _resolve_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
df_lower = {c.lower(): c for c in df.columns}
for cand in candidates:
if cand.lower() in df_lower:
return df_lower[cand.lower()]
return None
def _normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", text.lower().strip())
def _print_corpus_stats(docs: List[Dict]):
from collections import Counter
bloom_dist = Counter(d["bloom_level"] for d in docs)
topic_dist = Counter(d["topic"] for d in docs if d["topic"])
print(f" Bloom dist : { {f'C{k}': v for k, v in sorted(bloom_dist.items())} }")
print(f" Topics : {len(topic_dist)} unique, top-5: {dict(topic_dist.most_common(5))}")
def load_corpus(csv_path: str) -> List[Dict]:
try:
df = pd.read_csv(csv_path, sep=";").fillna("")
if len(df.columns) < 2:
df = pd.read_csv(csv_path, sep=",").fillna("")
except Exception:
df = pd.read_csv(csv_path, sep=",").fillna("")
col = {key: _resolve_column(df, cands) for key, cands in COLUMN_MAP.items()}
missing = [
k for k, v in col.items()
if v is None and k not in ("topic", "concept")
]
if missing:
raise ValueError(
f"Kolom tidak ditemukan: {missing}\n"
f"Kolom tersedia: {list(df.columns)}\n"
f"Sesuaikan COLUMN_MAP di data_loader.py."
)
documents = []
for idx, row in df.iterrows():
doc_id = str(row[col["id"]]).strip() if col["id"] else str(idx)
documents.append({
"id": doc_id,
"question": str(row[col["question"]]).strip(),
"context": str(row[col["context"]]).strip() if col["context"] else "",
"bloom_level": parse_bloom(row[col["bloom_level"]]),
"topic": str(row[col["topic"]]).strip() if col["topic"] else "",
"concept": str(row[col["concept"]]).strip() if col["concept"] else "",
})
print(f"Loaded {len(documents)} documents from '{csv_path}'")
_print_corpus_stats(documents)
return documents
def load_graph(graph_json_path: str) -> nx.DiGraph:
with open(graph_json_path) as f:
data = json.load(f)
G = nx.DiGraph()
for node in data.get("nodes", []):
attrs = dict(node)
if "bloom_level" in attrs:
attrs["bloom_level"] = parse_bloom(attrs["bloom_level"])
G.add_node(node["id"], **attrs)
raw_edges = data.get("edges") or data.get("links") or []
for edge in raw_edges:
G.add_edge(edge["source"], edge["target"], relation=edge.get("relation", ""))
from collections import Counter
rel_counts = Counter(d["relation"] for _, _, d in G.edges(data=True))
type_counts = Counter(attrs.get("type", "?") for _, attrs in G.nodes(data=True))
print(f"Loaded KG: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")
print(f" Node types : {dict(type_counts)}")
print(f" Edge types : {dict(rel_counts)}")
return G
def link_corpus_to_graph(documents: List[Dict], graph: nx.DiGraph) -> Dict[str, str]:
"""Build mapping: doc_id -> graph question node id."""
q_nodes = {
nid: attrs
for nid, attrs in graph.nodes(data=True)
if attrs.get("type") == "question"
}
q_by_label = {
_normalize_text(attrs["label"]): nid
for nid, attrs in q_nodes.items()
}
mapping = {}
direct_hits = 0
text_hits = 0
for doc in documents:
doc_id = doc["id"]
if doc_id in q_nodes:
mapping[doc_id] = doc_id
direct_hits += 1
continue
norm_q = _normalize_text(doc["question"])
if norm_q in q_by_label:
mapping[doc_id] = q_by_label[norm_q]
text_hits += 1
unlinked = len(documents) - direct_hits - text_hits
print(f"Corpus-Graph linking: {direct_hits} direct, {text_hits} text match, {unlinked} unlinked")
return mapping
def enrich_with_graph_topics(
documents: List[Dict],
graph: nx.DiGraph,
doc_graph_map: Dict[str, str],
) -> List[Dict]:
"""Fill empty topic & concept fields from graph parent nodes."""
q_to_parent: Dict[str, Dict] = {}
for src, tgt, data in graph.edges(data=True):
if data.get("relation") != "has_question":
continue
src_attrs = graph.nodes[src]
src_type = src_attrs.get("type", "")
src_label = src_attrs.get("label", "")
if tgt not in q_to_parent:
q_to_parent[tgt] = {"topic": "", "concept": ""}
if src_type == "topic_coarse":
q_to_parent[tgt]["topic"] = src_label
elif src_type in ("topic_fine", "concept"):
q_to_parent[tgt]["concept"] = src_label
enriched = 0
for doc in documents:
if doc.get("topic") and doc.get("concept"):
continue
graph_id = doc_graph_map.get(doc["id"])
if not graph_id or graph_id not in q_to_parent:
continue
parent = q_to_parent[graph_id]
if not doc.get("topic"):
doc["topic"] = parent["topic"]
if not doc.get("concept"):
doc["concept"] = parent["concept"]
enriched += 1
print(f"Enriched {enriched} documents with topic/concept from graph")
return documents