| import asyncio, base64, copy, hashlib, io, json, os, re, tempfile, time, uuid, httpx, logging |
|
|
| from backend import lens_core as core |
| from http import HTTPStatus |
| from collections import OrderedDict |
| from threading import Lock, Semaphore |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| SERVER_MAX_WORKERS = int(os.environ.get('SERVER_MAX_WORKERS', '15')) |
| JOB_TTL_SEC = int(os.environ.get('JOB_TTL_SEC', '3600')) |
| HTTP_TIMEOUT_SEC = float(os.environ.get( |
| 'HTTP_TIMEOUT_SEC', str(getattr(core, 'AI_TIMEOUT_SEC', 120)))) |
| SUPPORTED_MODES = {"lens_images", "lens_text"} |
| BUILD_ID = os.environ.get('TP_BUILD_ID', 'v9-backendfix-20260129') |
| TP_DEBUG = str(os.environ.get('TP_DEBUG', '')).strip( |
| ).lower() in ('1', 'true', 'yes', 'on') |
|
|
| TP_PARA_MARKER_PREFIX = '<<TP_P' |
| TP_PARA_MARKER_SUFFIX = '>>' |
|
|
| TP_RESULT_CACHE_MAX = int(os.environ.get('TP_RESULT_CACHE_MAX', '24')) |
| TP_AI_RESULT_CACHE_MAX = int(os.environ.get('TP_AI_RESULT_CACHE_MAX', '16')) |
| TP_WARMUP_LANG = (os.environ.get('TP_WARMUP_LANG', 'th') or 'th').strip() |
|
|
| _result_cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() |
| _ai_result_cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() |
| _jobs: Dict[str, Dict[str, Any]] = {} |
| _job_queue: asyncio.Queue = asyncio.Queue() |
| _result_cache_lock = Lock() |
| _ai_cache_lock = Lock() |
|
|
| HF_AI_MAX_CONCURRENCY = max( |
| 1, int(os.environ.get('HF_AI_MAX_CONCURRENCY', '1'))) |
| HF_AI_MIN_INTERVAL_SEC = max(0.0, float( |
| os.environ.get('HF_AI_MIN_INTERVAL_SEC', '5'))) |
| HF_AI_MAX_RETRIES = max(1, int(os.environ.get('HF_AI_MAX_RETRIES', '6'))) |
| HF_AI_RETRY_BASE_SEC = max(0.2, float( |
| os.environ.get('HF_AI_RETRY_BASE_SEC', '2'))) |
| _hf_ai_sem = Semaphore(HF_AI_MAX_CONCURRENCY) |
| _hf_ai_lock = Lock() |
| _hf_ai_last_ts = 0.0 |
| _tp_marker_re = re.compile(r'<<TP_P\d+>>') |
|
|
| TP_ACCESS_LOG_MODE = (os.environ.get('TP_ACCESS_LOG_MODE', 'custom') or 'custom').strip().lower() |
| if TP_ACCESS_LOG_MODE in ('custom', 'tp', 'plain'): |
| try: |
| _uv = logging.getLogger('uvicorn.access') |
| _uv.disabled = True |
| _uv.propagate = False |
| _uv.setLevel(logging.CRITICAL) |
| except Exception: |
| pass |
|
|
| def _dbg(tag: str, data=None) -> None: |
| if not TP_DEBUG: |
| return |
| try: |
| if data is None: |
| print(f'[TextPhantom][dbg] {tag}') |
| else: |
| s = json.dumps(data, ensure_ascii=False) |
| if len(s) > 2000: |
| s = s[:2000] + '…' |
| print(f'[TextPhantom][dbg] {tag} {s}') |
| except Exception: |
| try: |
| print(f'[TextPhantom][dbg] {tag} {data}') |
| except Exception: |
| pass |
|
|
| def _tree_stats(tree) -> dict: |
| if not isinstance(tree, dict): |
| return {'paras': 0, 'items': 0, 'spans': 0} |
| paras = tree.get('paragraphs') or [] |
| if not isinstance(paras, list): |
| return {'paras': 0, 'items': 0, 'spans': 0} |
| items = 0 |
| spans = 0 |
| for p in paras: |
| if not isinstance(p, dict): |
| continue |
| its = p.get('items') or [] |
| if not isinstance(its, list): |
| continue |
| items += len(its) |
| for it in its: |
| if not isinstance(it, dict): |
| continue |
| sp = it.get('spans') or [] |
| if isinstance(sp, list): |
| spans += len(sp) |
| return {'paras': len(paras), 'items': items, 'spans': spans} |
|
|
| def _tree_to_paragraph_texts(tree: Any) -> List[str]: |
| if not isinstance(tree, dict): |
| return [] |
| paras = tree.get('paragraphs') or [] |
| if not isinstance(paras, list) or not paras: |
| return [] |
| out: List[str] = [] |
| for p in paras: |
| if not isinstance(p, dict): |
| out.append('') |
| continue |
| t = str(p.get('text') or '').strip() |
| if not t: |
| items = p.get('items') or [] |
| if isinstance(items, list) and items: |
| t = ' '.join(str(it.get('text') or '').strip() for it in items if isinstance( |
| it, dict) and str(it.get('text') or '').strip()) |
| out.append(t) |
| return out |
|
|
| def _apply_para_markers(paras: List[str]) -> str: |
| if not paras: |
| return '' |
| parts: List[str] = [] |
| for i, t in enumerate(paras): |
| parts.append( |
| f"{TP_PARA_MARKER_PREFIX}{i}{TP_PARA_MARKER_SUFFIX}\n{(t or '').strip()}") |
| return '\n\n'.join(parts) |
|
|
| def _clamp_runaway_repeats(s: str, max_repeat: int = 12) -> str: |
| if not s: |
| return '' |
| pat = re.compile(r"(.)\1{" + str(max_repeat) + r",}") |
| return pat.sub(lambda m: m.group(1) * max_repeat, s) |
|
|
| def _extract_marker_indices(s: str) -> set[int]: |
| if not s: |
| return set() |
| out: set[int] = set() |
| for m in re.finditer(r"<<TP_P(\d+)>>", s): |
| try: |
| out.add(int(m.group(1))) |
| except Exception: |
| continue |
| return out |
|
|
| def _needs_ai_retry(ai_text_full: str, expected_paras: int) -> bool: |
| if expected_paras <= 0: |
| return False |
| idx = _extract_marker_indices(ai_text_full) |
| if len(idx) >= expected_paras: |
| return False |
|
|
| if (TP_PARA_MARKER_PREFIX in (ai_text_full or '')) and (TP_PARA_MARKER_SUFFIX not in (ai_text_full or '')): |
| return True |
| return True |
|
|
| def _now() -> float: |
| return time.time() |
|
|
| def _lru_get(cache: OrderedDict, lock: Lock, key: str) -> Optional[Dict[str, Any]]: |
| if not key: |
| return None |
| with lock: |
| v = cache.get(key) |
| if v is None: |
| return None |
| cache.move_to_end(key) |
| return copy.deepcopy(v) |
|
|
| def _lru_set(cache: OrderedDict, lock: Lock, key: str, value: Dict[str, Any], max_items: int) -> None: |
| if not key or not isinstance(value, dict) or max_items <= 0: |
| return |
| with lock: |
| cache[key] = copy.deepcopy(value) |
| cache.move_to_end(key) |
| while len(cache) > max_items: |
| cache.popitem(last=False) |
|
|
| def _sha256_hex(blob: bytes) -> str: |
| return hashlib.sha256(blob).hexdigest() if blob else '' |
|
|
| def _ai_prompt_sig(s: str) -> str: |
| t = (s or '').strip() |
| if not t: |
| return '' |
| return hashlib.sha256(t.encode('utf-8')).hexdigest()[:12] |
|
|
| def _build_cache_key(img_hash: str, lang: str, mode: str, source: str, ai_cfg: Optional["AiConfig"]) -> str: |
| parts = [img_hash, _normalize_lang( |
| lang), (mode or '').strip(), (source or '').strip()] |
| if ai_cfg and (source or '').strip().lower() == 'ai': |
| parts.extend([ |
| (ai_cfg.provider or '').strip(), |
| (ai_cfg.model or '').strip(), |
| (ai_cfg.base_url or '').strip(), |
| _ai_prompt_sig(ai_cfg.prompt_editable), |
| ]) |
| return '|'.join([p for p in parts if p is not None]) |
|
|
| def _b64_to_bytes(b64: str) -> bytes: |
| pad = '=' * ((4 - (len(b64) % 4)) % 4) |
| return base64.b64decode(b64 + pad) |
|
|
| def _datauri_to_bytes(data_uri: str) -> tuple[bytes, str]: |
| s = (data_uri or '').strip() |
| if not s.startswith('data:'): |
| return b'', '' |
| head, _, b64 = s.partition(',') |
| mime = '' |
| if ';' in head: |
| mime = head[5:head.index(';')] |
| return _b64_to_bytes(b64), mime or 'application/octet-stream' |
|
|
| def _bytes_to_datauri(blob: bytes, mime: str) -> str: |
| b64 = base64.b64encode(blob).decode('ascii') |
| return f"data:{mime};base64,{b64}" |
|
|
| def _download_bytes(url: str, referer: str = '') -> tuple[bytes, str]: |
| u = (url or '').strip() |
| if not u: |
| return b'', '' |
| headers = { |
| 'user-agent': 'Mozilla/5.0 (TextPhantomOCR; +https://huggingface.co/spaces)', |
| } |
| ref = (referer or '').strip() |
| if ref: |
| headers['referer'] = ref |
|
|
| with httpx.Client(timeout=HTTP_TIMEOUT_SEC, follow_redirects=True, headers=headers) as client: |
| r = client.get(u) |
| r.raise_for_status() |
| ct = (r.headers.get('content-type') or '').split(';')[0].strip() |
| return r.content, ct |
|
|
| def _detect_provider_from_key(api_key: str) -> str: |
| return core._canonical_provider(core._detect_ai_provider_from_key(api_key)) |
|
|
| def _resolve_provider_defaults(provider: str) -> dict: |
| return (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get(provider, {}) |
|
|
| def _resolve_model(provider: str, model: str) -> str: |
| return core._resolve_model(provider, model) |
|
|
| def _has_meaningful_text(s: str) -> bool: |
| t = _tp_marker_re.sub('', str(s or '')) |
| return bool(t.strip()) |
|
|
| def _is_hf_provider(provider: str, base_url: str) -> bool: |
| p = (provider or '').strip().lower() |
| b = (base_url or '').strip().lower() |
| return p == 'huggingface' or 'router.huggingface.co' in b |
|
|
| def _is_hf_rate_limited_error(msg: str) -> bool: |
| t = (msg or '').lower() |
| if 'rate limit' in t or 'ratelimit' in t or 'too many requests' in t: |
| return True |
| if 'http 429' in t or ' 429' in t: |
| return True |
| if 'http 503' in t or ' 503' in t or 'overloaded' in t or 'temporarily' in t: |
| return True |
| return False |
|
|
| def _hf_throttle_before_call() -> None: |
| if HF_AI_MIN_INTERVAL_SEC <= 0: |
| return |
| global _hf_ai_last_ts |
| with _hf_ai_lock: |
| now = _now() |
| dt = now - float(_hf_ai_last_ts or 0.0) |
| wait = HF_AI_MIN_INTERVAL_SEC - dt |
| if wait > 0: |
| time.sleep(wait) |
| _hf_ai_last_ts = _now() |
|
|
| def _openai_compat_generate_with_hf_backoff(api_key: str, base_url: str, model: str, system_text: str, user_parts: List[str]): |
| last_err: Optional[Exception] = None |
| for attempt in range(int(HF_AI_MAX_RETRIES)): |
| try: |
| with _hf_ai_sem: |
| _hf_throttle_before_call() |
| return core._openai_compat_generate_json(api_key, base_url, model, system_text, user_parts) |
| except Exception as e: |
| last_err = e |
| if not _is_hf_rate_limited_error(str(e)): |
| raise |
| delay = min(15.0, max(float(HF_AI_MIN_INTERVAL_SEC), float( |
| HF_AI_RETRY_BASE_SEC) * (2 ** min(attempt, 4)))) |
| _dbg('ai.hf.backoff', { |
| 'attempt': attempt + 1, 'delay_sec': round(delay, 2), 'err': str(e)[:240]}) |
| time.sleep(delay) |
| continue |
| if last_err is not None: |
| raise last_err |
| raise Exception('hf_backoff_failed') |
|
|
| def _normalize_lang(lang: str) -> str: |
| return core._normalize_lang(lang) |
|
|
| @dataclass |
| class AiConfig: |
| api_key: str |
| model: str = 'auto' |
| provider: str = 'auto' |
| base_url: str = 'auto' |
| prompt_editable: str = '' |
|
|
| def _collapse_ws(text: str) -> str: |
| return re.sub(r"\s+", " ", str(text or "")).strip() |
|
|
| def _sanitize_marked_text(marked_text: str) -> str: |
| t = str(marked_text or "") |
| if not t: |
| return "" |
| t = t.replace("\r\n", "\n").replace("\r", "\n") |
| t = re.sub(r"<<TP_P(?!\d+>>)[^\s>]*>?", "", t) |
| t = re.sub(r"(?m)^\s*(<<TP_P\d+>>)\s*(\S)", r"\1\n\2", t) |
|
|
| lines = t.split("\n") |
| out0: List[str] = [] |
| for line in lines: |
| if "<<TP_P" not in line: |
| out0.append(line) |
| continue |
| m = re.match(r"^\s*(<<TP_P\d+>>)\s*$", line) |
| if m: |
| out0.append(m.group(1)) |
| continue |
| m2 = re.match(r"^\s*(<<TP_P\d+>>)\s*(.*)$", line) |
| if m2: |
| out0.append(m2.group(1)) |
| rest = (m2.group(2) or "").strip() |
| if rest: |
| out0.append(rest) |
| continue |
| out0.append(re.sub(r"<<TP_P\d+>>", "", line)) |
| t = "\n".join(out0) |
|
|
| indices = sorted(_extract_marker_indices(t)) |
| if not indices: |
| return _collapse_ws(t) |
| out_lines: List[str] = [] |
| for idx in indices: |
| marker = f"<<TP_P{idx}>>" |
| m = re.search( |
| rf"{re.escape(marker)}\s*([\s\S]*?)(?=<<TP_P\d+>>|\Z)", t) |
| seg = m.group(1) if m else "" |
| seg = _collapse_ws(seg) |
| out_lines.append(marker) |
| out_lines.append(seg) |
| out_lines.append("") |
| return "\n".join(out_lines).strip("\n") |
|
|
|
|
| def _has_complete_marker_sequence(ai_text_full: str, expected_paras: int) -> bool: |
| if expected_paras <= 0: |
| return True |
| t = str(ai_text_full or "") |
| need = list(range(int(expected_paras))) |
| idx = sorted(_extract_marker_indices(t)) |
| if len(idx) < len(need): |
| return False |
| if idx[:len(need)] != need: |
| return False |
| last = -1 |
| for i in need: |
| m = f"<<TP_P{i}>>" |
| p = t.find(m) |
| if p < 0 or p <= last: |
| return False |
| last = p |
| return True |
|
|
| def _build_ai_prompt_packet_custom(target_lang: str, original_text_full: str, prompt_editable: str, is_retry: bool = False) -> tuple[str, List[str]]: |
| lang = _normalize_lang(target_lang) |
|
|
| base = (getattr(core, "AI_PROMPT_SYSTEM_BASE", "") or "").strip() |
|
|
| style = (prompt_editable or "").strip() |
| if not style: |
| style = ( |
| (getattr(core, "AI_LANG_STYLE", {}) or {}).get(lang) |
| or (getattr(core, "AI_LANG_STYLE", {}) or {}).get("default") |
| or "" |
| ).strip() |
|
|
| contract_parts: List[str] = [ |
| "Output ONLY the translated text (no JSON, no markdown, no extra commentary).", |
| "Markers: Keep every paragraph marker like <<TP_P0>> unchanged and in order. Do not remove, rename, or add markers.", |
| "For each marker, output the marker followed by that paragraph's translated text.", |
| ] |
| if is_retry: |
| contract_parts.append( |
| "Retry: You MUST output ALL markers from the first to the last marker in the input." |
| ) |
|
|
| system_text = "\n\n".join( |
| [p for p in [base, style, "\n".join(contract_parts)] if p] |
| ) |
|
|
| user_parts: List[str] = ["Input:\n" + str(original_text_full or "")] |
| return system_text, user_parts |
|
|
| def ai_translate_text(original_text_full: str, target_lang: str, ai: AiConfig, is_retry: bool = False) -> dict: |
| if not _has_meaningful_text(original_text_full): |
| return { |
| 'aiTextFull': '', |
| 'meta': { |
| 'skipped': True, |
| 'skipped_reason': 'no_text', |
| }, |
| } |
|
|
| api_key = (ai.api_key or '').strip() |
| if not api_key: |
| raise Exception('AI api_key is required') |
|
|
| provider = core._canonical_provider((ai.provider or 'auto')) |
| if provider in ('', 'auto'): |
| provider = _detect_provider_from_key(api_key) |
|
|
| preset = _resolve_provider_defaults(provider) or {} |
|
|
| model = _resolve_model(provider, (ai.model or 'auto')) |
|
|
| base_url = (ai.base_url or 'auto').strip() |
| if base_url in ('', 'auto'): |
| base_url = (preset.get('base_url') or '').strip() |
|
|
| if provider not in ('gemini', 'anthropic'): |
| if not base_url: |
| base_url = (_resolve_provider_defaults('openai') or {}).get( |
| 'base_url') or 'https://api.openai.com/v1' |
|
|
| system_text, user_parts = _build_ai_prompt_packet_custom( |
| target_lang, original_text_full, ai.prompt_editable, is_retry=is_retry |
| ) |
|
|
| started = _now() |
| used_model = model |
| if provider == 'gemini': |
| raw = core._gemini_generate_json( |
| api_key, model, system_text, user_parts) |
| elif provider == 'anthropic': |
| raw = core._anthropic_generate_json( |
| api_key, model, system_text, user_parts) |
| else: |
| if _is_hf_provider(provider, base_url): |
| raw, used_model = _openai_compat_generate_with_hf_backoff( |
| api_key, base_url, model, system_text, user_parts) |
| else: |
| raw, used_model = core._openai_compat_generate_json( |
| api_key, base_url, model, system_text, user_parts) |
|
|
| ai_text_full = core._parse_ai_textfull_only( |
| raw) if core.DO_AI_JSON else core._parse_ai_textfull_text_only(raw) |
|
|
| ai_text_full = _sanitize_marked_text(ai_text_full) |
|
|
| return { |
| 'aiTextFull': ai_text_full, |
| 'meta': { |
| 'model': used_model, |
| 'provider': provider, |
| 'base_url': base_url, |
| 'latency_sec': round(_now() - started, 3), |
| }, |
| } |
|
|
| def process_image_path(image_path: str, lang: str, mode: str, ai_cfg: Optional[AiConfig]) -> dict: |
| mode_id = (mode or '').strip() |
| if mode_id not in SUPPORTED_MODES: |
| mode_id = 'lens_images' |
|
|
| target_lang = _normalize_lang(lang) |
|
|
| data = core.get_lens_data_from_image( |
| image_path, getattr(core, 'FIREBASE_URL', ''), target_lang) |
| img = core.Image.open(image_path).convert('RGB') |
| W, H = img.size |
|
|
| thai_font = getattr(core, 'FONT_THAI_PATH', 'NotoSansThai-Regular.ttf') |
| latin_font = getattr(core, 'FONT_LATIN_PATH', 'NotoSans-Regular.ttf') |
|
|
| if target_lang == 'ja': |
| latin_font = getattr(core, 'FONT_JA_PATH', latin_font) |
| elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): |
| latin_font = getattr(core, 'FONT_ZH_SC_PATH', latin_font) |
| elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): |
| latin_font = getattr(core, 'FONT_ZH_TC_PATH', latin_font) |
|
|
| if getattr(core, 'FONT_DOWNLOD', True): |
| thai_font = core.ensure_font( |
| thai_font, getattr(core, 'FONT_THAI_URLS', [])) |
| if target_lang == 'ja': |
| latin_font = core.ensure_font( |
| latin_font, getattr(core, 'FONT_JA_URLS', [])) |
| elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): |
| latin_font = core.ensure_font( |
| latin_font, getattr(core, 'FONT_ZH_SC_URLS', [])) |
| elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): |
| latin_font = core.ensure_font( |
| latin_font, getattr(core, 'FONT_ZH_TC_URLS', [])) |
| else: |
| latin_font = core.ensure_font( |
| latin_font, getattr(core, 'FONT_LATIN_URLS', [])) |
|
|
| image_url = data.get('imageUrl') if isinstance(data, dict) else None |
|
|
| out: Dict[str, Any] = { |
| 'mode': mode_id, |
| 'imageUrl': image_url, |
| 'imageDataUri': '', |
| 'originalContentLanguage': data.get('originalContentLanguage') if isinstance(data, dict) else None, |
| 'originalTextFull': data.get('originalTextFull') if isinstance(data, dict) else None, |
| 'translatedTextFull': data.get('translatedTextFull') if isinstance(data, dict) else None, |
| 'AiTextFull': '', |
| 'originalParagraphs': (data.get('originalParagraphs') or []) if isinstance(data, dict) else [], |
| 'translatedParagraphs': (data.get('translatedParagraphs') or []) if isinstance(data, dict) else [], |
| 'original': {}, |
| 'translated': {}, |
| 'Ai': {}, |
| } |
|
|
| if mode_id == 'lens_images': |
| if image_url: |
| decoded = core.decode_imageurl_to_datauri(str(image_url)) |
| if decoded: |
| out['imageDataUri'] = decoded |
| elif isinstance(image_url, str) and image_url.startswith(('http://', 'https://')): |
| blob, mime2 = _download_bytes(image_url) |
| out['imageDataUri'] = _bytes_to_datauri( |
| blob, mime2 or 'image/jpeg') |
|
|
| if not out.get('imageDataUri'): |
| with open(image_path, 'rb') as f: |
| blob = f.read() |
| out['imageDataUri'] = _bytes_to_datauri(blob, 'image/jpeg') |
| return out |
|
|
| original_span_tokens = None |
| original_tree = None |
| translated_tree = None |
|
|
| def _base_img_for_overlay() -> core.Image.Image: |
| if not (getattr(core, 'ERASE_OLD_TEXT_WITH_ORIGINAL_BOXES', True) and original_span_tokens): |
| return img |
| return core.erase_text_with_boxes( |
| img, |
| original_span_tokens, |
| pad_px=getattr(core, 'ERASE_PADDING_PX', 2), |
| sample_margin_px=getattr(core, 'ERASE_SAMPLE_MARGIN_PX', 6), |
| ) |
|
|
| if getattr(core, 'DO_ORIGINAL', True): |
| tree, _ = core.decode_tree( |
| out.get('originalParagraphs') or [], |
| out.get('originalTextFull') or '', |
| 'original', |
| W, |
| H, |
| want_raw=False, |
| ) |
| original_tree = tree |
| original_span_tokens = core.flatten_tree_spans(tree) |
| _dbg('tree.original', _tree_stats(original_tree)) |
| out['original'] = { |
| 'originalTree': tree, |
| 'originalTextFull': out.get('originalTextFull') or '', |
| } |
|
|
| if getattr(core, 'DO_TRANSLATED', True): |
| tree, _ = core.decode_tree( |
| out.get('translatedParagraphs') or [], |
| out.get('translatedTextFull') or '', |
| 'translated', |
| W, |
| H, |
| want_raw=False, |
| ) |
| translated_tree = tree |
| translated_span_tokens = core.flatten_tree_spans(tree) |
| _dbg('tree.translated', _tree_stats(translated_tree)) |
| out['translated'] = { |
| 'translatedTree': tree, |
| 'translatedTextFull': out.get('translatedTextFull') or '', |
| } |
|
|
| def _tree_score(tree: Any) -> int: |
| if not isinstance(tree, dict): |
| return -1 |
| paragraphs = tree.get('paragraphs') or [] |
| if not isinstance(paragraphs, list) or not paragraphs: |
| return -1 |
|
|
| para_count = len(paragraphs) |
| item_count = 0 |
| span_count = 0 |
| for p in paragraphs: |
| if not isinstance(p, dict): |
| continue |
| items = p.get('items') or [] |
| if not isinstance(items, list): |
| continue |
| item_count += len(items) |
| for it in items: |
| if not isinstance(it, dict): |
| continue |
| spans = it.get('spans') or [] |
| if isinstance(spans, list): |
| span_count += len(spans) |
|
|
| return item_count * 10000 + para_count * 100 + span_count |
|
|
| def _pick_ai_template_tree() -> Optional[Dict[str, Any]]: |
| tr_score = _tree_score(translated_tree) |
| og_score = _tree_score(original_tree) |
|
|
| if tr_score < 0 and og_score < 0: |
| return None |
| if og_score > tr_score: |
| return original_tree |
| return translated_tree or original_tree |
|
|
| ai_tree = None |
| if ai_cfg and (ai_cfg.api_key or '').strip() and getattr(core, 'DO_AI', True): |
| src_paras = _tree_to_paragraph_texts(original_tree or {}) |
| src_text = _apply_para_markers(src_paras) if src_paras else str( |
| out.get('originalTextFull') or '') |
| if not _has_meaningful_text(src_text): |
| out['AiTextFull'] = '' |
| out['Ai'] = { |
| 'meta': { |
| 'skipped': True, |
| 'skipped_reason': 'no_text', |
| } |
| } |
| else: |
| ai = ai_translate_text(src_text, target_lang, ai_cfg) |
| if src_paras and _needs_ai_retry(str(ai.get('aiTextFull') or ''), len(src_paras)): |
| _dbg('ai.retry', { |
| 'expected_paras': len(src_paras), |
| 'found_markers': len(_extract_marker_indices(str(ai.get('aiTextFull') or ''))), |
| }) |
| retry_paras = [_clamp_runaway_repeats(p) for p in src_paras] |
| retry_text = _apply_para_markers(retry_paras) or src_text |
| ai = ai_translate_text( |
| retry_text, target_lang, ai_cfg, is_retry=True) |
|
|
| ai_text_full = str(ai.get('aiTextFull') or '') |
| meta0 = ai.get('meta') or {} |
| if src_paras: |
| expected = len(src_paras) |
| if not _has_complete_marker_sequence(ai_text_full, expected): |
| fallback_paras = _tree_to_paragraph_texts(translated_tree or {}) |
| if len(fallback_paras) < expected: |
| fallback_paras = (fallback_paras + src_paras)[:expected] |
| else: |
| fallback_paras = fallback_paras[:expected] |
|
|
| found = sorted(_extract_marker_indices(ai_text_full)) |
| seg_map: Dict[int, str] = {} |
| for idx in found: |
| if idx < 0 or idx >= expected: |
| continue |
| marker = f"<<TP_P{idx}>>" |
| m = re.search(rf"{re.escape(marker)}\s*([\s\S]*?)(?=<<TP_P\d+>>|\Z)", ai_text_full) |
| seg = _collapse_ws(m.group(1) if m else '') |
| if seg and idx not in seg_map: |
| seg_map[idx] = seg |
|
|
| missing = 0 |
| out_lines: List[str] = [] |
| for i in range(expected): |
| seg = seg_map.get(i) or _collapse_ws(fallback_paras[i] if i < len(fallback_paras) else '') |
| if not seg_map.get(i): |
| missing += 1 |
| out_lines.append(f"<<TP_P{i}>>") |
| out_lines.append(seg) |
| out_lines.append('') |
| ai_text_full = "\n".join(out_lines).strip("\n") |
| _dbg('ai.marker.repaired', { |
| 'expected_paras': expected, |
| 'found_markers': len(seg_map), |
| 'missing': missing, |
| }) |
|
|
| meta0 = { |
| **meta0, |
| 'marker_repaired': True, |
| 'marker_expected': expected, |
| 'marker_found': len(seg_map), |
| 'marker_missing': missing, |
| } |
|
|
| template_tree = _pick_ai_template_tree() |
| _dbg('ai.template.pick', { |
| 'score_original': _tree_score(original_tree), |
| 'score_translated': _tree_score(translated_tree), |
| 'picked': 'original' if template_tree is original_tree else ('translated' if template_tree is translated_tree else 'none'), |
| }) |
| if not isinstance(template_tree, dict): |
| template_tree = original_tree if isinstance(original_tree, dict) else ( |
| translated_tree if isinstance(translated_tree, dict) else {}) |
| patched = core.patch( |
| {'Ai': {'aiTextFull': str( |
| ai_text_full or ''), 'aiTree': template_tree}}, |
| W, |
| H, |
| thai_font or '', |
| latin_font or '', |
| lang=target_lang, |
| ) |
| ai_tree = (patched.get('Ai') or {}).get('aiTree') or {} |
| _dbg('ai.patched', { |
| 'ai_text_len': len(ai_text_full), |
| 'stats_ai': _tree_stats(ai_tree), |
| 'stats_original': _tree_stats(original_tree or {}), |
| 'stats_translated': _tree_stats(translated_tree or {}), |
| 'mode': mode_id, |
| 'lang': target_lang, |
| }) |
|
|
| shared_para_sizes = core._compute_shared_para_sizes( |
| [original_tree or {}, translated_tree or {}, ai_tree or {}], |
| thai_font or '', |
| latin_font or '', |
| W, |
| H, |
| ) |
| core._apply_para_font_size(original_tree or {}, shared_para_sizes) |
| core._apply_para_font_size( |
| translated_tree or {}, shared_para_sizes) |
| core._apply_para_font_size(ai_tree or {}, shared_para_sizes) |
| core._rebuild_ai_spans_after_font_resize( |
| ai_tree or {}, W, H, thai_font or '', latin_font or '', lang=target_lang) |
|
|
| out['AiTextFull'] = ai_text_full |
| out['Ai'] = { |
| 'aiTextFull': ai_text_full, |
| 'aiTree': ai_tree, |
| 'meta': meta0, |
| } |
| if getattr(core, 'DO_AI_HTML', True): |
| core.fit_tree_font_sizes_for_tp_html( |
| ai_tree, thai_font or '', latin_font or '', W, H) |
| out['Ai']['aihtml'] = core.ai_tree_to_tp_html(ai_tree, W, H) |
| out['Ai']['aihtmlMeta'] = { |
| 'baseW': int(W), |
| 'baseH': int(H), |
| 'format': 'tp', |
| } |
|
|
| if getattr(core, 'DO_ORIGINAL', True) and getattr(core, 'DO_ORIGINAL_HTML', True) and isinstance(original_tree, dict): |
| core.fit_tree_font_sizes_for_tp_html( |
| original_tree, thai_font or '', latin_font or '', W, H) |
| if isinstance(out.get('original'), dict): |
| out['original']['originalhtml'] = core.ai_tree_to_tp_html( |
| original_tree or {}, W, H) |
|
|
| if getattr(core, 'DO_TRANSLATED', True) and getattr(core, 'DO_TRANSLATED_HTML', True) and isinstance(translated_tree, dict): |
| core.fit_tree_font_sizes_for_tp_html( |
| translated_tree, thai_font or '', latin_font or '', W, H) |
| if isinstance(out.get('translated'), dict): |
| out['translated']['translatedhtml'] = core.ai_tree_to_tp_html( |
| translated_tree or {}, W, H) |
|
|
| if getattr(core, 'HTML_INCLUDE_CSS', True) and (getattr(core, 'DO_ORIGINAL_HTML', True) or getattr(core, 'DO_TRANSLATED_HTML', True) or getattr(core, 'DO_AI_HTML', True)): |
| out['htmlCss'] = core.tp_overlay_css() |
| out['htmlMeta'] = { |
| 'baseW': int(W), |
| 'baseH': int(H), |
| 'format': 'tp', |
| } |
| base_img = _base_img_for_overlay() |
| buf = io.BytesIO() |
| base_img.save(buf, format='PNG') |
| out['imageDataUri'] = _bytes_to_datauri(buf.getvalue(), 'image/png') |
|
|
| return out |
|
|
| app = FastAPI(title='TextPhantom OCR API', version='1.0') |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=['*'], |
| allow_credentials=True, |
| allow_methods=['*'], |
| allow_headers=['*'], |
| ) |
|
|
| @app.middleware("http") |
| async def _tp_access_log(request: Request, call_next): |
| resp = await call_next(request) |
| if TP_ACCESS_LOG_MODE in ('uvicorn', 'off', 'none'): |
| return resp |
| try: |
| path = request.url.path |
| if request.method == 'GET' and path.startswith("/translate/"): |
| client = request.client |
| host = client.host if client else "-" |
| port = client.port if client else 0 |
| ver = request.scope.get("http_version") or "1.1" |
| phrase = HTTPStatus(resp.status_code).phrase |
| print(f'{host}:{port} - "{request.method} {path} HTTP/{ver}" {resp.status_code} {phrase}', flush=True) |
| except Exception: |
| pass |
| return resp |
|
|
| async def _cleanup_jobs_loop(): |
| while True: |
| await asyncio.sleep(60) |
| cutoff = _now() - JOB_TTL_SEC |
| dead = [jid for jid, j in _jobs.items() if float( |
| j.get('ts', 0)) < cutoff] |
| for jid in dead: |
| _jobs.pop(jid, None) |
|
|
| async def _worker_loop(worker_id: int): |
| while True: |
| jid, payload = await _job_queue.get() |
| try: |
| _jobs[jid] = {'status': 'running', 'ts': _now()} |
| result = await asyncio.to_thread(_process_payload, payload) |
| _jobs[jid] = {'status': 'done', 'result': result, 'ts': _now()} |
| except Exception as e: |
| _jobs[jid] = {'status': 'error', 'result': str(e), 'ts': _now()} |
| finally: |
| _job_queue.task_done() |
|
|
| def _process_payload(payload: dict) -> dict: |
| t_all = time.perf_counter() |
| mode = (payload.get('mode') or 'lens_images') |
| lang = (payload.get('lang') or 'en') |
|
|
| context = payload.get('context') if isinstance( |
| payload.get('context'), dict) else {} |
| page_url = str((context or {}).get('page_url') or '').strip() |
|
|
| src = (payload.get('src') or '').strip() |
| img_bytes = b'' |
| mime = '' |
|
|
| if payload.get('imageDataUri'): |
| img_bytes, mime = _datauri_to_bytes(payload.get('imageDataUri')) |
| elif src.startswith('data:'): |
| img_bytes, mime = _datauri_to_bytes(src) |
| else: |
| img_bytes, mime = _download_bytes(src, page_url) |
|
|
| t_img = time.perf_counter() |
|
|
| if not img_bytes: |
| raise Exception('No image data') |
|
|
| ai_cfg = None |
| ai = payload.get('ai') or None |
| source = str(payload.get('source') or '').strip().lower() or 'translated' |
| if mode == 'lens_text' and source == 'ai' and isinstance(ai, dict): |
| api_key = str(ai.get('api_key') or '').strip() or ( |
| os.getenv('AI_API_KEY') or '').strip() |
| ai_cfg = AiConfig( |
| api_key=api_key, |
| model=str(ai.get('model') or 'auto').strip() or 'auto', |
| provider=str(ai.get('provider') or 'auto').strip() or 'auto', |
| base_url=str(ai.get('base_url') or 'auto').strip() or 'auto', |
| prompt_editable=str(ai.get('prompt') or '').strip(), |
| ) |
|
|
| core.DO_AI_JSON = False |
|
|
| img_hash = _sha256_hex(img_bytes) |
| cache_key = '' |
| if mode == 'lens_text' and img_hash: |
| cache_source = 'ai' if source == 'ai' else 'text' |
| cache_key = _build_cache_key( |
| img_hash, lang, mode, cache_source, ai_cfg) |
| cached = None |
| if source == 'ai': |
| cached = _lru_get(_ai_result_cache, _ai_cache_lock, cache_key) |
| else: |
| cached = _lru_get(_result_cache, _result_cache_lock, cache_key) |
| if cached: |
| cached['perf'] = { |
| 'cache': 'hit', |
| 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), |
| 'img_ms': round((t_img - t_all) * 1000, 1), |
| } |
| return cached |
|
|
| suffix = '.png' if (mime or '').endswith('png') else '.jpg' |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: |
| f.write(img_bytes) |
| tmp_path = f.name |
| t_tmp = time.perf_counter() |
| try: |
| out = process_image_path(tmp_path, lang, mode, ai_cfg) |
| out['perf'] = { |
| 'cache': 'miss' if cache_key else 'off', |
| 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), |
| 'img_ms': round((t_img - t_all) * 1000, 1), |
| 'tmp_ms': round((t_tmp - t_img) * 1000, 1), |
| } |
| if cache_key and isinstance(out, dict): |
| if source == 'ai': |
| _lru_set(_ai_result_cache, _ai_cache_lock, |
| cache_key, out, TP_AI_RESULT_CACHE_MAX) |
| else: |
| _lru_set(_result_cache, _result_cache_lock, |
| cache_key, out, TP_RESULT_CACHE_MAX) |
| return out |
| finally: |
| try: |
| os.unlink(tmp_path) |
| except Exception: |
| pass |
|
|
| @app.on_event('startup') |
| async def _startup(): |
| print( |
| f'[TextPhantom][api] starting build={BUILD_ID} workers={SERVER_MAX_WORKERS}') |
| for i in range(max(1, SERVER_MAX_WORKERS)): |
| asyncio.create_task(_worker_loop(i)) |
| asyncio.create_task(_cleanup_jobs_loop()) |
|
|
| @app.get('/health') |
| async def health(): |
| return {'ok': True, 'build': BUILD_ID} |
|
|
| @app.get('/version') |
| async def version(): |
| return {'ok': True, 'build': BUILD_ID, 'core': 'lens_core'} |
|
|
| @app.get('/warmup') |
| async def warmup(lang: str = TP_WARMUP_LANG): |
| t0 = time.perf_counter() |
| r = core.warmup(lang) |
| return {'ok': True, 'build': BUILD_ID, 'dt_ms': round((time.perf_counter() - t0) * 1000, 1), 'result': r} |
|
|
| @app.get('/meta') |
| async def meta(): |
| langs = getattr(core, 'UI_LANGUAGES', None) or [] |
| sources = [ |
| {'id': 'original', 'name': 'Original'}, |
| {'id': 'translated', 'name': 'Translated'}, |
| {'id': 'ai', 'name': 'Ai'}, |
| ] |
| env_key = (os.getenv('AI_API_KEY') or '').strip() |
| return {'ok': True, 'languages': langs, 'sources': sources, 'has_env_ai_key': bool(env_key)} |
|
|
| @app.post('/translate') |
| async def translate(payload: Dict[str, Any]): |
| jid = str(uuid.uuid4()) |
| _dbg('rest.enqueue', { |
| 'id': jid, |
| 'mode': str(payload.get('mode') or ''), |
| 'lang': str(payload.get('lang') or ''), |
| 'source': str(payload.get('source') or ''), |
| 'has_datauri': bool(payload.get('imageDataUri')), |
| 'has_src': bool(payload.get('src')), |
| }) |
| _jobs[jid] = {'status': 'queued', 'ts': _now()} |
| await _job_queue.put((jid, payload)) |
| return {'id': jid} |
|
|
| @app.get('/translate/{job_id}') |
| async def translate_status(job_id: str): |
| j = _jobs.get(job_id) |
| if not j: |
| return {'status': 'error', 'result': 'job_not_found'} |
| return j |
|
|
| @app.post('/ai/resolve') |
| async def ai_resolve(payload: Dict[str, Any]): |
| api_key = str(payload.get('api_key') or '').strip() or ( |
| os.getenv('AI_API_KEY') or '').strip() |
| lang = _normalize_lang(str(payload.get('lang') or 'en')) |
| style_default = ((getattr(core, 'AI_LANG_STYLE', {}) or {}).get(lang) or (getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '').strip() |
| if not api_key: |
| return { |
| 'ok': False, |
| 'error': 'missing_api_key', |
| 'provider': '', |
| 'default_model': '', |
| 'models': [], |
| 'lang': lang, |
| 'prompt_editable_default': style_default, |
| } |
|
|
| provider = core._canonical_provider(str(payload.get('provider') or 'auto')) |
| if provider in ('', 'auto'): |
| provider = _detect_provider_from_key(api_key) |
|
|
| preset = _resolve_provider_defaults(provider) or {} |
| requested_model = str(payload.get('model') or 'auto').strip() or 'auto' |
| resolved_model = _resolve_model(provider, requested_model) |
|
|
| models: List[str] = [] |
| base_url = (str(payload.get('base_url') or 'auto')).strip() |
| if base_url in ('', 'auto'): |
| base_url = (preset.get('base_url') or '').strip() |
|
|
| if provider == 'huggingface': |
| if base_url: |
| models = core._hf_router_available_models(api_key, base_url) |
| if requested_model.lower() in ('', 'auto'): |
| fallback = core._pick_hf_fallback_model(models) |
| if fallback: |
| resolved_model = fallback |
|
|
| elif provider == 'gemini': |
| models = getattr(core, '_gemini_available_models', |
| lambda _k: [])(api_key) |
| if not models: |
| models = ['gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.5-pro', |
| 'gemini-2.0-flash', 'gemini-3-flash-preview', 'gemini-3-pro-preview'] |
|
|
| elif provider == 'anthropic': |
| models = getattr(core, '_anthropic_available_models', |
| lambda _k, _b=None: [])(api_key, base_url) |
|
|
| else: |
| if not base_url: |
| base_url = (core.AI_PROVIDER_DEFAULTS.get('openai') or {}).get( |
| 'base_url') or 'https://api.openai.com/v1' |
| models = getattr(core, '_openai_compat_available_models', |
| lambda _k, _b: [])(api_key, base_url) |
|
|
| if provider == 'huggingface' and not models: |
| models = [ |
| 'google/gemma-3-27b-it:featherless-a', |
| 'google/gemma-3-27b-it', |
| 'google/gemma-2-2b-it', |
| 'google/gemma-2-9b-it', |
| ] |
|
|
| if provider != 'huggingface' and not models: |
| fallback_models: List[str] = [] |
| preset_model = str(preset.get('model') or '').strip() |
| if preset_model: |
| fallback_models.append(preset_model) |
|
|
| provider_defaults = (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get( |
| provider, {}) or {} |
| provider_model = str(provider_defaults.get('model') or '').strip() |
| if provider_model: |
| fallback_models.append(provider_model) |
|
|
| if provider == 'gemini': |
| fallback_models.extend([ |
| 'gemini-2.5-flash', |
| 'gemini-2.5-flash-lite', |
| 'gemini-2.5-pro', |
| 'gemini-2.0-flash', |
| 'gemini-3-flash-preview', |
| 'gemini-3-pro-preview', |
| ]) |
|
|
| models = sorted(set([m for m in fallback_models if m]), key=str.lower) |
|
|
| if not models: |
| all_models: List[str] = [] |
| for _, v in (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).items(): |
| m2 = str((v or {}).get('model') or '').strip() |
| if m2: |
| all_models.append(m2) |
| models = sorted(set(all_models), key=str.lower) |
|
|
| if models: |
| models = sorted( |
| {m.strip() for m in models if isinstance(m, str) and m.strip()}, |
| key=str.lower, |
| ) |
|
|
| if models and resolved_model not in models: |
| resolved_model = models[0] |
|
|
| prompt_default = style_default |
|
|
| return { |
| 'ok': True, |
| 'provider': provider, |
| 'base_url': base_url, |
| 'default_model': (preset.get('model') or ''), |
| 'model': resolved_model, |
| 'models': models, |
| 'prompt_editable_default': prompt_default, |
| } |
|
|
| @app.get('/ai/prompt/default') |
| async def ai_prompt_default(lang: str = 'en'): |
| l = _normalize_lang(lang) |
| base = (getattr(core, 'AI_PROMPT_SYSTEM_BASE', '') or '').strip() |
| style = (getattr(core, 'AI_LANG_STYLE', {}) or {}).get(l) or ( |
| getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '' |
| style = (style or '').strip() |
| contract = "\n".join([ |
| 'Return ONLY valid JSON (no markdown, no extra text).', |
| 'Output JSON MUST have exactly one key: "aiTextFull".', |
| 'Schema example: {"aiTextFull":"..."}', |
| 'Markers: Keep every paragraph marker like <<TP_P0>> unchanged and in order. Do not remove or add markers.', |
| "aiTextFull must include all markers, each followed by that paragraph's translated text.", |
| ]) |
| system_text = "\n\n".join([p for p in [base, style, contract] if p]) |
| return { |
| 'ok': True, |
| 'lang': l, |
| 'prompt_editable_default': style, |
| 'lang_style': style, |
| 'system_base': base, |
| 'contract': contract, |
| 'system_text': system_text, |
| } |
|
|
| @app.websocket('/ws') |
| async def ws_endpoint(ws: WebSocket): |
| await ws.accept() |
| await ws.send_text(json.dumps({'type': 'ack'})) |
| try: |
| while True: |
| msg = await ws.receive_text() |
| data = json.loads(msg) |
| if data.get('type') != 'job': |
| continue |
| jid = str(data.get('id') or '') |
| payload = data.get('payload') or {} |
| _dbg('ws.job', { |
| 'id': jid, |
| 'mode': str(payload.get('mode') or ''), |
| 'lang': str(payload.get('lang') or ''), |
| 'source': str(payload.get('source') or ''), |
| 'has_datauri': bool(payload.get('imageDataUri')), |
| 'has_src': bool(payload.get('src')), |
| }) |
| try: |
| result = await asyncio.to_thread(_process_payload, payload) |
| try: |
| await ws.send_text(json.dumps({'type': 'result', 'id': jid, 'result': result})) |
| except WebSocketDisconnect: |
| return |
| except Exception as e: |
| try: |
| await ws.send_text(json.dumps({'type': 'error', 'id': jid, 'error': str(e)})) |
| except (WebSocketDisconnect, RuntimeError): |
| return |
| except WebSocketDisconnect: |
| return |
|
|
| def main(): |
| image_path = getattr(core, 'IMAGE_PATH', '') |
| lang = getattr(core, 'LANG', 'en') |
| mode = os.environ.get('MODE', 'lens_text') |
| ai_key = os.environ.get('AI_API_KEY', getattr(core, 'AI_API_KEY', '')) |
| ai_model = os.environ.get('AI_MODEL', getattr(core, 'AI_MODEL', 'auto')) |
| ai_prompt = os.environ.get('AI_PROMPT', '') |
|
|
| ai_cfg = AiConfig(api_key=ai_key, model=ai_model, |
| prompt_editable=ai_prompt) if ai_key and mode == 'lens_text' else None |
| out = process_image_path(image_path, lang, mode, ai_cfg) |
| print(json.dumps(out, ensure_ascii=False, indent=2)) |
|
|
| if __name__ == '__main__': |
| main() |
|
|