umyunsang's picture
Upload folder using huggingface_hub
9e65b56 verified
"""AI Hub ๋ฐ์ดํ„ฐ์…‹ ํŒŒ์„œ ๋ชจ๋“ˆ.
๊ฐ ํŒŒ์„œ๋Š” ๋‹จ์ผ JSON ํŒŒ์ผ์„ ํŒŒ์‹ฑํ•˜์—ฌ ํ•™์Šต ๋ ˆ์ฝ”๋“œ ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
๋ฐ˜ํ™˜ ํ˜•์‹:
{
"question": str,
"answer": str,
"source": str,
"category": str,
"metadata": dict,
}
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
def _load_json(filepath: Path) -> Any:
with open(filepath, encoding="utf-8") as f:
return json.load(f)
class GukripParser:
"""71852 ๊ตญ๋ฆฝ์•„์‹œ์•„๋ฌธํ™”์ „๋‹น ํŒŒ์„œ.
consulting_content์˜ '์ƒ๋‹ด์›:' ๋ฐœํ™”๋ฅผ ์ถ”์ถœํ•˜์—ฌ ๋‹ต๋ณ€์œผ๋กœ ์‚ฌ์šฉํ•˜๊ณ ,
instructions[0].data[0].instruction์„ ์งˆ๋ฌธ์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
"""
def parse(self, filepath: Path) -> list[dict]:
data = _load_json(filepath)
if isinstance(data, list):
records = []
for item in data:
records.extend(self._parse_item(item))
return records
return self._parse_item(data)
def _parse_item(self, item: dict) -> list[dict]:
content: str = item.get("consulting_content", "")
source_id: str = item.get("source_id", "")
consulting_date: str = item.get("consulting_date", "")
category: str = item.get("consulting_category", "")
# ์ƒ๋‹ด์› ๋ฐœํ™” ์ถ”์ถœ
agent_turns = []
for line in content.split("\n"):
line = line.strip()
if line.startswith("์ƒ๋‹ด์›:"):
turn_text = line[len("์ƒ๋‹ด์›:") :].strip()
if turn_text:
agent_turns.append(turn_text)
if not agent_turns:
return []
answer = " ".join(agent_turns)
# instruction์—์„œ ์งˆ๋ฌธ ์ถ”์ถœ
instructions = item.get("instructions", [])
if not instructions:
return []
data_list = instructions[0].get("data", [])
if not data_list:
return []
question = data_list[0].get("instruction", "").strip()
if not question:
return []
return [
{
"question": question,
"answer": answer,
"source": "71852_๊ตญ๋ฆฝ์•„์‹œ์•„๋ฌธํ™”์ „๋‹น",
"category": category,
"metadata": {
"source_id": source_id,
"consulting_date": consulting_date,
},
}
]
class GovQAParser:
"""71852 ์ค‘์•™/์ง€๋ฐฉํ–‰์ •๊ธฐ๊ด€ ํŒŒ์„œ.
consulting_content์—์„œ Q/A ํ˜•์‹์„ ํŒŒ์‹ฑํ•˜์—ฌ ๊ณต์‹ ์ •๋ถ€ ๋‹ต๋ณ€์„ ์ถ”์ถœํ•œ๋‹ค.
๋ณด์กฐ ์งˆ๋ฌธ(instructions.data[*].instruction)์€ ๋ณ„๋„ ๋ ˆ์ฝ”๋“œ๋กœ ์ƒ์„ฑํ•œ๋‹ค.
"""
# A ๊ตฌ๋ถ„์ž ํŒจํ„ด: "\nA :" ๋˜๋Š” "\nA:"
_A_SEP = re.compile(r"\nA\s*:")
def parse(self, filepath: Path) -> list[dict]:
data = _load_json(filepath)
if isinstance(data, list):
records = []
for item in data:
records.extend(self._parse_item(item))
return records
return self._parse_item(data)
def _parse_item(self, item: dict) -> list[dict]:
content: str = item.get("consulting_content", "")
source_str: str = item.get("source", "")
source_id: str = item.get("source_id", "")
consulting_date: str = item.get("consulting_date", "")
category: str = item.get("consulting_category", "")
# A ๋ถ€๋ถ„ ๋ถ„๋ฆฌ
parts = self._A_SEP.split(content, maxsplit=1)
if len(parts) < 2:
return []
q_part, a_part = parts[0], parts[1].strip()
if not a_part:
return []
# Q ๋ถ€๋ถ„์—์„œ ์งˆ๋ฌธ ์ถ”์ถœ
question = self._extract_question(q_part)
if not question:
return []
records = [
{
"question": question,
"answer": a_part,
"source": "71852_์ค‘์•™ํ–‰์ •๊ธฐ๊ด€",
"category": category,
"metadata": {
"source_id": source_id,
"consulting_date": consulting_date,
"org": source_str,
},
}
]
# ๋ณด์กฐ ์งˆ๋ฌธ(instructions.data[*].instruction)์œผ๋กœ ์ถ”๊ฐ€ ๋ ˆ์ฝ”๋“œ ์ƒ์„ฑ
instructions = item.get("instructions", [])
if instructions:
for instr_item in instructions[0].get("data", []):
sub_q = instr_item.get("instruction", "").strip()
if sub_q and sub_q != question:
records.append(
{
"question": sub_q,
"answer": a_part,
"source": "71852_์ค‘์•™ํ–‰์ •๊ธฐ๊ด€",
"category": category,
"metadata": {
"source_id": source_id,
"consulting_date": consulting_date,
"org": source_str,
"question_type": "auxiliary",
},
}
)
return records
@staticmethod
def _extract_question(q_part: str) -> str:
"""Q ๋ธ”๋ก์—์„œ ์งˆ๋ฌธ ํ…์ŠคํŠธ๋ฅผ ์ถ”์ถœํ•œ๋‹ค."""
# "Q :" ๋˜๋Š” "Q:" ์ดํ›„ ํ…์ŠคํŠธ ์ถ”์ถœ
q_match = re.search(r"\nQ\s*:(.*?)(?=\n\n|\Z)", q_part, re.DOTALL)
if q_match:
return q_match.group(1).strip()
# fallback: "์ œ๋ชฉ :" ์ดํ›„ ํ…์ŠคํŠธ
title_match = re.search(r"์ œ๋ชฉ\s*:\s*(.+)", q_part)
if title_match:
return title_match.group(1).strip()
return q_part.strip()
class GovQALocalParser(GovQAParser):
"""71852 ์ง€๋ฐฉํ–‰์ •๊ธฐ๊ด€ ํŒŒ์„œ โ€” GovQAParser์™€ ๋™์ผํ•œ ๋กœ์ง, source ๋ ˆ์ด๋ธ”๋งŒ ๋‹ค๋ฆ„."""
def _parse_item(self, item: dict) -> list[dict]:
records = super()._parse_item(item)
for r in records:
r["source"] = "71852_์ง€๋ฐฉํ–‰์ •๊ธฐ๊ด€"
return records
class AdminLawParser:
"""71847 ํ–‰์ •๋ฒ• ํŒŒ์„œ.
label.input์„ ์งˆ๋ฌธ, label.output์„ ๋‹ต๋ณ€์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
๊ฒฐ์ •๋ก€(TL_๊ฒฐ์ •๋ก€_QA)์™€ ๋ฒ•๋ น(TL_๋ฒ•๋ น_QA) ๋ชจ๋‘ ๋™์ผ ๊ตฌ์กฐ.
"""
def __init__(self, source_label: str = "71847_๊ฒฐ์ •๋ก€"):
self.source_label = source_label
def parse(self, filepath: Path) -> list[dict]:
data = _load_json(filepath)
if isinstance(data, list):
records = []
for item in data:
records.extend(self._parse_item(item))
return records
return self._parse_item(data)
def _parse_item(self, item: dict) -> list[dict]:
label = item.get("label", {})
question = label.get("input", "").strip()
answer = label.get("output", "").strip()
if not question or not answer:
return []
info = item.get("info", {})
case_name = info.get("caseName", info.get("title", ""))
category = info.get("ministry", info.get("caseCode", ""))
return [
{
"question": question,
"answer": answer,
"source": self.source_label,
"category": category,
"metadata": {
"case_name": case_name,
"law_class": info.get("lawClass", ""),
},
}
]