Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| from urllib.parse import urlparse, urlencode, parse_qs, urlunparse | |
| from .config import ( | |
| BASE_MODEL_EMBED, EMBED_MODEL_PATH, | |
| CATEGORY_PATTERN, SUFFIX_PATTERN, | |
| CATEGORY_PREFIX, CATEGORY_KEYWORDS, | |
| CHUNK_SIZE, CHUNK_OVERLAP, | |
| ) | |
| _KOREAN_TOKEN_PATTERN = re.compile(r"[๊ฐ-ํฃ]+") | |
| _DOMAIN_SPLIT_HINTS = ( | |
| "ํด์ธ", "๋ด์ฌ", "๋ด์ฌํ๋", "๋ด์ฌ๋จ", "๊ตญ์ ", "๊ต๋ฅ", "ํ๊ฒฌ", "๊ตํํ์", | |
| "์ดํ์ฐ์", "๋ชจ์ง", "์ง์", "์ฅํ", "์ธํด", "ํ์ฅ์ค์ต", "์ฑ์ฉ", "์ํฌํฐ์ฆ", | |
| "๋ฉํ ๋ง", "๊ต์ก", "ํน๊ฐ", "๊ณต๋ชจ์ ", "๊ฒฝ์ง๋ํ", "๊ธฐ์์ฌ", "๊ตญ๊ฐ๊ทผ๋ก", | |
| ) | |
| _chunk_tokenizer = None | |
| def clean_url(url: str) -> str: | |
| parsed = urlparse(url) | |
| params = parse_qs(parsed.query) | |
| params.pop("layout", None) | |
| new_query = urlencode({k: v[0] for k, v in params.items()}) | |
| return urlunparse(parsed._replace(query=new_query)) | |
| def clean_title(raw: str) -> str: | |
| title = raw.replace("\n", " ").replace("\r", " ") | |
| title = re.sub(r"\s{2,}", " ", title).strip() | |
| title = CATEGORY_PATTERN.sub("", title).strip() | |
| title = SUFFIX_PATTERN.sub("", title).strip() | |
| return title | |
| def infer_category(title: str, body: str) -> str: | |
| text = f"{title} {body}" | |
| if ( | |
| ("๋ด์ฌ" in text and any(term in text for term in ("ํด์ธ", "WFK", "์๋ํ๋ ์ฆ", "KOICA"))) | |
| or any(term in title for term in ("ํด์ธ๋ด์ฌ", "์ฒญ๋ ๋ด์ฌ๋จ", "ํ๋ก์ ํธ ๋ด์ฌ๋จ", "๋ด์ฌ๋จ")) | |
| ): | |
| return "๋ด์ฌ/์ํฌํฐ์ฆ" | |
| for prefix, cat in CATEGORY_PREFIX.items(): | |
| if title.startswith(prefix): | |
| return cat | |
| for cat, keywords in CATEGORY_KEYWORDS.items(): | |
| if any(kw in title for kw in keywords): | |
| return cat | |
| for cat, keywords in CATEGORY_KEYWORDS.items(): | |
| if any(kw in body for kw in keywords): | |
| return cat | |
| return "๊ธฐํ" | |
| def tokenize_ko(text: str) -> list[str]: | |
| tokens = re.findall(r"[\w๊ฐ-ํฃ]+", text.lower()) | |
| expanded = list(tokens) | |
| for token in tokens: | |
| if not _KOREAN_TOKEN_PATTERN.fullmatch(token): | |
| continue | |
| expanded.extend(hint for hint in _DOMAIN_SPLIT_HINTS if hint in token) | |
| if 4 <= len(token) <= 12: | |
| max_n = min(6, len(token)) | |
| expanded.extend( | |
| token[start : start + n] | |
| for n in range(2, max_n + 1) | |
| for start in range(0, len(token) - n + 1) | |
| ) | |
| return expanded | |
| def _get_chunk_tokenizer(): | |
| global _chunk_tokenizer | |
| if _chunk_tokenizer is None: | |
| from transformers import AutoTokenizer | |
| model_source = EMBED_MODEL_PATH if os.path.exists(EMBED_MODEL_PATH) else BASE_MODEL_EMBED | |
| local_only = os.getenv("TRANSFORMERS_OFFLINE") == "1" or os.getenv("HF_HUB_OFFLINE") == "1" | |
| _chunk_tokenizer = AutoTokenizer.from_pretrained( | |
| model_source, | |
| local_files_only=local_only, | |
| ) | |
| _chunk_tokenizer.model_max_length = max(_chunk_tokenizer.model_max_length, 1_000_000_000) | |
| return _chunk_tokenizer | |
| def chunk_text(text: str) -> list[str]: | |
| tokenizer = _get_chunk_tokenizer() | |
| token_ids = tokenizer.encode(text, add_special_tokens=False) | |
| if not token_ids: | |
| return [] | |
| step = CHUNK_SIZE - CHUNK_OVERLAP | |
| if step <= 0: | |
| raise ValueError("CHUNK_OVERLAP must be smaller than CHUNK_SIZE.") | |
| chunks, start = [], 0 | |
| while start < len(token_ids): | |
| chunk_ids = token_ids[start : start + CHUNK_SIZE] | |
| chunks.append( | |
| tokenizer.decode( | |
| chunk_ids, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=False, | |
| ) | |
| ) | |
| if start + CHUNK_SIZE >= len(token_ids): | |
| break | |
| start += step | |
| return chunks | |