"""Pipeline definitions and execution engine for Chosun proofreading comparator.""" import json import logging import os import random import re import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path import httpx from openai import OpenAI logger = logging.getLogger(__name__) from postprocess import ( apply_comma_removal, apply_correction_filter, apply_currency_compact, apply_paragraph_dedupe, apply_pronoun_postprocess, apply_unit_unicode, apply_vocabulary, ) from text_splitter import split_into_bulks # v24 helpers (self-consistency, tool-calling judge, apply_judge_decisions). # Imported lazily inside step handlers to avoid initialising the PMI lookup # at module import — the demo only loads the corpus when v24 is selected. import pipelines_v24 as _v24 # --- Pipeline definitions --- PIPELINES: dict[str, list[dict]] = { "251231_default": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "max_tokens": 2000, }, { "name": "basic_correction", "type": "llm", "prompt_key": "step1", "max_tokens": 4000, }, { "name": "context_correction", "type": "llm", "prompt_key": "step2", "max_tokens": 4000, }, { "name": "style_correction", "type": "llm", "prompt_key": "step3", "max_tokens": 4000, }, {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], "260430_pro3_v1": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "max_tokens": 4000, }, { "name": "basic_correction", "type": "llm", "prompt_key": "step1", "reasoning_effort": "low", "max_tokens": 4000, }, {"name": "vocabulary_sub", "type": "rule", "rule": "vocabulary"}, { "name": "context_correction", "type": "llm", "prompt_key": "step2", "reasoning_effort": "low", "max_tokens": 4000, }, {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, { "name": "correction_filter", "type": "rule", "rule": "correction_filter", "config": {"max_char_diff": 2}, }, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], # E1-B — A 프롬프트를 Pro3 1-pass로 이식. Total TP 96 (Baseline 94 초과). # Use with prompt_name='dev_260430_e1b'. "260430_e1b": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "max_tokens": 2000, }, { "name": "basic_correction", "type": "llm", "prompt_key": "step1", "reasoning_effort": "low", "max_tokens": 4000, }, {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], # F3C — miss-focus few-shot (균형형). L1 TP 84 / Crit TP 10. # Use with prompt_name='dev_260430_f3c'. "260430_f3c": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "max_tokens": 2000, }, { "name": "basic_correction", "type": "llm", "prompt_key": "step1", "reasoning_effort": "low", "max_tokens": 4000, }, {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], # E8 — Pro3 2-pass (basic+context 통합 / style). 과교정 최소. # Use with prompt_name='dev_260430_e8'. "260430_e8": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "max_tokens": 2000, }, { "name": "basic_and_context_correction", "type": "llm", "prompt_key": "step1", "reasoning_effort": "low", "max_tokens": 4000, }, { "name": "style_correction", "type": "llm", "prompt_key": "step2", "reasoning_effort": "low", "max_tokens": 4000, }, {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], # v16 — 260408 승자. cv1 + "실운영 피드백 기반 최종 점검 리마인더" 8개 섹션 (153줄). # paragraph F1 45.57 / ci_v1 F1 19.32 (전 버전 중 최고). Use with prompt_name='dev_260408_v16'. "260408_v16": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "max_tokens": 2000, }, { "name": "basic_correction", "type": "llm", "prompt_key": "step1", "reasoning_effort": "low", "max_tokens": 4000, }, {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, # Safety net: LLM 이 확률적으로 뱉는 문단 에코/재시작 중복을 제거한다. # input 에 이미 있던 반복은 보존 (output_count > input_count 일 때만 drop). {"name": "paragraph_dedupe", "type": "rule", "rule": "paragraph_dedupe"}, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], # v24 — solar-pro3 + FT + self-consistency × 2 + tool-calling judge. # 5 LLM calls/doc (FT + basic×2 + judge round1 + judge round2). Tool-calling # judge consults a top-K PMI table loaded from data/pmi_compounds_top.tsv.gz # (no external corpus required). Use with prompt_name='dev_260429_v24'. # paragraph F1 47.04 (vs solar-pro2 prod_251231 mean 44.86, +2.18). "260429_v24": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "max_tokens": 4000, }, { "name": "basic_correction", "type": "llm_self_consistency", "prompt_key": "step1", "reasoning_effort": "low", "max_tokens": 4000, "repeat": 2, "repeat_temperatures": [0.0, 0.2], "intersection_threshold": 2, "response_format": { "type": "json_schema", "json_schema": { "name": "proofreading_result", "strict": True, "schema": { "type": "object", "properties": { "corrections": { "type": "array", "items": { "type": "object", "properties": { "original": {"type": "string"}, "corrected": {"type": "string"}, "type": {"type": "string"}, "reason": {"type": "string"}, }, "required": ["original", "corrected", "type", "reason"], "additionalProperties": False, }, }, "output": {"type": "string"}, "explanation": {"type": "string"}, }, "required": ["corrections", "output", "explanation"], "additionalProperties": False, }, }, }, }, { "name": "tool_calling_judge", "type": "tool_calling_judge", "prompt_key": "judge", "reasoning_effort": "low", "max_tokens": 4000, "max_tool_iterations": 4, "corrections_step": "basic_correction", "pmi_lookup": "data/pmi_compounds_top.tsv.gz", }, { "name": "apply_judge_decisions", "type": "rule", "rule": "apply_judge_decisions", "config": { "judge_step": "tool_calling_judge", "corrections_step": "basic_correction", }, }, {"name": "vocabulary_reapply", "type": "rule", "rule": "vocabulary"}, {"name": "paragraph_dedupe", "type": "rule", "rule": "paragraph_dedupe"}, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], # E1 taxonomy pipeline — sequential specialists, grammar last @ RE=medium # Use with prompt_name='dev_260513_taxonomy_v6'. "260513_taxonomy_e1": [ {"name": "vocab_sub_pre", "type": "rule", "rule": "vocabulary"}, { "name": "ft_correction", "type": "llm", "model": "ft:solar-news-correction", "prompt_key": "proofread", "max_tokens": 4000, }, { "name": "spelling_vocab", "type": "specialist", "prompt_key": "spelling_vocab", "reasoning_effort": "low", "max_tokens": 3000, }, { "name": "punct_number", "type": "specialist", "prompt_key": "punct_number", "reasoning_effort": "low", "max_tokens": 3000, }, { "name": "context_dedup", "type": "specialist", "prompt_key": "context_dedup", "reasoning_effort": "low", "max_tokens": 3000, }, { "name": "grammar_particle", "type": "specialist", "prompt_key": "grammar_particle", "reasoning_effort": "medium", "max_tokens": 4000, }, {"name": "post_process", "type": "rule", "rule": "post_process"}, ], } # JSON schema enforcing specialist `corrections[]` output shape. _CORRECTIONS_SCHEMA: dict = { "type": "json_schema", "json_schema": { "name": "corrections_only", "strict": True, "schema": { "type": "object", "properties": { "corrections": { "type": "array", "items": { "type": "object", "properties": { "original": {"type": "string"}, "corrected": {"type": "string"}, "type": {"type": "string"}, "reason": {"type": "string"}, }, "required": ["original", "corrected", "type", "reason"], "additionalProperties": False, }, } }, "required": ["corrections"], "additionalProperties": False, }, }, } # FT model uses a fixed system prompt (behavior baked into fine-tuning) FT_SYSTEM_PROMPT = "입력된 문서에 대한 교열 결과를 생성해 주세요." # Matches reference pipeline (refer/chosun-projects/proofread/inference/run.py). # \n는 step1+ 호출 직전에 이 토큰으로 치환되고, 프롬프트의 "개수/위치 100% 보존" # 규칙이 문단 복제/에코를 구조적으로 억제한다. 응답 파싱 후 다시 \n으로 복원. PARAGRAPH_SEP = "" # Default pronoun replacements for post_process rule DEFAULT_PRONOUN_REPLACEMENTS: dict[str, str] = { "이재명 대표": "이재명 대통령", } # --- Prompt loading --- def list_prompts() -> list[str]: """List available prompt versions from prompts/ directory. Returns: Sorted list of prompt directory names. """ prompts_dir = Path(__file__).parent / "prompts" if not prompts_dir.exists(): return [] return sorted( d.name for d in prompts_dir.iterdir() if d.is_dir() and not d.name.startswith(".") ) def load_prompts(prompt_name: str) -> dict[str, str]: """Load prompt texts from prompts/{prompt_name}/*.txt files. Args: prompt_name: Name of the prompt directory. Returns: Dict mapping step key (e.g. 'step1') to prompt text. """ prompt_dir = Path(__file__).parent / "prompts" / prompt_name prompts: dict[str, str] = {} if not prompt_dir.exists(): return prompts for txt_file in prompt_dir.glob("*.txt"): content = txt_file.read_text(encoding="utf-8").strip() if content and not content.startswith("# TODO"): prompts[txt_file.stem] = content return prompts # --- JSON extraction from LLM responses --- def extract_json_output(content: str, fallback: str | None = None) -> str: """Extract 'output' field from LLM JSON response. Tries multiple strategies in order: 1. Raw JSON parse 2. Markdown code block extraction 3. Find last "output" key with brace matching 4. Truncated JSON recovery Args: content: Raw LLM response text. fallback: If all JSON-extraction strategies fail, return this string instead of raw content. Matches reference pipeline semantics (run.py parse_step_output) — LLM hallucination / non-JSON blobs are replaced with the pre-step input so they don't propagate. Returns: Extracted output text as a string. Non-string JSON values (null, numbers, arrays) are coerced to str to prevent downstream "expected string or buffer" errors from re.sub() calls. """ def _as_str(value: object) -> str: """Coerce extracted output to a string. LLMs occasionally return {"output": null}, {"output": 123}, or a list/dict despite being prompted for text. Returning the raw value would crash re.sub() later with "expected string or buffer". """ if value is None: return "" if isinstance(value, str): return value # Lists/dicts/numbers — serialize back to JSON so no information is lost, # but the downstream text-processing pipeline receives a real string. try: return json.dumps(value, ensure_ascii=False) except (TypeError, ValueError): return str(value) if not content: return "" if not isinstance(content, str): return _as_str(content) # Strategy 1: entire response is valid JSON try: data = json.loads(content) if isinstance(data, dict) and "output" in data: return _as_str(data["output"]) except (json.JSONDecodeError, AttributeError): pass # Strategy 2: markdown code blocks (last block first) json_blocks = list(re.finditer(r"```(?:json)?\s*(.*?)```", content, re.DOTALL)) for match in reversed(json_blocks): try: data = json.loads(match.group(1).strip()) if isinstance(data, dict) and "output" in data: return _as_str(data["output"]) except (json.JSONDecodeError, AttributeError): continue # Strategy 3: find last "output" key with brace matching last_idx = content.rfind('"output"') if last_idx != -1: brace_start = content.rfind("{", 0, last_idx) if brace_start != -1: depth = 0 for i in range(brace_start, len(content)): if content[i] == "{": depth += 1 elif content[i] == "}": depth -= 1 if depth == 0: try: data = json.loads(content[brace_start : i + 1]) if isinstance(data, dict) and "output" in data: return _as_str(data["output"]) except (json.JSONDecodeError, AttributeError): pass break # Strategy 4: truncated JSON — extract "output" value before "explanation" output_key_match = re.search(r'"output"\s*:\s*"', content) if output_key_match: start = output_key_match.end() remaining = content[start:] boundary = re.search(r'"\s*,\s*"explanation"\s*:', remaining) if boundary: raw_value = remaining[: boundary.start()] try: return _as_str(json.loads('"' + raw_value + '"')) except (json.JSONDecodeError, ValueError): return raw_value # Fallback: if no JSON `output` field recovered, return caller-supplied # fallback (typically the original input text) to prevent hallucinated # non-JSON blobs from propagating. This matches reference parse_step_output. if fallback is not None: return fallback return content # --- LLM call --- def call_llm( client: OpenAI, system_prompt: str, user_content: str, model: str = "solar-pro2", # Upstage 서빙 스택이 temperature=0 에서 greedy 디코딩 경로로 들어가는데, # 이 경로가 특정 입력(truncated article 등)에서 학습 데이터의 "재시작" 패턴을 # 재현성 높게 재현하는 바이어스가 관찰됨. 0.0001 로 sampling 경로로 살짝 # 밀어 넣어 bias 를 흔들되, argmax 확률 비중은 거의 그대로 유지. temperature: float = 0.0001, reasoning_effort: str | None = None, max_tokens: int | None = None, response_format: dict | None = None, ) -> str: """Call Upstage API via OpenAI SDK. Args: client: OpenAI client configured for Upstage API. system_prompt: System prompt text. user_content: User message content. model: Model name. temperature: Sampling temperature. reasoning_effort: Optional reasoning effort level (for solar-pro3). max_tokens: Optional max_tokens limit. Required for FT models. Returns: LLM response text. """ # FT models require max_tokens — default to a safe value if the caller omits it. if max_tokens is None and model.startswith("ft:"): max_tokens = 4000 messages: list[dict] = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": user_content}) kwargs: dict = { "model": model, "messages": messages, "stream": False, "temperature": temperature, } if max_tokens is not None: kwargs["max_tokens"] = max_tokens if reasoning_effort: kwargs["extra_body"] = {"reasoning_effort": reasoning_effort} if response_format is not None: kwargs["response_format"] = response_format # Skip the openai SDK entirely — post directly with httpx. The SDK's # pydantic ChatCompletion validator raises AssertionError / IndexError # inside create() when Upstage returns a response body that doesn't # strictly match OpenAI spec, and .with_raw_response didn't avoid every # such path. Direct HTTP POST + manual JSON parsing is schema-agnostic. return _post_chat_completion(client, kwargs) # Shared httpx client — reuse connections across pipeline steps. _HTTPX_TIMEOUT = httpx.Timeout(60.0, connect=10.0) _HTTPX_CLIENT: httpx.Client | None = None def _get_httpx_client() -> httpx.Client: global _HTTPX_CLIENT if _HTTPX_CLIENT is None: _HTTPX_CLIENT = httpx.Client(timeout=_HTTPX_TIMEOUT) return _HTTPX_CLIENT def _post_chat_completion(client: OpenAI, payload: dict) -> str: """POST /chat/completions directly via httpx, bypassing the openai SDK. Extracts base_url and api_key from the provided `OpenAI` client so the caller doesn't need to thread those around. Applies retry/backoff on transient failures (network errors, 5xx, 429). Returns the plain text of `choices[0].message.content` (or `""` if the response has no usable content — no exception escapes this function except for truly non-retryable 4xx responses). """ # Flatten extra_body so it becomes top-level JSON fields. body = dict(payload) extra = body.pop("extra_body", None) or {} body.update(extra) base_url = str(getattr(client, "base_url", "")).rstrip("/") api_key = getattr(client, "api_key", None) or os.getenv("UPSTAGE_API_KEY", "") url = f"{base_url}/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } http = _get_httpx_client() max_attempts = 3 base_delay = 1.0 last_exc: Exception | None = None for attempt in range(1, max_attempts + 1): try: response = http.post(url, headers=headers, json=body) except (httpx.TimeoutException, httpx.NetworkError) as e: last_exc = e if attempt == max_attempts: break time.sleep(base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.5)) continue # Transient HTTP statuses retry. if response.status_code in (408, 429, 500, 502, 503, 504): last_exc = RuntimeError( f"HTTP {response.status_code} from Upstage: " f"{response.text[:200]}" ) if attempt == max_attempts: break time.sleep(base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.5)) continue # Non-retryable errors surface their body for debugging. if response.status_code >= 400: logger.error( "Upstage HTTP %d: %s", response.status_code, response.text[:500], ) return "" # 2xx — parse body and pull content. try: data = response.json() except (json.JSONDecodeError, ValueError): logger.warning("Upstage 2xx response was not valid JSON") return "" return _extract_content_from_body(data) if last_exc is not None: logger.warning("Upstage request failed after %d attempts: %s", max_attempts, last_exc) return "" def _extract_content_from_body(body: object) -> str: """Pull choices[0].message.content out of a (possibly loose) response body.""" if not isinstance(body, dict): return "" choices = body.get("choices") or [] if not isinstance(choices, list) or not choices: return "" first = choices[0] if not isinstance(first, dict): return "" message = first.get("message") or {} content = message.get("content") if isinstance(message, dict) else None if not content: content = first.get("text") return content if isinstance(content, str) else "" # --- Specialist (taxonomy) helpers --- def extract_corrections(content: str) -> list[dict]: """Extract `corrections[]` array from a specialist JSON response. Tries progressively more lenient strategies: 1. Raw JSON parse of the whole response. 2. JSON inside markdown code blocks (``` ... ```). 3. Last `{ ... "corrections": [...] ... }` block with brace matching. 4. Salvage: regex-scan for individual correction objects when the surrounding JSON is truncated or malformed (e.g. the model hit max_tokens mid-array). A correction is accepted if it has at least `original` and `corrected` string fields; `type`/`reason` are treated as optional metadata that downstream `apply_specialist_corrections` ignores anyway. """ if not content: return [] def _valid(corrections_list: list) -> list[dict]: return [ c for c in corrections_list if isinstance(c, dict) and isinstance(c.get("original"), str) and isinstance(c.get("corrected"), str) ] # Strategy 1: raw JSON try: data = json.loads(content) if isinstance(data, dict) and isinstance(data.get("corrections"), list): return _valid(data["corrections"]) except (json.JSONDecodeError, AttributeError): pass # Strategy 2: markdown code blocks (last-first wins) for match in reversed(list(re.finditer(r"```(?:json)?\s*(.*?)```", content, re.DOTALL))): try: data = json.loads(match.group(1).strip()) if isinstance(data, dict) and isinstance(data.get("corrections"), list): return _valid(data["corrections"]) except (json.JSONDecodeError, AttributeError): continue # Strategy 3: scan for enclosing {...} around last "corrections" key last_idx = content.rfind('"corrections"') if last_idx != -1: brace_start = content.rfind("{", 0, last_idx) if brace_start != -1: depth = 0 for i in range(brace_start, len(content)): if content[i] == "{": depth += 1 elif content[i] == "}": depth -= 1 if depth == 0: try: data = json.loads(content[brace_start : i + 1]) if isinstance(data, dict) and isinstance( data.get("corrections"), list ): return _valid(data["corrections"]) except (json.JSONDecodeError, AttributeError): pass break # Strategy 4: salvage individual correction objects when the outer JSON is # broken or truncated. Scan the entire content for `{ ... "original": ... # "corrected": ... }` substrings and parse each in isolation. This recovers # partial arrays when the model hit max_tokens mid-output. salvaged: list[dict] = [] seen: set[tuple[str, str]] = set() # Find each opening brace that begins a correction object (has "original" # somewhere before the matching close). Use a conservative brace-matcher # to extract each candidate. cursor = 0 while True: brace_start = content.find("{", cursor) if brace_start == -1: break depth = 0 end = -1 for i in range(brace_start, len(content)): ch = content[i] if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i break if end == -1: # Unclosed brace — nothing more to salvage. break candidate = content[brace_start : end + 1] cursor = end + 1 # Quick filter: must look like a correction object. if '"original"' not in candidate or '"corrected"' not in candidate: continue try: obj = json.loads(candidate) except (json.JSONDecodeError, AttributeError): continue if not isinstance(obj, dict): continue orig = obj.get("original") corrected = obj.get("corrected") if not isinstance(orig, str) or not isinstance(corrected, str): continue key = (orig, corrected) if key in seen: continue seen.add(key) salvaged.append( { "original": orig, "corrected": corrected, "type": obj.get("type", ""), "reason": obj.get("reason", ""), } ) if salvaged: logger.info( "extract_corrections: salvaged %d correction(s) from malformed JSON", len(salvaged), ) return salvaged def apply_specialist_corrections(text: str, corrections: list[dict]) -> str: """Apply a single specialist's corrections to ``text``. Mirrors ``TaxonomySpecialistPipeline._apply_inline_corrections``: dedupe on (original, corrected), longer-first non-overlapping edit planning, right-to-left apply so indices stay stable. """ clean: list[dict[str, str]] = [] seen: set[tuple[str, str]] = set() for c in corrections or []: if not isinstance(c, dict): continue orig = c.get("original") corrected = c.get("corrected") if not isinstance(orig, str) or not isinstance(corrected, str): continue if not orig or orig == corrected: continue key = (orig, corrected) if key in seen: continue seen.add(key) clean.append({"original": orig, "corrected": corrected}) clean.sort(key=lambda c: -len(c["original"])) edits: list[tuple[int, int, str]] = [] for c in clean: idx = text.find(c["original"]) while idx != -1: end = idx + len(c["original"]) if not any(not (end <= s or idx >= e) for s, e, _ in edits): edits.append((idx, end, c["corrected"])) break idx = text.find(c["original"], idx + 1) edits.sort(key=lambda e: e[0], reverse=True) for start, end, corrected in edits: text = text[:start] + corrected + text[end:] return text def run_specialist( step: dict, text: str, original_text: str, prompts: dict[str, str], model: str, client: OpenAI, ) -> str: """Run one taxonomy specialist: call LLM, parse corrections, apply.""" prompt_key = step.get("prompt_key", step["name"]) system_prompt = prompts.get(prompt_key, "") user_content = f"<원문>\n{original_text}\n\n\n<교열_초안>\n{text}\n" response = call_llm( client=client, system_prompt=system_prompt, user_content=user_content, model=step.get("model", model), reasoning_effort=step.get("reasoning_effort"), max_tokens=step.get("max_tokens"), response_format=_CORRECTIONS_SCHEMA, ) corrections = extract_corrections(response) return apply_specialist_corrections(text, corrections) # --- Rule application --- def apply_rule( step: dict, text: str, original_text: str, vocabulary: list[dict[str, str]], ) -> str: """Apply a rule-based pipeline step. Args: step: Step definition dict. text: Current text. original_text: Original input text (for correction_filter). vocabulary: Loaded vocabulary list. Returns: Processed text. """ rule = step.get("rule", step["name"]) if rule == "vocabulary": return apply_vocabulary(text, vocabulary) elif rule == "post_process": text = apply_pronoun_postprocess(text, DEFAULT_PRONOUN_REPLACEMENTS) text = apply_currency_compact(text) text = apply_comma_removal(text) text = apply_unit_unicode(text) return text elif rule == "correction_filter": config = step.get("config", {}) return apply_correction_filter( text, original_text, max_char_diff=config.get("max_char_diff", 2), allow_spacing=config.get("allow_spacing", True), ) elif rule == "paragraph_dedupe": config = step.get("config", {}) # prefix_len 기본값은 apply_paragraph_dedupe 의 기본값(30)을 따른다. # 명시적으로 config 에 지정된 경우만 override. kwargs: dict = {} if "min_len" in config: kwargs["min_len"] = config["min_len"] if "prefix_len" in config: kwargs["prefix_len"] = config["prefix_len"] return apply_paragraph_dedupe(text, original_text, **kwargs) elif rule == "apply_judge_decisions": # v24 rule. text here is the post-judge bulk text — but the judge step # is a no-op on text (decisions live in pipelines_v24._JUDGE_RAW_BY_BULK # keyed by bulk index). The dispatcher in run_pipeline applies this # rule per-bulk by re-splitting and looking up each bulk's decisions. # When called at the doc level (this path), we just return text — the # actual revert happens in the per-bulk apply path inside run_pipeline. return text return text # --- Bulk parallel processing --- def _process_single_bulk( bulk: str, step: dict, model: str, prompts: dict[str, str], client: OpenAI, original_bulk: str, ) -> str: """Process a single text bulk through an LLM step. Args: bulk: Text bulk to process. step: Step definition. model: Model name to use. prompts: Loaded prompt texts. client: OpenAI client. original_bulk: Pre-correction text **for this bulk** (not the full chunk). Used as <원문> in step1 and as the leading-lookup fallback in the hallucination guard. Per-bulk scoping mirrors the reference pipeline (run.py: `original=step0_sep`) and prevents the model from echoing adjacent paragraphs when a short bulk meets a large <원문> block. Returns: Processed bulk text. """ step_model = step.get("model", model) prompt_key = step.get("prompt_key") reasoning_effort = step.get("reasoning_effort") max_tokens = step.get("max_tokens") is_ft_model = step_model.startswith("ft:") if prompt_key: system_prompt = prompts.get(prompt_key, "") # step1+ 프롬프트는 기반 구조 보존 규칙을 가정한다. # LLM 호출 직전에 \n을 토큰으로 치환하고 파싱 후 되돌린다. if prompt_key == "step1": original_sep = original_bulk.replace("\n", PARAGRAPH_SEP) bulk_sep = bulk.replace("\n", PARAGRAPH_SEP) user_content = ( f"<원문>\n{original_sep}\n\n\n" f"<교열_모델_수정결과>\n{bulk_sep}\n" ) else: user_content = bulk.replace("\n", PARAGRAPH_SEP) elif is_ft_model: # FT models have instructions baked in — send user content only (no system prompt). # Sending a system prompt causes the Upstage API to return a 500 error for this model. system_prompt = "" user_content = bulk else: system_prompt = FT_SYSTEM_PROMPT user_content = bulk response = call_llm( client=client, system_prompt=system_prompt, user_content=user_content, model=step_model, reasoning_effort=reasoning_effort, max_tokens=max_tokens, ) # FT model returns raw text (no JSON). Other steps return JSON with "output" field. if is_ft_model: # FT duplication guard. Symptoms seen in the wild: # (a) Half-half echo: model returns input twice (length ~2×). # (b) Prefix echo: model returns full input verbatim then appends a # partial re-correction tail (length ~1.2–1.5×). # Both yield a downstream "duplicated paragraph" bug, so we trigger the # guard at 1.25× and try several recovery patterns before falling back # to the original input. if response and len(response) >= len(bulk) * 1.25: stripped = response.strip() bulk_stripped = bulk.strip() # (b) Output starts with full input verbatim → strip the appended tail. if ( bulk_stripped and stripped.startswith(bulk_stripped) and len(stripped) > len(bulk_stripped) * 1.05 ): return bulk_stripped # (a) Half-half echo. first_end = len(stripped) // 2 head = stripped[:first_end].rstrip() tail = stripped[first_end:].strip() if head and (head == tail or head in tail or tail in head): return head # Unknown duplication pattern → safer to return the input. return bulk return response # Fallback to bulk when LLM returns non-JSON hallucination — matches # reference pipeline semantics (parse_step_output in refer/.../inference/run.py). extracted = extract_json_output(response, fallback=bulk) # PARAGRAPH_SEP을 다시 \n으로 복원. 이후의 중복 가드/다운스트림 step은 모두 # 평문 \n을 기대한다 (rule step들이 토큰을 LCS/정규식 # 경로에서 처리하지 못할 수 있어 LLM boundary에서만 존재하도록 유지). if prompt_key: extracted = extracted.replace(PARAGRAPH_SEP, "\n") # step1 (basic_correction) hallucination guard: the prompt feeds the LLM # both <원문> and <교열_모델_수정결과>, and occasionally the model concatenates # corrected + original (or two slight variants) into one "output" string. # Symptom: result length ≈ 2× input. Detect by locating a second occurrence # of the input's leading sentence (or original_bulk's leading sentence) # inside the extracted output — strong signal of duplication. if extracted and len(extracted) >= len(bulk) * 1.4: stripped = extracted.strip() def _leading_lookup(needle_src: str) -> str | None: head = needle_src.strip()[:80] if len(head) < 30: return None second = stripped.find(head, len(head)) if second > len(head): return stripped[:second].rstrip() return None recovered = _leading_lookup(bulk) or _leading_lookup(original_bulk) if recovered: return recovered # No clean recovery — keep the FT-pass output rather than the bloated mess. return bulk return extracted def _process_single_bulk_with_raw( *, bulk: str, step: dict, model: str, prompts: dict[str, str], client: OpenAI, original_bulk: str, ) -> tuple[str, str]: """Run a non-FT LLM step on one bulk and return (post_text, raw_response). Used by v24's self-consistency wrapper, which needs both the extracted text AND the raw JSON (so it can intersect ``corrections`` arrays across runs). Mirrors the LLM branch of ``_process_single_bulk`` but skips the FT-specific duplication guard. The hallucination guard at ``len >= 1.4×`` is preserved. Returns ``("", "")`` on call failure rather than raising — the caller falls back to per-step error handling. """ step_model = step.get("model", model) prompt_key = step.get("prompt_key") reasoning_effort = step.get("reasoning_effort") max_tokens = step.get("max_tokens") response_format = step.get("response_format") temperature = step.get("temperature") if prompt_key: system_prompt = prompts.get(prompt_key, "") if prompt_key == "step1": original_sep = original_bulk.replace("\n", PARAGRAPH_SEP) bulk_sep = bulk.replace("\n", PARAGRAPH_SEP) user_content = ( f"<원문>\n{original_sep}\n\n\n" f"<교열_모델_수정결과>\n{bulk_sep}\n" ) else: user_content = bulk.replace("\n", PARAGRAPH_SEP) else: system_prompt = FT_SYSTEM_PROMPT user_content = bulk call_kwargs: dict = { "client": client, "system_prompt": system_prompt, "user_content": user_content, "model": step_model, "reasoning_effort": reasoning_effort, "max_tokens": max_tokens, "response_format": response_format, } if temperature is not None: call_kwargs["temperature"] = temperature try: raw = call_llm(**{k: v for k, v in call_kwargs.items() if v is not None}) except Exception as exc: # noqa: BLE001 logger.warning( "[_process_single_bulk_with_raw] call_llm failed: %s", exc ) return ("", "") extracted = extract_json_output(raw, fallback=bulk) if prompt_key: extracted = extracted.replace(PARAGRAPH_SEP, "\n") # Hallucination guard — same shape as _process_single_bulk's LLM branch. if extracted and len(extracted) >= len(bulk) * 1.4: stripped = extracted.strip() def _leading_lookup(needle_src: str) -> str | None: head = needle_src.strip()[:80] if len(head) < 30: return None second = stripped.find(head, len(head)) if second > len(head): return stripped[:second].rstrip() return None recovered = _leading_lookup(bulk) or _leading_lookup(original_bulk) if recovered: extracted = recovered else: extracted = bulk return (extracted, raw) def process_bulks_parallel( bulks: list[str], original_bulks: list[str], step: dict, model: str, prompts: dict[str, str], client: OpenAI, max_workers: int = 10, ) -> list[str]: """Process multiple text bulks in parallel. Args: bulks: List of text bulks (current pipeline state). original_bulks: Per-bulk pre-correction text, paired 1:1 with ``bulks``. Used as <원문> in step1 and as fallback anchor in the hallucination guard. step: Step definition. model: Model name. prompts: Loaded prompt texts. client: OpenAI client. max_workers: Maximum thread pool workers. Returns: List of processed bulks in original order. """ if len(bulks) != len(original_bulks): raise ValueError( f"bulks/original_bulks length mismatch: {len(bulks)} vs {len(original_bulks)}" ) if len(bulks) <= 1: return [ _process_single_bulk(bulks[0], step, model, prompts, client, original_bulks[0]) ] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit( _process_single_bulk, bulk, step, model, prompts, client, orig_bulk ) for bulk, orig_bulk in zip(bulks, original_bulks) ] return [f.result() for f in futures] # --- Main pipeline execution --- def run_pipeline( text: str, pipeline_name: str, model: str, prompt_name: str, client: OpenAI, vocabulary: list[dict[str, str]], ) -> dict: """Execute a full pipeline on input text. Args: text: Input text to process. pipeline_name: Name of the pipeline to run. model: Default model name for LLM steps. prompt_name: Prompt version directory name. client: OpenAI client. vocabulary: Loaded vocabulary list. Returns: Dict with 'output' (processed text) and 'processing_time' (seconds). """ if not text or not text.strip(): return {"output": text, "processing_time": 0.0} steps = PIPELINES[pipeline_name] prompts = load_prompts(prompt_name) original_text = text # v24 uses bulk-indexed scratch state (per-bulk corrections + judge # decisions). Reset at the start of each request so concurrent users # don't see each other's data. _v24.reset_v24_state() # Locate v24's PMI table (the demo bundles a top-K subset). Activate # only if the pipeline declares one — otherwise the lookup is unused. for s in steps: pmi_path = s.get("pmi_lookup") if pmi_path: full_path = Path(__file__).parent / pmi_path try: _v24.set_active_pmi_path(full_path) except Exception as exc: # noqa: BLE001 logger.warning("[v24] PMI load failed at %s: %s", full_path, exc) break start_time = time.time() # step1(basic_correction)의 <원문> 소스. 기존 파이프라인(run.py)에서 step1은 # 같은 bulk의 post-vocab / pre-FT 텍스트(= step0)를 <원문>으로 받았다. # 여기서도 FT 직전 text를 스냅샷으로 잡아두고, step1 실행 시 현재 bulk들과 # 같은 splitter로 쪼개서 index-align된 per-bulk 원문을 넘긴다. pre_ft_snapshot = text # Per-step fallback: if any step raises, keep the previous `text` and # move on. A single failing step (transient API issue, template # mismatch, empty response in mid-pipeline) shouldn't nuke the whole # output. Specialist steps already had this guard; now rule/llm do too. step_errors: list[str] = [] for step in steps: prev_text = text step_name = step.get("name", step["type"]) try: # v24 apply_judge_decisions handled BEFORE the generic rule # branch so the per-bulk revert path (which needs index-aligned # bulks) wins over apply_rule's no-op. if step["type"] == "rule" and step.get("rule") == "apply_judge_decisions": cfg = step.get("config", {}) bulks = split_into_bulks(text) if bulks: judge_step_name = cfg.get("judge_step", "tool_calling_judge") corr_step_name = cfg.get("corrections_step", "basic_correction") reverted: list[str] = [] for i, b in enumerate(bulks): try: reverted.append( _v24.apply_judge_decisions_for_bulk( bulk_idx=i, bulk=b, judge_step_name=judge_step_name, corrections_step_name=corr_step_name, ) ) except Exception as exc: # noqa: BLE001 logger.warning( "[apply_judge_decisions] bulk %d failed (%s); keeping", i, exc, ) reverted.append(b) text = "\n".join(reverted) elif step["type"] == "rule": text = apply_rule(step, text, original_text, vocabulary) elif step["type"] == "llm": step_model_name = step.get("model", model) # FT 직전 text를 스냅샷 — 다음 step1이 이를 <원문>으로 소비한다. if step_model_name.startswith("ft:"): pre_ft_snapshot = text bulks = split_into_bulks(text) if bulks: # step1은 pre-FT 스냅샷을 per-bulk 원문으로 사용. # 그 외 LLM step은 <원문>을 쓰지 않으므로 현재 bulk을 그대로 # 원문으로 넘겨도 무해(hallucination 가드의 fallback 앵커로만 쓰임). if step.get("prompt_key") == "step1": orig_candidate = split_into_bulks(pre_ft_snapshot) if len(orig_candidate) == len(bulks): original_bulks = orig_candidate else: # Splitter 출력이 FT 전후로 어긋난 경우(문단 수 변동 등): # chunk-wide pre-FT 텍스트를 모든 bulk에 공통으로 사용 → # 기존 동작과 동일. 적어도 retreat, never regress. original_bulks = [pre_ft_snapshot] * len(bulks) else: original_bulks = list(bulks) processed = process_bulks_parallel( bulks, original_bulks, step, model, prompts, client ) # Bulk 경계는 원본에서 \n 로 분리돼 있었으므로 합칠 때도 \n 로 # 연결한다. "".join 으로 붙이면 인접 bulk 의 문단이 한 줄로 # 뭉개지고, 이후 split_into_bulks 재실행 시 문단 수가 줄어 # step1 의 <원문>↔<교열_모델_수정결과> 정렬이 어긋나며 구조 # 보존 규칙까지 깨진다 (참조: run.py:326 "\n".join). text = "\n".join( str(p) if not isinstance(p, str) else p for p in processed ) elif step["type"] == "specialist": text = run_specialist(step, text, original_text, prompts, model, client) elif step["type"] == "llm_self_consistency": # v24 basic_correction: per-bulk × N runs, intersect corrections. # Splits like a normal LLM step (per-paragraph bulks), but each # bulk runs through pipelines_v24.run_self_consistency_for_bulk # which calls _process_single_bulk_with_raw N times and emits # the synthesized post-text + raw JSON. bulks = split_into_bulks(text) if bulks: if step.get("prompt_key") == "step1": orig_candidate = split_into_bulks(pre_ft_snapshot) if len(orig_candidate) == len(bulks): original_bulks = orig_candidate else: original_bulks = [pre_ft_snapshot] * len(bulks) else: original_bulks = list(bulks) def _sc_one(idx_bulk): i, b = idx_bulk return _v24.run_self_consistency_for_bulk( bulk_idx=i, bulk=b, original_bulk=original_bulks[i], step=step, model=model, prompts=prompts, client=client, process_single_bulk_fn=_process_single_bulk_with_raw, ) if len(bulks) == 1: processed = [_sc_one((0, bulks[0]))] else: with ThreadPoolExecutor(max_workers=10) as ex: processed = list( ex.map(_sc_one, list(enumerate(bulks))) ) text = "\n".join( str(p) if not isinstance(p, str) else p for p in processed ) elif step["type"] == "tool_calling_judge": # v24 judge: per-bulk 2-round emulation. Decisions are stashed # in _JUDGE_RAW_BY_BULK keyed by bulk index. text passes # through unchanged — the apply_judge_decisions rule reverts. bulks = split_into_bulks(text) if bulks: # Always anchor on pre-FT snapshot for <원문> since the # judge is reasoning about whether each correction was # over-correction relative to the source article. orig_candidate = split_into_bulks(pre_ft_snapshot) if len(orig_candidate) == len(bulks): original_bulks = orig_candidate else: original_bulks = [pre_ft_snapshot] * len(bulks) corrections_step = step.get("corrections_step", "basic_correction") def _judge_one(idx_bulk): i, b = idx_bulk return _v24.run_tool_calling_judge_for_bulk( bulk_idx=i, bulk=b, original_bulk=original_bulks[i], corrections_step_name=corrections_step, step=step, model=model, prompts=prompts, client=client, call_llm_fn=call_llm, ) if len(bulks) == 1: _judge_one((0, bulks[0])) else: with ThreadPoolExecutor(max_workers=10) as ex: list(ex.map(_judge_one, list(enumerate(bulks)))) # text unchanged # If a step produced an unusably short output (< 10% of input), the # model likely truncated or refused. Treat as no-op so downstream # steps see a sensible baseline. if ( step["type"] in ("llm", "specialist", "llm_self_consistency") and len(prev_text) > 50 and len(text) < len(prev_text) * 0.1 ): logger.warning( "Step '%s' produced suspiciously short output (%d → %d chars); " "falling back to previous text.", step_name, len(prev_text), len(text), ) text = prev_text step_errors.append(f"{step_name}: short_output") except Exception as e: logger.exception( "Step '%s' failed (%s); falling back to previous text and continuing.", step_name, type(e).__name__, ) text = prev_text step_errors.append(f"{step_name}: {type(e).__name__}") elapsed = time.time() - start_time return { "output": text, "processing_time": elapsed, "step_errors": step_errors, }