bielik_app_service / app /logic /infill_utils.py
Patryk Studzinski
Improve Polish grammar in infill prompt + remove debug logs
14fc89e
raw
history blame
7.54 kB
"""
Infill Utilities for Batch Gap-Filling
Handles gap detection, JSON parsing from LLM output, and text reconstruction.
Gap Notation Support:
- [GAP:n]: Explicit numbered gaps (preferred)
- ___: Underscores (auto-numbered in scan order)
FUTURE: Chunking Support
-------------------------
For texts exceeding ~2000 tokens (approx 6000 chars), implement per-gap prompting:
1. Split text into chunks preserving gap context (±150 tokens around each gap)
2. Process each gap individually with left/right context
3. Merge results back into full text
4. This avoids context window overflow on smaller models (2k-4k context)
Current implementation assumes texts fit within model context window.
Add chunking when processing long-form content (articles, full listings).
"""
import re
import json
from typing import List, Optional, Tuple
from dataclasses import dataclass
@dataclass
class GapInfo:
"""Information about a detected gap in text."""
index: int # 1-based index
marker: str # Original marker string
start: int # Start position in text
end: int # End position in text
def detect_gaps(text: str, notation: str = "auto") -> List[GapInfo]:
"""
Detect gaps in text and return their positions.
Args:
text: Input text with gap markers
notation: "auto", "[GAP:n]", or "___"
Returns:
List of GapInfo objects sorted by position
Examples:
>>> detect_gaps("Buy this [GAP:1] car with [GAP:2] features")
[GapInfo(index=1, marker='[GAP:1]', ...), GapInfo(index=2, marker='[GAP:2]', ...)]
>>> detect_gaps("Buy this ___ car with ___ features")
[GapInfo(index=1, marker='___', ...), GapInfo(index=2, marker='___', ...)]
"""
gaps = []
# Pattern for [GAP:n] notation
gap_tag_pattern = r'\[GAP:(\d+)\]'
# Pattern for underscore notation (3+ underscores)
underscore_pattern = r'_{3,}'
if notation == "auto":
# Try [GAP:n] first, fallback to ___
gap_matches = list(re.finditer(gap_tag_pattern, text))
if gap_matches:
notation = "[GAP:n]"
else:
notation = "___"
if notation == "[GAP:n]":
for match in re.finditer(gap_tag_pattern, text):
gaps.append(GapInfo(
index=int(match.group(1)),
marker=match.group(0),
start=match.start(),
end=match.end()
))
else: # "___"
for i, match in enumerate(re.finditer(underscore_pattern, text), start=1):
gaps.append(GapInfo(
index=i,
marker=match.group(0),
start=match.start(),
end=match.end()
))
# Sort by position (should already be, but ensure)
gaps.sort(key=lambda g: g.start)
return gaps
def parse_infill_json(raw_output: str) -> Optional[dict]:
"""
Extract and parse JSON from LLM output.
Handles common LLM quirks:
- JSON wrapped in markdown code blocks
- Leading/trailing text before/after JSON
- Function-call style wrapper ({"name": "...", "arguments": {...}})
- Double-escaped JSON strings in arguments field
- Minor formatting issues
Returns:
Parsed dict with 'filled_text' and 'gaps' keys, or None if parsing fails
"""
if not raw_output:
return None
# Try to extract JSON from markdown code blocks
json_block_pattern = r'```(?:json)?\s*([\s\S]*?)\s*```'
match = re.search(json_block_pattern, raw_output)
if match:
raw_output = match.group(1)
# Find JSON object boundaries
start_idx = raw_output.find('{')
if start_idx == -1:
return None
# Find matching closing brace
depth = 0
end_idx = -1
for i, char in enumerate(raw_output[start_idx:], start=start_idx):
if char == '{':
depth += 1
elif char == '}':
depth -= 1
if depth == 0:
end_idx = i + 1
break
if end_idx == -1:
return None
json_str = raw_output[start_idx:end_idx]
try:
parsed = json.loads(json_str)
# Handle function-call style wrapper with STRING arguments (double-escaped):
# {"name": "fill_in_text", "arguments": "{\"filled_text\": \"...\"}"}
if 'arguments' in parsed:
args = parsed['arguments']
if isinstance(args, str):
try:
parsed = json.loads(args)
except json.JSONDecodeError:
return None
elif isinstance(args, dict):
parsed = args
# Also handle: {"name": "...", "parameters": {...}}
if 'parameters' in parsed:
params = parsed['parameters']
if isinstance(params, str):
try:
parsed = json.loads(params)
except json.JSONDecodeError:
return None
elif isinstance(params, dict):
parsed = params
# Validate required fields
if 'filled_text' not in parsed and 'gaps' not in parsed:
return None
return parsed
except json.JSONDecodeError:
return None
def apply_fills(original_text: str, gaps: List[GapInfo], fills: dict) -> str:
"""
Apply gap fills to original text.
Uses fills from parsed JSON, replacing markers with chosen words.
This is a fallback when LLM's 'filled_text' might be corrupted.
Args:
original_text: Original text with gap markers
gaps: Detected gaps from detect_gaps()
fills: Dict mapping gap index to fill choice
e.g., {1: "excellent", 2: "powerful"}
Returns:
Text with gaps replaced by fill choices
"""
if not gaps or not fills:
return original_text
# Process from end to start to preserve positions
result = original_text
for gap in reversed(gaps):
if gap.index in fills:
result = result[:gap.start] + fills[gap.index] + result[gap.end:]
return result
def build_fills_dict(gaps_list: List[dict]) -> dict:
"""
Convert gaps list from JSON to fills dict.
Args:
gaps_list: List of gap dicts from parsed JSON
[{"index": 1, "choice": "word"}, ...]
Returns:
Dict mapping index to choice: {1: "word", ...}
"""
fills = {}
for gap in gaps_list:
if 'index' in gap and 'choice' in gap:
fills[gap['index']] = gap['choice']
return fills
def normalize_gaps_to_tagged(text: str) -> Tuple[str, List[GapInfo]]:
"""
Normalize any gap notation to [GAP:n] format.
Useful for standardizing input before processing.
Args:
text: Text with any gap notation
Returns:
Tuple of (normalized_text, gaps)
"""
gaps = detect_gaps(text, "auto")
if not gaps:
return text, []
# If already [GAP:n], return as-is
if gaps[0].marker.startswith('[GAP:'):
return text, gaps
# Convert ___ to [GAP:n]
result = text
for gap in reversed(gaps):
new_marker = f"[GAP:{gap.index}]"
result = result[:gap.start] + new_marker + result[gap.end:]
# Re-detect with new positions
return result, detect_gaps(result, "[GAP:n]")