sangsangfinder / api /core /utils.py
cksleigen's picture
Initial clean deploy
54656fc
Raw
History Blame Contribute Delete
3.82 kB
import os
import re
from urllib.parse import urlparse, urlencode, parse_qs, urlunparse
from .config import (
BASE_MODEL_EMBED, EMBED_MODEL_PATH,
CATEGORY_PATTERN, SUFFIX_PATTERN,
CATEGORY_PREFIX, CATEGORY_KEYWORDS,
CHUNK_SIZE, CHUNK_OVERLAP,
)
_KOREAN_TOKEN_PATTERN = re.compile(r"[๊ฐ€-ํžฃ]+")
_DOMAIN_SPLIT_HINTS = (
"ํ•ด์™ธ", "๋ด‰์‚ฌ", "๋ด‰์‚ฌํ™œ๋™", "๋ด‰์‚ฌ๋‹จ", "๊ตญ์ œ", "๊ต๋ฅ˜", "ํŒŒ๊ฒฌ", "๊ตํ™˜ํ•™์ƒ",
"์–ดํ•™์—ฐ์ˆ˜", "๋ชจ์ง‘", "์ง€์›", "์žฅํ•™", "์ธํ„ด", "ํ˜„์žฅ์‹ค์Šต", "์ฑ„์šฉ", "์„œํฌํ„ฐ์ฆˆ",
"๋ฉ˜ํ† ๋ง", "๊ต์œก", "ํŠน๊ฐ•", "๊ณต๋ชจ์ „", "๊ฒฝ์ง„๋Œ€ํšŒ", "๊ธฐ์ˆ™์‚ฌ", "๊ตญ๊ฐ€๊ทผ๋กœ",
)
_chunk_tokenizer = None
def clean_url(url: str) -> str:
parsed = urlparse(url)
params = parse_qs(parsed.query)
params.pop("layout", None)
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def clean_title(raw: str) -> str:
title = raw.replace("\n", " ").replace("\r", " ")
title = re.sub(r"\s{2,}", " ", title).strip()
title = CATEGORY_PATTERN.sub("", title).strip()
title = SUFFIX_PATTERN.sub("", title).strip()
return title
def infer_category(title: str, body: str) -> str:
text = f"{title} {body}"
if (
("๋ด‰์‚ฌ" in text and any(term in text for term in ("ํ•ด์™ธ", "WFK", "์›”๋“œํ”„๋ Œ์ฆˆ", "KOICA")))
or any(term in title for term in ("ํ•ด์™ธ๋ด‰์‚ฌ", "์ฒญ๋…„๋ด‰์‚ฌ๋‹จ", "ํ”„๋กœ์ ํŠธ ๋ด‰์‚ฌ๋‹จ", "๋ด‰์‚ฌ๋‹จ"))
):
return "๋ด‰์‚ฌ/์„œํฌํ„ฐ์ฆˆ"
for prefix, cat in CATEGORY_PREFIX.items():
if title.startswith(prefix):
return cat
for cat, keywords in CATEGORY_KEYWORDS.items():
if any(kw in title for kw in keywords):
return cat
for cat, keywords in CATEGORY_KEYWORDS.items():
if any(kw in body for kw in keywords):
return cat
return "๊ธฐํƒ€"
def tokenize_ko(text: str) -> list[str]:
tokens = re.findall(r"[\w๊ฐ€-ํžฃ]+", text.lower())
expanded = list(tokens)
for token in tokens:
if not _KOREAN_TOKEN_PATTERN.fullmatch(token):
continue
expanded.extend(hint for hint in _DOMAIN_SPLIT_HINTS if hint in token)
if 4 <= len(token) <= 12:
max_n = min(6, len(token))
expanded.extend(
token[start : start + n]
for n in range(2, max_n + 1)
for start in range(0, len(token) - n + 1)
)
return expanded
def _get_chunk_tokenizer():
global _chunk_tokenizer
if _chunk_tokenizer is None:
from transformers import AutoTokenizer
model_source = EMBED_MODEL_PATH if os.path.exists(EMBED_MODEL_PATH) else BASE_MODEL_EMBED
local_only = os.getenv("TRANSFORMERS_OFFLINE") == "1" or os.getenv("HF_HUB_OFFLINE") == "1"
_chunk_tokenizer = AutoTokenizer.from_pretrained(
model_source,
local_files_only=local_only,
)
_chunk_tokenizer.model_max_length = max(_chunk_tokenizer.model_max_length, 1_000_000_000)
return _chunk_tokenizer
def chunk_text(text: str) -> list[str]:
tokenizer = _get_chunk_tokenizer()
token_ids = tokenizer.encode(text, add_special_tokens=False)
if not token_ids:
return []
step = CHUNK_SIZE - CHUNK_OVERLAP
if step <= 0:
raise ValueError("CHUNK_OVERLAP must be smaller than CHUNK_SIZE.")
chunks, start = [], 0
while start < len(token_ids):
chunk_ids = token_ids[start : start + CHUNK_SIZE]
chunks.append(
tokenizer.decode(
chunk_ids,
skip_special_tokens=True,
clean_up_tokenization_spaces=False,
)
)
if start + CHUNK_SIZE >= len(token_ids):
break
start += step
return chunks