|
|
import re |
|
|
import os |
|
|
from typing import List |
|
|
|
|
|
def split_text_into_chunks(text: str, max_chars: int = 256) -> List[str]: |
|
|
""" |
|
|
Split raw text into chunks no longer than max_chars. |
|
|
""" |
|
|
|
|
|
paragraphs = re.split(r"[\r\n]+", text.strip()) |
|
|
final_chunks = [] |
|
|
|
|
|
for para in paragraphs: |
|
|
para = para.strip() |
|
|
if not para: |
|
|
continue |
|
|
|
|
|
|
|
|
sentences = re.split(r"(?<=[\.\!\?\…])\s+", para) |
|
|
|
|
|
buffer = "" |
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() |
|
|
if not sentence: |
|
|
continue |
|
|
|
|
|
|
|
|
if len(sentence) > max_chars: |
|
|
|
|
|
if buffer: |
|
|
final_chunks.append(buffer) |
|
|
buffer = "" |
|
|
|
|
|
|
|
|
sub_parts = re.split(r"(?<=[\,\;\:\-\–\—])\s+", sentence) |
|
|
for part in sub_parts: |
|
|
part = part.strip() |
|
|
if not part: continue |
|
|
|
|
|
if len(buffer) + 1 + len(part) <= max_chars: |
|
|
buffer = (buffer + " " + part) if buffer else part |
|
|
else: |
|
|
if buffer: final_chunks.append(buffer) |
|
|
buffer = part |
|
|
|
|
|
|
|
|
if len(buffer) > max_chars: |
|
|
words = buffer.split() |
|
|
current = "" |
|
|
for word in words: |
|
|
if current and len(current) + 1 + len(word) > max_chars: |
|
|
final_chunks.append(current) |
|
|
current = word |
|
|
else: |
|
|
current = (current + " " + word) if current else word |
|
|
buffer = current |
|
|
else: |
|
|
|
|
|
if buffer and len(buffer) + 1 + len(sentence) > max_chars: |
|
|
final_chunks.append(buffer) |
|
|
buffer = sentence |
|
|
else: |
|
|
buffer = (buffer + " " + sentence) if buffer else sentence |
|
|
|
|
|
|
|
|
if buffer: |
|
|
final_chunks.append(buffer) |
|
|
buffer = "" |
|
|
|
|
|
return [c.strip() for c in final_chunks if c.strip()] |
|
|
|
|
|
def env_bool(name: str, default: bool = False) -> bool: |
|
|
v = os.getenv(name) |
|
|
if v is None: |
|
|
return default |
|
|
return v.strip().lower() in ("1", "true", "yes", "y", "on") |