import re import json import pandas as pd import networkx as nx from typing import List, Dict, Optional COLUMN_MAP = { "id": ["id"], "question": ["Pertanyaan"], "context": ["Konteks"], "bloom_level": ["Level Kognitif"], "topic": [], "concept": [], } def parse_bloom(value) -> int: if isinstance(value, int) and 1 <= value <= 6: return value s = str(value).strip().lower() m = re.match(r"c(\d)", s) if m: return min(max(int(m.group(1)), 1), 6) keywords = { "mengingat": 1, "memahami": 2, "mengaplikasikan": 3, "menerapkan": 3, "menganalisis": 4, "mengevaluasi": 5, "mencipta": 6, "membuat": 6, } for kw, lvl in keywords.items(): if kw in s: return lvl return 1 def _resolve_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]: df_lower = {c.lower(): c for c in df.columns} for cand in candidates: if cand.lower() in df_lower: return df_lower[cand.lower()] return None def _normalize_text(text: str) -> str: return re.sub(r"\s+", " ", text.lower().strip()) def _print_corpus_stats(docs: List[Dict]): from collections import Counter bloom_dist = Counter(d["bloom_level"] for d in docs) topic_dist = Counter(d["topic"] for d in docs if d["topic"]) print(f" Bloom dist : { {f'C{k}': v for k, v in sorted(bloom_dist.items())} }") print(f" Topics : {len(topic_dist)} unique, top-5: {dict(topic_dist.most_common(5))}") def load_corpus(csv_path: str) -> List[Dict]: try: df = pd.read_csv(csv_path, sep=";").fillna("") if len(df.columns) < 2: df = pd.read_csv(csv_path, sep=",").fillna("") except Exception: df = pd.read_csv(csv_path, sep=",").fillna("") col = {key: _resolve_column(df, cands) for key, cands in COLUMN_MAP.items()} missing = [ k for k, v in col.items() if v is None and k not in ("topic", "concept") ] if missing: raise ValueError( f"Kolom tidak ditemukan: {missing}\n" f"Kolom tersedia: {list(df.columns)}\n" f"Sesuaikan COLUMN_MAP di data_loader.py." ) documents = [] for idx, row in df.iterrows(): doc_id = str(row[col["id"]]).strip() if col["id"] else str(idx) documents.append({ "id": doc_id, "question": str(row[col["question"]]).strip(), "context": str(row[col["context"]]).strip() if col["context"] else "", "bloom_level": parse_bloom(row[col["bloom_level"]]), "topic": str(row[col["topic"]]).strip() if col["topic"] else "", "concept": str(row[col["concept"]]).strip() if col["concept"] else "", }) print(f"Loaded {len(documents)} documents from '{csv_path}'") _print_corpus_stats(documents) return documents def load_graph(graph_json_path: str) -> nx.DiGraph: with open(graph_json_path) as f: data = json.load(f) G = nx.DiGraph() for node in data.get("nodes", []): attrs = dict(node) if "bloom_level" in attrs: attrs["bloom_level"] = parse_bloom(attrs["bloom_level"]) G.add_node(node["id"], **attrs) raw_edges = data.get("edges") or data.get("links") or [] for edge in raw_edges: G.add_edge(edge["source"], edge["target"], relation=edge.get("relation", "")) from collections import Counter rel_counts = Counter(d["relation"] for _, _, d in G.edges(data=True)) type_counts = Counter(attrs.get("type", "?") for _, attrs in G.nodes(data=True)) print(f"Loaded KG: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges") print(f" Node types : {dict(type_counts)}") print(f" Edge types : {dict(rel_counts)}") return G def link_corpus_to_graph(documents: List[Dict], graph: nx.DiGraph) -> Dict[str, str]: """Build mapping: doc_id -> graph question node id.""" q_nodes = { nid: attrs for nid, attrs in graph.nodes(data=True) if attrs.get("type") == "question" } q_by_label = { _normalize_text(attrs["label"]): nid for nid, attrs in q_nodes.items() } mapping = {} direct_hits = 0 text_hits = 0 for doc in documents: doc_id = doc["id"] if doc_id in q_nodes: mapping[doc_id] = doc_id direct_hits += 1 continue norm_q = _normalize_text(doc["question"]) if norm_q in q_by_label: mapping[doc_id] = q_by_label[norm_q] text_hits += 1 unlinked = len(documents) - direct_hits - text_hits print(f"Corpus-Graph linking: {direct_hits} direct, {text_hits} text match, {unlinked} unlinked") return mapping def enrich_with_graph_topics( documents: List[Dict], graph: nx.DiGraph, doc_graph_map: Dict[str, str], ) -> List[Dict]: """Fill empty topic & concept fields from graph parent nodes.""" q_to_parent: Dict[str, Dict] = {} for src, tgt, data in graph.edges(data=True): if data.get("relation") != "has_question": continue src_attrs = graph.nodes[src] src_type = src_attrs.get("type", "") src_label = src_attrs.get("label", "") if tgt not in q_to_parent: q_to_parent[tgt] = {"topic": "", "concept": ""} if src_type == "topic_coarse": q_to_parent[tgt]["topic"] = src_label elif src_type in ("topic_fine", "concept"): q_to_parent[tgt]["concept"] = src_label enriched = 0 for doc in documents: if doc.get("topic") and doc.get("concept"): continue graph_id = doc_graph_map.get(doc["id"]) if not graph_id or graph_id not in q_to_parent: continue parent = q_to_parent[graph_id] if not doc.get("topic"): doc["topic"] = parent["topic"] if not doc.get("concept"): doc["concept"] = parent["concept"] enriched += 1 print(f"Enriched {enriched} documents with topic/concept from graph") return documents