| """Pipeline definitions and execution engine for Chosun proofreading comparator.""" |
|
|
| import json |
| import logging |
| import os |
| import random |
| import re |
| import time |
| from concurrent.futures import ThreadPoolExecutor |
| from pathlib import Path |
|
|
| import httpx |
| from openai import OpenAI |
|
|
| logger = logging.getLogger(__name__) |
| from postprocess import ( |
| apply_comma_removal, |
| apply_correction_filter, |
| apply_currency_compact, |
| apply_paragraph_dedupe, |
| apply_pronoun_postprocess, |
| apply_unit_unicode, |
| apply_vocabulary, |
| ) |
| from text_splitter import split_into_bulks |
|
|
| |
| |
| |
| import pipelines_v24 as _v24 |
|
|
| |
|
|
| PIPELINES: dict[str, list[dict]] = { |
| "251231_default": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "max_tokens": 2000, |
| }, |
| { |
| "name": "basic_correction", |
| "type": "llm", |
| "prompt_key": "step1", |
| "max_tokens": 4000, |
| }, |
| { |
| "name": "context_correction", |
| "type": "llm", |
| "prompt_key": "step2", |
| "max_tokens": 4000, |
| }, |
| { |
| "name": "style_correction", |
| "type": "llm", |
| "prompt_key": "step3", |
| "max_tokens": 4000, |
| }, |
| {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| "260430_pro3_v1": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "max_tokens": 4000, |
| }, |
| { |
| "name": "basic_correction", |
| "type": "llm", |
| "prompt_key": "step1", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| }, |
| {"name": "vocabulary_sub", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "context_correction", |
| "type": "llm", |
| "prompt_key": "step2", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| }, |
| {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "correction_filter", |
| "type": "rule", |
| "rule": "correction_filter", |
| "config": {"max_char_diff": 2}, |
| }, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| |
| |
| "260430_e1b": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "max_tokens": 2000, |
| }, |
| { |
| "name": "basic_correction", |
| "type": "llm", |
| "prompt_key": "step1", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| }, |
| {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| |
| |
| "260430_f3c": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "max_tokens": 2000, |
| }, |
| { |
| "name": "basic_correction", |
| "type": "llm", |
| "prompt_key": "step1", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| }, |
| {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| |
| |
| "260430_e8": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "max_tokens": 2000, |
| }, |
| { |
| "name": "basic_and_context_correction", |
| "type": "llm", |
| "prompt_key": "step1", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| }, |
| { |
| "name": "style_correction", |
| "type": "llm", |
| "prompt_key": "step2", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| }, |
| {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| |
| |
| "260408_v16": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "max_tokens": 2000, |
| }, |
| { |
| "name": "basic_correction", |
| "type": "llm", |
| "prompt_key": "step1", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| }, |
| {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, |
| |
| |
| {"name": "paragraph_dedupe", "type": "rule", "rule": "paragraph_dedupe"}, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| |
| |
| |
| |
| |
| "260429_v24": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "max_tokens": 4000, |
| }, |
| { |
| "name": "basic_correction", |
| "type": "llm_self_consistency", |
| "prompt_key": "step1", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| "repeat": 2, |
| "repeat_temperatures": [0.0, 0.2], |
| "intersection_threshold": 2, |
| "response_format": { |
| "type": "json_schema", |
| "json_schema": { |
| "name": "proofreading_result", |
| "strict": True, |
| "schema": { |
| "type": "object", |
| "properties": { |
| "corrections": { |
| "type": "array", |
| "items": { |
| "type": "object", |
| "properties": { |
| "original": {"type": "string"}, |
| "corrected": {"type": "string"}, |
| "type": {"type": "string"}, |
| "reason": {"type": "string"}, |
| }, |
| "required": ["original", "corrected", "type", "reason"], |
| "additionalProperties": False, |
| }, |
| }, |
| "output": {"type": "string"}, |
| "explanation": {"type": "string"}, |
| }, |
| "required": ["corrections", "output", "explanation"], |
| "additionalProperties": False, |
| }, |
| }, |
| }, |
| }, |
| { |
| "name": "tool_calling_judge", |
| "type": "tool_calling_judge", |
| "prompt_key": "judge", |
| "reasoning_effort": "low", |
| "max_tokens": 4000, |
| "max_tool_iterations": 4, |
| "corrections_step": "basic_correction", |
| "pmi_lookup": "data/pmi_compounds_top.tsv.gz", |
| }, |
| { |
| "name": "apply_judge_decisions", |
| "type": "rule", |
| "rule": "apply_judge_decisions", |
| "config": { |
| "judge_step": "tool_calling_judge", |
| "corrections_step": "basic_correction", |
| }, |
| }, |
| {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, |
| {"name": "paragraph_dedupe", "type": "rule", "rule": "paragraph_dedupe"}, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| |
| |
| "260513_taxonomy_e1": [ |
| {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, |
| { |
| "name": "ft_correction", |
| "type": "llm", |
| "model": "ft:solar-news-correction", |
| "prompt_key": "proofread", |
| "max_tokens": 4000, |
| }, |
| { |
| "name": "spelling_vocab", |
| "type": "specialist", |
| "prompt_key": "spelling_vocab", |
| "reasoning_effort": "low", |
| "max_tokens": 3000, |
| }, |
| { |
| "name": "punct_number", |
| "type": "specialist", |
| "prompt_key": "punct_number", |
| "reasoning_effort": "low", |
| "max_tokens": 3000, |
| }, |
| { |
| "name": "context_dedup", |
| "type": "specialist", |
| "prompt_key": "context_dedup", |
| "reasoning_effort": "low", |
| "max_tokens": 3000, |
| }, |
| { |
| "name": "grammar_particle", |
| "type": "specialist", |
| "prompt_key": "grammar_particle", |
| "reasoning_effort": "medium", |
| "max_tokens": 4000, |
| }, |
| {"name": "post_process", "type": "rule", "rule": "post_process"}, |
| ], |
| } |
|
|
| |
| _CORRECTIONS_SCHEMA: dict = { |
| "type": "json_schema", |
| "json_schema": { |
| "name": "corrections_only", |
| "strict": True, |
| "schema": { |
| "type": "object", |
| "properties": { |
| "corrections": { |
| "type": "array", |
| "items": { |
| "type": "object", |
| "properties": { |
| "original": {"type": "string"}, |
| "corrected": {"type": "string"}, |
| "type": {"type": "string"}, |
| "reason": {"type": "string"}, |
| }, |
| "required": ["original", "corrected", "type", "reason"], |
| "additionalProperties": False, |
| }, |
| } |
| }, |
| "required": ["corrections"], |
| "additionalProperties": False, |
| }, |
| }, |
| } |
|
|
| |
| FT_SYSTEM_PROMPT = "입력된 문서에 대한 교열 결과를 생성해 주세요." |
|
|
| |
| |
| |
| PARAGRAPH_SEP = "<paragraph_separator>" |
|
|
| |
| DEFAULT_PRONOUN_REPLACEMENTS: dict[str, str] = { |
| "이재명 대표": "이재명 대통령", |
| } |
|
|
| |
|
|
|
|
| def list_prompts() -> list[str]: |
| """List available prompt versions from prompts/ directory. |
| |
| Returns: |
| Sorted list of prompt directory names. |
| """ |
| prompts_dir = Path(__file__).parent / "prompts" |
| if not prompts_dir.exists(): |
| return [] |
| return sorted( |
| d.name for d in prompts_dir.iterdir() if d.is_dir() and not d.name.startswith(".") |
| ) |
|
|
|
|
| def load_prompts(prompt_name: str) -> dict[str, str]: |
| """Load prompt texts from prompts/{prompt_name}/*.txt files. |
| |
| Args: |
| prompt_name: Name of the prompt directory. |
| |
| Returns: |
| Dict mapping step key (e.g. 'step1') to prompt text. |
| """ |
| prompt_dir = Path(__file__).parent / "prompts" / prompt_name |
| prompts: dict[str, str] = {} |
| if not prompt_dir.exists(): |
| return prompts |
| for txt_file in prompt_dir.glob("*.txt"): |
| content = txt_file.read_text(encoding="utf-8").strip() |
| if content and not content.startswith("# TODO"): |
| prompts[txt_file.stem] = content |
| return prompts |
|
|
|
|
| |
|
|
|
|
| def extract_json_output(content: str, fallback: str | None = None) -> str: |
| """Extract 'output' field from LLM JSON response. |
| |
| Tries multiple strategies in order: |
| 1. Raw JSON parse |
| 2. Markdown code block extraction |
| 3. Find last "output" key with brace matching |
| 4. Truncated JSON recovery |
| |
| Args: |
| content: Raw LLM response text. |
| fallback: If all JSON-extraction strategies fail, return this string |
| instead of raw content. Matches reference pipeline semantics |
| (run.py parse_step_output) — LLM hallucination / non-JSON blobs |
| are replaced with the pre-step input so they don't propagate. |
| |
| Returns: |
| Extracted output text as a string. Non-string JSON values |
| (null, numbers, arrays) are coerced to str to prevent downstream |
| "expected string or buffer" errors from re.sub() calls. |
| """ |
|
|
| def _as_str(value: object) -> str: |
| """Coerce extracted output to a string. |
| |
| LLMs occasionally return {"output": null}, {"output": 123}, or a |
| list/dict despite being prompted for text. Returning the raw value |
| would crash re.sub() later with "expected string or buffer". |
| """ |
| if value is None: |
| return "" |
| if isinstance(value, str): |
| return value |
| |
| |
| try: |
| return json.dumps(value, ensure_ascii=False) |
| except (TypeError, ValueError): |
| return str(value) |
|
|
| if not content: |
| return "" |
| if not isinstance(content, str): |
| return _as_str(content) |
|
|
| |
| try: |
| data = json.loads(content) |
| if isinstance(data, dict) and "output" in data: |
| return _as_str(data["output"]) |
| except (json.JSONDecodeError, AttributeError): |
| pass |
|
|
| |
| json_blocks = list(re.finditer(r"```(?:json)?\s*(.*?)```", content, re.DOTALL)) |
| for match in reversed(json_blocks): |
| try: |
| data = json.loads(match.group(1).strip()) |
| if isinstance(data, dict) and "output" in data: |
| return _as_str(data["output"]) |
| except (json.JSONDecodeError, AttributeError): |
| continue |
|
|
| |
| last_idx = content.rfind('"output"') |
| if last_idx != -1: |
| brace_start = content.rfind("{", 0, last_idx) |
| if brace_start != -1: |
| depth = 0 |
| for i in range(brace_start, len(content)): |
| if content[i] == "{": |
| depth += 1 |
| elif content[i] == "}": |
| depth -= 1 |
| if depth == 0: |
| try: |
| data = json.loads(content[brace_start : i + 1]) |
| if isinstance(data, dict) and "output" in data: |
| return _as_str(data["output"]) |
| except (json.JSONDecodeError, AttributeError): |
| pass |
| break |
|
|
| |
| output_key_match = re.search(r'"output"\s*:\s*"', content) |
| if output_key_match: |
| start = output_key_match.end() |
| remaining = content[start:] |
| boundary = re.search(r'"\s*,\s*"explanation"\s*:', remaining) |
| if boundary: |
| raw_value = remaining[: boundary.start()] |
| try: |
| return _as_str(json.loads('"' + raw_value + '"')) |
| except (json.JSONDecodeError, ValueError): |
| return raw_value |
|
|
| |
| |
| |
| if fallback is not None: |
| return fallback |
| return content |
|
|
|
|
| |
|
|
|
|
| def call_llm( |
| client: OpenAI, |
| system_prompt: str, |
| user_content: str, |
| model: str = "solar-pro2", |
| |
| |
| |
| |
| temperature: float = 0.0001, |
| reasoning_effort: str | None = None, |
| max_tokens: int | None = None, |
| response_format: dict | None = None, |
| ) -> str: |
| """Call Upstage API via OpenAI SDK. |
| |
| Args: |
| client: OpenAI client configured for Upstage API. |
| system_prompt: System prompt text. |
| user_content: User message content. |
| model: Model name. |
| temperature: Sampling temperature. |
| reasoning_effort: Optional reasoning effort level (for solar-pro3). |
| max_tokens: Optional max_tokens limit. Required for FT models. |
| |
| Returns: |
| LLM response text. |
| """ |
| |
| if max_tokens is None and model.startswith("ft:"): |
| max_tokens = 4000 |
|
|
| messages: list[dict] = [] |
| if system_prompt: |
| messages.append({"role": "system", "content": system_prompt}) |
| messages.append({"role": "user", "content": user_content}) |
|
|
| kwargs: dict = { |
| "model": model, |
| "messages": messages, |
| "stream": False, |
| "temperature": temperature, |
| } |
| if max_tokens is not None: |
| kwargs["max_tokens"] = max_tokens |
| if reasoning_effort: |
| kwargs["extra_body"] = {"reasoning_effort": reasoning_effort} |
| if response_format is not None: |
| kwargs["response_format"] = response_format |
|
|
| |
| |
| |
| |
| |
| return _post_chat_completion(client, kwargs) |
|
|
|
|
| |
| _HTTPX_TIMEOUT = httpx.Timeout(60.0, connect=10.0) |
| _HTTPX_CLIENT: httpx.Client | None = None |
|
|
|
|
| def _get_httpx_client() -> httpx.Client: |
| global _HTTPX_CLIENT |
| if _HTTPX_CLIENT is None: |
| _HTTPX_CLIENT = httpx.Client(timeout=_HTTPX_TIMEOUT) |
| return _HTTPX_CLIENT |
|
|
|
|
| def _post_chat_completion(client: OpenAI, payload: dict) -> str: |
| """POST /chat/completions directly via httpx, bypassing the openai SDK. |
| |
| Extracts base_url and api_key from the provided `OpenAI` client so the |
| caller doesn't need to thread those around. Applies retry/backoff on |
| transient failures (network errors, 5xx, 429). Returns the plain text |
| of `choices[0].message.content` (or `""` if the response has no usable |
| content — no exception escapes this function except for truly |
| non-retryable 4xx responses). |
| """ |
| |
| body = dict(payload) |
| extra = body.pop("extra_body", None) or {} |
| body.update(extra) |
|
|
| base_url = str(getattr(client, "base_url", "")).rstrip("/") |
| api_key = getattr(client, "api_key", None) or os.getenv("UPSTAGE_API_KEY", "") |
| url = f"{base_url}/chat/completions" |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json", |
| } |
|
|
| http = _get_httpx_client() |
|
|
| max_attempts = 3 |
| base_delay = 1.0 |
| last_exc: Exception | None = None |
| for attempt in range(1, max_attempts + 1): |
| try: |
| response = http.post(url, headers=headers, json=body) |
| except (httpx.TimeoutException, httpx.NetworkError) as e: |
| last_exc = e |
| if attempt == max_attempts: |
| break |
| time.sleep(base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.5)) |
| continue |
|
|
| |
| if response.status_code in (408, 429, 500, 502, 503, 504): |
| last_exc = RuntimeError( |
| f"HTTP {response.status_code} from Upstage: " |
| f"{response.text[:200]}" |
| ) |
| if attempt == max_attempts: |
| break |
| time.sleep(base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.5)) |
| continue |
|
|
| |
| if response.status_code >= 400: |
| logger.error( |
| "Upstage HTTP %d: %s", |
| response.status_code, |
| response.text[:500], |
| ) |
| return "" |
|
|
| |
| try: |
| data = response.json() |
| except (json.JSONDecodeError, ValueError): |
| logger.warning("Upstage 2xx response was not valid JSON") |
| return "" |
| return _extract_content_from_body(data) |
|
|
| if last_exc is not None: |
| logger.warning("Upstage request failed after %d attempts: %s", max_attempts, last_exc) |
| return "" |
|
|
|
|
| def _extract_content_from_body(body: object) -> str: |
| """Pull choices[0].message.content out of a (possibly loose) response body.""" |
| if not isinstance(body, dict): |
| return "" |
| choices = body.get("choices") or [] |
| if not isinstance(choices, list) or not choices: |
| return "" |
| first = choices[0] |
| if not isinstance(first, dict): |
| return "" |
| message = first.get("message") or {} |
| content = message.get("content") if isinstance(message, dict) else None |
| if not content: |
| content = first.get("text") |
| return content if isinstance(content, str) else "" |
|
|
|
|
| |
|
|
|
|
| def extract_corrections(content: str) -> list[dict]: |
| """Extract `corrections[]` array from a specialist JSON response. |
| |
| Tries progressively more lenient strategies: |
| 1. Raw JSON parse of the whole response. |
| 2. JSON inside markdown code blocks (``` ... ```). |
| 3. Last `{ ... "corrections": [...] ... }` block with brace matching. |
| 4. Salvage: regex-scan for individual correction objects when the |
| surrounding JSON is truncated or malformed (e.g. the model hit |
| max_tokens mid-array). |
| |
| A correction is accepted if it has at least `original` and `corrected` |
| string fields; `type`/`reason` are treated as optional metadata that |
| downstream `apply_specialist_corrections` ignores anyway. |
| """ |
| if not content: |
| return [] |
|
|
| def _valid(corrections_list: list) -> list[dict]: |
| return [ |
| c |
| for c in corrections_list |
| if isinstance(c, dict) |
| and isinstance(c.get("original"), str) |
| and isinstance(c.get("corrected"), str) |
| ] |
|
|
| |
| try: |
| data = json.loads(content) |
| if isinstance(data, dict) and isinstance(data.get("corrections"), list): |
| return _valid(data["corrections"]) |
| except (json.JSONDecodeError, AttributeError): |
| pass |
|
|
| |
| for match in reversed(list(re.finditer(r"```(?:json)?\s*(.*?)```", content, re.DOTALL))): |
| try: |
| data = json.loads(match.group(1).strip()) |
| if isinstance(data, dict) and isinstance(data.get("corrections"), list): |
| return _valid(data["corrections"]) |
| except (json.JSONDecodeError, AttributeError): |
| continue |
|
|
| |
| last_idx = content.rfind('"corrections"') |
| if last_idx != -1: |
| brace_start = content.rfind("{", 0, last_idx) |
| if brace_start != -1: |
| depth = 0 |
| for i in range(brace_start, len(content)): |
| if content[i] == "{": |
| depth += 1 |
| elif content[i] == "}": |
| depth -= 1 |
| if depth == 0: |
| try: |
| data = json.loads(content[brace_start : i + 1]) |
| if isinstance(data, dict) and isinstance( |
| data.get("corrections"), list |
| ): |
| return _valid(data["corrections"]) |
| except (json.JSONDecodeError, AttributeError): |
| pass |
| break |
|
|
| |
| |
| |
| |
| salvaged: list[dict] = [] |
| seen: set[tuple[str, str]] = set() |
| |
| |
| |
| cursor = 0 |
| while True: |
| brace_start = content.find("{", cursor) |
| if brace_start == -1: |
| break |
| depth = 0 |
| end = -1 |
| for i in range(brace_start, len(content)): |
| ch = content[i] |
| if ch == "{": |
| depth += 1 |
| elif ch == "}": |
| depth -= 1 |
| if depth == 0: |
| end = i |
| break |
| if end == -1: |
| |
| break |
| candidate = content[brace_start : end + 1] |
| cursor = end + 1 |
| |
| if '"original"' not in candidate or '"corrected"' not in candidate: |
| continue |
| try: |
| obj = json.loads(candidate) |
| except (json.JSONDecodeError, AttributeError): |
| continue |
| if not isinstance(obj, dict): |
| continue |
| orig = obj.get("original") |
| corrected = obj.get("corrected") |
| if not isinstance(orig, str) or not isinstance(corrected, str): |
| continue |
| key = (orig, corrected) |
| if key in seen: |
| continue |
| seen.add(key) |
| salvaged.append( |
| { |
| "original": orig, |
| "corrected": corrected, |
| "type": obj.get("type", ""), |
| "reason": obj.get("reason", ""), |
| } |
| ) |
| if salvaged: |
| logger.info( |
| "extract_corrections: salvaged %d correction(s) from malformed JSON", |
| len(salvaged), |
| ) |
| return salvaged |
|
|
|
|
| def apply_specialist_corrections(text: str, corrections: list[dict]) -> str: |
| """Apply a single specialist's corrections to ``text``. |
| |
| Mirrors ``TaxonomySpecialistPipeline._apply_inline_corrections``: |
| dedupe on (original, corrected), longer-first non-overlapping edit |
| planning, right-to-left apply so indices stay stable. |
| """ |
| clean: list[dict[str, str]] = [] |
| seen: set[tuple[str, str]] = set() |
| for c in corrections or []: |
| if not isinstance(c, dict): |
| continue |
| orig = c.get("original") |
| corrected = c.get("corrected") |
| if not isinstance(orig, str) or not isinstance(corrected, str): |
| continue |
| if not orig or orig == corrected: |
| continue |
| key = (orig, corrected) |
| if key in seen: |
| continue |
| seen.add(key) |
| clean.append({"original": orig, "corrected": corrected}) |
|
|
| clean.sort(key=lambda c: -len(c["original"])) |
|
|
| edits: list[tuple[int, int, str]] = [] |
| for c in clean: |
| idx = text.find(c["original"]) |
| while idx != -1: |
| end = idx + len(c["original"]) |
| if not any(not (end <= s or idx >= e) for s, e, _ in edits): |
| edits.append((idx, end, c["corrected"])) |
| break |
| idx = text.find(c["original"], idx + 1) |
|
|
| edits.sort(key=lambda e: e[0], reverse=True) |
| for start, end, corrected in edits: |
| text = text[:start] + corrected + text[end:] |
| return text |
|
|
|
|
| def run_specialist( |
| step: dict, |
| text: str, |
| original_text: str, |
| prompts: dict[str, str], |
| model: str, |
| client: OpenAI, |
| ) -> str: |
| """Run one taxonomy specialist: call LLM, parse corrections, apply.""" |
| prompt_key = step.get("prompt_key", step["name"]) |
| system_prompt = prompts.get(prompt_key, "") |
| user_content = f"<원문>\n{original_text}\n</원문>\n\n<교열_초안>\n{text}\n</교열_초안>" |
| response = call_llm( |
| client=client, |
| system_prompt=system_prompt, |
| user_content=user_content, |
| model=step.get("model", model), |
| reasoning_effort=step.get("reasoning_effort"), |
| max_tokens=step.get("max_tokens"), |
| response_format=_CORRECTIONS_SCHEMA, |
| ) |
| corrections = extract_corrections(response) |
| return apply_specialist_corrections(text, corrections) |
|
|
|
|
| |
|
|
|
|
| def apply_rule( |
| step: dict, |
| text: str, |
| original_text: str, |
| vocabulary: list[dict[str, str]], |
| ) -> str: |
| """Apply a rule-based pipeline step. |
| |
| Args: |
| step: Step definition dict. |
| text: Current text. |
| original_text: Original input text (for correction_filter). |
| vocabulary: Loaded vocabulary list. |
| |
| Returns: |
| Processed text. |
| """ |
| rule = step.get("rule", step["name"]) |
|
|
| if rule == "vocabulary": |
| return apply_vocabulary(text, vocabulary) |
| elif rule == "post_process": |
| text = apply_pronoun_postprocess(text, DEFAULT_PRONOUN_REPLACEMENTS) |
| text = apply_currency_compact(text) |
| text = apply_comma_removal(text) |
| text = apply_unit_unicode(text) |
| return text |
| elif rule == "correction_filter": |
| config = step.get("config", {}) |
| return apply_correction_filter( |
| text, |
| original_text, |
| max_char_diff=config.get("max_char_diff", 2), |
| allow_spacing=config.get("allow_spacing", True), |
| ) |
| elif rule == "paragraph_dedupe": |
| config = step.get("config", {}) |
| |
| |
| kwargs: dict = {} |
| if "min_len" in config: |
| kwargs["min_len"] = config["min_len"] |
| if "prefix_len" in config: |
| kwargs["prefix_len"] = config["prefix_len"] |
| return apply_paragraph_dedupe(text, original_text, **kwargs) |
| elif rule == "apply_judge_decisions": |
| |
| |
| |
| |
| |
| |
| return text |
|
|
| return text |
|
|
|
|
| |
|
|
|
|
| def _process_single_bulk( |
| bulk: str, |
| step: dict, |
| model: str, |
| prompts: dict[str, str], |
| client: OpenAI, |
| original_bulk: str, |
| ) -> str: |
| """Process a single text bulk through an LLM step. |
| |
| Args: |
| bulk: Text bulk to process. |
| step: Step definition. |
| model: Model name to use. |
| prompts: Loaded prompt texts. |
| client: OpenAI client. |
| original_bulk: Pre-correction text **for this bulk** (not the full |
| chunk). Used as <원문> in step1 and as the leading-lookup fallback |
| in the hallucination guard. Per-bulk scoping mirrors the reference |
| pipeline (run.py: `original=step0_sep`) and prevents the model |
| from echoing adjacent paragraphs when a short bulk meets a large |
| <원문> block. |
| |
| Returns: |
| Processed bulk text. |
| """ |
| step_model = step.get("model", model) |
| prompt_key = step.get("prompt_key") |
| reasoning_effort = step.get("reasoning_effort") |
| max_tokens = step.get("max_tokens") |
| is_ft_model = step_model.startswith("ft:") |
|
|
| if prompt_key: |
| system_prompt = prompts.get(prompt_key, "") |
| |
| |
| if prompt_key == "step1": |
| original_sep = original_bulk.replace("\n", PARAGRAPH_SEP) |
| bulk_sep = bulk.replace("\n", PARAGRAPH_SEP) |
| user_content = ( |
| f"<원문>\n{original_sep}\n</원문>\n\n" |
| f"<교열_모델_수정결과>\n{bulk_sep}\n</교열_모델_수정결과>" |
| ) |
| else: |
| user_content = bulk.replace("\n", PARAGRAPH_SEP) |
| elif is_ft_model: |
| |
| |
| system_prompt = "" |
| user_content = bulk |
| else: |
| system_prompt = FT_SYSTEM_PROMPT |
| user_content = bulk |
|
|
| response = call_llm( |
| client=client, |
| system_prompt=system_prompt, |
| user_content=user_content, |
| model=step_model, |
| reasoning_effort=reasoning_effort, |
| max_tokens=max_tokens, |
| ) |
| |
| if is_ft_model: |
| |
| |
| |
| |
| |
| |
| |
| if response and len(response) >= len(bulk) * 1.25: |
| stripped = response.strip() |
| bulk_stripped = bulk.strip() |
|
|
| |
| if ( |
| bulk_stripped |
| and stripped.startswith(bulk_stripped) |
| and len(stripped) > len(bulk_stripped) * 1.05 |
| ): |
| return bulk_stripped |
|
|
| |
| first_end = len(stripped) // 2 |
| head = stripped[:first_end].rstrip() |
| tail = stripped[first_end:].strip() |
| if head and (head == tail or head in tail or tail in head): |
| return head |
|
|
| |
| return bulk |
| return response |
| |
| |
| extracted = extract_json_output(response, fallback=bulk) |
|
|
| |
| |
| |
| if prompt_key: |
| extracted = extracted.replace(PARAGRAPH_SEP, "\n") |
|
|
| |
| |
| |
| |
| |
| |
| if extracted and len(extracted) >= len(bulk) * 1.4: |
| stripped = extracted.strip() |
|
|
| def _leading_lookup(needle_src: str) -> str | None: |
| head = needle_src.strip()[:80] |
| if len(head) < 30: |
| return None |
| second = stripped.find(head, len(head)) |
| if second > len(head): |
| return stripped[:second].rstrip() |
| return None |
|
|
| recovered = _leading_lookup(bulk) or _leading_lookup(original_bulk) |
| if recovered: |
| return recovered |
| |
| return bulk |
|
|
| return extracted |
|
|
|
|
| def _process_single_bulk_with_raw( |
| *, |
| bulk: str, |
| step: dict, |
| model: str, |
| prompts: dict[str, str], |
| client: OpenAI, |
| original_bulk: str, |
| ) -> tuple[str, str]: |
| """Run a non-FT LLM step on one bulk and return (post_text, raw_response). |
| |
| Used by v24's self-consistency wrapper, which needs both the extracted |
| text AND the raw JSON (so it can intersect ``corrections`` arrays across |
| runs). Mirrors the LLM branch of ``_process_single_bulk`` but skips the |
| FT-specific duplication guard. The hallucination guard at ``len >= 1.4×`` |
| is preserved. |
| |
| Returns ``("", "")`` on call failure rather than raising — the caller |
| falls back to per-step error handling. |
| """ |
| step_model = step.get("model", model) |
| prompt_key = step.get("prompt_key") |
| reasoning_effort = step.get("reasoning_effort") |
| max_tokens = step.get("max_tokens") |
| response_format = step.get("response_format") |
| temperature = step.get("temperature") |
|
|
| if prompt_key: |
| system_prompt = prompts.get(prompt_key, "") |
| if prompt_key == "step1": |
| original_sep = original_bulk.replace("\n", PARAGRAPH_SEP) |
| bulk_sep = bulk.replace("\n", PARAGRAPH_SEP) |
| user_content = ( |
| f"<원문>\n{original_sep}\n</원문>\n\n" |
| f"<교열_모델_수정결과>\n{bulk_sep}\n</교열_모델_수정결과>" |
| ) |
| else: |
| user_content = bulk.replace("\n", PARAGRAPH_SEP) |
| else: |
| system_prompt = FT_SYSTEM_PROMPT |
| user_content = bulk |
|
|
| call_kwargs: dict = { |
| "client": client, |
| "system_prompt": system_prompt, |
| "user_content": user_content, |
| "model": step_model, |
| "reasoning_effort": reasoning_effort, |
| "max_tokens": max_tokens, |
| "response_format": response_format, |
| } |
| if temperature is not None: |
| call_kwargs["temperature"] = temperature |
|
|
| try: |
| raw = call_llm(**{k: v for k, v in call_kwargs.items() if v is not None}) |
| except Exception as exc: |
| logger.warning( |
| "[_process_single_bulk_with_raw] call_llm failed: %s", exc |
| ) |
| return ("", "") |
|
|
| extracted = extract_json_output(raw, fallback=bulk) |
| if prompt_key: |
| extracted = extracted.replace(PARAGRAPH_SEP, "\n") |
|
|
| |
| if extracted and len(extracted) >= len(bulk) * 1.4: |
| stripped = extracted.strip() |
|
|
| def _leading_lookup(needle_src: str) -> str | None: |
| head = needle_src.strip()[:80] |
| if len(head) < 30: |
| return None |
| second = stripped.find(head, len(head)) |
| if second > len(head): |
| return stripped[:second].rstrip() |
| return None |
|
|
| recovered = _leading_lookup(bulk) or _leading_lookup(original_bulk) |
| if recovered: |
| extracted = recovered |
| else: |
| extracted = bulk |
|
|
| return (extracted, raw) |
|
|
|
|
| def process_bulks_parallel( |
| bulks: list[str], |
| original_bulks: list[str], |
| step: dict, |
| model: str, |
| prompts: dict[str, str], |
| client: OpenAI, |
| max_workers: int = 10, |
| ) -> list[str]: |
| """Process multiple text bulks in parallel. |
| |
| Args: |
| bulks: List of text bulks (current pipeline state). |
| original_bulks: Per-bulk pre-correction text, paired 1:1 with |
| ``bulks``. Used as <원문> in step1 and as fallback anchor in |
| the hallucination guard. |
| step: Step definition. |
| model: Model name. |
| prompts: Loaded prompt texts. |
| client: OpenAI client. |
| max_workers: Maximum thread pool workers. |
| |
| Returns: |
| List of processed bulks in original order. |
| """ |
| if len(bulks) != len(original_bulks): |
| raise ValueError( |
| f"bulks/original_bulks length mismatch: {len(bulks)} vs {len(original_bulks)}" |
| ) |
|
|
| if len(bulks) <= 1: |
| return [ |
| _process_single_bulk(bulks[0], step, model, prompts, client, original_bulks[0]) |
| ] |
|
|
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| futures = [ |
| executor.submit( |
| _process_single_bulk, bulk, step, model, prompts, client, orig_bulk |
| ) |
| for bulk, orig_bulk in zip(bulks, original_bulks) |
| ] |
| return [f.result() for f in futures] |
|
|
|
|
| |
|
|
|
|
| def run_pipeline( |
| text: str, |
| pipeline_name: str, |
| model: str, |
| prompt_name: str, |
| client: OpenAI, |
| vocabulary: list[dict[str, str]], |
| ) -> dict: |
| """Execute a full pipeline on input text. |
| |
| Args: |
| text: Input text to process. |
| pipeline_name: Name of the pipeline to run. |
| model: Default model name for LLM steps. |
| prompt_name: Prompt version directory name. |
| client: OpenAI client. |
| vocabulary: Loaded vocabulary list. |
| |
| Returns: |
| Dict with 'output' (processed text) and 'processing_time' (seconds). |
| """ |
| if not text or not text.strip(): |
| return {"output": text, "processing_time": 0.0} |
|
|
| steps = PIPELINES[pipeline_name] |
| prompts = load_prompts(prompt_name) |
| original_text = text |
|
|
| |
| |
| |
| _v24.reset_v24_state() |
|
|
| |
| |
| for s in steps: |
| pmi_path = s.get("pmi_lookup") |
| if pmi_path: |
| full_path = Path(__file__).parent / pmi_path |
| try: |
| _v24.set_active_pmi_path(full_path) |
| except Exception as exc: |
| logger.warning("[v24] PMI load failed at %s: %s", full_path, exc) |
| break |
|
|
| start_time = time.time() |
|
|
| |
| |
| |
| |
| pre_ft_snapshot = text |
|
|
| |
| |
| |
| |
| step_errors: list[str] = [] |
| for step in steps: |
| prev_text = text |
| step_name = step.get("name", step["type"]) |
| try: |
| |
| |
| |
| if step["type"] == "rule" and step.get("rule") == "apply_judge_decisions": |
| cfg = step.get("config", {}) |
| bulks = split_into_bulks(text) |
| if bulks: |
| judge_step_name = cfg.get("judge_step", "tool_calling_judge") |
| corr_step_name = cfg.get("corrections_step", "basic_correction") |
| reverted: list[str] = [] |
| for i, b in enumerate(bulks): |
| try: |
| reverted.append( |
| _v24.apply_judge_decisions_for_bulk( |
| bulk_idx=i, |
| bulk=b, |
| judge_step_name=judge_step_name, |
| corrections_step_name=corr_step_name, |
| ) |
| ) |
| except Exception as exc: |
| logger.warning( |
| "[apply_judge_decisions] bulk %d failed (%s); keeping", |
| i, |
| exc, |
| ) |
| reverted.append(b) |
| text = "\n".join(reverted) |
| elif step["type"] == "rule": |
| text = apply_rule(step, text, original_text, vocabulary) |
| elif step["type"] == "llm": |
| step_model_name = step.get("model", model) |
| |
| if step_model_name.startswith("ft:"): |
| pre_ft_snapshot = text |
|
|
| bulks = split_into_bulks(text) |
| if bulks: |
| |
| |
| |
| if step.get("prompt_key") == "step1": |
| orig_candidate = split_into_bulks(pre_ft_snapshot) |
| if len(orig_candidate) == len(bulks): |
| original_bulks = orig_candidate |
| else: |
| |
| |
| |
| original_bulks = [pre_ft_snapshot] * len(bulks) |
| else: |
| original_bulks = list(bulks) |
|
|
| processed = process_bulks_parallel( |
| bulks, original_bulks, step, model, prompts, client |
| ) |
| |
| |
| |
| |
| |
| text = "\n".join( |
| str(p) if not isinstance(p, str) else p for p in processed |
| ) |
| elif step["type"] == "specialist": |
| text = run_specialist(step, text, original_text, prompts, model, client) |
| elif step["type"] == "llm_self_consistency": |
| |
| |
| |
| |
| |
| bulks = split_into_bulks(text) |
| if bulks: |
| if step.get("prompt_key") == "step1": |
| orig_candidate = split_into_bulks(pre_ft_snapshot) |
| if len(orig_candidate) == len(bulks): |
| original_bulks = orig_candidate |
| else: |
| original_bulks = [pre_ft_snapshot] * len(bulks) |
| else: |
| original_bulks = list(bulks) |
|
|
| def _sc_one(idx_bulk): |
| i, b = idx_bulk |
| return _v24.run_self_consistency_for_bulk( |
| bulk_idx=i, |
| bulk=b, |
| original_bulk=original_bulks[i], |
| step=step, |
| model=model, |
| prompts=prompts, |
| client=client, |
| process_single_bulk_fn=_process_single_bulk_with_raw, |
| ) |
|
|
| if len(bulks) == 1: |
| processed = [_sc_one((0, bulks[0]))] |
| else: |
| with ThreadPoolExecutor(max_workers=10) as ex: |
| processed = list( |
| ex.map(_sc_one, list(enumerate(bulks))) |
| ) |
| text = "\n".join( |
| str(p) if not isinstance(p, str) else p for p in processed |
| ) |
| elif step["type"] == "tool_calling_judge": |
| |
| |
| |
| bulks = split_into_bulks(text) |
| if bulks: |
| |
| |
| |
| orig_candidate = split_into_bulks(pre_ft_snapshot) |
| if len(orig_candidate) == len(bulks): |
| original_bulks = orig_candidate |
| else: |
| original_bulks = [pre_ft_snapshot] * len(bulks) |
| corrections_step = step.get("corrections_step", "basic_correction") |
|
|
| def _judge_one(idx_bulk): |
| i, b = idx_bulk |
| return _v24.run_tool_calling_judge_for_bulk( |
| bulk_idx=i, |
| bulk=b, |
| original_bulk=original_bulks[i], |
| corrections_step_name=corrections_step, |
| step=step, |
| model=model, |
| prompts=prompts, |
| client=client, |
| call_llm_fn=call_llm, |
| ) |
|
|
| if len(bulks) == 1: |
| _judge_one((0, bulks[0])) |
| else: |
| with ThreadPoolExecutor(max_workers=10) as ex: |
| list(ex.map(_judge_one, list(enumerate(bulks)))) |
| |
| |
| |
| |
| if ( |
| step["type"] in ("llm", "specialist", "llm_self_consistency") |
| and len(prev_text) > 50 |
| and len(text) < len(prev_text) * 0.1 |
| ): |
| logger.warning( |
| "Step '%s' produced suspiciously short output (%d → %d chars); " |
| "falling back to previous text.", |
| step_name, |
| len(prev_text), |
| len(text), |
| ) |
| text = prev_text |
| step_errors.append(f"{step_name}: short_output") |
| except Exception as e: |
| logger.exception( |
| "Step '%s' failed (%s); falling back to previous text and continuing.", |
| step_name, |
| type(e).__name__, |
| ) |
| text = prev_text |
| step_errors.append(f"{step_name}: {type(e).__name__}") |
|
|
| elapsed = time.time() - start_time |
| return { |
| "output": text, |
| "processing_time": elapsed, |
| "step_errors": step_errors, |
| } |
|
|