Spaces:
Running
Running
| import base64, copy, hashlib, json, math, os, re, struct, time, unicodedata, cv2, httpx, numpy as np, budoux | |
| from urllib.parse import parse_qs, urlencode, urlparse | |
| from PIL import Image, ImageChops, ImageDraw, ImageFilter, ImageFont | |
| IMAGE_PATH = "33.jpg" | |
| OUT_JSON = "output.json" | |
| LANG = "th" | |
| AI_API_KEY = os.getenv("AI_API_KEY", "").strip() | |
| FIREBASE_URL = "https://cookie-6e1cd-default-rtdb.asia-southeast1.firebasedatabase.app/lens/cookie.json" | |
| WRITE_OUT_JSON = True | |
| DECODE_IMAGEURL_TO_DATAURI = True | |
| DO_ORIGINAL = True | |
| DO_TRANSLATED = True | |
| DO_ORIGINAL_HTML = True | |
| DO_TRANSLATED_HTML = True | |
| DO_AI_HTML = True | |
| HTML_INCLUDE_CSS = True | |
| DRAW_OVERLAY_ORIGINAL = False | |
| DRAW_OVERLAY_TRANSLATED = False | |
| OVERLAY_ORIGINAL_PATH = "overlay_original.png" | |
| OVERLAY_TRANSLATED_PATH = "overlay_translated.png" | |
| TRANSLATED_OVERLAY_FONT_SCALE = 1.0 | |
| TRANSLATED_OVERLAY_FIT_TO_BOX = True | |
| AI_OVERLAY_FONT_SCALE = 1.5 | |
| AI_OVERLAY_FIT_TO_BOX = True | |
| DO_AI = True | |
| DO_AI_JSON = False | |
| DO_AI_OVERLAY = False | |
| AI_CACHE = False | |
| AI_CACHE_PATH = "ai_cache.json" | |
| AI_PATH_OVERLAY = "overlay_ai.png" | |
| AI_PROVIDER = "auto" | |
| AI_MODEL = "auto" | |
| AI_BASE_URL = "auto" | |
| AI_TEMPERATURE = 0.2 | |
| AI_MAX_TOKENS = 1200 | |
| AI_TIMEOUT_SEC = 120 | |
| DRAW_BOX_OUTLINE = True | |
| AUTO_TEXT_COLOR = True | |
| TEXT_COLOR = (0, 0, 0, 255) | |
| TEXT_COLOR_DARK = (0, 0, 0, 255) | |
| TEXT_COLOR_LIGHT = (255, 255, 255, 255) | |
| BOX_OUTLINE = (0, 255, 0, 255) | |
| BOX_OUTLINE_WIDTH = 2 | |
| DRAW_OUTLINE_PARA = False | |
| DRAW_OUTLINE_ITEM = False | |
| DRAW_OUTLINE_SPAN = False | |
| PARA_OUTLINE = (0, 0, 255, 255) | |
| ITEM_OUTLINE = (255, 0, 0, 255) | |
| SPAN_OUTLINE = BOX_OUTLINE | |
| PARA_OUTLINE_WIDTH = 3 | |
| ITEM_OUTLINE_WIDTH = 2 | |
| SPAN_OUTLINE_WIDTH = BOX_OUTLINE_WIDTH | |
| ERASE_OLD_TEXT_WITH_ORIGINAL_BOXES = True | |
| ERASE_PADDING_PX = 2 | |
| ERASE_SAMPLE_MARGIN_PX = 6 | |
| ERASE_MODE = "inpaint" | |
| ERASE_MOSAIC_BLOCK_PX = 10 | |
| ERASE_CLONE_GAP_PX = 4 | |
| ERASE_CLONE_BORDER_PX = 6 | |
| ERASE_CLONE_FEATHER_PX = 3 | |
| ERASE_BLEND_GAP_PX = 3 | |
| ERASE_BLEND_FEATHER_PX = 4 | |
| INPAINT_RADIUS = 3 | |
| INPAINT_METHOD = "telea" | |
| INPAINT_DILATE_PX = 1 | |
| BG_SAMPLE_BORDER_PX = 3 | |
| BASELINE_SHIFT = True | |
| BASELINE_SHIFT_FACTOR = 0.40 | |
| FONT_DOWNLOD = True | |
| FONT_THAI_PATH = "NotoSansThai-Regular.ttf" | |
| FONT_LATIN_PATH = "NotoSans-Regular.ttf" | |
| FONT_THAI_URLS = [ | |
| "https://github.com/google/fonts/raw/main/ofl/notosansthai/NotoSansThai-Regular.ttf", | |
| "https://github.com/google/fonts/raw/main/ofl/notosansthaiui/NotoSansThaiUI-Regular.ttf", | |
| ] | |
| FONT_LATIN_URLS = [ | |
| "https://github.com/google/fonts/raw/main/ofl/notosans/NotoSans-Regular.ttf", | |
| ] | |
| FONT_JA_PATH = "NotoSansCJKjp-Regular.otf" | |
| FONT_JA_URLS = [ | |
| "https://raw.githubusercontent.com/googlefonts/noto-cjk/main/Sans/OTF/Japanese/NotoSansCJKjp-Regular.otf", | |
| "https://github.com/googlefonts/noto-cjk/raw/main/Sans/OTF/Japanese/NotoSansCJKjp-Regular.otf", | |
| ] | |
| FONT_ZH_SC_PATH = "NotoSansCJKsc-Regular.otf" | |
| FONT_ZH_SC_URLS = [ | |
| "https://raw.githubusercontent.com/googlefonts/noto-cjk/main/Sans/OTF/SimplifiedChinese/NotoSansCJKsc-Regular.otf", | |
| "https://github.com/googlefonts/noto-cjk/raw/main/Sans/OTF/SimplifiedChinese/NotoSansCJKsc-Regular.otf", | |
| ] | |
| FONT_ZH_TC_PATH = "NotoSansCJKtc-Regular.otf" | |
| FONT_ZH_TC_URLS = [ | |
| "https://raw.githubusercontent.com/googlefonts/noto-cjk/main/Sans/OTF/TraditionalChinese/NotoSansCJKtc-Regular.otf", | |
| "https://github.com/googlefonts/noto-cjk/raw/main/Sans/OTF/TraditionalChinese/NotoSansCJKtc-Regular.otf", | |
| ] | |
| UI_LANGUAGES = [ | |
| {"code": "en", "name": "English"}, | |
| {"code": "th", "name": "Thai"}, | |
| {"code": "ja", "name": "Japanese"}, | |
| {"code": "ko", "name": "Korean"}, | |
| {"code": "zh-CN", "name": "Chinese (Simplified)"}, | |
| {"code": "vi", "name": "Vietnamese"}, | |
| {"code": "es", "name": "Spanish"}, | |
| {"code": "de", "name": "German"}, | |
| {"code": "fr", "name": "French"}, | |
| ] | |
| AI_PROVIDER_DEFAULTS = { | |
| "gemini": { | |
| "model": "gemini-2.5-flash", | |
| "base_url": "", | |
| }, | |
| "openai": { | |
| "model": "gpt-4o-mini", | |
| "base_url": "https://api.openai.com/v1", | |
| }, | |
| "openrouter": { | |
| "model": "openai/o4-mini", | |
| "base_url": "https://openrouter.ai/api/v1", | |
| }, | |
| "huggingface": { | |
| "model": "google/gemma-2-2b-it", | |
| "base_url": "https://router.huggingface.co/v1", | |
| }, | |
| "featherless": { | |
| "model": "Qwen/Qwen2.5-7B-Instruct", | |
| "base_url": "https://api.featherless.ai/v1", | |
| }, | |
| "groq": { | |
| "model": "openai/gpt-oss-20b", | |
| "base_url": "https://api.groq.com/openai/v1", | |
| }, | |
| "together": { | |
| "model": "openai/gpt-oss-20b", | |
| "base_url": "https://api.together.xyz/v1", | |
| }, | |
| "deepseek": { | |
| "model": "deepseek-chat", | |
| "base_url": "https://api.deepseek.com/v1", | |
| }, | |
| "anthropic": { | |
| "model": "claude-sonnet-4-20250514", | |
| "base_url": "https://api.anthropic.com", | |
| }, | |
| } | |
| AI_PROVIDER_ALIASES = { | |
| "hf": "huggingface", | |
| "huggingface_router": "huggingface", | |
| "hf_router": "huggingface", | |
| "openai_compat": "openai", | |
| "openai-compatible": "openai", | |
| "gemini3": "gemini", | |
| "gemini-3": "gemini", | |
| "google": "gemini", | |
| } | |
| AI_MODEL_ALIASES = { | |
| "gemini": { | |
| "flash-lite": "gemini-2.5-flash-lite", | |
| "flash": "gemini-2.5-flash", | |
| "pro": "gemini-2.5-pro", | |
| "3-flash": "gemini-3-flash-preview", | |
| "3-pro": "gemini-3-pro-preview", | |
| "3-pro-image": "gemini-3-pro-image-preview", | |
| "flash-image": "gemini-2.5-flash-image", | |
| } | |
| } | |
| AI_PROMPT_SYSTEM_BASE = ( | |
| "You are a professional manga translator and dialogue localizer.\n" | |
| "Rewrite each paragraph as natural dialogue in the target language while preserving meaning, tone, intent, and character voice.\n" | |
| "Keep lines concise for speech bubbles. Do not add new information. Do not omit meaning. Do not explain.\n" | |
| "Preserve emphasis (… ! ?). Avoid excessive punctuation.\n" | |
| "If the input is already in the target language, improve it (dialogue polish) without changing meaning." | |
| ) | |
| AI_LANG_STYLE = { | |
| "th": ( | |
| "Target language: Thai\\n" | |
| "Write Thai manga dialogue that reads like a high-quality Thai scanlation: natural, concise, and in-character.\\n" | |
| "Keep lines short for speech bubbles; avoid stiff, literal phrasing.\\n" | |
| "Default: omit pronouns and omit gendered polite sentence-final particles unless the source line clearly requires them.\\n" | |
| "Never use the word 'ฉัน'. Prefer omitting the subject.\\n" | |
| "Never use a male-coded second-person pronoun. When addressing someone by name, do not add a second-person pronoun after the name; prefer NAME + clause.\\n" | |
| "If a second-person reference is unavoidable, use a neutral/casual form appropriate to tone, but keep it gender-neutral and consistent with the line.\\n" | |
| "Use particles/interjections sparingly to match tone; do not overuse.\\n" | |
| "Keep names/terms consistent; transliterate when appropriate.\\n" | |
| "Output only the translated text." | |
| ), | |
| "en": ( | |
| "Target language: English\n" | |
| "Write natural English manga dialogue: concise, conversational, with contractions where natural.\n" | |
| "Localize tone and character voice; keep emotion and emphasis.\n" | |
| "Keep proper nouns consistent; do not over-explain." | |
| ), | |
| "ja": ( | |
| "Target language: Japanese\n" | |
| "Write natural Japanese manga dialogue: concise, spoken.\n" | |
| "Choose 丁寧語/タメ口 to match context; keep emotion and emphasis.\n" | |
| "Keep proper nouns consistent; keep SFX natural in Japanese." | |
| ), | |
| "default": ( | |
| "Write natural manga dialogue in the target language: concise, spoken, faithful to meaning and tone." | |
| ), | |
| } | |
| AI_PROMPT_RESPONSE_CONTRACT_JSON = ( | |
| "Return ONLY valid JSON (no markdown, no extra text).\n" | |
| "Output JSON MUST have exactly one key: \"aiTextFull\".\n" | |
| "\"aiTextFull\" MUST be a single JSON string WITHOUT raw newlines.\n" | |
| "Use literal \\n and \\n\\n to represent line breaks.\n" | |
| "You MUST preserve paragraph boundaries and order. Paragraphs are separated by a blank line (\\n\\n).\n" | |
| "Do NOT add extra paragraphs. Do NOT remove paragraphs.\n" | |
| "Never include code fences or XML/HTML tags.\n" | |
| "All string values MUST NOT contain raw newlines." | |
| ) | |
| AI_PROMPT_RESPONSE_CONTRACT_TEXT = ( | |
| "Return ONLY the translated text (no JSON, no markdown, no commentary).\n" | |
| "You MUST preserve paragraph boundaries and order. Paragraphs are separated by a blank line.\n" | |
| "Use actual newlines for line breaks.\n" | |
| "Do NOT add extra paragraphs. Do NOT remove paragraphs.\n" | |
| "Never include code fences or XML/HTML tags." | |
| ) | |
| AI_PROMPT_DATA_TEMPLATE = ( | |
| "Input JSON:\n{input_json}\n\n" | |
| "Output JSON schema (MUST match exactly):\n{output_schema}" | |
| ) | |
| AI_PROMPT_DATA_TEMPLATE_TEXT = ( | |
| "Input JSON:\n{input_json}\n\n" | |
| "Return the translation as plain text only." | |
| ) | |
| FIREBASE_COOKIE_TTL_SEC = int(os.getenv("FIREBASE_COOKIE_TTL_SEC", "900")) | |
| _FIREBASE_COOKIE_CACHE = {"ts": 0.0, "url": "", "data": None} | |
| _FONT_RESOLVE_CACHE = {} | |
| _HF_MODELS_CACHE = {} | |
| _FONT_PAIR_CACHE = {} | |
| _TP_HTML_EPS_PX = 0.0 | |
| ZWSP = "\u200b" | |
| def _active_ai_contract() -> str: | |
| return AI_PROMPT_RESPONSE_CONTRACT_JSON if DO_AI_JSON else AI_PROMPT_RESPONSE_CONTRACT_TEXT | |
| def _active_ai_data_template() -> str: | |
| return AI_PROMPT_DATA_TEMPLATE if DO_AI_JSON else AI_PROMPT_DATA_TEMPLATE_TEXT | |
| def _canonical_provider(provider: str) -> str: | |
| p = (provider or "").strip().lower() | |
| return AI_PROVIDER_ALIASES.get(p, p) | |
| def _resolve_model(provider: str, model: str) -> str: | |
| m = (model or "").strip() | |
| if not m or m.lower() == "auto": | |
| d = AI_PROVIDER_DEFAULTS.get(provider) or {} | |
| return (d.get("model") or "").strip() or AI_PROVIDER_DEFAULTS["openai"]["model"] | |
| key = m.lower() | |
| aliases = AI_MODEL_ALIASES.get(provider) or {} | |
| return aliases.get(key) or m | |
| def _normalize_lang(lang: str) -> str: | |
| t = (lang or "").strip().lower() | |
| if t in ("jp", "jpn", "japanese"): | |
| return "ja" | |
| if t in ("thai",): | |
| return "th" | |
| if t in ("eng", "english"): | |
| return "en" | |
| if t.startswith("zh"): | |
| return t | |
| if len(t) >= 2: | |
| return t[:2] | |
| return t | |
| def _sha1(s: str) -> str: | |
| return hashlib.sha1(s.encode("utf-8")).hexdigest() | |
| def _hf_router_available_models(api_key: str, base_url: str) -> list[str]: | |
| if not api_key or not base_url: | |
| return [] | |
| key = _sha1(f"{_sha1(api_key)}|{base_url}") | |
| now = time.time() | |
| cached = _HF_MODELS_CACHE.get(key) or {} | |
| if cached.get("ts") and now - float(cached["ts"]) < 3600 and isinstance(cached.get("models"), list): | |
| return cached["models"] | |
| url = base_url.rstrip("/") + "/models" | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| try: | |
| with httpx.Client(timeout=float(AI_TIMEOUT_SEC)) as client: | |
| r = client.get(url, headers=headers) | |
| r.raise_for_status() | |
| data = r.json() | |
| except Exception: | |
| return [] | |
| models = [] | |
| for m in (data.get("data") or []): | |
| mid = (m.get("id") if isinstance(m, dict) else None) | |
| if isinstance(mid, str) and mid.strip(): | |
| models.append(mid.strip()) | |
| _HF_MODELS_CACHE[key] = {"ts": now, "models": models} | |
| return models | |
| def _pick_hf_fallback_model(models: list[str]) -> str: | |
| if not models: | |
| return "" | |
| priority_substrings = ( | |
| "gemma-3", | |
| "gemma-2", | |
| "llama-3.1", | |
| "llama-3", | |
| "mistral", | |
| "qwen", | |
| "glm", | |
| ) | |
| lowered = [(m, m.lower()) for m in models] | |
| for sub in priority_substrings: | |
| for m, ml in lowered: | |
| if sub in ml and ("instruct" in ml or ml.endswith("-it") or ":" in ml): | |
| return m | |
| for m, ml in lowered: | |
| if "instruct" in ml or ml.endswith("-it") or ":" in ml: | |
| return m | |
| return models[0] | |
| def _load_ai_cache(path: str): | |
| if not path: | |
| return {} | |
| if not os.path.exists(path): | |
| return {} | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| d = json.load(f) | |
| return d if isinstance(d, dict) else {} | |
| except Exception: | |
| return {} | |
| def _save_ai_cache(path: str, cache: dict): | |
| if not path: | |
| return | |
| tmp = path + ".tmp" | |
| with open(tmp, "w", encoding="utf-8") as f: | |
| json.dump(cache, f, ensure_ascii=False) | |
| os.replace(tmp, path) | |
| def _build_ai_prompt_packet(target_lang: str, original_text_full: str): | |
| lang = _normalize_lang(target_lang) | |
| input_json = json.dumps( | |
| {"target_lang": lang, "originalTextFull": original_text_full}, ensure_ascii=False) | |
| output_schema = json.dumps({"aiTextFull": "..."}, ensure_ascii=False) | |
| data_template = _active_ai_data_template() | |
| if DO_AI_JSON: | |
| data_text = data_template.format( | |
| input_json=input_json, output_schema=output_schema) | |
| else: | |
| data_text = data_template.format(input_json=input_json) | |
| style = AI_LANG_STYLE.get(lang) or AI_LANG_STYLE.get("default") or "" | |
| system_parts = [AI_PROMPT_SYSTEM_BASE] | |
| if style: | |
| system_parts.append(style) | |
| system_parts.append(_active_ai_contract()) | |
| system_text = "\n\n".join([p for p in system_parts if p]) | |
| user_parts = [] | |
| user_parts.append(data_text) | |
| return system_text, user_parts | |
| def _gemini_generate_json(api_key: str, model: str, system_text: str, user_parts: list[str]): | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" | |
| parts = [{"text": p} for p in user_parts if (p or "").strip()] | |
| payload = { | |
| "systemInstruction": {"parts": [{"text": system_text}]}, | |
| "contents": [{"role": "user", "parts": parts}], | |
| "generationConfig": { | |
| "temperature": float(AI_TEMPERATURE), | |
| "maxOutputTokens": int(AI_MAX_TOKENS), | |
| "responseMimeType": "text/plain", | |
| }, | |
| } | |
| with httpx.Client(timeout=float(AI_TIMEOUT_SEC)) as client: | |
| r = client.post(url, json=payload) | |
| try: | |
| r.raise_for_status() | |
| except httpx.HTTPStatusError as e: | |
| raise Exception(f"Gemini HTTP {r.status_code}: {r.text}") from e | |
| data = r.json() | |
| candidates = data.get("candidates") or [] | |
| if not candidates: | |
| raise Exception("Gemini returned no candidates") | |
| c = (candidates[0].get("content") or {}) | |
| out_parts = c.get("parts") or [] | |
| if not out_parts: | |
| raise Exception("Gemini returned empty content parts") | |
| txt = "".join([str(p.get("text") or "") for p in out_parts]).strip() | |
| if not txt: | |
| raise Exception("Gemini returned empty text") | |
| return txt | |
| def _read_first_env(*names: str) -> str: | |
| for n in names: | |
| v = (os.environ.get(n) or "").strip() | |
| if v: | |
| return v | |
| return "" | |
| def _detect_ai_provider_from_key(api_key: str) -> str: | |
| k = (api_key or "").strip() | |
| if k.startswith("AIza"): | |
| return "gemini" | |
| if k.startswith("hf_"): | |
| return "huggingface" | |
| if k.startswith("sk-or-"): | |
| return "openrouter" | |
| if k.startswith("sk-ant-"): | |
| return "anthropic" | |
| if k.startswith("gsk_"): | |
| return "groq" | |
| return "openai" | |
| def _resolve_ai_config(): | |
| api_key = (AI_API_KEY or _read_first_env( | |
| "AI_API_KEY", | |
| "OPENAI_API_KEY", | |
| "HF_TOKEN", | |
| "HUGGINGFACEHUB_API_TOKEN", | |
| "GEMINI_API_KEY", | |
| "OPENROUTER_API_KEY", | |
| "FEATHERLESS_API_KEY", | |
| "GROQ_API_KEY", | |
| "TOGETHER_API_KEY", | |
| "DEEPSEEK_API_KEY", | |
| "ANTHROPIC_API_KEY", | |
| )).strip() | |
| provider = _canonical_provider((AI_PROVIDER or "auto")) | |
| model = (AI_MODEL or "auto").strip() | |
| base_url = (AI_BASE_URL or "auto").strip() | |
| if provider in ("", "auto"): | |
| provider = _canonical_provider(_detect_ai_provider_from_key(api_key)) | |
| preset = AI_PROVIDER_DEFAULTS.get(provider) or {} | |
| model = _resolve_model(provider, model) | |
| if base_url in ("", "auto"): | |
| base_url = (preset.get("base_url") or "").strip() | |
| if provider not in ("gemini", "anthropic"): | |
| if not base_url: | |
| base_url = (AI_PROVIDER_DEFAULTS.get("openai") or {}).get( | |
| "base_url") or "https://api.openai.com/v1" | |
| return provider, api_key, model, base_url | |
| def _openai_compat_generate_json(api_key: str, base_url: str, model: str, system_text: str, user_parts: list[str]): | |
| url = (base_url.rstrip("/") + "/chat/completions") | |
| messages = [{"role": "system", "content": system_text}] | |
| for p in user_parts: | |
| if (p or "").strip(): | |
| messages.append({"role": "user", "content": p}) | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": float(AI_TEMPERATURE), | |
| "max_tokens": int(AI_MAX_TOKENS), | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| used_model = model | |
| with httpx.Client(timeout=float(AI_TIMEOUT_SEC)) as client: | |
| r = client.post(url, json=payload, headers=headers) | |
| try: | |
| r.raise_for_status() | |
| data = r.json() | |
| except httpx.HTTPStatusError as e: | |
| if ( | |
| r.status_code == 400 | |
| and "router.huggingface.co" in (base_url or "") | |
| and ((AI_MODEL or "").strip().lower() in ("", "auto") or model == (AI_PROVIDER_DEFAULTS.get("huggingface") or {}).get("model")) | |
| ): | |
| try: | |
| err = r.json().get("error") or {} | |
| except Exception: | |
| err = {} | |
| if (err.get("code") or "") == "model_not_supported": | |
| models = _hf_router_available_models(api_key, base_url) | |
| fallback = _pick_hf_fallback_model(models) | |
| if fallback and fallback != model: | |
| payload["model"] = fallback | |
| used_model = fallback | |
| r2 = client.post(url, json=payload, headers=headers) | |
| try: | |
| r2.raise_for_status() | |
| except httpx.HTTPStatusError as e2: | |
| raise Exception( | |
| f"AI HTTP {r2.status_code}: {r2.text}") from e2 | |
| data = r2.json() | |
| else: | |
| preview = ", ".join(models[:8]) | |
| hint = f"\nAvailable models (first 8): {preview}" if preview else "" | |
| raise Exception( | |
| f"AI HTTP {r.status_code}: {r.text}{hint}") from e | |
| else: | |
| raise Exception( | |
| f"AI HTTP {r.status_code}: {r.text}") from e | |
| else: | |
| raise Exception(f"AI HTTP {r.status_code}: {r.text}") from e | |
| choices = data.get("choices") or [] | |
| if not choices: | |
| raise Exception("AI returned no choices") | |
| msg = (choices[0].get("message") or {}) | |
| txt = (msg.get("content") or "").strip() | |
| if not txt: | |
| raise Exception("AI returned empty text") | |
| return txt, used_model | |
| def _anthropic_generate_json(api_key: str, model: str, system_text: str, user_parts: list[str]): | |
| url = "https://api.anthropic.com/v1/messages" | |
| messages = [] | |
| for p in user_parts: | |
| if (p or "").strip(): | |
| messages.append({"role": "user", "content": p}) | |
| payload = { | |
| "model": model, | |
| "max_tokens": int(AI_MAX_TOKENS), | |
| "temperature": float(AI_TEMPERATURE), | |
| "system": system_text, | |
| "messages": messages, | |
| } | |
| headers = { | |
| "x-api-key": api_key, | |
| "content-type": "application/json", | |
| } | |
| with httpx.Client(timeout=float(AI_TIMEOUT_SEC)) as client: | |
| r = client.post(url, json=payload, headers=headers) | |
| try: | |
| r.raise_for_status() | |
| except httpx.HTTPStatusError as e: | |
| raise Exception(f"Anthropic HTTP {r.status_code}: {r.text}") from e | |
| data = r.json() | |
| content = data.get("content") or [] | |
| txt = "".join([(c.get("text") or "") for c in content if isinstance( | |
| c, dict) and c.get("type") == "text"]).strip() | |
| if not txt: | |
| raise Exception("Anthropic returned empty text") | |
| return txt | |
| def _strip_wrappers(s: str) -> str: | |
| t = (s or "").strip() | |
| if not t: | |
| return "" | |
| t = t.replace("\r\n", "\n").replace("\r", "\n") | |
| if "```" in t: | |
| t = re.sub(r"```[a-zA-Z0-9_-]*", "", t) | |
| t = t.replace("```", "") | |
| t = re.sub(r"</?AiTextFull>", "", t, flags=re.IGNORECASE).strip() | |
| return t | |
| def _sanitize_json_like_text(raw: str) -> str: | |
| t = _strip_wrappers(raw) | |
| if not t: | |
| return "" | |
| out = [] | |
| in_str = False | |
| esc = False | |
| run_ch = "" | |
| run_len = 0 | |
| def _flush_run(): | |
| nonlocal run_ch, run_len | |
| if run_len: | |
| out.append(run_ch * min(run_len, 3)) | |
| run_ch = "" | |
| run_len = 0 | |
| for ch in t: | |
| if in_str: | |
| if esc: | |
| _flush_run() | |
| out.append(ch) | |
| esc = False | |
| continue | |
| if ch == "\\": | |
| _flush_run() | |
| out.append(ch) | |
| esc = True | |
| continue | |
| if ch == '"': | |
| _flush_run() | |
| out.append(ch) | |
| in_str = False | |
| continue | |
| if ch == "\n": | |
| _flush_run() | |
| out.append("\\n") | |
| continue | |
| if ch == "\t": | |
| _flush_run() | |
| out.append("\\t") | |
| continue | |
| if ch == run_ch: | |
| run_len += 1 | |
| continue | |
| _flush_run() | |
| run_ch = ch | |
| run_len = 1 | |
| continue | |
| _flush_run() | |
| if ch == '"': | |
| out.append(ch) | |
| in_str = True | |
| esc = False | |
| continue | |
| out.append(ch) | |
| _flush_run() | |
| return "".join(out) | |
| def _extract_first_json(raw: str): | |
| t = _sanitize_json_like_text(raw) | |
| if not t: | |
| raise Exception("AI returned empty text") | |
| start = t.find("{") | |
| if start < 0: | |
| raise Exception("AI returned no JSON object") | |
| in_str = False | |
| esc = False | |
| depth = 0 | |
| json_start = None | |
| for i in range(start, len(t)): | |
| ch = t[i] | |
| if in_str: | |
| if esc: | |
| esc = False | |
| elif ch == "\\": | |
| esc = True | |
| elif ch == '"': | |
| in_str = False | |
| continue | |
| if ch == '"': | |
| in_str = True | |
| continue | |
| if ch == "{": | |
| if depth == 0: | |
| json_start = i | |
| depth += 1 | |
| continue | |
| if ch == "}": | |
| if depth > 0: | |
| depth -= 1 | |
| if depth == 0 and json_start is not None: | |
| cand = t[json_start: i + 1] | |
| return json.loads(cand) | |
| raise Exception("Failed to parse AI JSON") | |
| def _parse_ai_textfull_only(raw: str) -> str: | |
| obj = _extract_first_json(raw) | |
| if not isinstance(obj, dict): | |
| raise Exception("AI JSON is not an object") | |
| txt = obj.get("aiTextFull") | |
| if txt is None: | |
| txt = obj.get("textFull") | |
| if txt is None: | |
| raise Exception("AI JSON missing aiTextFull") | |
| t = str(txt) | |
| if "\\n" in t and "\n" not in t: | |
| t = t.replace("\\n", "\n") | |
| t = t.replace("\r\n", "\n").replace("\r", "\n").strip() | |
| return t | |
| def _parse_ai_textfull_text_only(raw: str) -> str: | |
| t = _strip_wrappers(raw) | |
| if not t: | |
| raise Exception("AI returned empty text") | |
| if t.lstrip().startswith("{"): | |
| return _parse_ai_textfull_only(t) | |
| if "\\n" in t and "\n" not in t: | |
| t = t.replace("\\n", "\n") | |
| t = re.sub(r"^aiTextFull\s*[:=]\s*", "", t, flags=re.IGNORECASE).strip() | |
| return t | |
| def _budoux_parser_for_lang(lang: str): | |
| lang = _normalize_lang(lang) | |
| if not budoux: | |
| return None | |
| if lang == "th": | |
| return budoux.load_default_thai_parser() | |
| if lang == "ja": | |
| return budoux.load_default_japanese_parser() | |
| if lang in ("zh", "zh-hans", "zh_cn", "zh-cn", "zh_hans"): | |
| return budoux.load_default_simplified_chinese_parser() | |
| if lang in ("zh-hant", "zh_tw", "zh-tw", "zh_hant"): | |
| return budoux.load_default_traditional_chinese_parser() | |
| model_path = os.environ.get("BUDOUX_MODEL_PATH") | |
| if not model_path: | |
| return None | |
| with open(model_path, "r", encoding="utf-8") as f: | |
| model = json.load(f) | |
| return budoux.Parser(model) | |
| def _ensure_box_fields(box: dict): | |
| if not isinstance(box, dict): | |
| return {} | |
| b = copy.deepcopy(box) | |
| if "rotation_deg" not in b: | |
| b["rotation_deg"] = 0.0 | |
| if "rotation_deg_css" not in b: | |
| b["rotation_deg_css"] = 0.0 | |
| if "center" not in b and all(k in b for k in ("left", "top", "width", "height")): | |
| b["center"] = {"x": b["left"] + b["width"] / | |
| 2.0, "y": b["top"] + b["height"]/2.0} | |
| if all(k in b for k in ("left", "top", "width", "height")): | |
| if "left_pct" not in b: | |
| b["left_pct"] = b["left"] * 100.0 | |
| if "top_pct" not in b: | |
| b["top_pct"] = b["top"] * 100.0 | |
| if "width_pct" not in b: | |
| b["width_pct"] = b["width"] * 100.0 | |
| if "height_pct" not in b: | |
| b["height_pct"] = b["height"] * 100.0 | |
| return b | |
| def _tokens_with_spaces(text: str, parser, lang: str): | |
| t = (text or "") | |
| if not t: | |
| return [] | |
| out = [] | |
| parts = re.findall(r"\s+|\S+", t) | |
| for part in parts: | |
| if not part: | |
| continue | |
| if part.isspace(): | |
| out.append(("space", part)) | |
| continue | |
| segs = parser.parse(part) if parser else [part] | |
| for seg in segs: | |
| if seg: | |
| out.append(("word", seg)) | |
| return out | |
| def _line_cap_px_for_item(item: dict, img_w: int, img_h: int) -> float: | |
| p1 = item.get("baseline_p1") or {} | |
| p2 = item.get("baseline_p2") or {} | |
| dx = (float(p2.get("x") or 0.0) - float(p1.get("x") or 0.0)) * float(img_w) | |
| dy = (float(p2.get("y") or 0.0) - float(p1.get("y") or 0.0)) * float(img_h) | |
| cap = float(math.hypot(dx, dy)) | |
| if cap > 1e-6: | |
| return cap | |
| b = _ensure_box_fields(item.get("box") or {}) | |
| return float(b.get("width") or 0.0) * float(img_w) | |
| def _wrap_tokens_to_lines_px(tokens, items, img_w: int, img_h: int, thai_font: str, latin_font: str, font_size: int, min_lines: int): | |
| max_lines = len(items) | |
| if max_lines <= 0: | |
| return [] | |
| caps = [_line_cap_px_for_item(it, img_w, img_h) for it in items] | |
| desired = max(1, min(int(min_lines), max_lines)) | |
| soft_factor = 0.90 if desired > 1 else 1.0 | |
| lines = [[]] | |
| cur_w = 0.0 | |
| li = 0 | |
| last_word_hint = "" | |
| pending_space = "" | |
| tmp = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| def _measure_w(font, txt: str) -> float: | |
| try: | |
| return float(font.getlength(txt)) | |
| except Exception: | |
| try: | |
| bb = dtmp.textbbox((0, 0), txt, font=font, anchor="ls") | |
| return float(bb[2] - bb[0]) | |
| except Exception: | |
| w, _ = dtmp.textsize(txt, font=font) | |
| return float(w) | |
| def _cap_for_line(idx: int) -> float: | |
| return float(caps[min(idx, max_lines - 1)]) | |
| for k, s in (tokens or []): | |
| if k == "space": | |
| if not lines[-1]: | |
| continue | |
| pending_space += str(s) | |
| continue | |
| if k != "word": | |
| continue | |
| txt = str(s) | |
| if not txt: | |
| continue | |
| font = pick_font(txt, thai_font, latin_font, int(font_size)) | |
| w = _measure_w(font, txt) | |
| sw = 0.0 | |
| if pending_space: | |
| hint = last_word_hint or txt | |
| font_s = pick_font(hint, thai_font, latin_font, int(font_size)) | |
| sw = _measure_w(font_s, pending_space) | |
| cap = _cap_for_line(li) | |
| soft_cap = cap * soft_factor if (li < desired and cap > 0.0) else cap | |
| need_w = cur_w + sw + w | |
| if lines[-1] and li < max_lines - 1: | |
| if cap > 0.0 and need_w > cap: | |
| lines.append([]) | |
| li += 1 | |
| cur_w = 0.0 | |
| pending_space = "" | |
| sw = 0.0 | |
| elif soft_cap > 0.0 and need_w > soft_cap: | |
| lines.append([]) | |
| li += 1 | |
| cur_w = 0.0 | |
| pending_space = "" | |
| sw = 0.0 | |
| if pending_space and lines[-1]: | |
| lines[-1].append(("space", pending_space, sw)) | |
| cur_w += sw | |
| pending_space = "" | |
| lines[-1].append(("word", txt, w)) | |
| cur_w += w | |
| last_word_hint = txt | |
| if len(lines) > max_lines: | |
| head = lines[: max_lines - 1] | |
| tail = [] | |
| for seg in lines[max_lines - 1:]: | |
| tail.extend(seg) | |
| lines = head + [tail] | |
| for i in range(len(lines)): | |
| while lines[i] and lines[i][0][0] == "space": | |
| lines[i] = lines[i][1:] | |
| while lines[i] and lines[i][-1][0] == "space": | |
| lines[i] = lines[i][:-1] | |
| return lines | |
| def _ensure_min_lines_by_split(lines, min_lines: int, max_lines: int): | |
| if not lines: | |
| return [] | |
| min_lines = int(min_lines) | |
| max_lines = int(max_lines) | |
| if min_lines <= 1: | |
| return lines | |
| target = min(min_lines, max_lines) | |
| lines = [list(seg) for seg in (lines or [])] | |
| def _trim(seg): | |
| while seg and seg[0][0] == "space": | |
| seg.pop(0) | |
| while seg and seg[-1][0] == "space": | |
| seg.pop() | |
| return seg | |
| while len(lines) < target: | |
| idx = None | |
| best = 0 | |
| for i, seg in enumerate(lines): | |
| n_words = sum(1 for k, s, _ in seg if k == "word" and s != ZWSP) | |
| if n_words > best and n_words > 1: | |
| best = n_words | |
| idx = i | |
| if idx is None: | |
| break | |
| seg = lines[idx] | |
| word_pos = [i for i, (k, s, _) in enumerate(seg) | |
| if k == "word" and s != ZWSP] | |
| if len(word_pos) <= 1: | |
| break | |
| cut_word = len(word_pos) // 2 | |
| cut_pos = word_pos[cut_word] | |
| left = _trim(seg[:cut_pos]) | |
| right = _trim(seg[cut_pos:]) | |
| lines[idx] = left | |
| lines.insert(idx + 1, right) | |
| if len(lines) >= max_lines: | |
| break | |
| return lines | |
| def _fit_para_size_and_lines(ptext: str, parser, items, img_w: int, img_h: int, thai_font: str, latin_font: str, base_size: int, min_lines: int, lang: str): | |
| tokens2 = _tokens_with_spaces(ptext, parser, lang) | |
| if not tokens2 or not items: | |
| return int(base_size), [[] for _ in range(len(items))] | |
| max_lines = len(items) | |
| n_words = 0 | |
| for k, s in tokens2: | |
| if k == "word" and str(s): | |
| n_words += 1 | |
| desired_lines = max(1, min(max_lines, n_words)) | |
| size = max(10, int(base_size)) | |
| heights = [] | |
| for it in items: | |
| b = _ensure_box_fields(it.get("box") or {}) | |
| heights.append(float(b.get("height") or 0.0) * float(img_h)) | |
| while size >= 10: | |
| lines = _wrap_tokens_to_lines_px( | |
| tokens2, items, img_w, img_h, thai_font, latin_font, size, min_lines=desired_lines) | |
| lines = _ensure_min_lines_by_split( | |
| lines, min_lines=desired_lines, max_lines=max_lines) | |
| if len(lines) <= max_lines: | |
| ok = True | |
| for ii, seg in enumerate(lines): | |
| words = [s for k, s, _ in seg if k == "word" and s != ZWSP] | |
| if not words: | |
| continue | |
| line_text = "".join(words) | |
| mline = _line_metrics_px( | |
| line_text, thai_font, latin_font, size) | |
| if mline is None: | |
| continue | |
| _, th, _ = mline | |
| if ii < len(heights) and heights[ii] > 0.0 and th > heights[ii] * 1.01: | |
| ok = False | |
| break | |
| if ok: | |
| return size, lines | |
| size -= 1 | |
| lines10 = _wrap_tokens_to_lines_px( | |
| tokens2, items, img_w, img_h, thai_font, latin_font, 10, min_lines=desired_lines) | |
| lines10 = _ensure_min_lines_by_split( | |
| lines10, min_lines=desired_lines, max_lines=max_lines) | |
| return 10, lines10 | |
| def _pad_lines(lines, max_lines: int): | |
| max_lines = int(max_lines) | |
| if max_lines <= 0: | |
| return [] | |
| lines = list(lines or []) | |
| if len(lines) > max_lines: | |
| return lines[:max_lines] | |
| if len(lines) < max_lines: | |
| lines.extend([[] for _ in range(max_lines - len(lines))]) | |
| return lines | |
| def _contains_thai(text: str) -> bool: | |
| for ch in (text or ""): | |
| if _is_thai_char(ch): | |
| return True | |
| return False | |
| def _apply_line_to_item( | |
| item: dict, | |
| line_tokens, | |
| para_index: int, | |
| item_index: int, | |
| abs_line_start_raw: int, | |
| W: int, | |
| H: int, | |
| thai_path: str, | |
| latin_path: str, | |
| forced_size_px: int | None, | |
| apply_baseline_shift: bool = True, | |
| kerning_adjust: bool = False, | |
| ): | |
| tokens = [] | |
| for t in (line_tokens or []): | |
| if not isinstance(t, (list, tuple)) or len(t) < 2: | |
| continue | |
| k = str(t[0]) | |
| s = str(t[1]) | |
| w = float(t[2]) if len(t) > 2 and isinstance( | |
| t[2], (int, float)) else 0.0 | |
| tokens.append((k, s, w)) | |
| words = [s for k, s, _ in tokens if k == "word" and s != ZWSP] | |
| item_text = "".join(s for _, s, _ in tokens if s != ZWSP).strip() | |
| item["text"] = item_text | |
| item["valid_text"] = bool(item_text) | |
| b = _ensure_box_fields(item.get("box") or {}) | |
| item["box"] = b | |
| base_left = float(b.get("left") or 0.0) | |
| base_top = float(b.get("top") or 0.0) | |
| base_w = float(b.get("width") or 0.0) | |
| base_h = float(b.get("height") or 0.0) | |
| if not words or base_w <= 0.0 or base_h <= 0.0 or W <= 0 or H <= 0: | |
| item["spans"] = [] | |
| return | |
| p1 = item.get("baseline_p1") or {} | |
| p2 = item.get("baseline_p2") or {} | |
| x1 = float(p1.get("x") or 0.0) * float(W) | |
| y1 = float(p1.get("y") or 0.0) * float(H) | |
| x2 = float(p2.get("x") or 0.0) * float(W) | |
| y2 = float(p2.get("y") or 0.0) * float(H) | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| L = float(math.hypot(dx, dy)) | |
| if L <= 1e-9: | |
| item["spans"] = [] | |
| return | |
| ux = dx / L | |
| uy = dy / L | |
| nx = -uy | |
| ny = ux | |
| if ny < 0: | |
| nx, ny = -nx, -ny | |
| base_w_px = L | |
| base_h_px = base_h * float(H) | |
| base_size = 96 | |
| widths_px = [] | |
| max_ascent = 0 | |
| max_descent = 0 | |
| layout_units = [] | |
| for k, s, _ in tokens: | |
| if s == ZWSP: | |
| continue | |
| if k == "space": | |
| layout_units.append(("space", _sanitize_draw_text(s))) | |
| elif k == "word": | |
| layout_units.append(("word", _sanitize_draw_text(s))) | |
| def _measure_len_px(font, text: str) -> float: | |
| try: | |
| return float(font.getlength(text)) | |
| except Exception: | |
| tmp = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| try: | |
| bb = dtmp.textbbox((0, 0), text, font=font, anchor="ls") | |
| return float(bb[2] - bb[0]) | |
| except Exception: | |
| w, _ = dtmp.textsize(text, font=font) | |
| return float(w) | |
| for i, (k, t) in enumerate(layout_units): | |
| if k == "space": | |
| hint = "" | |
| for j in range(i - 1, -1, -1): | |
| if layout_units[j][0] == "word": | |
| hint = layout_units[j][1] | |
| break | |
| if not hint: | |
| for j in range(i + 1, len(layout_units)): | |
| if layout_units[j][0] == "word": | |
| hint = layout_units[j][1] | |
| break | |
| font0 = pick_font(hint or "a", thai_path, latin_path, base_size) | |
| widths_px.append(max(0.0, _measure_len_px(font0, t))) | |
| continue | |
| font0 = pick_font(t, thai_path, latin_path, base_size) | |
| try: | |
| ascent, descent = font0.getmetrics() | |
| except Exception: | |
| ascent, descent = base_size, int(base_size * 0.25) | |
| if ascent > max_ascent: | |
| max_ascent = ascent | |
| if descent > max_descent: | |
| max_descent = descent | |
| if kerning_adjust and (i + 1) < len(layout_units) and layout_units[i + 1][0] == "word": | |
| nxt = layout_units[i + 1][1] | |
| nxt1 = nxt[:1] if nxt else "" | |
| if nxt1 and (_contains_thai(t) == _contains_thai(nxt1)): | |
| tw = _measure_len_px(font0, t + nxt1) - \ | |
| _measure_len_px(font0, nxt1) | |
| else: | |
| tw = _measure_len_px(font0, t) | |
| else: | |
| tw = _measure_len_px(font0, t) | |
| widths_px.append(max(0.0, tw)) | |
| line_tw = sum(widths_px) | |
| bo_base = _baseline_offset_px_for_text( | |
| item_text, thai_path, latin_path, base_size) | |
| if bo_base is not None: | |
| _, total_h_base = bo_base | |
| line_th = float(total_h_base) | |
| else: | |
| line_th = float(max_ascent + max_descent) | |
| if line_tw <= 1e-9 or line_th <= 1e-9: | |
| item["spans"] = [] | |
| return | |
| if forced_size_px is None: | |
| scale_line = min((base_w_px * 1.0) / line_tw, | |
| (base_h_px * 0.995) / line_th) | |
| if scale_line <= 0.0: | |
| item["spans"] = [] | |
| return | |
| final_size = max(10, int(base_size * scale_line)) | |
| else: | |
| final_size = int(max(10, forced_size_px)) | |
| scale_line = float(final_size) / float(base_size) | |
| item["font_size_px"] = final_size | |
| w_scaled = [w * scale_line for w in widths_px] | |
| total_scaled = sum(w_scaled) | |
| margin_px = (base_w_px - total_scaled) / \ | |
| 2.0 if total_scaled < base_w_px else 0.0 | |
| bo = _baseline_offset_px_for_text( | |
| item_text, thai_path, latin_path, final_size) | |
| if apply_baseline_shift and bo is not None: | |
| baseline_offset_px, _ = bo | |
| cx = (base_left + (base_w / 2.0)) * float(W) | |
| cy = (base_top + (base_h / 2.0)) * float(H) | |
| target = (cx + (baseline_offset_px * nx), | |
| cy + (baseline_offset_px * ny)) | |
| s = ((target[0] - x1) * nx) + ((target[1] - y1) * ny) | |
| x1 += nx * s | |
| y1 += ny * s | |
| x2 += nx * s | |
| y2 += ny * s | |
| item["baseline_p1"] = {"x": x1 / float(W), "y": y1 / float(H)} | |
| item["baseline_p2"] = {"x": x2 / float(W), "y": y2 / float(H)} | |
| raw_pos = 0 | |
| span_i = 0 | |
| unit_i = 0 | |
| cum_px = 0.0 | |
| spans = [] | |
| for kind, s, _ in tokens: | |
| if s == ZWSP: | |
| continue | |
| start_raw = abs_line_start_raw + raw_pos | |
| raw_pos += len(s) | |
| end_raw = abs_line_start_raw + raw_pos | |
| if unit_i >= len(w_scaled): | |
| break | |
| wpx = w_scaled[unit_i] | |
| t0 = (margin_px + cum_px) / base_w_px | |
| cum_px += wpx | |
| t1 = (margin_px + cum_px) / base_w_px | |
| if kind == "space": | |
| unit_i += 1 | |
| continue | |
| span_box = _ensure_box_fields({ | |
| "left": base_left + (base_w * t0), | |
| "top": base_top, | |
| "width": base_w * (t1 - t0), | |
| "height": base_h, | |
| "rotation_deg": float(b.get("rotation_deg") or 0.0), | |
| "rotation_deg_css": float(b.get("rotation_deg_css") or 0.0), | |
| }) | |
| spans.append({ | |
| "side": "Ai", | |
| "para_index": para_index, | |
| "item_index": item_index, | |
| "span_index": span_i, | |
| "text": s, | |
| "valid_text": True, | |
| "start_raw": start_raw, | |
| "end_raw": end_raw, | |
| "t0_raw": t0, | |
| "t1_raw": t1, | |
| "box": span_box, | |
| "height_raw": item.get("height_raw"), | |
| "baseline_p1": item.get("baseline_p1"), | |
| "baseline_p2": item.get("baseline_p2"), | |
| "font_size_px": final_size, | |
| }) | |
| span_i += 1 | |
| unit_i += 1 | |
| item["spans"] = spans | |
| def patch(payload: dict, img_w: int, img_h: int, thai_font: str, latin_font: str, lang: str | None = None) -> dict: | |
| ai = payload.get("Ai") or {} | |
| ai_text_full = str(ai.get("aiTextFull") or "") | |
| template_tree = ai.get("aiTree") or {} | |
| if not isinstance(template_tree, dict): | |
| raise ValueError("Ai.aiTree template must be a dict") | |
| lang_norm = _normalize_lang(lang or LANG) | |
| parser = _budoux_parser_for_lang(lang_norm) | |
| out_tree = copy.deepcopy(template_tree) | |
| out_tree["side"] = "Ai" | |
| paragraphs = out_tree.get("paragraphs") or [] | |
| ai_text_full_clean = ai_text_full | |
| def _extract_paras_by_markers(txt: str, expected: int) -> tuple[list[str], str, int] | None: | |
| if not txt or expected <= 0 or "<<TP_P" not in txt: | |
| return None | |
| matches = list(re.finditer(r"<<TP_P(\d+)>>", txt)) | |
| if not matches: | |
| return None | |
| out: list[str] = [""] * expected | |
| for mi, m in enumerate(matches): | |
| try: | |
| idx = int(m.group(1)) | |
| except Exception: | |
| continue | |
| seg_start = m.end() | |
| seg_end = matches[mi + 1].start() if (mi + | |
| 1) < len(matches) else len(txt) | |
| seg = (txt[seg_start:seg_end] or "").lstrip("\r\n").strip() | |
| if 0 <= idx < expected and not out[idx]: | |
| out[idx] = seg | |
| clean = "\n\n".join(out) | |
| return out, clean, len(matches) | |
| marked = _extract_paras_by_markers(ai_text_full, len(paragraphs)) | |
| if marked is not None: | |
| ai_paras, ai_text_full_clean, _marker_count = marked | |
| else: | |
| ai_paras = ai_text_full.split("\n\n") if ai_text_full else [] | |
| if len(ai_paras) < len(paragraphs): | |
| ai_paras = ai_paras + [""] * (len(paragraphs) - len(ai_paras)) | |
| if len(ai_paras) > len(paragraphs): | |
| ai_paras = ai_paras[:len(paragraphs)] | |
| ai_text_full_clean = "\n\n".join(ai_paras) | |
| raw_cursor = 0 | |
| for pi, (p, ptext) in enumerate(zip(paragraphs, ai_paras)): | |
| p["side"] = "Ai" | |
| p["para_index"] = int(p.get("para_index", pi)) | |
| items = p.get("items") or [] | |
| max_lines = len(items) | |
| if max_lines <= 0: | |
| continue | |
| base_size_ref = None | |
| if isinstance(p.get("para_font_size_px"), int) and int(p.get("para_font_size_px")) > 0: | |
| base_size_ref = int(p.get("para_font_size_px")) | |
| else: | |
| ref_sizes = [] | |
| for it in items: | |
| fs = it.get("font_size_px") | |
| if isinstance(fs, int) and fs > 0: | |
| ref_sizes.append(fs) | |
| if ref_sizes: | |
| base_size_ref = min(ref_sizes) | |
| base_size = int(base_size_ref or 96) | |
| min_lines = int(max_lines) | |
| para_size, lines = _fit_para_size_and_lines( | |
| ptext, | |
| parser, | |
| items, | |
| img_w, | |
| img_h, | |
| thai_font, | |
| latin_font, | |
| base_size, | |
| min_lines=min_lines, | |
| lang=lang_norm, | |
| ) | |
| lines = _pad_lines(lines, max_lines) | |
| p["para_font_size_px"] = int(para_size) | |
| p["text"] = ptext | |
| p["valid_text"] = bool(ptext) | |
| p["start_raw"] = raw_cursor | |
| p["end_raw"] = raw_cursor + len(ptext) | |
| line_start = raw_cursor | |
| for ii in range(max_lines): | |
| it = items[ii] | |
| it["side"] = "Ai" | |
| it["para_index"] = pi | |
| it["item_index"] = ii | |
| _apply_line_to_item( | |
| it, | |
| (lines[ii] if ii < len(lines) else []), | |
| pi, | |
| ii, | |
| line_start, | |
| img_w, | |
| img_h, | |
| thai_font, | |
| latin_font, | |
| para_size, | |
| apply_baseline_shift=True, | |
| kerning_adjust=True, | |
| ) | |
| line_raw_len = sum(len(s) for k, s, w in ( | |
| lines[ii] if ii < len(lines) else []) if s != ZWSP) | |
| line_start += line_raw_len | |
| raw_cursor = p["end_raw"] + 2 | |
| return {"Ai": {"aiTextFull": ai_text_full_clean, "aiTree": out_tree}} | |
| def _uniformize_ai_item_span_font_size(item: dict, img_w: int, img_h: int, thai_font: str, latin_font: str): | |
| spans = item.get("spans") or [] | |
| if not spans or img_w <= 0 or img_h <= 0: | |
| return | |
| base_size = item.get("font_size_px") | |
| try: | |
| base_size = int(base_size) if base_size is not None else None | |
| except Exception: | |
| base_size = None | |
| if not base_size: | |
| for sp in spans: | |
| fs = sp.get("font_size_px") if isinstance(sp, dict) else None | |
| if isinstance(fs, int) and fs > 0: | |
| base_size = fs | |
| break | |
| if not base_size or base_size <= 0: | |
| return | |
| tmp = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| font_cache = {} | |
| def _font_for(text: str, size: int): | |
| key = (int(size), 1 if _contains_thai(text) else 0) | |
| f = font_cache.get(key) | |
| if f: | |
| return f | |
| f = pick_font(text, thai_font, latin_font, int(size)) | |
| font_cache[key] = f | |
| return f | |
| min_size = int(base_size) | |
| for sp in spans: | |
| if not isinstance(sp, dict): | |
| continue | |
| txt = _sanitize_draw_text(sp.get("text") or "") | |
| if txt.strip() == "": | |
| continue | |
| b = sp.get("box") or {} | |
| aw = float(b.get("width") or 0.0) * float(img_w) | |
| ah = float(b.get("height") or 0.0) * float(img_h) | |
| if aw <= 0.0 or ah <= 0.0: | |
| continue | |
| font = _font_for(txt, base_size) | |
| try: | |
| bb = dtmp.textbbox((0, 0), txt, font=font, anchor="ls") | |
| tw = float(bb[2] - bb[0]) | |
| th = float(bb[3] - bb[1]) | |
| except Exception: | |
| tw, th = dtmp.textsize(txt, font=font) | |
| tw = float(tw) | |
| th = float(th) | |
| if tw <= 0.0 or th <= 0.0: | |
| continue | |
| s = min((aw * 0.995) / tw, (ah * 0.995) / th) | |
| if s < 1.0: | |
| req = max(10, int(base_size * s)) | |
| if req < min_size: | |
| min_size = req | |
| if min_size != base_size: | |
| item["font_size_px"] = int(min_size) | |
| for sp in spans: | |
| if isinstance(sp, dict): | |
| sp["font_size_px"] = int(min_size) | |
| def _rebuild_ai_spans_after_font_resize(ai_tree: dict, img_w: int, img_h: int, thai_font: str, latin_font: str, lang: str | None = None): | |
| if not ai_tree or img_w <= 0 or img_h <= 0: | |
| return | |
| lang_norm = _normalize_lang(lang or LANG) | |
| parser = _budoux_parser_for_lang(lang_norm) | |
| for pi, p in _iter_paragraphs(ai_tree): | |
| items = p.get("items") or [] | |
| for ii, it in enumerate(items): | |
| txt = _item_line_text(it) | |
| if not str(txt).strip(): | |
| it["spans"] = [] | |
| continue | |
| tokens = _tokens_with_spaces(str(txt), parser, lang_norm) | |
| line_tokens = [(k, s, 0.0) for k, s in tokens] | |
| forced = it.get("font_size_px") or p.get("para_font_size_px") | |
| if isinstance(forced, float): | |
| forced = int(forced) | |
| elif isinstance(forced, str) and forced.strip().isdigit(): | |
| forced = int(forced.strip()) | |
| _apply_line_to_item( | |
| it, | |
| line_tokens, | |
| int(p.get("para_index", pi)), | |
| int(it.get("item_index", ii)), | |
| int(it.get("start_raw", 0)), | |
| img_w, | |
| img_h, | |
| thai_font, | |
| latin_font, | |
| forced, | |
| apply_baseline_shift=False, | |
| kerning_adjust=True, | |
| ) | |
| _uniformize_ai_item_span_font_size( | |
| it, img_w, img_h, thai_font, latin_font) | |
| def ai_translate_original_text(original_text_full: str, target_lang: str): | |
| provider, api_key, model, base_url = _resolve_ai_config() | |
| if not api_key: | |
| raise Exception("AI_API_KEY is required for AI translation") | |
| lang = _normalize_lang(target_lang) | |
| prompt_sig = _sha1( | |
| json.dumps( | |
| { | |
| "sys": AI_PROMPT_SYSTEM_BASE, | |
| "contract": _active_ai_contract(), | |
| "data": _active_ai_data_template(), | |
| "style": AI_LANG_STYLE.get(lang) or AI_LANG_STYLE.get("default") or "", | |
| }, | |
| ensure_ascii=False, | |
| ) | |
| ) | |
| cache = None | |
| cache_key = None | |
| if AI_CACHE: | |
| cache = _load_ai_cache(AI_CACHE_PATH) | |
| cache_key = _sha1( | |
| json.dumps( | |
| {"provider": provider, "m": model, "u": base_url, | |
| "l": lang, "p": prompt_sig, "t": original_text_full}, | |
| ensure_ascii=False, | |
| ) | |
| ) | |
| if cache_key in cache: | |
| cached = cache[cache_key] | |
| if lang == "th" and cached: | |
| t = str(cached.get("aiTextFull") or "") | |
| if t: | |
| t2 = re.sub( | |
| r"(?:(?<=^)|(?<=[\s\"'“”‘’()\[\]{}<>]))\u0e19\u0e32\u0e22(?=(?:\s|$))", "", t) | |
| t2 = re.sub(r"[ \t]{2,}", " ", t2) | |
| t2 = re.sub(r"^[ \t]+", "", t2, flags=re.MULTILINE) | |
| if t2 != t: | |
| cached = dict(cached) | |
| cached["aiTextFull"] = t2 | |
| cache[cache_key] = cached | |
| _save_ai_cache(AI_CACHE_PATH, cache) | |
| return cached | |
| system_text, user_parts = _build_ai_prompt_packet(lang, original_text_full) | |
| started = time.time() | |
| used_model = model | |
| if provider == "gemini": | |
| raw = _gemini_generate_json(api_key, model, system_text, user_parts) | |
| elif provider == "anthropic": | |
| raw = _anthropic_generate_json(api_key, model, system_text, user_parts) | |
| else: | |
| raw, used_model = _openai_compat_generate_json( | |
| api_key, base_url, model, system_text, user_parts) | |
| ai_text_full = _parse_ai_textfull_only( | |
| raw) if DO_AI_JSON else _parse_ai_textfull_text_only(raw) | |
| if lang == "th" and ai_text_full: | |
| ai_text_full = re.sub( | |
| r"(?:(?<=^)|(?<=[\s\"'“”‘’()\[\]{}<>]))\u0e19\u0e32\u0e22(?=(?:\s|$))", "", ai_text_full) | |
| ai_text_full = re.sub(r"[ \t]{2,}", " ", ai_text_full) | |
| ai_text_full = re.sub(r"^[ \t]+", "", ai_text_full, flags=re.MULTILINE) | |
| result = { | |
| "aiTextFull": ai_text_full, | |
| "meta": {"model": used_model, "provider": provider, "base_url": base_url, "latency_sec": round(time.time() - started, 3)}, | |
| } | |
| if AI_CACHE and cache is not None and cache_key is not None: | |
| cache[cache_key] = result | |
| _save_ai_cache(AI_CACHE_PATH, cache) | |
| return result | |
| def to_translated(u, lang="th"): | |
| q = parse_qs(urlparse(u).query) | |
| return "https://lens.google.com/translatedimage?" + urlencode( | |
| dict( | |
| vsrid=q["vsrid"][0], | |
| gsessionid=q["gsessionid"][0], | |
| sl="auto", | |
| tl=lang, | |
| se=1, | |
| ib="1", | |
| ) | |
| ) | |
| def _b64pad(s: str) -> str: | |
| return s + "=" * ((4 - (len(s) % 4)) % 4) | |
| def decode_imageurl_to_datauri(imageUrl: str): | |
| if not imageUrl: | |
| return None | |
| if isinstance(imageUrl, str) and imageUrl.startswith("data:image") and "base64," in imageUrl: | |
| return imageUrl | |
| for fn in (base64.b64decode, base64.urlsafe_b64decode): | |
| try: | |
| b = fn(_b64pad(imageUrl)) | |
| try: | |
| t = b.decode("utf-8") | |
| except Exception: | |
| t = b.decode("utf-8", errors="ignore") | |
| if "data:image" in t and "base64," in t: | |
| i = t.find("data:image") | |
| return t[i:].strip() if i >= 0 else t.strip() | |
| except Exception: | |
| pass | |
| return None | |
| def read_varint(buf, i): | |
| shift = 0 | |
| result = 0 | |
| while True: | |
| if i >= len(buf): | |
| raise ValueError("eof varint") | |
| b = buf[i] | |
| i += 1 | |
| result |= ((b & 0x7F) << shift) | |
| if (b & 0x80) == 0: | |
| return result, i | |
| shift += 7 | |
| if shift > 70: | |
| raise ValueError("varint too long") | |
| def parse_proto(buf, start=0, end=None): | |
| if end is None: | |
| end = len(buf) | |
| i = start | |
| out = [] | |
| while i < end: | |
| key, i = read_varint(buf, i) | |
| field = key >> 3 | |
| wire = key & 7 | |
| if wire == 0: | |
| val, i = read_varint(buf, i) | |
| out.append((field, wire, val)) | |
| elif wire == 1: | |
| val = buf[i: i + 8] | |
| i += 8 | |
| out.append((field, wire, val)) | |
| elif wire == 2: | |
| l, i = read_varint(buf, i) | |
| val = buf[i: i + l] | |
| i += l | |
| out.append((field, wire, val)) | |
| elif wire == 5: | |
| val = buf[i: i + 4] | |
| i += 4 | |
| out.append((field, wire, val)) | |
| else: | |
| raise ValueError(f"wiretype {wire}") | |
| return out | |
| def b2f(b4): | |
| return struct.unpack("<f", b4)[0] | |
| def b2hex(b): | |
| return b.hex() | |
| def _get_float_field(msg_fields, field_num): | |
| for f, w, v in msg_fields: | |
| if f == field_num and w == 5: | |
| return b2f(v) | |
| return None | |
| def _get_points_from_geom(geom_bytes): | |
| pts = [] | |
| height = None | |
| geom_fields = parse_proto(geom_bytes) | |
| for f, w, v in geom_fields: | |
| if f == 1 and w == 2: | |
| p_fields = parse_proto(v) | |
| x = _get_float_field(p_fields, 1) | |
| y = _get_float_field(p_fields, 2) | |
| if x is not None and y is not None: | |
| pts.append((x, y)) | |
| if f == 3 and w == 5: | |
| height = b2f(v) | |
| if len(pts) >= 2 and height is not None: | |
| return pts[0], pts[1], height | |
| return None, None, None | |
| def _looks_like_geom(geom_bytes): | |
| geom_fields = parse_proto(geom_bytes) | |
| pts = 0 | |
| has_height = False | |
| for f, w, v in geom_fields: | |
| if f == 1 and w == 2: | |
| p_fields = parse_proto(v) | |
| if _get_float_field(p_fields, 1) is not None and _get_float_field(p_fields, 2) is not None: | |
| pts += 1 | |
| elif f == 3 and w == 5: | |
| has_height = True | |
| return pts >= 2 and has_height | |
| def _looks_like_span(span_bytes): | |
| span_fields = parse_proto(span_bytes) | |
| has_t = False | |
| has_range = False | |
| for f, w, v in span_fields: | |
| if f in (3, 4) and w == 5: | |
| has_t = True | |
| elif f in (1, 2) and w == 0: | |
| has_range = True | |
| return has_t and has_range | |
| def _is_item_message(msg_bytes): | |
| fields = parse_proto(msg_bytes) | |
| geom_ok = False | |
| span_ok = 0 | |
| for f, w, v in fields: | |
| if f == 1 and w == 2 and not geom_ok: | |
| geom_ok = _looks_like_geom(v) | |
| elif f == 2 and w == 2: | |
| if _looks_like_span(v): | |
| span_ok += 1 | |
| return geom_ok and span_ok > 0 | |
| def _extract_items_from_paragraph(par_bytes): | |
| top = parse_proto(par_bytes) | |
| items = [] | |
| for _, w, v in top: | |
| if w == 2 and _is_item_message(v): | |
| items.append(v) | |
| if items: | |
| return items | |
| items = [] | |
| seen = set() | |
| nodes = 0 | |
| def walk(buf, depth): | |
| nonlocal nodes | |
| if depth >= 4 or nodes > 20000: | |
| return | |
| for _, w, v in parse_proto(buf): | |
| if w != 2: | |
| continue | |
| nodes += 1 | |
| if nodes > 20000: | |
| return | |
| if _is_item_message(v): | |
| if v in seen: | |
| continue | |
| seen.add(v) | |
| items.append(v) | |
| else: | |
| walk(v, depth + 1) | |
| walk(par_bytes, 0) | |
| return items | |
| def _extract_item_geom_spans(item_bytes): | |
| fields = parse_proto(item_bytes) | |
| geom_bytes = None | |
| spans_bytes = [] | |
| for f, w, v in fields: | |
| if f == 1 and w == 2: | |
| geom_bytes = v | |
| if f == 2 and w == 2: | |
| spans_bytes.append(v) | |
| return geom_bytes, spans_bytes | |
| def _extract_span(span_bytes): | |
| span_fields = parse_proto(span_bytes) | |
| start = None | |
| end = None | |
| t0 = None | |
| t1 = None | |
| for f, w, v in span_fields: | |
| if f == 1 and w == 0: | |
| start = int(v) | |
| elif f == 2 and w == 0: | |
| end = int(v) | |
| elif f == 3 and w == 5: | |
| t0 = b2f(v) | |
| elif f == 4 and w == 5: | |
| t1 = b2f(v) | |
| return start, end, t0, t1, span_fields | |
| def _normalize_angle_deg(angle_deg): | |
| while angle_deg <= -180.0: | |
| angle_deg += 360.0 | |
| while angle_deg > 180.0: | |
| angle_deg -= 360.0 | |
| if angle_deg < -90.0: | |
| angle_deg += 180.0 | |
| if angle_deg > 90.0: | |
| angle_deg -= 180.0 | |
| return angle_deg | |
| def _slice_text(full_text, start, end): | |
| if start is None or end is None: | |
| return "" | |
| if start < 0 or end < 0 or start > end or end > len(full_text): | |
| return "" | |
| return full_text[start:end] | |
| def _range_min_max(ranges): | |
| if not ranges: | |
| return None, None | |
| s = min(r[0] for r in ranges) | |
| e = max(r[1] for r in ranges) | |
| return s, e | |
| def decode_tree(paragraphs_b64, full_text, side, img_w, img_h, want_raw=True): | |
| raw_dump = [] | |
| paragraphs = [] | |
| cursor = 0 | |
| for para_index, b64s in enumerate(paragraphs_b64): | |
| par_bytes = base64.b64decode(b64s) | |
| if want_raw: | |
| raw_dump.append({"para_index": para_index, | |
| "b64": b64s, "bytes_hex": b2hex(par_bytes)}) | |
| item_msgs = _extract_items_from_paragraph(par_bytes) | |
| items = [] | |
| para_ranges = [] | |
| para_bounds = None | |
| for item_index, item_bytes in enumerate(item_msgs): | |
| geom_bytes, spans_bytes = _extract_item_geom_spans(item_bytes) | |
| if geom_bytes is None: | |
| continue | |
| p1, p2, height_norm = _get_points_from_geom(geom_bytes) | |
| if p1 is None or p2 is None or height_norm is None: | |
| continue | |
| x1n, y1n = p1 | |
| x2n, y2n = p2 | |
| x1 = x1n * img_w | |
| y1 = y1n * img_h | |
| x2 = x2n * img_w | |
| y2 = y2n * img_h | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| if dx < 0 or (abs(dx) < 1e-12 and dy < 0): | |
| x1, y1, x2, y2 = x2, y2, x1, y1 | |
| x1n, y1n, x2n, y2n = x2n, y2n, x1n, y1n | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| L = math.hypot(dx, dy) | |
| if L <= 1e-12: | |
| continue | |
| ux = dx / L | |
| uy = dy / L | |
| angle_deg_raw = math.degrees(math.atan2(dy, dx)) | |
| angle_deg = _normalize_angle_deg(angle_deg_raw) | |
| angle_deg_css = angle_deg | |
| height_px = height_norm * img_h | |
| item_spans = [] | |
| item_ranges = [] | |
| item_bounds = None | |
| for span_index, sb in enumerate(spans_bytes): | |
| start, end, t0, t1, _ = _extract_span(sb) | |
| if start is None: | |
| start = cursor | |
| else: | |
| cursor = max(cursor, start) | |
| if end is None: | |
| continue | |
| cursor = max(cursor, end) | |
| if t0 is None and t1 is None: | |
| continue | |
| if t0 is None: | |
| t0 = 0.0 | |
| if t1 is None: | |
| t1 = 1.0 | |
| valid_text = False | |
| span_text = "" | |
| if start is not None and end is not None and 0 <= start <= end <= len(full_text): | |
| span_text = full_text[start:end] | |
| valid_text = span_text.strip() != "" | |
| if valid_text: | |
| item_ranges.append((start, end)) | |
| e1x = x1 + ux * (t0 * L) | |
| e1y = y1 + uy * (t0 * L) | |
| e2x = x1 + ux * (t1 * L) | |
| e2y = y1 + uy * (t1 * L) | |
| cx = (e1x + e2x) / 2.0 | |
| cy = (e1y + e2y) / 2.0 | |
| width_px = abs(t1 - t0) * L | |
| left_px = cx - width_px / 2.0 | |
| top_px = cy - height_px / 2.0 | |
| left = left_px / img_w | |
| top = top_px / img_h | |
| width = width_px / img_w | |
| height = height_px / img_h | |
| span_node = { | |
| "side": side, | |
| "para_index": para_index, | |
| "item_index": item_index, | |
| "span_index": span_index, | |
| "start_raw": start, | |
| "end_raw": end, | |
| "t0_raw": t0, | |
| "t1_raw": t1, | |
| "height_raw": height_norm, | |
| "baseline_p1": {"x": x1n, "y": y1n}, | |
| "baseline_p2": {"x": x2n, "y": y2n}, | |
| "box": { | |
| "left": left, | |
| "top": top, | |
| "width": width, | |
| "height": height, | |
| "rotation_deg": angle_deg, | |
| "rotation_deg_css": angle_deg_css, | |
| "center": {"x": cx / img_w, "y": cy / img_h}, | |
| "left_pct": left * 100.0, | |
| "top_pct": top * 100.0, | |
| "width_pct": width * 100.0, | |
| "height_pct": height * 100.0, | |
| }, | |
| "text": span_text, | |
| "valid_text": valid_text, | |
| } | |
| quad = _token_box_quad_px(span_node, img_w, img_h, pad_px=0) | |
| if quad: | |
| xs = [p[0] for p in quad] | |
| ys = [p[1] for p in quad] | |
| b = (min(xs), min(ys), max(xs), max(ys)) | |
| item_bounds = b if item_bounds is None else (min(item_bounds[0], b[0]), min( | |
| item_bounds[1], b[1]), max(item_bounds[2], b[2]), max(item_bounds[3], b[3])) | |
| item_bounds = item_bounds | |
| item_spans.append(span_node) | |
| s0, s1 = _range_min_max(item_ranges) | |
| item_text = _slice_text( | |
| full_text, s0, s1).strip() if s0 is not None else "" | |
| item_valid_text = item_text.strip() != "" | |
| if s0 is not None: | |
| para_ranges.append((s0, s1)) | |
| cx = (x1 + x2) / 2.0 | |
| cy = (y1 + y2) / 2.0 | |
| left_px = cx - L / 2.0 | |
| top_px = cy - height_px / 2.0 | |
| item_box = { | |
| "left": left_px / img_w, | |
| "top": top_px / img_h, | |
| "width": L / img_w, | |
| "height": height_px / img_h, | |
| "rotation_deg": angle_deg, | |
| "rotation_deg_css": angle_deg_css, | |
| "center": {"x": cx / img_w, "y": cy / img_h}, | |
| } | |
| if item_bounds is not None: | |
| para_bounds = item_bounds if para_bounds is None else (min(para_bounds[0], item_bounds[0]), min( | |
| para_bounds[1], item_bounds[1]), max(para_bounds[2], item_bounds[2]), max(para_bounds[3], item_bounds[3])) | |
| items.append( | |
| { | |
| "side": side, | |
| "para_index": para_index, | |
| "item_index": item_index, | |
| "start_raw": s0, | |
| "end_raw": s1, | |
| "text": item_text, | |
| "valid_text": item_valid_text, | |
| "height_raw": height_norm, | |
| "baseline_p1": {"x": x1n, "y": y1n}, | |
| "baseline_p2": {"x": x2n, "y": y2n}, | |
| "box": item_box, | |
| "bounds_px": item_bounds, | |
| "spans": item_spans, | |
| } | |
| ) | |
| p0, p1 = _range_min_max(para_ranges) | |
| para_text = _slice_text( | |
| full_text, p0, p1).strip() if p0 is not None else "" | |
| para_valid_text = para_text.strip() != "" | |
| paragraphs.append( | |
| { | |
| "side": side, | |
| "para_index": para_index, | |
| "start_raw": p0, | |
| "end_raw": p1, | |
| "text": para_text, | |
| "valid_text": para_valid_text, | |
| "bounds_px": para_bounds, | |
| "items": items, | |
| } | |
| ) | |
| tree = {"side": side, "paragraphs": paragraphs} | |
| return tree, raw_dump | |
| def flatten_tree_spans(tree): | |
| spans = [] | |
| for p in tree.get("paragraphs") or []: | |
| for it in p.get("items") or []: | |
| for sp in it.get("spans") or []: | |
| spans.append(sp) | |
| return spans | |
| def flatten_tree_items_as_tokens(tree, img_w, img_h): | |
| toks = [] | |
| for p in tree.get("paragraphs") or []: | |
| for it in p.get("items") or []: | |
| t = { | |
| "side": it["side"], | |
| "para_index": it["para_index"], | |
| "item_index": it["item_index"], | |
| "span_index": -1, | |
| "start_raw": it.get("start_raw"), | |
| "end_raw": it.get("end_raw"), | |
| "t0_raw": 0.0, | |
| "t1_raw": 1.0, | |
| "height_raw": it.get("height_raw"), | |
| "baseline_p1": it.get("baseline_p1"), | |
| "baseline_p2": it.get("baseline_p2"), | |
| "box": it.get("box"), | |
| "text": it.get("text") or "", | |
| "valid_text": it.get("valid_text", False), | |
| } | |
| toks.append(t) | |
| return toks | |
| def _mean_angle_deg(angles_deg): | |
| vals = [a for a in (angles_deg or []) if a is not None] | |
| if not vals: | |
| return 0.0 | |
| xs = [math.cos(math.radians(a)) for a in vals] | |
| ys = [math.sin(math.radians(a)) for a in vals] | |
| return math.degrees(math.atan2(sum(ys) / len(ys), sum(xs) / len(xs))) | |
| def _rotate_xy(x, y, cos_a, sin_a): | |
| return (x * cos_a - y * sin_a, x * sin_a + y * cos_a) | |
| def _para_obb_quad_px(para_node, W, H): | |
| items = para_node.get("items") or [] | |
| if not items: | |
| return None | |
| angles = [] | |
| pts = [] | |
| for it in items: | |
| b = (it.get("box") or {}) | |
| angles.append(b.get("rotation_deg", 0.0)) | |
| q = _token_box_quad_px(it, W, H, pad_px=0) | |
| if q: | |
| pts.extend(q) | |
| if len(pts) < 4: | |
| return None | |
| ang = _mean_angle_deg(angles) | |
| cos_a = math.cos(math.radians(ang)) | |
| sin_a = math.sin(math.radians(ang)) | |
| cos_n = cos_a | |
| sin_n = -sin_a | |
| rpts = [_rotate_xy(x, y, cos_n, sin_n) for (x, y) in pts] | |
| xs = [p[0] for p in rpts] | |
| ys = [p[1] for p in rpts] | |
| minx, maxx = min(xs), max(xs) | |
| miny, maxy = min(ys), max(ys) | |
| corners = [(minx, miny), (maxx, miny), (maxx, maxy), (minx, maxy)] | |
| return [_rotate_xy(x, y, cos_a, sin_a) for (x, y) in corners] | |
| def build_level_outlines(tree, W, H): | |
| outlines = [] | |
| if not tree: | |
| return outlines | |
| if DRAW_OUTLINE_PARA: | |
| for para in tree.get("paragraphs") or []: | |
| q = _para_obb_quad_px(para, W, H) | |
| if q: | |
| outlines.append( | |
| {"quad": q, "color": PARA_OUTLINE, "width": PARA_OUTLINE_WIDTH}) | |
| if DRAW_OUTLINE_ITEM: | |
| for itok in flatten_tree_items_as_tokens(tree, W, H): | |
| q = _token_box_quad_px(itok, W, H, pad_px=0) | |
| if q: | |
| outlines.append( | |
| {"quad": q, "color": ITEM_OUTLINE, "width": ITEM_OUTLINE_WIDTH}) | |
| return outlines | |
| def tokens_to_html(tokens, container_class="RTMDre"): | |
| parts = [] | |
| parts.append(f'<div class="{container_class}">') | |
| for t in tokens: | |
| if not t.get("valid_text"): | |
| continue | |
| b = t["box"] | |
| aria = (t.get("text") or "").replace('"', """).replace("\n", " ") | |
| wi = t.get("wi", 0) | |
| rot = b.get("rotation_deg_css", b.get("rotation_deg", 0.0)) | |
| fs = t.get("font_size_px") or b.get("font_size_px") | |
| lh = None | |
| if fs: | |
| try: | |
| lh = max(1, int(round(float(fs) * 1.05))) | |
| except Exception: | |
| lh = None | |
| style = ( | |
| f'top: calc({b["top_pct"]}%); ' | |
| f'left: calc({b["left_pct"]}%); ' | |
| f'width: calc({b["width_pct"]}%); ' | |
| f'height: calc({b["height_pct"]}%); ' | |
| f"transform: rotate({rot}deg);" | |
| ) | |
| if fs: | |
| style += f" font-size: {float(fs):.4g}px;" | |
| if lh: | |
| style += f" line-height: {lh}px;" | |
| parts.append( | |
| f'<div class="IwqbBf" aria-label="{aria}" data-wi="{wi}" role="button" tabindex="-1" style="{style}"></div>' | |
| ) | |
| parts.append("</div>") | |
| return "".join(parts) | |
| def tp_overlay_css(): | |
| return ( | |
| ".tp-draw-root{position:absolute;inset:0;pointer-events:none;}" | |
| ".tp-draw-scope{position:absolute;left:0;top:0;transform-origin:0 0;}" | |
| ".tp-para{position:absolute;left:0;top:0;}" | |
| ".tp-item{position:absolute;left:0;top:0;display:flex;align-items:center;justify-content:center;" | |
| "white-space:pre;pointer-events:none;box-sizing:border-box;overflow:visible;" | |
| "font-family:var(--tp-font,system-ui);font-weight:500;" | |
| "color:var(--tp-fg,rgba(20,20,20,.98));" | |
| "text-shadow:0 0 2px rgba(255,255,255,.90),0 0 2px rgba(0,0,0,.60),0 1px 1px rgba(0,0,0,.35);}" | |
| ".tp-item>span{display:inline-block;white-space:pre;transform-origin:center;" | |
| "padding:0;border-radius:3px;" | |
| "background:var(--tp-bg,rgba(255,255,255,.65));" | |
| "box-decoration-break:clone;-webkit-box-decoration-break:clone;}" | |
| ".tp-item[data-wrap='1'],.tp-item[data-wrap='1']>span{white-space:pre-wrap;word-break:break-word;}" | |
| ".tp-item[data-wrap='1']>span{text-align:center;}" | |
| ) | |
| def _tp_norm_list(v): | |
| if isinstance(v, list): | |
| return v | |
| if isinstance(v, dict): | |
| try: | |
| return [v[k] for k in sorted(v.keys(), key=lambda x: int(x) if str(x).isdigit() else str(x))] | |
| except Exception: | |
| return list(v.values()) | |
| return [] | |
| def _tp_num(x): | |
| try: | |
| n = float(x) | |
| return n if math.isfinite(n) else None | |
| except Exception: | |
| return None | |
| def _tp_escape_text(s: str) -> str: | |
| if not s: | |
| return "" | |
| s = s.replace("\r", "") | |
| s = s.replace("&", "&").replace("<", "<").replace(">", ">") | |
| return s | |
| def _tp_get_rect(obj: dict, base_w: float, base_h: float): | |
| if not isinstance(obj, dict): | |
| return None | |
| box = obj.get("box") if isinstance(obj.get("box"), dict) else {} | |
| l0 = _tp_num(box.get("left")) | |
| t0 = _tp_num(box.get("top")) | |
| w0 = _tp_num(box.get("width")) | |
| h0 = _tp_num(box.get("height")) | |
| if None not in (l0, t0, w0, h0) and w0 > 0 and h0 > 0: | |
| l = l0 * base_w | |
| t = t0 * base_h | |
| r = (l0 + w0) * base_w | |
| b = (t0 + h0) * base_h | |
| deg = _tp_num(box.get("rotation_deg_css")) | |
| if deg is None: | |
| deg = _tp_num(box.get("rotation_deg")) | |
| return {"l": l, "t": t, "r": r, "b": b, "deg": deg or 0.0} | |
| lp = _tp_num(box.get("left_pct")) | |
| tp = _tp_num(box.get("top_pct")) | |
| wp = _tp_num(box.get("width_pct")) | |
| hp = _tp_num(box.get("height_pct")) | |
| if None not in (lp, tp, wp, hp) and wp > 0 and hp > 0: | |
| l0p = lp / 100.0 | |
| t0p = tp / 100.0 | |
| w0p = wp / 100.0 | |
| h0p = hp / 100.0 | |
| l = l0p * base_w | |
| t = t0p * base_h | |
| r = (l0p + w0p) * base_w | |
| b = (t0p + h0p) * base_h | |
| deg = _tp_num(box.get("rotation_deg_css")) | |
| if deg is None: | |
| deg = _tp_num(box.get("rotation_deg")) | |
| return {"l": l, "t": t, "r": r, "b": b, "deg": deg or 0.0} | |
| bpx = obj.get("bounds_px") | |
| if isinstance(bpx, list) and len(bpx) == 4: | |
| l = _tp_num(bpx[0]) | |
| t = _tp_num(bpx[1]) | |
| r = _tp_num(bpx[2]) | |
| bb = _tp_num(bpx[3]) | |
| if None not in (l, t, r, bb) and r > l and bb > t: | |
| return {"l": l, "t": t, "r": r, "b": bb, "deg": 0.0} | |
| return None | |
| def _tp_union_rect(items: list, base_w: float, base_h: float): | |
| l = float("inf") | |
| t = float("inf") | |
| r = float("-inf") | |
| b = float("-inf") | |
| for it in items: | |
| bx = _tp_get_rect(it, base_w, base_h) | |
| if not bx: | |
| continue | |
| l = min(l, bx["l"]) | |
| t = min(t, bx["t"]) | |
| r = max(r, bx["r"]) | |
| b = max(b, bx["b"]) | |
| if not math.isfinite(l) or not math.isfinite(t) or not math.isfinite(r) or not math.isfinite(b): | |
| return None | |
| return {"l": l, "t": t, "r": r, "b": b, "deg": 0.0} | |
| def _tp_mean_item_deg(items: list, base_w: float, base_h: float) -> float: | |
| angles = [] | |
| for it in items or []: | |
| bx = _tp_get_rect(it, base_w, base_h) | |
| if not bx: | |
| continue | |
| a = _tp_num(bx.get("deg")) | |
| if a is None: | |
| continue | |
| angles.append(float(a)) | |
| if not angles: | |
| return 0.0 | |
| return float(_mean_angle_deg(angles)) | |
| def _tp_oriented_rect_from_points(pts: list, para_deg: float) -> dict | None: | |
| if len(pts) < 2: | |
| return None | |
| ang = float(para_deg or 0.0) | |
| if not math.isfinite(ang): | |
| ang = 0.0 | |
| rad_n = math.radians(-ang) | |
| cn = math.cos(rad_n) | |
| sn = math.sin(rad_n) | |
| rpts = [(x * cn - y * sn, x * sn + y * cn) for x, y in pts] | |
| xs = [p[0] for p in rpts] | |
| ys = [p[1] for p in rpts] | |
| minx, maxx = min(xs), max(xs) | |
| miny, maxy = min(ys), max(ys) | |
| w = float(maxx - minx) | |
| h = float(maxy - miny) | |
| if w <= 0.0 or h <= 0.0: | |
| return None | |
| cx0 = float((minx + maxx) / 2.0) | |
| cy0 = float((miny + maxy) / 2.0) | |
| rad_a = math.radians(ang) | |
| ca = math.cos(rad_a) | |
| sa = math.sin(rad_a) | |
| cx = (cx0 * ca) - (cy0 * sa) | |
| cy = (cx0 * sa) + (cy0 * ca) | |
| l = cx - (w / 2.0) | |
| t = cy - (h / 2.0) | |
| return {"l": float(l), "t": float(t), "r": float(l + w), "b": float(t + h), "deg": float(ang)} | |
| def _tp_rect_corners(l: float, t: float, r: float, b: float, deg: float) -> list: | |
| w = float(r - l) | |
| h = float(b - t) | |
| if w <= 0.0 or h <= 0.0: | |
| return [] | |
| cx = float((l + r) / 2.0) | |
| cy = float((t + b) / 2.0) | |
| hw = w / 2.0 | |
| hh = h / 2.0 | |
| rad = math.radians(float(deg or 0.0)) | |
| c = math.cos(rad) | |
| s = math.sin(rad) | |
| out = [] | |
| for x, y in ((-hw, -hh), (hw, -hh), (hw, hh), (-hw, hh)): | |
| rx = (x * c) - (y * s) | |
| ry = (x * s) + (y * c) | |
| out.append((cx + rx, cy + ry)) | |
| return out | |
| def _tp_para_rect_from_items(items: list, base_w: float, base_h: float, para_deg: float) -> dict | None: | |
| if not items: | |
| return None | |
| pts = [] | |
| for it in items: | |
| ibx = _tp_get_rect(it, base_w, base_h) | |
| if not ibx: | |
| continue | |
| w = float(ibx["r"] - ibx["l"]) | |
| h = float(ibx["b"] - ibx["t"]) | |
| if w <= 0.0 or h <= 0.0: | |
| continue | |
| deg = float(ibx.get("deg") or 0.0) | |
| cx = float(ibx["l"] + w / 2.0) | |
| cy = float(ibx["t"] + h / 2.0) | |
| hw = w / 2.0 | |
| hh = h / 2.0 | |
| rad = math.radians(deg) | |
| c = math.cos(rad) | |
| s = math.sin(rad) | |
| for x, y in ((-hw, -hh), (hw, -hh), (hw, hh), (-hw, hh)): | |
| rx = (x * c) - (y * s) | |
| ry = (x * s) + (y * c) | |
| pts.append((cx + rx, cy + ry)) | |
| return _tp_oriented_rect_from_points(pts, para_deg) | |
| def _tp_extract_item_text(it: dict) -> str: | |
| if not isinstance(it, dict): | |
| return "" | |
| for k in ( | |
| "text", | |
| "translated_text", | |
| "translatedText", | |
| "ai_text", | |
| "aiText", | |
| "display_text", | |
| "displayText", | |
| ): | |
| v = it.get(k) | |
| if isinstance(v, str) and v: | |
| return v | |
| spans = _tp_norm_list(it.get("spans")) | |
| if spans: | |
| return "".join(s.get("text") if isinstance(s, dict) and isinstance(s.get("text"), str) else "" for s in spans) | |
| return "" | |
| def ai_tree_to_tp_html(tree: dict, base_w: int, base_h: int) -> str: | |
| base_w = int(base_w or 0) | |
| base_h = int(base_h or 0) | |
| if base_w <= 0 or base_h <= 0: | |
| return "" | |
| paras = _tp_norm_list(tree.get("paragraphs") | |
| if isinstance(tree, dict) else None) | |
| if not paras: | |
| return "" | |
| parts = [ | |
| f'<div class="tp-draw-scope" style="width: {base_w}px; height: {base_h}px;">'] | |
| for pi, p in enumerate(paras): | |
| if not isinstance(p, dict): | |
| continue | |
| items = _tp_norm_list(p.get("items")) | |
| if len(items) > 1 and any(isinstance(x, dict) and _tp_num(x.get("item_index")) is not None for x in items): | |
| items = sorted( | |
| items, | |
| key=lambda x: _tp_num( | |
| x.get("item_index")) if isinstance(x, dict) else 0.0, | |
| ) | |
| para_idx = int(_tp_num(p.get("para_index")) or pi) | |
| pbx = _tp_get_rect(p, base_w, base_h) or _tp_union_rect( | |
| items, base_w, base_h) | |
| if not pbx: | |
| continue | |
| para_deg = float(pbx.get("deg") or 0.0) | |
| if abs(para_deg) <= 0.01: | |
| derived = _tp_mean_item_deg(items, base_w, base_h) | |
| if abs(derived) > 0.01: | |
| pbx2 = _tp_para_rect_from_items(items, base_w, base_h, derived) | |
| if pbx2: | |
| pbx = pbx2 | |
| para_deg = float(pbx.get("deg") or 0.0) | |
| pbx_items = _tp_para_rect_from_items(items, base_w, base_h, para_deg) | |
| if pbx_items: | |
| pts = _tp_rect_corners( | |
| pbx["l"], pbx["t"], pbx["r"], pbx["b"], para_deg) | |
| pts += _tp_rect_corners(pbx_items["l"], pbx_items["t"], | |
| pbx_items["r"], pbx_items["b"], para_deg) | |
| merged = _tp_oriented_rect_from_points(pts, para_deg) | |
| if merged: | |
| pbx = merged | |
| eps = float(_TP_HTML_EPS_PX or 0.0) | |
| if eps > 0.0: | |
| pbx = { | |
| "l": float(pbx["l"] - eps), | |
| "t": float(pbx["t"] - eps), | |
| "r": float(pbx["r"] + eps), | |
| "b": float(pbx["b"] + eps), | |
| "deg": float(pbx.get("deg") or para_deg or 0.0), | |
| } | |
| pw = max(0.0, pbx["r"] - pbx["l"]) | |
| ph = max(0.0, pbx["b"] - pbx["t"]) | |
| para_style = ( | |
| f'left: {pbx["l"]:.6f}px; ' | |
| f'top: {pbx["t"]:.6f}px; ' | |
| f'width: {pw:.6f}px; ' | |
| f'height: {ph:.6f}px;' | |
| ) | |
| if abs(para_deg) > 0.01: | |
| para_style += f' transform: rotate({para_deg:.6g}deg); transform-origin: center center;' | |
| parts.append( | |
| f'<div class="tp-para tp-para-{para_idx}" data-para-index="{para_idx}" style="{para_style}">' | |
| ) | |
| para_cx = (pbx["l"] + pbx["r"]) / 2.0 | |
| para_cy = (pbx["t"] + pbx["b"]) / 2.0 | |
| inv_c = inv_s = None | |
| if abs(para_deg) > 0.01: | |
| rad_inv = math.radians(-para_deg) | |
| inv_c = math.cos(rad_inv) | |
| inv_s = math.sin(rad_inv) | |
| raw_texts = [_tp_extract_item_text(it) for it in items] | |
| mapped = list(raw_texts) | |
| p_text = p.get("text") if isinstance(p.get("text"), str) else "" | |
| non_empty = sum( | |
| 1 for t in raw_texts if isinstance(t, str) and t.strip()) | |
| any_nl = any(isinstance(t, str) and re.search(r"\r?\n", t) | |
| for t in raw_texts) | |
| first_nl = bool(raw_texts and isinstance( | |
| raw_texts[0], str) and re.search(r"\r?\n", raw_texts[0])) | |
| lines = None | |
| if p_text and re.search(r"\r?\n", p_text) and (non_empty <= 1 or any_nl): | |
| lines = [s.rstrip() | |
| for s in re.split(r"\r?\n+", p_text) if s.strip()] | |
| elif first_nl and (non_empty <= 1 or all(not (t or "").strip() for t in raw_texts[1:])): | |
| lines = [s.rstrip() for s in re.split( | |
| r"\r?\n+", raw_texts[0]) if s.strip()] | |
| if lines: | |
| mapped = [lines[i] if i < len(lines) else ( | |
| raw_texts[i] if i < len(raw_texts) else "") for i in range(len(items))] | |
| for ii, it in enumerate(items): | |
| if not isinstance(it, dict): | |
| continue | |
| text = (mapped[ii] if ii < len(mapped) else "") or "" | |
| if not text.strip(): | |
| continue | |
| ibx = _tp_get_rect(it, base_w, base_h) | |
| if not ibx: | |
| continue | |
| w0 = max(0.0, ibx["r"] - ibx["l"]) | |
| h0 = max(0.0, ibx["b"] - ibx["t"]) | |
| if w0 <= 0 or h0 <= 0: | |
| continue | |
| w = float(w0 + (2.0 * eps)) if eps > 0.0 else float(w0) | |
| h = float(h0 + (2.0 * eps)) if eps > 0.0 else float(h0) | |
| item_idx = int(_tp_num(it.get("item_index")) or ii) | |
| fs_raw = _tp_num(it.get("font_size_px")) | |
| fs = int(round(fs_raw)) if fs_raw and fs_raw > 0 else max( | |
| 10, int(round(h0 * 0.85))) | |
| fs = max(6, min(fs, max(6, int(math.floor(h0 * 0.95))))) | |
| lh = max(1, min(int(round(h0)), int(round(fs * 1.12)))) | |
| if inv_c is not None and inv_s is not None: | |
| icx = (ibx["l"] + ibx["r"]) / 2.0 | |
| icy = (ibx["t"] + ibx["b"]) / 2.0 | |
| dx = icx - para_cx | |
| dy = icy - para_cy | |
| rcx = para_cx + (dx * inv_c - dy * inv_s) | |
| rcy = para_cy + (dx * inv_s + dy * inv_c) | |
| left = (rcx - (w / 2.0)) - pbx["l"] | |
| top = (rcy - (h / 2.0)) - pbx["t"] | |
| else: | |
| left = (ibx["l"] - pbx["l"]) - eps | |
| top = (ibx["t"] - pbx["t"]) - eps | |
| style = ( | |
| f'left: {left:.6f}px; ' | |
| f'top: {top:.6f}px; ' | |
| f'width: {w:.6f}px; ' | |
| f'height: {h:.6f}px; ' | |
| f'font-size: {fs}px; ' | |
| f'line-height: {lh}px; ' | |
| 'padding-bottom: 0px;' | |
| ) | |
| deg = float(ibx.get("deg") or 0.0) | |
| if inv_c is not None: | |
| deg = deg - para_deg | |
| if abs(deg) > 0.01: | |
| style += f' transform: rotate({deg:.6g}deg); transform-origin: center center;' | |
| wrap_attr = ' data-wrap="1"' if it.get("_tp_wrap") else "" | |
| parts.append( | |
| f'<div class="tp-item tp-item-{item_idx}" data-para-index="{para_idx}" data-item-index="{item_idx}"{wrap_attr} style="{style}">' | |
| f'<span>{_tp_escape_text(text)}</span></div>' | |
| ) | |
| parts.append("</div>") | |
| parts.append("</div>") | |
| return "".join(parts) | |
| def overlay_css(container_class="RTMDre", token_class="IwqbBf"): | |
| c = container_class | |
| t = token_class | |
| return ( | |
| f".{c}{{" | |
| "position:absolute!important;" | |
| "inset:0!important;" | |
| "width:100%!important;" | |
| "height:100%!important;" | |
| "display:block!important;" | |
| "opacity:1!important;" | |
| "visibility:visible!important;" | |
| "pointer-events:none!important;" | |
| "overflow:visible!important;" | |
| "z-index:2147483647!important;" | |
| "transform:none!important;" | |
| "contain:layout style paint!important;" | |
| "--lens-text-color:#fff;" | |
| "--lens-font-family:\"Noto Sans Thai\",\"Noto Sans Thai UI\",\"Noto Sans\",system-ui,-apple-system,BlinkMacSystemFont,\"Segoe UI\",Roboto,Arial,sans-serif;" | |
| "--lens-text-shadow:0 1px 2px rgba(0,0,0,.85),0 0 1px rgba(0,0,0,.85);" | |
| "}}" | |
| f".{c} *{{box-sizing:border-box!important;}}" | |
| f".{c} .{t}{{" | |
| "position:absolute!important;" | |
| "display:flex!important;" | |
| "align-items:center!important;" | |
| "justify-content:center!important;" | |
| "opacity:1!important;" | |
| "visibility:visible!important;" | |
| "pointer-events:none!important;" | |
| "user-select:none!important;" | |
| "overflow:visible!important;" | |
| "white-space:pre!important;" | |
| "transform-origin:top left!important;" | |
| "filter:none!important;" | |
| "mix-blend-mode:normal!important;" | |
| "text-transform:none!important;" | |
| "letter-spacing:normal!important;" | |
| "}}" | |
| f".{c} .{t}::before{{" | |
| "content:attr(aria-label)!important;" | |
| "display:block!important;" | |
| "white-space:pre!important;" | |
| "color:var(--lens-text-color)!important;" | |
| "font-family:var(--lens-font-family)!important;" | |
| "text-shadow:var(--lens-text-shadow)!important;" | |
| "font-weight:400!important;" | |
| "font-style:normal!important;" | |
| "line-height:inherit!important;" | |
| "text-rendering:geometricPrecision!important;" | |
| "}}" | |
| ) | |
| def ensure_font(path, urls): | |
| key = str(path or "") | |
| cached = _FONT_RESOLVE_CACHE.get(key) | |
| if cached is not None: | |
| return cached or None | |
| if path and os.path.isfile(path): | |
| _FONT_RESOLVE_CACHE[key] = path | |
| return path | |
| candidates = [] | |
| for root in ("/usr/share/fonts", "/usr/local/share/fonts", os.path.expanduser("~/.fonts")): | |
| if os.path.isdir(root): | |
| for p in os.walk(root): | |
| for fn in p[2]: | |
| if fn.lower() == os.path.basename(path).lower(): | |
| candidates.append(os.path.join(p[0], fn)) | |
| if candidates: | |
| _FONT_RESOLVE_CACHE[key] = candidates[0] | |
| return candidates[0] | |
| for url in urls: | |
| try: | |
| r = httpx.get(url, timeout=30) | |
| if r.status_code == 200 and len(r.content) > 10000: | |
| with open(path, "wb") as f: | |
| f.write(r.content) | |
| if os.path.isfile(path): | |
| _FONT_RESOLVE_CACHE[key] = path | |
| return path | |
| except Exception: | |
| pass | |
| _FONT_RESOLVE_CACHE[key] = "" | |
| return None | |
| def pick_font(text, thai_path, latin_path, size): | |
| def has_thai(s): | |
| for ch in s: | |
| o = ord(ch) | |
| if 0x0E00 <= o <= 0x0E7F: | |
| return True | |
| return False | |
| fp = thai_path if has_thai(text) else latin_path | |
| if fp and os.path.isfile(fp): | |
| try: | |
| return ImageFont.truetype(fp, size=size, layout_engine=getattr(ImageFont, "LAYOUT_RAQM", 0)) | |
| except Exception: | |
| try: | |
| return ImageFont.truetype(fp, size=size) | |
| except Exception: | |
| pass | |
| return ImageFont.load_default() | |
| def _get_font_pair(thai_path, latin_path, size): | |
| key = (str(thai_path or ""), str(latin_path or ""), int(size)) | |
| v = _FONT_PAIR_CACHE.get(key) | |
| if v: | |
| return v | |
| f_th = pick_font("ก", thai_path, latin_path, size) | |
| f_lat = pick_font("A", thai_path, latin_path, size) | |
| _FONT_PAIR_CACHE[key] = (f_th, f_lat) | |
| return f_th, f_lat | |
| def _is_thai_char(ch: str) -> bool: | |
| if not ch: | |
| return False | |
| o = ord(ch) | |
| return 0x0E00 <= o <= 0x0E7F | |
| def _split_runs_for_fallback(text: str): | |
| runs = [] | |
| cur = [] | |
| cur_is_th = None | |
| for ch in text: | |
| if ch == "\n": | |
| if cur: | |
| runs.append(("".join(cur), cur_is_th)) | |
| cur = [] | |
| runs.append(("\n", None)) | |
| cur_is_th = None | |
| continue | |
| is_th = _is_thai_char(ch) | |
| if ch.isspace() and cur_is_th is not None: | |
| is_th = cur_is_th | |
| if cur_is_th is None: | |
| cur_is_th = is_th | |
| cur = [ch] | |
| continue | |
| if is_th == cur_is_th: | |
| cur.append(ch) | |
| else: | |
| runs.append(("".join(cur), cur_is_th)) | |
| cur = [ch] | |
| cur_is_th = is_th | |
| if cur: | |
| runs.append(("".join(cur), cur_is_th)) | |
| return runs | |
| def _draw_text_centered_fallback(draw_ctx, center_xy, text, thai_path, latin_path, size, fill): | |
| t = _sanitize_draw_text(text) | |
| if not t: | |
| return | |
| f_th, f_lat = _get_font_pair(thai_path, latin_path, size) | |
| runs = _split_runs_for_fallback(t) | |
| x = 0.0 | |
| min_t = 0.0 | |
| max_b = 0.0 | |
| for run, is_th in runs: | |
| if run == "\n": | |
| continue | |
| f = f_th if is_th else f_lat | |
| try: | |
| bb = draw_ctx.textbbox((x, 0), run, font=f, anchor="ls") | |
| min_t = min(min_t, float(bb[1])) | |
| max_b = max(max_b, float(bb[3])) | |
| x = float(bb[2]) | |
| except Exception: | |
| try: | |
| w, h = draw_ctx.textsize(run, font=f) | |
| except Exception: | |
| w, h = (len(run) * size * 0.5, size) | |
| min_t = min(min_t, -float(h) * 0.8) | |
| max_b = max(max_b, float(h) * 0.2) | |
| x += float(w) | |
| total_w = max(1.0, x) | |
| total_h = max(1.0, max_b - min_t) | |
| cx, cy = center_xy | |
| start_x = float(cx) - (total_w / 2.0) | |
| baseline_y = float(cy) - (total_h / 2.0) - min_t | |
| x = start_x | |
| for run, is_th in runs: | |
| if run == "\n": | |
| continue | |
| f = f_th if is_th else f_lat | |
| draw_ctx.text((x, baseline_y), run, font=f, fill=fill, anchor="ls") | |
| try: | |
| x += float(draw_ctx.textlength(run, font=f)) | |
| except Exception: | |
| try: | |
| w, _ = draw_ctx.textsize(run, font=f) | |
| except Exception: | |
| w = len(run) * size * 0.5 | |
| x += float(w) | |
| def _draw_text_baseline_fallback(draw, pos, text, thai_path, latin_path, size, fill): | |
| t = _sanitize_draw_text(text) | |
| if not t: | |
| return 0.0, 0.0 | |
| f_th, f_lat = _get_font_pair(thai_path, latin_path, size) | |
| runs = _split_runs_for_fallback(t) | |
| x0, y0 = pos | |
| x = float(x0) | |
| max_ascent = 0 | |
| max_descent = 0 | |
| for run, is_th in runs: | |
| if run == "\n": | |
| continue | |
| f = f_th if is_th else f_lat | |
| try: | |
| ascent, descent = f.getmetrics() | |
| except Exception: | |
| ascent, descent = size, int(size * 0.25) | |
| max_ascent = max(max_ascent, ascent) | |
| max_descent = max(max_descent, descent) | |
| draw.text((x, y0), run, font=f, fill=fill, anchor="ls") | |
| try: | |
| adv = float(f.getlength(run)) | |
| except Exception: | |
| tmp = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| try: | |
| bb = dtmp.textbbox((0, 0), run, font=f, anchor="ls") | |
| adv = float(bb[2] - bb[0]) | |
| except Exception: | |
| w, _ = dtmp.textsize(run, font=f) | |
| adv = float(w) | |
| x += adv | |
| return float(x - x0), float(max_ascent + max_descent) | |
| def _baseline_offset_px_for_text(text: str, thai_path: str, latin_path: str, size: int): | |
| t = _sanitize_draw_text(text) | |
| if not t: | |
| return None | |
| f_th, f_lat = _get_font_pair(thai_path, latin_path, size) | |
| runs = _split_runs_for_fallback(t) | |
| tmp = Image.new("RGBA", (16, 16), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| x = 0.0 | |
| min_t = 0.0 | |
| max_b = 0.0 | |
| for run, is_th in runs: | |
| if run == "\n": | |
| continue | |
| f = f_th if is_th else f_lat | |
| try: | |
| bb = dtmp.textbbox((x, 0), run, font=f, anchor="ls") | |
| min_t = min(min_t, float(bb[1])) | |
| max_b = max(max_b, float(bb[3])) | |
| x = float(bb[2]) | |
| except Exception: | |
| try: | |
| w, h = dtmp.textsize(run, font=f) | |
| except Exception: | |
| w, h = (len(run) * size * 0.5, size) | |
| min_t = min(min_t, -float(h) * 0.8) | |
| max_b = max(max_b, float(h) * 0.2) | |
| x += float(w) | |
| total_h = max(1.0, max_b - min_t) | |
| baseline_offset = -(total_h / 2.0) - min_t | |
| return baseline_offset, total_h | |
| def _line_metrics_px(text: str, thai_path: str, latin_path: str, size: int): | |
| t = _sanitize_draw_text(text) | |
| if not t: | |
| return None | |
| f_th, f_lat = _get_font_pair(thai_path, latin_path, size) | |
| runs = _split_runs_for_fallback(t) | |
| tmp = Image.new("RGBA", (16, 16), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| x = 0.0 | |
| min_t = 0.0 | |
| max_b = 0.0 | |
| for run, is_th in runs: | |
| if run == "\n": | |
| continue | |
| f = f_th if is_th else f_lat | |
| try: | |
| bb = dtmp.textbbox((x, 0), run, font=f, anchor="ls") | |
| min_t = min(min_t, float(bb[1])) | |
| max_b = max(max_b, float(bb[3])) | |
| x = float(bb[2]) | |
| except Exception: | |
| try: | |
| w, h = dtmp.textsize(run, font=f) | |
| except Exception: | |
| w, h = (len(run) * size * 0.5, size) | |
| min_t = min(min_t, -float(h) * 0.8) | |
| max_b = max(max_b, float(h) * 0.2) | |
| x += float(w) | |
| width = max(1.0, x) | |
| total_h = max(1.0, max_b - min_t) | |
| baseline_to_center = -((min_t + max_b) / 2.0) | |
| return width, total_h, baseline_to_center | |
| def _item_avail_w_px(item: dict, W: int, H: int) -> float: | |
| b = item.get("box") or {} | |
| w_box = float(b.get("width") or 0.0) * float(W) | |
| L = 0.0 | |
| p1 = item.get("baseline_p1") or {} | |
| p2 = item.get("baseline_p2") or {} | |
| if ("x" in p1 and "y" in p1 and "x" in p2 and "y" in p2): | |
| dx = (float(p2.get("x") or 0.0) - float(p1.get("x") or 0.0)) * float(W) | |
| dy = (float(p2.get("y") or 0.0) - float(p1.get("y") or 0.0)) * float(H) | |
| L = float(math.hypot(dx, dy)) | |
| avail = max(w_box, L) | |
| return max(1.0, float(avail)) | |
| def _item_avail_h_px(item: dict, H: int) -> float: | |
| b = item.get("box") or {} | |
| return max(1.0, (float(b.get("height") or 0.0) * float(H)) - 2.0) | |
| def _item_line_text(item: dict) -> str: | |
| t = str(item.get("text") or "") | |
| if t.strip(): | |
| return t | |
| spans = item.get("spans") or [] | |
| return "".join(str(s.get("text") or "") for s in spans) | |
| def _compute_fit_size_px_for_item(item: dict, thai_path: str, latin_path: str, W: int, H: int, base_size: int = 96) -> int | None: | |
| item.pop("_tp_wrap", None) | |
| text = _item_line_text(item) | |
| if not text.strip(): | |
| return None | |
| m = _line_metrics_px(text, thai_path, latin_path, base_size) | |
| if m is None: | |
| return None | |
| tw, th, _ = m | |
| avail_w = _item_avail_w_px(item, W, H) | |
| avail_h = _item_avail_h_px(item, H) | |
| if tw <= 1e-6 or th <= 1e-6: | |
| return None | |
| is_thai = any(_is_thai_char(ch) for ch in text) | |
| scale_w = (avail_w * 0.98) / tw | |
| scale_h = (avail_h * (0.90 if is_thai else 0.94)) / th | |
| scale = min(scale_w, scale_h) | |
| if scale <= 0: | |
| return None | |
| size = max(10, int(base_size * scale)) | |
| while size > 10: | |
| mm = _line_metrics_px(text, thai_path, latin_path, size) | |
| if mm is None: | |
| return None | |
| tw2, th2, _ = mm | |
| if (tw2 <= avail_w * 0.999) and (th2 <= avail_h * 0.999): | |
| break | |
| size -= 1 | |
| if size <= 12 and avail_h >= 24: | |
| tw0, th0, _ = m | |
| if tw0 > (avail_w * 1.2): | |
| def _wrap_fits(s: int) -> bool: | |
| if s <= 0: | |
| return False | |
| k = float(s) / float(base_size) | |
| tw = float(tw0) * k | |
| th = float(th0) * k | |
| lines = int(math.ceil(max(1.0, tw) / max(1.0, avail_w))) | |
| return (float(lines) * th) <= float(avail_h) | |
| hi = int(min(max(16, avail_h), base_size * 3)) | |
| lo = int(size) | |
| best = int(size) | |
| while lo <= hi: | |
| mid = (lo + hi) // 2 | |
| if _wrap_fits(mid): | |
| best = int(mid) | |
| lo = mid + 1 | |
| else: | |
| hi = mid - 1 | |
| if best >= int(size * 1.25): | |
| item["_tp_wrap"] = True | |
| size = int(best) | |
| return int(size) | |
| def fit_tree_font_sizes_for_tp_html(tree: dict, thai_path: str, latin_path: str, W: int, H: int) -> dict: | |
| paras = tree.get("paragraphs") or [] | |
| for p in paras: | |
| items = p.get("items") or [] | |
| if not items: | |
| continue | |
| per_item_fit: dict[int, int] = {} | |
| fits: list[int] = [] | |
| for i, it in enumerate(items): | |
| s = _compute_fit_size_px_for_item(it, thai_path, latin_path, W, H) | |
| if s is None: | |
| continue | |
| per_item_fit[i] = int(s) | |
| fits.append(int(s)) | |
| if not fits: | |
| continue | |
| fits.sort() | |
| p["para_font_size_px"] = int(fits[len(fits) // 2]) | |
| for i, it in enumerate(items): | |
| fs = per_item_fit.get(i) | |
| if fs is None: | |
| continue | |
| it["font_size_px"] = int(fs) | |
| for sp in (it.get("spans") or []): | |
| sp["font_size_px"] = int(fs) | |
| return tree | |
| def _iter_paragraphs(tree: dict): | |
| ps = (tree or {}).get("paragraphs") or [] | |
| for i, p in enumerate(ps): | |
| yield i, p | |
| def _apply_para_font_size(tree: dict, para_sizes: dict[int, int]): | |
| if not tree: | |
| return | |
| for pi, p in _iter_paragraphs(tree): | |
| sz = para_sizes.get(pi) | |
| if not sz: | |
| continue | |
| p["para_font_size_px"] = int(sz) | |
| for it in (p.get("items") or []): | |
| it["font_size_px"] = int(sz) | |
| for sp in (it.get("spans") or []): | |
| sp["font_size_px"] = int(sz) | |
| def _compute_shared_para_sizes(trees: list[dict], thai_path: str, latin_path: str, W: int, H: int) -> dict[int, int]: | |
| sizes: dict[int, int] = {} | |
| for tree in trees: | |
| if not tree: | |
| continue | |
| for pi, p in _iter_paragraphs(tree): | |
| for it in (p.get("items") or []): | |
| fit = _compute_fit_size_px_for_item( | |
| it, thai_path, latin_path, W, H) | |
| if fit is None: | |
| continue | |
| cur = sizes.get(pi) | |
| sizes[pi] = fit if cur is None else min(cur, fit) | |
| vals = [v for v in sizes.values() if isinstance(v, int) and v > 0] | |
| if not vals: | |
| return sizes | |
| vals.sort() | |
| mid = len(vals) // 2 | |
| target = vals[mid] if (len(vals) % 2 == 1) else int( | |
| round((vals[mid - 1] + vals[mid]) / 2)) | |
| for k in list(sizes.keys()): | |
| try: | |
| sizes[k] = int(min(int(sizes[k]), int(target))) | |
| except Exception: | |
| pass | |
| return sizes | |
| def _sanitize_draw_text(s: str) -> str: | |
| t = (s or "").replace("\r\n", "\n").replace("\r", "\n") | |
| t = t.replace("\u200b", "").replace("\ufeff", "") | |
| t = "".join(ch for ch in t if (ch == "\n") or ( | |
| unicodedata.category(ch)[0] != "C")) | |
| return t | |
| def _token_box_px(t, W, H, pad_px=0): | |
| b = t.get("box") or {} | |
| left = int(round(float(b.get("left", 0.0)) * W)) - pad_px | |
| top = int(round(float(b.get("top", 0.0)) * H)) - pad_px | |
| right = int(round((float(b.get("left", 0.0)) + | |
| float(b.get("width", 0.0))) * W)) + pad_px | |
| bottom = int( | |
| round((float(b.get("top", 0.0)) + float(b.get("height", 0.0))) * H)) + pad_px | |
| left = max(0, min(W, left)) | |
| top = max(0, min(H, top)) | |
| right = max(0, min(W, right)) | |
| bottom = max(0, min(H, bottom)) | |
| if right <= left or bottom <= top: | |
| return None | |
| return left, top, right, bottom | |
| def _token_quad_px(t, W, H, pad_px=0, apply_baseline_shift=True): | |
| if not t.get("valid_text"): | |
| return None | |
| p1 = t.get("baseline_p1") or {} | |
| p2 = t.get("baseline_p2") or {} | |
| x1 = float(p1.get("x", 0.0)) * W | |
| y1 = float(p1.get("y", 0.0)) * H | |
| x2 = float(p2.get("x", 0.0)) * W | |
| y2 = float(p2.get("y", 0.0)) * H | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| if dx < 0 or (abs(dx) < 1e-12 and dy < 0): | |
| x1, y1, x2, y2 = x2, y2, x1, y1 | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| L = math.hypot(dx, dy) | |
| if L <= 1e-9: | |
| return None | |
| ux = dx / L | |
| uy = dy / L | |
| nx = -uy | |
| ny = ux | |
| if ny < 0: | |
| nx, ny = -nx, -ny | |
| t0 = float(t.get("t0_raw") if t.get("t0_raw") is not None else 0.0) | |
| t1 = float(t.get("t1_raw") if t.get("t1_raw") is not None else 1.0) | |
| sx = x1 + ux * (t0 * L) | |
| sy = y1 + uy * (t0 * L) | |
| ex = x1 + ux * (t1 * L) | |
| ey = y1 + uy * (t1 * L) | |
| h = max(1.0, float(t.get("height_raw") or 0.0) * H) | |
| if apply_baseline_shift and BASELINE_SHIFT: | |
| shift = h * BASELINE_SHIFT_FACTOR | |
| sx += nx * shift | |
| sy += ny * shift | |
| ex += nx * shift | |
| ey += ny * shift | |
| pad = max(0.0, float(pad_px)) | |
| sx -= ux * pad | |
| sy -= uy * pad | |
| ex += ux * pad | |
| ey += uy * pad | |
| hh = (h / 2.0) + pad | |
| ox = nx * hh | |
| oy = ny * hh | |
| return [(sx - ox, sy - oy), (ex - ox, ey - oy), (ex + ox, ey + oy), (sx + ox, sy + oy)] | |
| def _token_box_quad_px(t, W, H, pad_px=0): | |
| b = t.get("box") or {} | |
| w = float(b.get("width", 0.0)) * W | |
| h = float(b.get("height", 0.0)) * H | |
| if w <= 0.0 or h <= 0.0: | |
| return None | |
| left = float(b.get("left", 0.0)) * W | |
| top = float(b.get("top", 0.0)) * H | |
| cx = left + (w / 2.0) | |
| cy = top + (h / 2.0) | |
| hw = (w / 2.0) + float(pad_px) | |
| hh = (h / 2.0) + float(pad_px) | |
| angle_deg = float(b.get("rotation_deg", 0.0)) | |
| rad = math.radians(angle_deg) | |
| c = math.cos(rad) | |
| s = math.sin(rad) | |
| corners = [(-hw, -hh), (hw, -hh), (hw, hh), (-hw, hh)] | |
| out = [] | |
| for x, y in corners: | |
| rx = (x * c) - (y * s) | |
| ry = (x * s) + (y * c) | |
| out.append((cx + rx, cy + ry)) | |
| return out | |
| def _quad_bbox(quad, W, H): | |
| xs = [p[0] for p in quad] | |
| ys = [p[1] for p in quad] | |
| l = max(0, min(W, int(math.floor(min(xs))))) | |
| t = max(0, min(H, int(math.floor(min(ys))))) | |
| r = max(0, min(W, int(math.ceil(max(xs))))) | |
| b = max(0, min(H, int(math.ceil(max(ys))))) | |
| if r <= l or b <= t: | |
| return None | |
| return l, t, r, b | |
| def _median_rgba(pixels): | |
| if not pixels: | |
| return None | |
| rs = sorted(p[0] for p in pixels) | |
| gs = sorted(p[1] for p in pixels) | |
| bs = sorted(p[2] for p in pixels) | |
| a = 255 | |
| mid = len(rs) // 2 | |
| return (rs[mid], gs[mid], bs[mid], a) | |
| def _rel_luminance(rgb): | |
| r, g, b = rgb | |
| def lin(c): | |
| c = c / 255.0 | |
| return c / 12.92 if c <= 0.04045 else ((c + 0.055) / 1.055) ** 2.4 | |
| return 0.2126 * lin(r) + 0.7152 * lin(g) + 0.0722 * lin(b) | |
| def _contrast_ratio(l1, l2): | |
| a = max(l1, l2) + 0.05 | |
| b = min(l1, l2) + 0.05 | |
| return a / b | |
| def _pick_bw_text_color(bg_rgb): | |
| Lb = _rel_luminance(bg_rgb) | |
| c_black = _contrast_ratio(Lb, 0.0) | |
| c_white = _contrast_ratio(Lb, 1.0) | |
| return TEXT_COLOR_LIGHT if c_white >= c_black else TEXT_COLOR_DARK | |
| def _sample_bg_color_from_quad(base_rgb, quad, rect, border_px=3, margin_px=6): | |
| l, t, r, b = rect | |
| w = r - l | |
| h = b - t | |
| if w <= 0 or h <= 0: | |
| return _sample_bg_color(base_rgb, rect, margin_px) | |
| mask = Image.new("L", (w, h), 0) | |
| d = ImageDraw.Draw(mask) | |
| qrel = [(x - l, y - t) for x, y in quad] | |
| d.polygon(qrel, fill=255) | |
| bp = int(max(0, border_px or 0)) | |
| if bp > 0: | |
| k = min(w, h) | |
| bp = min(bp, max(1, (k - 1) // 2)) | |
| if bp > 0: | |
| er = mask.filter(ImageFilter.MinFilter(size=bp * 2 + 1)) | |
| border = ImageChops.subtract(mask, er) | |
| else: | |
| border = mask | |
| region = base_rgb.crop((l, t, r, b)) | |
| rp = list(region.getdata()) | |
| mp = list(border.getdata()) | |
| samples = [p for p, m in zip(rp, mp) if m > 0] | |
| if len(samples) < 24: | |
| ext = _sample_bg_color(base_rgb, rect, margin_px) | |
| return ext | |
| med = _median_rgba(samples) | |
| if med: | |
| return med[:3] | |
| return _sample_bg_color(base_rgb, rect, margin_px) | |
| def _sample_bg_color(base_rgb, rect, margin_px): | |
| W, H = base_rgb.size | |
| l, t, r, b = rect | |
| m = max(1, int(margin_px)) | |
| samples = [] | |
| def add_strip(x0, y0, x1, y1): | |
| x0 = max(0, min(W, x0)) | |
| y0 = max(0, min(H, y0)) | |
| x1 = max(0, min(W, x1)) | |
| y1 = max(0, min(H, y1)) | |
| if x1 <= x0 or y1 <= y0: | |
| return | |
| samples.extend(list(base_rgb.crop((x0, y0, x1, y1)).getdata())) | |
| add_strip(l, t - m, r, t) | |
| add_strip(l, b, r, b + m) | |
| add_strip(l - m, t, l, b) | |
| add_strip(r, t, r + m, b) | |
| med = _median_rgba(samples) | |
| if med: | |
| return med[:3] | |
| return base_rgb.getpixel((max(0, min(W - 1, l)), max(0, min(H - 1, t)))) | |
| def _sample_bg_color_from_quad_ring(base_rgb, quad, rect, ring_px=4): | |
| W, H = base_rgb.size | |
| l, t, r, b = rect | |
| w = r - l | |
| h = b - t | |
| if w <= 0 or h <= 0: | |
| return None | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| pts = np.array([[(x - l, y - t) for x, y in quad]], dtype=np.int32) | |
| cv2.fillPoly(mask, pts, 255) | |
| rp = int(max(1, ring_px or 1)) | |
| k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (rp * 2 + 1, rp * 2 + 1)) | |
| dil = cv2.dilate(mask, k, iterations=1) | |
| ring = cv2.bitwise_and(dil, cv2.bitwise_not(mask)) | |
| rgb = np.array(base_rgb.crop((l, t, r, b)).convert("RGB"), dtype=np.uint8) | |
| sel = rgb[ring > 0] | |
| if sel.size < 24: | |
| return None | |
| med = np.median(sel, axis=0) | |
| return (int(med[0]), int(med[1]), int(med[2])) | |
| def _pixelate(img, block_px): | |
| w, h = img.size | |
| if w <= 1 or h <= 1: | |
| return img | |
| block_px = int(block_px or 1) | |
| if block_px < 1: | |
| block_px = 1 | |
| sw = max(1, w // block_px) | |
| sh = max(1, h // block_px) | |
| return img.resize((sw, sh), resample=Image.NEAREST).resize((w, h), resample=Image.NEAREST) | |
| def _mean_abs_diff(a, b): | |
| if a.size != b.size: | |
| return 1e18 | |
| a = a.convert("RGB") | |
| b = b.convert("RGB") | |
| da = list(a.getdata()) | |
| db = list(b.getdata()) | |
| if not da: | |
| return 1e18 | |
| s = 0 | |
| for (ar, ag, ab), (br, bg, bb) in zip(da, db): | |
| s += abs(ar - br) + abs(ag - bg) + abs(ab - bb) | |
| return s / (len(da) * 3) | |
| def _resize_small(img, max_w=64, max_h=64): | |
| w, h = img.size | |
| if w <= 0 or h <= 0: | |
| return img | |
| scale = min(max_w / w, max_h / h, 1.0) | |
| nw = max(1, int(w * scale)) | |
| nh = max(1, int(h * scale)) | |
| return img.resize((nw, nh), resample=Image.BILINEAR) | |
| def _clone_candidate_score(base, rect, cand_rect, direction, border_px): | |
| W, H = base.size | |
| l, t, r, b = rect | |
| cl, ct, cr, cb = cand_rect | |
| w = r - l | |
| h = b - t | |
| if w <= 1 or h <= 1: | |
| return 1e18 | |
| border_px = max(1, int(border_px or 1)) | |
| if direction == "up": | |
| a = base.crop((l, max(0, t - border_px), r, t)) | |
| d = base.crop((cl, max(0, cb - border_px), cr, cb)) | |
| elif direction == "down": | |
| a = base.crop((l, b, r, min(H, b + border_px))) | |
| d = base.crop((cl, ct, cr, min(H, ct + border_px))) | |
| elif direction == "left": | |
| a = base.crop((max(0, l - border_px), t, l, b)) | |
| d = base.crop((max(0, cr - border_px), ct, cr, cb)) | |
| else: | |
| a = base.crop((r, t, min(W, r + border_px), b)) | |
| d = base.crop((cl, ct, min(W, cl + border_px), cb)) | |
| a = _resize_small(a, 64, 16) | |
| d = _resize_small(d, 64, 16) | |
| return _mean_abs_diff(a, d) | |
| def _choose_clone_rect(base, rect, gap_px, border_px): | |
| W, H = base.size | |
| l, t, r, b = rect | |
| w = r - l | |
| h = b - t | |
| gap_px = max(0, int(gap_px or 0)) | |
| cands = [] | |
| up = (l, t - gap_px - h, r, t - gap_px) | |
| down = (l, b + gap_px, r, b + gap_px + h) | |
| left = (l - gap_px - w, t, l - gap_px, b) | |
| right = (r + gap_px, t, r + gap_px + w, b) | |
| for direction, (cl, ct, cr, cb) in [("up", up), ("down", down), ("left", left), ("right", right)]: | |
| if cl < 0 or ct < 0 or cr > W or cb > H: | |
| continue | |
| cand_rect = (cl, ct, cr, cb) | |
| score = _clone_candidate_score( | |
| base, rect, cand_rect, direction, border_px) | |
| cands.append((score, cand_rect)) | |
| if not cands: | |
| return None | |
| cands.sort(key=lambda x: x[0]) | |
| return cands[0][1] | |
| def _erase_with_clone(base, rect, mask, gap_px, border_px, feather_px): | |
| l, t, r, b = rect | |
| cand = _choose_clone_rect(base, rect, gap_px, border_px) | |
| if not cand: | |
| return False | |
| cl, ct, cr, cb = cand | |
| donor = base.crop((cl, ct, cr, cb)) | |
| region = base.crop((l, t, r, b)) | |
| feather_px = max(0, int(feather_px or 0)) | |
| if feather_px > 0: | |
| m = mask.filter(ImageFilter.GaussianBlur(radius=feather_px)) | |
| else: | |
| m = mask | |
| merged = Image.composite(donor, region, m) | |
| base.paste(merged, (l, t)) | |
| return True | |
| def _erase_with_blend_patches(base, rect, mask, gap_px=3, feather_px=4): | |
| l, t, r, b = rect | |
| W, H = base.size | |
| w = r - l | |
| h = b - t | |
| if w <= 2 or h <= 2: | |
| return False | |
| gap = int(max(0, gap_px)) | |
| candidates = [] | |
| dirs = [(0, -(h + gap)), (0, (h + gap)), (-(w + gap), 0), ((w + gap), 0), | |
| (-(w + gap), -(h + gap)), ((w + gap), -(h + gap)), (-(w + gap), (h + gap)), ((w + gap), (h + gap))] | |
| for dx, dy in dirs: | |
| ll = l + dx | |
| tt = t + dy | |
| rr = ll + w | |
| bb = tt + h | |
| if ll < 0 or tt < 0 or rr > W or bb > H: | |
| continue | |
| candidates.append(base.crop((ll, tt, rr, bb)).convert("RGB")) | |
| if not candidates: | |
| return False | |
| acc = candidates[0] | |
| for c in candidates[1:]: | |
| acc = ImageChops.add(acc, c, scale=1.0, offset=0) | |
| n = len(candidates) | |
| blended = acc.point(lambda p: int(p / n)) | |
| m = mask | |
| fp = int(max(0, feather_px)) | |
| if fp > 0: | |
| m = m.filter(ImageFilter.GaussianBlur(radius=fp)) | |
| region = base.crop((l, t, r, b)).convert("RGB") | |
| merged = Image.composite(blended, region, m) | |
| base.paste(merged, (l, t)) | |
| return True | |
| def _erase_with_inpaint(base, box_tokens, pad_px=2): | |
| if not box_tokens: | |
| return base | |
| rgb = base.convert("RGB") | |
| W, H = rgb.size | |
| mask = Image.new("L", (W, H), 0) | |
| d = ImageDraw.Draw(mask) | |
| for t in box_tokens: | |
| quad = _token_box_quad_px(t, W, H, pad_px=pad_px) | |
| if not quad: | |
| quad = _token_quad_px(t, W, H, pad_px=pad_px, | |
| apply_baseline_shift=True) | |
| if not quad: | |
| rect = _token_box_px(t, W, H, pad_px=pad_px) | |
| if not rect: | |
| continue | |
| l, tt, r, bb = rect | |
| quad = [(l, tt), (r, tt), (r, bb), (l, bb)] | |
| d.polygon(quad, fill=255) | |
| m = np.array(mask, dtype=np.uint8) | |
| ys, xs = np.where(m > 0) | |
| if xs.size == 0 or ys.size == 0: | |
| return rgb | |
| l = int(max(0, xs.min() - 8)) | |
| t = int(max(0, ys.min() - 8)) | |
| r = int(min(W, xs.max() + 1 + 8)) | |
| b = int(min(H, ys.max() + 1 + 8)) | |
| if r <= l or b <= t: | |
| return rgb | |
| crop_rgb = np.array(rgb.crop((l, t, r, b)), dtype=np.uint8) | |
| crop_m = m[t:b, l:r] | |
| dpx = int(max(0, INPAINT_DILATE_PX or 0)) | |
| if dpx > 0: | |
| k = cv2.getStructuringElement( | |
| cv2.MORPH_ELLIPSE, (dpx * 2 + 1, dpx * 2 + 1)) | |
| crop_m = cv2.dilate(crop_m, k, iterations=1) | |
| bgr = cv2.cvtColor(crop_rgb, cv2.COLOR_RGB2BGR) | |
| method = (INPAINT_METHOD or "telea").strip().lower() | |
| flag = cv2.INPAINT_TELEA if method in ("telea", "t") else cv2.INPAINT_NS | |
| radius = float(INPAINT_RADIUS or 3) | |
| out_bgr = cv2.inpaint(bgr, crop_m, radius, flag) | |
| out_rgb = cv2.cvtColor(out_bgr, cv2.COLOR_BGR2RGB) | |
| out = rgb.copy() | |
| out.paste(Image.fromarray(out_rgb), (l, t)) | |
| return out | |
| def erase_text_with_boxes(img, box_tokens, pad_px=2, sample_margin_px=6, mode=None, mosaic_block_px=None): | |
| if not box_tokens: | |
| return img | |
| mode = (mode or ERASE_MODE or "solid").strip().lower() | |
| mosaic_block_px = int(mosaic_block_px or ERASE_MOSAIC_BLOCK_PX or 10) | |
| base = img.convert("RGB").copy() | |
| if mode in ("inpaint", "cv2", "opencv"): | |
| return _erase_with_inpaint(base, box_tokens, pad_px=pad_px) | |
| W, H = base.size | |
| for t in box_tokens: | |
| quad = _token_box_quad_px(t, W, H, pad_px=pad_px) | |
| if not quad: | |
| quad = _token_quad_px(t, W, H, pad_px=pad_px, | |
| apply_baseline_shift=True) | |
| if not quad: | |
| rect = _token_box_px(t, W, H, pad_px=pad_px) | |
| if not rect: | |
| continue | |
| l, tt, r, bb = rect | |
| quad = [(l, tt), (r, tt), (r, bb), (l, bb)] | |
| rect = _quad_bbox(quad, W, H) | |
| if not rect: | |
| continue | |
| l, tt, r, bb = rect | |
| region = base.crop((l, tt, r, bb)) | |
| mask = Image.new("L", (r - l, bb - tt), 0) | |
| mdraw = ImageDraw.Draw(mask) | |
| qrel = [(x - l, y - tt) for x, y in quad] | |
| mdraw.polygon(qrel, fill=255) | |
| if mode in ("blend_patch", "blend", "avg_patch", "patch"): | |
| ok = _erase_with_blend_patches( | |
| base, rect, mask, ERASE_BLEND_GAP_PX, ERASE_BLEND_FEATHER_PX) | |
| if ok: | |
| continue | |
| mode = "solid" | |
| if mode == "clone": | |
| ok = _erase_with_clone( | |
| base, rect, mask, ERASE_CLONE_GAP_PX, ERASE_CLONE_BORDER_PX, ERASE_CLONE_FEATHER_PX) | |
| if ok: | |
| continue | |
| mode = "solid" | |
| if mode == "mosaic": | |
| pixelated = _pixelate(region, mosaic_block_px) | |
| merged = Image.composite(pixelated, region, mask) | |
| base.paste(merged, (l, tt)) | |
| else: | |
| color = _sample_bg_color_from_quad( | |
| base, quad, rect, BG_SAMPLE_BORDER_PX, sample_margin_px) | |
| region.paste(color, mask=mask) | |
| base.paste(region, (l, tt)) | |
| return base | |
| def draw_overlay(img, tokens, out_path, thai_path, latin_path, level_outlines=None, font_scale: float = 1.0, fit_to_box: bool = True): | |
| base = img.convert("RGBA") | |
| base_rgb = img.convert("RGB") | |
| overlay = Image.new("RGBA", base.size, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| for ol in (level_outlines or []): | |
| q = ol.get("quad") | |
| if not q: | |
| continue | |
| col = ol.get("color", BOX_OUTLINE) | |
| w = int(ol.get("width", 2)) | |
| draw.line(q + [q[0]], fill=col, width=w) | |
| W, H = base.size | |
| for t in tokens: | |
| b = t.get("box") or {} | |
| box_quad = _token_box_quad_px(t, W, H, pad_px=0) | |
| use_box_center = False | |
| if box_quad: | |
| lq, tq, rq, bq = _quad_bbox(box_quad, W, H) | |
| box_cx = (lq + rq) / 2.0 | |
| box_cy = (tq + bq) / 2.0 | |
| box_w = max(1.0, float(rq - lq)) | |
| box_h = max(1.0, float(bq - tq)) | |
| use_box_center = True | |
| else: | |
| left0 = float(b.get("left", 0.0)) * W | |
| top0 = float(b.get("top", 0.0)) * H | |
| box_w = max(1.0, float(b.get("width", 0.0)) * W) | |
| box_h = max(1.0, float(b.get("height", 0.0)) * H) | |
| box_cx = left0 + (box_w / 2.0) | |
| box_cy = top0 + (box_h / 2.0) | |
| if DRAW_OUTLINE_SPAN and DRAW_BOX_OUTLINE: | |
| quad = _token_box_quad_px(t, W, H, pad_px=0) | |
| if quad: | |
| draw.line(quad + [quad[0]], fill=SPAN_OUTLINE, | |
| width=SPAN_OUTLINE_WIDTH) | |
| else: | |
| left = b["left"] * W | |
| top = b["top"] * H | |
| width = b["width"] * W | |
| height = b["height"] * H | |
| draw.rectangle([left, top, left + width, top + height], | |
| outline=SPAN_OUTLINE, width=SPAN_OUTLINE_WIDTH) | |
| text = _sanitize_draw_text(t.get("text") or "") | |
| if text.strip() == "": | |
| continue | |
| p1 = t["baseline_p1"] | |
| p2 = t["baseline_p2"] | |
| x1 = float(p1["x"]) * W | |
| y1 = float(p1["y"]) * H | |
| x2 = float(p2["x"]) * W | |
| y2 = float(p2["y"]) * H | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| if dx < 0 or (abs(dx) < 1e-12 and dy < 0): | |
| x1, y1, x2, y2 = x2, y2, x1, y1 | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| L = math.hypot(dx, dy) | |
| if L <= 1e-9: | |
| continue | |
| ux = dx / L | |
| uy = dy / L | |
| t0 = float(t.get("t0_raw") if t.get("t0_raw") is not None else 0.0) | |
| t1 = float(t.get("t1_raw") if t.get("t1_raw") is not None else 1.0) | |
| sx = x1 + ux * (t0 * L) | |
| sy = y1 + uy * (t0 * L) | |
| ex = x1 + ux * (t1 * L) | |
| ey = y1 + uy * (t1 * L) | |
| avail_w = box_w | |
| avail_h = box_h | |
| if BASELINE_SHIFT and (not use_box_center): | |
| nx, ny = -uy, ux | |
| shift = avail_h * BASELINE_SHIFT_FACTOR | |
| sx += nx * shift | |
| sy += ny * shift | |
| angle_deg = float(b.get("rotation_deg", 0.0)) | |
| forced_size = t.get("font_size_px") | |
| if forced_size is not None: | |
| final_size = int( | |
| max(10, round(float(forced_size) * float(font_scale)))) | |
| font = pick_font(text, thai_path, latin_path, final_size) | |
| if fit_to_box: | |
| tmpc = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| dc = ImageDraw.Draw(tmpc) | |
| try: | |
| bbc = dc.textbbox((0, 0), text, font=font, anchor="ls") | |
| twc = float(bbc[2] - bbc[0]) | |
| thc = float(bbc[3] - bbc[1]) | |
| except Exception: | |
| twc, thc = dc.textsize(text, font=font) | |
| twc = float(twc) | |
| thc = float(thc) | |
| if twc > 0 and thc > 0 and (twc > avail_w or thc > avail_h): | |
| s = min(avail_w / twc, avail_h / thc) | |
| if s < 1.0: | |
| final_size = max(10, int(final_size * s)) | |
| font = pick_font( | |
| text, thai_path, latin_path, final_size) | |
| else: | |
| base_size = 96 | |
| font0 = pick_font(text, thai_path, latin_path, base_size) | |
| tmp = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| try: | |
| bb = dtmp.textbbox((0, 0), text, font=font0, anchor="ls") | |
| tw = bb[2] - bb[0] | |
| th = bb[3] - bb[1] | |
| except Exception: | |
| tw, th = dtmp.textsize(text, font=font0) | |
| if tw <= 0 or th <= 0: | |
| continue | |
| scale = min(avail_w / tw, avail_h / th) | |
| final_size = max(10, int(base_size * scale)) | |
| if not fit_to_box: | |
| final_size = max(10, int(final_size * float(font_scale))) | |
| font = pick_font(text, thai_path, latin_path, final_size) | |
| tmp2 = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| d2 = ImageDraw.Draw(tmp2) | |
| try: | |
| bb2 = d2.textbbox((0, 0), text, font=font, anchor="ls") | |
| tw2 = bb2[2] - bb2[0] | |
| th2 = bb2[3] - bb2[1] | |
| except Exception: | |
| tw2, th2 = d2.textsize(text, font=font) | |
| side = int(max(tw2, th2, avail_h, avail_w) * 2.2 + 40) | |
| side = min(side, int(max(W, H) * 4)) | |
| if side < 128: | |
| side = 128 | |
| canvas = Image.new("RGBA", (side, side), (0, 0, 0, 0)) | |
| dc = ImageDraw.Draw(canvas) | |
| fill = TEXT_COLOR | |
| if AUTO_TEXT_COLOR: | |
| q = _token_box_quad_px(t, W, H, pad_px=0) | |
| if q: | |
| rr = _quad_bbox(q, W, H) | |
| if rr: | |
| bg = _sample_bg_color_from_quad_ring( | |
| base_rgb, q, rr, ring_px=max(2, BG_SAMPLE_BORDER_PX)) | |
| if bg is None: | |
| bg = _sample_bg_color_from_quad( | |
| base_rgb, q, rr, BG_SAMPLE_BORDER_PX, ERASE_SAMPLE_MARGIN_PX) | |
| fill = _pick_bw_text_color(bg) | |
| else: | |
| rr = _token_box_px(t, W, H, pad_px=0) | |
| if rr: | |
| bg = _sample_bg_color(base_rgb, rr, ERASE_SAMPLE_MARGIN_PX) | |
| fill = _pick_bw_text_color(bg) | |
| origin = (side // 2, side // 2) | |
| p1 = t.get("baseline_p1") or {} | |
| p2 = t.get("baseline_p2") or {} | |
| has_baseline = ("x" in p1 and "y" in p1 and "x" in p2 and "y" in p2) | |
| if has_baseline: | |
| x1 = float(p1.get("x") or 0.0) * float(W) | |
| y1 = float(p1.get("y") or 0.0) * float(H) | |
| x2 = float(p2.get("x") or 0.0) * float(W) | |
| y2 = float(p2.get("y") or 0.0) * float(H) | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| Lb = float(math.hypot(dx, dy)) | |
| if Lb <= 1e-6: | |
| Lb = 1.0 | |
| ux = dx / Lb | |
| uy = dy / Lb | |
| nx = -uy | |
| ny = ux | |
| bb = t.get("box") or {} | |
| cx = (float(bb.get("left") or 0.0) + | |
| float(bb.get("width") or 0.0) / 2.0) * float(W) | |
| cy = (float(bb.get("top") or 0.0) + | |
| float(bb.get("height") or 0.0) / 2.0) * float(H) | |
| tt = _sanitize_draw_text(text) | |
| if not tt: | |
| continue | |
| font_m = pick_font(tt, thai_path, latin_path, final_size) | |
| try: | |
| tw = float(font_m.getlength(tt)) | |
| except Exception: | |
| tmp = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) | |
| dtmp = ImageDraw.Draw(tmp) | |
| try: | |
| bbm = dtmp.textbbox((0, 0), tt, font=font_m, anchor="ls") | |
| tw = float(bbm[2] - bbm[0]) | |
| except Exception: | |
| tw, _ = dtmp.textsize(tt, font=font_m) | |
| tw = float(tw) | |
| f_th, f_lat = _get_font_pair(thai_path, latin_path, final_size) | |
| try: | |
| a_th, d_th = f_th.getmetrics() | |
| except Exception: | |
| a_th, d_th = final_size, int(final_size * 0.25) | |
| try: | |
| a_lat, d_lat = f_lat.getmetrics() | |
| except Exception: | |
| a_lat, d_lat = final_size, int(final_size * 0.25) | |
| ascent = float(max(a_th, a_lat)) | |
| descent = float(max(d_th, d_lat)) | |
| center_y_rel = (-ascent + descent) / 2.0 | |
| bx = cx - ux * (tw / 2.0) - nx * center_y_rel | |
| by = cy - uy * (tw / 2.0) - ny * center_y_rel | |
| angle_deg = float(math.degrees(math.atan2(dy, dx))) | |
| _draw_text_baseline_fallback( | |
| dc, origin, text, thai_path, latin_path, final_size, fill) | |
| rotated = canvas.rotate(-angle_deg, resample=Image.BICUBIC, | |
| expand=False, center=origin) | |
| paste_x = int(round(bx - origin[0])) | |
| paste_y = int(round(by - origin[1])) | |
| overlay.alpha_composite(rotated, dest=(paste_x, paste_y)) | |
| else: | |
| _draw_text_centered_fallback( | |
| dc, origin, text, thai_path, latin_path, final_size, fill) | |
| rotated = canvas.rotate(-angle_deg, resample=Image.BICUBIC, | |
| expand=False, center=origin) | |
| paste_x = int(round(box_cx - origin[0])) | |
| paste_y = int(round(box_cy - origin[1])) | |
| overlay.alpha_composite(rotated, dest=(paste_x, paste_y)) | |
| out = Image.alpha_composite(base, overlay).convert("RGB") | |
| out.save(out_path) | |
| def get_lens_data_from_image(image_path, firebase_url, lang): | |
| ck = _get_firebase_cookie(firebase_url) | |
| with open(image_path, "rb") as f: | |
| img_bytes = f.read() | |
| hdr = {"User-Agent": "Mozilla/5.0", "Referer": "https://lens.google.com/"} | |
| with httpx.Client(cookies=ck, headers=hdr, follow_redirects=False, timeout=60) as c: | |
| r = c.post( | |
| "https://lens.google.com/v3/upload", | |
| files={"encoded_image": ("file.jpg", img_bytes, "image/jpeg")}, | |
| ) | |
| if r.status_code not in (302, 303): | |
| raise Exception(f"Upload failed: {r.status_code}\n{r.text}") | |
| redirect = r.headers["location"] | |
| u = to_translated(redirect, lang=lang) | |
| with httpx.Client(cookies=ck, headers=hdr, timeout=60) as c: | |
| j = c.get(u).text | |
| data = json.loads(j[5:] if j.startswith(")]}'") else j) | |
| return data | |
| def _get_firebase_cookie(firebase_url: str): | |
| u = (firebase_url or '').strip() | |
| now = time.time() | |
| cache = _FIREBASE_COOKIE_CACHE | |
| if cache.get('data') and cache.get('url') == u and (now - float(cache.get('ts') or 0)) < float(FIREBASE_COOKIE_TTL_SEC): | |
| return cache.get('data') | |
| r = httpx.get(u, timeout=30) | |
| ck = r.json() | |
| cache['ts'] = now | |
| cache['url'] = u | |
| cache['data'] = ck | |
| return ck | |
| def warmup(lang: str = "th") -> dict: | |
| l = _normalize_lang(lang) | |
| cookie_ok = False | |
| try: | |
| _get_firebase_cookie(FIREBASE_URL) | |
| cookie_ok = True | |
| except Exception: | |
| pass | |
| thai_font = FONT_THAI_PATH | |
| latin_font = FONT_LATIN_PATH | |
| if l == "ja": | |
| latin_font = FONT_JA_PATH | |
| elif l in ("zh", "zh-hans", "zh_cn", "zh-cn", "zh_hans"): | |
| latin_font = FONT_ZH_SC_PATH | |
| elif l in ("zh-hant", "zh_tw", "zh-tw", "zh_hant"): | |
| latin_font = FONT_ZH_TC_PATH | |
| if FONT_DOWNLOD: | |
| thai_font = ensure_font(thai_font, FONT_THAI_URLS) | |
| if l == "ja": | |
| latin_font = ensure_font(latin_font, FONT_JA_URLS) | |
| elif l in ("zh", "zh-hans", "zh_cn", "zh-cn", "zh_hans"): | |
| latin_font = ensure_font(latin_font, FONT_ZH_SC_URLS) | |
| elif l in ("zh-hant", "zh_tw", "zh-tw", "zh_hant"): | |
| latin_font = ensure_font(latin_font, FONT_ZH_TC_URLS) | |
| else: | |
| latin_font = ensure_font(latin_font, FONT_LATIN_URLS) | |
| _get_font_pair(thai_font or "", latin_font or "", 22) | |
| _get_font_pair(thai_font or "", latin_font or "", 28) | |
| return {"ok": True, "lang": l, "thai_font": thai_font or "", "latin_font": latin_font or "", "cookie_ok": cookie_ok} | |
| def main(): | |
| data = get_lens_data_from_image(IMAGE_PATH, FIREBASE_URL, LANG) | |
| img = Image.open(IMAGE_PATH).convert("RGB") | |
| W, H = img.size | |
| thai_font = FONT_THAI_PATH | |
| latin_font = FONT_LATIN_PATH | |
| lang = _normalize_lang(LANG) | |
| if lang == "ja": | |
| latin_font = FONT_JA_PATH | |
| elif lang in ("zh", "zh-hans", "zh_cn", "zh-cn", "zh_hans"): | |
| latin_font = FONT_ZH_SC_PATH | |
| elif lang in ("zh-hant", "zh_tw", "zh-tw", "zh_hant"): | |
| latin_font = FONT_ZH_TC_PATH | |
| if FONT_DOWNLOD: | |
| thai_font = ensure_font(thai_font, FONT_THAI_URLS) | |
| if lang == "ja": | |
| latin_font = ensure_font(latin_font, FONT_JA_URLS) | |
| elif lang in ("zh", "zh-hans", "zh_cn", "zh-cn", "zh_hans"): | |
| latin_font = ensure_font(latin_font, FONT_ZH_SC_URLS) | |
| elif lang in ("zh-hant", "zh_tw", "zh-tw", "zh_hant"): | |
| latin_font = ensure_font(latin_font, FONT_ZH_TC_URLS) | |
| else: | |
| latin_font = ensure_font(latin_font, FONT_LATIN_URLS) | |
| image_url = data.get("imageUrl") if isinstance(data, dict) else None | |
| image_datauri = "" | |
| if DECODE_IMAGEURL_TO_DATAURI and image_url: | |
| image_datauri = decode_imageurl_to_datauri(image_url) | |
| out = { | |
| "imageUrl": image_url, | |
| "imageDataUri": image_datauri, | |
| "originalContentLanguage": data.get("originalContentLanguage"), | |
| "originalTextFull": data.get("originalTextFull"), | |
| "translatedTextFull": data.get("translatedTextFull"), | |
| "AiTextFull": "", | |
| "originalParagraphs": data.get("originalParagraphs") or [], | |
| "translatedParagraphs": data.get("translatedParagraphs") or [], | |
| "original": {}, | |
| "translated": {}, | |
| "Ai": {}, | |
| } | |
| original_span_tokens = None | |
| original_tree = None | |
| translated_tree = None | |
| def _base_img_for_overlay() -> Image.Image: | |
| if not (ERASE_OLD_TEXT_WITH_ORIGINAL_BOXES and original_span_tokens): | |
| return img | |
| return erase_text_with_boxes( | |
| img, | |
| original_span_tokens, | |
| pad_px=ERASE_PADDING_PX, | |
| sample_margin_px=ERASE_SAMPLE_MARGIN_PX, | |
| ) | |
| if DO_ORIGINAL: | |
| tree, _ = decode_tree( | |
| data.get("originalParagraphs") or [], | |
| data.get("originalTextFull") or "", | |
| "original", | |
| W, | |
| H, | |
| want_raw=False, | |
| ) | |
| original_tree = tree | |
| original_span_tokens = flatten_tree_spans(tree) | |
| out["original"] = {"originalTree": tree} | |
| if DO_ORIGINAL_HTML: | |
| out["original"]["originalhtml"] = tokens_to_html( | |
| original_span_tokens) | |
| if DRAW_OVERLAY_ORIGINAL: | |
| base_img = _base_img_for_overlay() | |
| draw_overlay( | |
| base_img, | |
| original_span_tokens, | |
| OVERLAY_ORIGINAL_PATH, | |
| thai_font or "", | |
| latin_font or "", | |
| level_outlines=build_level_outlines(original_tree, W, H), | |
| ) | |
| if DO_AI and original_tree is None: | |
| tree0, _ = decode_tree( | |
| data.get("originalParagraphs") or [], | |
| data.get("originalTextFull") or "", | |
| "original", | |
| W, | |
| H, | |
| want_raw=False, | |
| ) | |
| original_tree = tree0 | |
| if DO_TRANSLATED: | |
| tree, _ = decode_tree( | |
| data.get("translatedParagraphs") or [], | |
| data.get("translatedTextFull") or "", | |
| "translated", | |
| W, | |
| H, | |
| want_raw=False, | |
| ) | |
| translated_tree = tree | |
| out["translated"] = {"translatedTree": tree} | |
| translated_span_tokens = flatten_tree_spans(tree) | |
| if DO_TRANSLATED_HTML: | |
| out["translated"]["translatedhtml"] = tokens_to_html( | |
| translated_span_tokens) | |
| if DRAW_OVERLAY_TRANSLATED: | |
| base_img = _base_img_for_overlay() | |
| draw_overlay( | |
| base_img, | |
| translated_span_tokens, | |
| OVERLAY_TRANSLATED_PATH, | |
| thai_font or "", | |
| latin_font or "", | |
| level_outlines=build_level_outlines(tree, W, H), | |
| font_scale=TRANSLATED_OVERLAY_FONT_SCALE, | |
| fit_to_box=TRANSLATED_OVERLAY_FIT_TO_BOX, | |
| ) | |
| ai = None | |
| if DO_AI: | |
| src_text = out.get("originalTextFull") or "" | |
| if not src_text: | |
| src_text = data.get("originalTextFull") or "" | |
| tree_for_boxes = translated_tree or original_tree | |
| if tree_for_boxes is None: | |
| tree_for_boxes, _ = decode_tree( | |
| data.get("originalParagraphs") or [], | |
| data.get("originalTextFull") or "", | |
| "original", | |
| W, | |
| H, | |
| want_raw=False, | |
| ) | |
| original_tree = tree_for_boxes | |
| ai = ai_translate_original_text( | |
| src_text, | |
| LANG, | |
| ) | |
| template_tree = translated_tree | |
| patched = patch({"Ai": {"aiTextFull": str(ai.get( | |
| "aiTextFull") or ""), "aiTree": template_tree}}, W, H, thai_font, latin_font) | |
| ai_tree = (patched.get("Ai") or {}).get("aiTree") or {} | |
| ai["aiTree"] = ai_tree | |
| shared_para_sizes = _compute_shared_para_sizes( | |
| [original_tree or {}, translated_tree or {}, ai_tree or {}], | |
| thai_font or "", | |
| latin_font or "", | |
| W, | |
| H, | |
| ) | |
| _apply_para_font_size(original_tree or {}, shared_para_sizes) | |
| _apply_para_font_size(translated_tree or {}, shared_para_sizes) | |
| _apply_para_font_size(ai_tree or {}, shared_para_sizes) | |
| _rebuild_ai_spans_after_font_resize( | |
| ai_tree or {}, W, H, thai_font or "", latin_font or "") | |
| out["AiTextFull"] = str(ai.get("aiTextFull") or "") | |
| out["Ai"] = { | |
| "aiTextFull": str(ai.get("aiTextFull") or ""), | |
| "aiTree": ai_tree, | |
| } | |
| if DO_AI_HTML: | |
| if AI_OVERLAY_FIT_TO_BOX: | |
| fit_tree_font_sizes_for_tp_html( | |
| ai_tree or {}, thai_font or "", latin_font or "", W, H) | |
| out["Ai"]["aihtml"] = ai_tree_to_tp_html(ai_tree, W, H) | |
| out["Ai"]["aihtmlCss"] = tp_overlay_css() | |
| out["Ai"]["aihtmlMeta"] = { | |
| "baseW": int(W), | |
| "baseH": int(H), | |
| "format": "tp", | |
| } | |
| if DO_AI_OVERLAY and translated_tree is not None: | |
| base_img = _base_img_for_overlay() | |
| tokens_for_draw = flatten_tree_spans(ai_tree) | |
| draw_overlay( | |
| base_img, | |
| tokens_for_draw, | |
| AI_PATH_OVERLAY, | |
| thai_font or "", | |
| latin_font or "", | |
| level_outlines=build_level_outlines(ai_tree, W, H), | |
| font_scale=AI_OVERLAY_FONT_SCALE, | |
| fit_to_box=AI_OVERLAY_FIT_TO_BOX, | |
| ) | |
| if HTML_INCLUDE_CSS and (DO_ORIGINAL_HTML or DO_TRANSLATED_HTML or DO_AI_HTML): | |
| out["htmlCss"] = overlay_css() | |
| out["htmlMeta"] = { | |
| "containerClass": "RTMDre", | |
| "tokenClass": "IwqbBf", | |
| "sourceWidth": int(W), | |
| "sourceHeight": int(H), | |
| } | |
| if "htmlMeta" not in out: | |
| out["htmlMeta"] = { | |
| "containerClass": "RTMDre", | |
| "tokenClass": "IwqbBf", | |
| "sourceWidth": int(W), | |
| "sourceHeight": int(H), | |
| } | |
| if WRITE_OUT_JSON: | |
| with open(OUT_JSON, "w", encoding="utf-8") as f: | |
| json.dump(out, f, ensure_ascii=False, indent=2) | |
| if __name__ == "__main__": | |
| main() | |