Spaces:
Running
Running
| import asyncio, base64, copy, hashlib, io, json, os, re, tempfile, time, uuid, httpx, logging | |
| from backend import lens_core as core | |
| from http import HTTPStatus | |
| from collections import OrderedDict | |
| from threading import Lock, Semaphore | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| SERVER_MAX_WORKERS = int(os.environ.get('SERVER_MAX_WORKERS', '15')) | |
| JOB_TTL_SEC = int(os.environ.get('JOB_TTL_SEC', '3600')) | |
| HTTP_TIMEOUT_SEC = float(os.environ.get( | |
| 'HTTP_TIMEOUT_SEC', str(getattr(core, 'AI_TIMEOUT_SEC', 120)))) | |
| SUPPORTED_MODES = {"lens_images", "lens_text"} | |
| BUILD_ID = os.environ.get('TP_BUILD_ID', 'v9-backendfix-20260129') | |
| TP_DEBUG = str(os.environ.get('TP_DEBUG', '')).strip( | |
| ).lower() in ('1', 'true', 'yes', 'on') | |
| TP_PARA_MARKER_PREFIX = '<<TP_P' | |
| TP_PARA_MARKER_SUFFIX = '>>' | |
| TP_RESULT_CACHE_MAX = int(os.environ.get('TP_RESULT_CACHE_MAX', '24')) | |
| TP_AI_RESULT_CACHE_MAX = int(os.environ.get('TP_AI_RESULT_CACHE_MAX', '16')) | |
| TP_WARMUP_LANG = (os.environ.get('TP_WARMUP_LANG', 'th') or 'th').strip() | |
| _result_cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() | |
| _ai_result_cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() | |
| _jobs: Dict[str, Dict[str, Any]] = {} | |
| _job_queue: asyncio.Queue = asyncio.Queue() | |
| _result_cache_lock = Lock() | |
| _ai_cache_lock = Lock() | |
| HF_AI_MAX_CONCURRENCY = max( | |
| 1, int(os.environ.get('HF_AI_MAX_CONCURRENCY', '1'))) | |
| HF_AI_MIN_INTERVAL_SEC = max(0.0, float( | |
| os.environ.get('HF_AI_MIN_INTERVAL_SEC', '5'))) | |
| HF_AI_MAX_RETRIES = max(1, int(os.environ.get('HF_AI_MAX_RETRIES', '6'))) | |
| HF_AI_RETRY_BASE_SEC = max(0.2, float( | |
| os.environ.get('HF_AI_RETRY_BASE_SEC', '2'))) | |
| _hf_ai_sem = Semaphore(HF_AI_MAX_CONCURRENCY) | |
| _hf_ai_lock = Lock() | |
| _hf_ai_last_ts = 0.0 | |
| _tp_marker_re = re.compile(r'<<TP_P\d+>>') | |
| TP_ACCESS_LOG_MODE = (os.environ.get('TP_ACCESS_LOG_MODE', 'custom') or 'custom').strip().lower() | |
| if TP_ACCESS_LOG_MODE in ('custom', 'tp', 'plain'): | |
| try: | |
| _uv = logging.getLogger('uvicorn.access') | |
| _uv.disabled = True | |
| _uv.propagate = False | |
| _uv.setLevel(logging.CRITICAL) | |
| except Exception: | |
| pass | |
| def _dbg(tag: str, data=None) -> None: | |
| if not TP_DEBUG: | |
| return | |
| try: | |
| if data is None: | |
| print(f'[TextPhantom][dbg] {tag}') | |
| else: | |
| s = json.dumps(data, ensure_ascii=False) | |
| if len(s) > 2000: | |
| s = s[:2000] + '…' | |
| print(f'[TextPhantom][dbg] {tag} {s}') | |
| except Exception: | |
| try: | |
| print(f'[TextPhantom][dbg] {tag} {data}') | |
| except Exception: | |
| pass | |
| def _tree_stats(tree) -> dict: | |
| if not isinstance(tree, dict): | |
| return {'paras': 0, 'items': 0, 'spans': 0} | |
| paras = tree.get('paragraphs') or [] | |
| if not isinstance(paras, list): | |
| return {'paras': 0, 'items': 0, 'spans': 0} | |
| items = 0 | |
| spans = 0 | |
| for p in paras: | |
| if not isinstance(p, dict): | |
| continue | |
| its = p.get('items') or [] | |
| if not isinstance(its, list): | |
| continue | |
| items += len(its) | |
| for it in its: | |
| if not isinstance(it, dict): | |
| continue | |
| sp = it.get('spans') or [] | |
| if isinstance(sp, list): | |
| spans += len(sp) | |
| return {'paras': len(paras), 'items': items, 'spans': spans} | |
| def _tree_to_paragraph_texts(tree: Any) -> List[str]: | |
| if not isinstance(tree, dict): | |
| return [] | |
| paras = tree.get('paragraphs') or [] | |
| if not isinstance(paras, list) or not paras: | |
| return [] | |
| out: List[str] = [] | |
| for p in paras: | |
| if not isinstance(p, dict): | |
| out.append('') | |
| continue | |
| t = str(p.get('text') or '').strip() | |
| if not t: | |
| items = p.get('items') or [] | |
| if isinstance(items, list) and items: | |
| t = ' '.join(str(it.get('text') or '').strip() for it in items if isinstance( | |
| it, dict) and str(it.get('text') or '').strip()) | |
| out.append(t) | |
| return out | |
| def _apply_para_markers(paras: List[str]) -> str: | |
| if not paras: | |
| return '' | |
| parts: List[str] = [] | |
| for i, t in enumerate(paras): | |
| parts.append( | |
| f"{TP_PARA_MARKER_PREFIX}{i}{TP_PARA_MARKER_SUFFIX}\n{(t or '').strip()}") | |
| return '\n\n'.join(parts) | |
| def _clamp_runaway_repeats(s: str, max_repeat: int = 12) -> str: | |
| if not s: | |
| return '' | |
| pat = re.compile(r"(.)\1{" + str(max_repeat) + r",}") | |
| return pat.sub(lambda m: m.group(1) * max_repeat, s) | |
| def _extract_marker_indices(s: str) -> set[int]: | |
| if not s: | |
| return set() | |
| out: set[int] = set() | |
| for m in re.finditer(r"<<TP_P(\d+)>>", s): | |
| try: | |
| out.add(int(m.group(1))) | |
| except Exception: | |
| continue | |
| return out | |
| def _needs_ai_retry(ai_text_full: str, expected_paras: int) -> bool: | |
| if expected_paras <= 0: | |
| return False | |
| idx = _extract_marker_indices(ai_text_full) | |
| if len(idx) >= expected_paras: | |
| return False | |
| if (TP_PARA_MARKER_PREFIX in (ai_text_full or '')) and (TP_PARA_MARKER_SUFFIX not in (ai_text_full or '')): | |
| return True | |
| return True | |
| def _now() -> float: | |
| return time.time() | |
| def _lru_get(cache: OrderedDict, lock: Lock, key: str) -> Optional[Dict[str, Any]]: | |
| if not key: | |
| return None | |
| with lock: | |
| v = cache.get(key) | |
| if v is None: | |
| return None | |
| cache.move_to_end(key) | |
| return copy.deepcopy(v) | |
| def _lru_set(cache: OrderedDict, lock: Lock, key: str, value: Dict[str, Any], max_items: int) -> None: | |
| if not key or not isinstance(value, dict) or max_items <= 0: | |
| return | |
| with lock: | |
| cache[key] = copy.deepcopy(value) | |
| cache.move_to_end(key) | |
| while len(cache) > max_items: | |
| cache.popitem(last=False) | |
| def _sha256_hex(blob: bytes) -> str: | |
| return hashlib.sha256(blob).hexdigest() if blob else '' | |
| def _ai_prompt_sig(s: str) -> str: | |
| t = (s or '').strip() | |
| if not t: | |
| return '' | |
| return hashlib.sha256(t.encode('utf-8')).hexdigest()[:12] | |
| def _build_cache_key(img_hash: str, lang: str, mode: str, source: str, ai_cfg: Optional["AiConfig"]) -> str: | |
| parts = [img_hash, _normalize_lang( | |
| lang), (mode or '').strip(), (source or '').strip()] | |
| if ai_cfg and (source or '').strip().lower() == 'ai': | |
| parts.extend([ | |
| (ai_cfg.provider or '').strip(), | |
| (ai_cfg.model or '').strip(), | |
| (ai_cfg.base_url or '').strip(), | |
| _ai_prompt_sig(ai_cfg.prompt_editable), | |
| ]) | |
| return '|'.join([p for p in parts if p is not None]) | |
| def _b64_to_bytes(b64: str) -> bytes: | |
| pad = '=' * ((4 - (len(b64) % 4)) % 4) | |
| return base64.b64decode(b64 + pad) | |
| def _datauri_to_bytes(data_uri: str) -> tuple[bytes, str]: | |
| s = (data_uri or '').strip() | |
| if not s.startswith('data:'): | |
| return b'', '' | |
| head, _, b64 = s.partition(',') | |
| mime = '' | |
| if ';' in head: | |
| mime = head[5:head.index(';')] | |
| return _b64_to_bytes(b64), mime or 'application/octet-stream' | |
| def _bytes_to_datauri(blob: bytes, mime: str) -> str: | |
| b64 = base64.b64encode(blob).decode('ascii') | |
| return f"data:{mime};base64,{b64}" | |
| def _download_bytes(url: str, referer: str = '') -> tuple[bytes, str]: | |
| u = (url or '').strip() | |
| if not u: | |
| return b'', '' | |
| headers = { | |
| 'user-agent': 'Mozilla/5.0 (TextPhantomOCR; +https://huggingface.co/spaces)', | |
| } | |
| ref = (referer or '').strip() | |
| if ref: | |
| headers['referer'] = ref | |
| with httpx.Client(timeout=HTTP_TIMEOUT_SEC, follow_redirects=True, headers=headers) as client: | |
| r = client.get(u) | |
| r.raise_for_status() | |
| ct = (r.headers.get('content-type') or '').split(';')[0].strip() | |
| return r.content, ct | |
| def _detect_provider_from_key(api_key: str) -> str: | |
| return core._canonical_provider(core._detect_ai_provider_from_key(api_key)) | |
| def _resolve_provider_defaults(provider: str) -> dict: | |
| return (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get(provider, {}) | |
| def _resolve_model(provider: str, model: str) -> str: | |
| return core._resolve_model(provider, model) | |
| def _has_meaningful_text(s: str) -> bool: | |
| t = _tp_marker_re.sub('', str(s or '')) | |
| return bool(t.strip()) | |
| def _is_hf_provider(provider: str, base_url: str) -> bool: | |
| p = (provider or '').strip().lower() | |
| b = (base_url or '').strip().lower() | |
| return p == 'huggingface' or 'router.huggingface.co' in b | |
| def _is_hf_rate_limited_error(msg: str) -> bool: | |
| t = (msg or '').lower() | |
| if 'rate limit' in t or 'ratelimit' in t or 'too many requests' in t: | |
| return True | |
| if 'http 429' in t or ' 429' in t: | |
| return True | |
| if 'http 503' in t or ' 503' in t or 'overloaded' in t or 'temporarily' in t: | |
| return True | |
| return False | |
| def _hf_throttle_before_call() -> None: | |
| if HF_AI_MIN_INTERVAL_SEC <= 0: | |
| return | |
| global _hf_ai_last_ts | |
| with _hf_ai_lock: | |
| now = _now() | |
| dt = now - float(_hf_ai_last_ts or 0.0) | |
| wait = HF_AI_MIN_INTERVAL_SEC - dt | |
| if wait > 0: | |
| time.sleep(wait) | |
| _hf_ai_last_ts = _now() | |
| def _openai_compat_generate_with_hf_backoff(api_key: str, base_url: str, model: str, system_text: str, user_parts: List[str]): | |
| last_err: Optional[Exception] = None | |
| for attempt in range(int(HF_AI_MAX_RETRIES)): | |
| try: | |
| with _hf_ai_sem: | |
| _hf_throttle_before_call() | |
| return core._openai_compat_generate_json(api_key, base_url, model, system_text, user_parts) | |
| except Exception as e: | |
| last_err = e | |
| if not _is_hf_rate_limited_error(str(e)): | |
| raise | |
| delay = min(15.0, max(float(HF_AI_MIN_INTERVAL_SEC), float( | |
| HF_AI_RETRY_BASE_SEC) * (2 ** min(attempt, 4)))) | |
| _dbg('ai.hf.backoff', { | |
| 'attempt': attempt + 1, 'delay_sec': round(delay, 2), 'err': str(e)[:240]}) | |
| time.sleep(delay) | |
| continue | |
| if last_err is not None: | |
| raise last_err | |
| raise Exception('hf_backoff_failed') | |
| def _normalize_lang(lang: str) -> str: | |
| return core._normalize_lang(lang) | |
| class AiConfig: | |
| api_key: str | |
| model: str = 'auto' | |
| provider: str = 'auto' | |
| base_url: str = 'auto' | |
| prompt_editable: str = '' | |
| def _collapse_ws(text: str) -> str: | |
| return re.sub(r"\s+", " ", str(text or "")).strip() | |
| def _sanitize_marked_text(marked_text: str) -> str: | |
| t = str(marked_text or "") | |
| if not t: | |
| return "" | |
| t = t.replace("\r\n", "\n").replace("\r", "\n") | |
| t = re.sub(r"<<TP_P(?!\d+>>)[^\s>]*>?", "", t) | |
| t = re.sub(r"(?m)^\s*(<<TP_P\d+>>)\s*(\S)", r"\1\n\2", t) | |
| lines = t.split("\n") | |
| out0: List[str] = [] | |
| for line in lines: | |
| if "<<TP_P" not in line: | |
| out0.append(line) | |
| continue | |
| m = re.match(r"^\s*(<<TP_P\d+>>)\s*$", line) | |
| if m: | |
| out0.append(m.group(1)) | |
| continue | |
| m2 = re.match(r"^\s*(<<TP_P\d+>>)\s*(.*)$", line) | |
| if m2: | |
| out0.append(m2.group(1)) | |
| rest = (m2.group(2) or "").strip() | |
| if rest: | |
| out0.append(rest) | |
| continue | |
| out0.append(re.sub(r"<<TP_P\d+>>", "", line)) | |
| t = "\n".join(out0) | |
| indices = sorted(_extract_marker_indices(t)) | |
| if not indices: | |
| return _collapse_ws(t) | |
| out_lines: List[str] = [] | |
| for idx in indices: | |
| marker = f"<<TP_P{idx}>>" | |
| m = re.search( | |
| rf"{re.escape(marker)}\s*([\s\S]*?)(?=<<TP_P\d+>>|\Z)", t) | |
| seg = m.group(1) if m else "" | |
| seg = _collapse_ws(seg) | |
| out_lines.append(marker) | |
| out_lines.append(seg) | |
| out_lines.append("") | |
| return "\n".join(out_lines).strip("\n") | |
| def _has_complete_marker_sequence(ai_text_full: str, expected_paras: int) -> bool: | |
| if expected_paras <= 0: | |
| return True | |
| t = str(ai_text_full or "") | |
| need = list(range(int(expected_paras))) | |
| idx = sorted(_extract_marker_indices(t)) | |
| if len(idx) < len(need): | |
| return False | |
| if idx[:len(need)] != need: | |
| return False | |
| last = -1 | |
| for i in need: | |
| m = f"<<TP_P{i}>>" | |
| p = t.find(m) | |
| if p < 0 or p <= last: | |
| return False | |
| last = p | |
| return True | |
| def _build_ai_prompt_packet_custom(target_lang: str, original_text_full: str, prompt_editable: str, is_retry: bool = False) -> tuple[str, List[str]]: | |
| lang = _normalize_lang(target_lang) | |
| base = (getattr(core, "AI_PROMPT_SYSTEM_BASE", "") or "").strip() | |
| style = (prompt_editable or "").strip() | |
| if not style: | |
| style = ( | |
| (getattr(core, "AI_LANG_STYLE", {}) or {}).get(lang) | |
| or (getattr(core, "AI_LANG_STYLE", {}) or {}).get("default") | |
| or "" | |
| ).strip() | |
| contract_parts: List[str] = [ | |
| "Output ONLY the translated text (no JSON, no markdown, no extra commentary).", | |
| "Markers: Keep every paragraph marker like <<TP_P0>> unchanged and in order. Do not remove, rename, or add markers.", | |
| "For each marker, output the marker followed by that paragraph's translated text.", | |
| ] | |
| if is_retry: | |
| contract_parts.append( | |
| "Retry: You MUST output ALL markers from the first to the last marker in the input." | |
| ) | |
| system_text = "\n\n".join( | |
| [p for p in [base, style, "\n".join(contract_parts)] if p] | |
| ) | |
| user_parts: List[str] = ["Input:\n" + str(original_text_full or "")] | |
| return system_text, user_parts | |
| def ai_translate_text(original_text_full: str, target_lang: str, ai: AiConfig, is_retry: bool = False) -> dict: | |
| if not _has_meaningful_text(original_text_full): | |
| return { | |
| 'aiTextFull': '', | |
| 'meta': { | |
| 'skipped': True, | |
| 'skipped_reason': 'no_text', | |
| }, | |
| } | |
| api_key = (ai.api_key or '').strip() | |
| if not api_key: | |
| raise Exception('AI api_key is required') | |
| provider = core._canonical_provider((ai.provider or 'auto')) | |
| if provider in ('', 'auto'): | |
| provider = _detect_provider_from_key(api_key) | |
| preset = _resolve_provider_defaults(provider) or {} | |
| model = _resolve_model(provider, (ai.model or 'auto')) | |
| base_url = (ai.base_url or 'auto').strip() | |
| if base_url in ('', 'auto'): | |
| base_url = (preset.get('base_url') or '').strip() | |
| if provider not in ('gemini', 'anthropic'): | |
| if not base_url: | |
| base_url = (_resolve_provider_defaults('openai') or {}).get( | |
| 'base_url') or 'https://api.openai.com/v1' | |
| system_text, user_parts = _build_ai_prompt_packet_custom( | |
| target_lang, original_text_full, ai.prompt_editable, is_retry=is_retry | |
| ) | |
| started = _now() | |
| used_model = model | |
| if provider == 'gemini': | |
| raw = core._gemini_generate_json( | |
| api_key, model, system_text, user_parts) | |
| elif provider == 'anthropic': | |
| raw = core._anthropic_generate_json( | |
| api_key, model, system_text, user_parts) | |
| else: | |
| if _is_hf_provider(provider, base_url): | |
| raw, used_model = _openai_compat_generate_with_hf_backoff( | |
| api_key, base_url, model, system_text, user_parts) | |
| else: | |
| raw, used_model = core._openai_compat_generate_json( | |
| api_key, base_url, model, system_text, user_parts) | |
| ai_text_full = core._parse_ai_textfull_only( | |
| raw) if core.DO_AI_JSON else core._parse_ai_textfull_text_only(raw) | |
| ai_text_full = _sanitize_marked_text(ai_text_full) | |
| return { | |
| 'aiTextFull': ai_text_full, | |
| 'meta': { | |
| 'model': used_model, | |
| 'provider': provider, | |
| 'base_url': base_url, | |
| 'latency_sec': round(_now() - started, 3), | |
| }, | |
| } | |
| def process_image_path(image_path: str, lang: str, mode: str, ai_cfg: Optional[AiConfig]) -> dict: | |
| mode_id = (mode or '').strip() | |
| if mode_id not in SUPPORTED_MODES: | |
| mode_id = 'lens_images' | |
| target_lang = _normalize_lang(lang) | |
| data = core.get_lens_data_from_image( | |
| image_path, getattr(core, 'FIREBASE_URL', ''), target_lang) | |
| img = core.Image.open(image_path).convert('RGB') | |
| W, H = img.size | |
| thai_font = getattr(core, 'FONT_THAI_PATH', 'NotoSansThai-Regular.ttf') | |
| latin_font = getattr(core, 'FONT_LATIN_PATH', 'NotoSans-Regular.ttf') | |
| if target_lang == 'ja': | |
| latin_font = getattr(core, 'FONT_JA_PATH', latin_font) | |
| elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): | |
| latin_font = getattr(core, 'FONT_ZH_SC_PATH', latin_font) | |
| elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): | |
| latin_font = getattr(core, 'FONT_ZH_TC_PATH', latin_font) | |
| if getattr(core, 'FONT_DOWNLOD', True): | |
| thai_font = core.ensure_font( | |
| thai_font, getattr(core, 'FONT_THAI_URLS', [])) | |
| if target_lang == 'ja': | |
| latin_font = core.ensure_font( | |
| latin_font, getattr(core, 'FONT_JA_URLS', [])) | |
| elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): | |
| latin_font = core.ensure_font( | |
| latin_font, getattr(core, 'FONT_ZH_SC_URLS', [])) | |
| elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): | |
| latin_font = core.ensure_font( | |
| latin_font, getattr(core, 'FONT_ZH_TC_URLS', [])) | |
| else: | |
| latin_font = core.ensure_font( | |
| latin_font, getattr(core, 'FONT_LATIN_URLS', [])) | |
| image_url = data.get('imageUrl') if isinstance(data, dict) else None | |
| out: Dict[str, Any] = { | |
| 'mode': mode_id, | |
| 'imageUrl': image_url, | |
| 'imageDataUri': '', | |
| 'originalContentLanguage': data.get('originalContentLanguage') if isinstance(data, dict) else None, | |
| 'originalTextFull': data.get('originalTextFull') if isinstance(data, dict) else None, | |
| 'translatedTextFull': data.get('translatedTextFull') if isinstance(data, dict) else None, | |
| 'AiTextFull': '', | |
| 'originalParagraphs': (data.get('originalParagraphs') or []) if isinstance(data, dict) else [], | |
| 'translatedParagraphs': (data.get('translatedParagraphs') or []) if isinstance(data, dict) else [], | |
| 'original': {}, | |
| 'translated': {}, | |
| 'Ai': {}, | |
| } | |
| if mode_id == 'lens_images': | |
| if image_url: | |
| decoded = core.decode_imageurl_to_datauri(str(image_url)) | |
| if decoded: | |
| out['imageDataUri'] = decoded | |
| elif isinstance(image_url, str) and image_url.startswith(('http://', 'https://')): | |
| blob, mime2 = _download_bytes(image_url) | |
| out['imageDataUri'] = _bytes_to_datauri( | |
| blob, mime2 or 'image/jpeg') | |
| if not out.get('imageDataUri'): | |
| with open(image_path, 'rb') as f: | |
| blob = f.read() | |
| out['imageDataUri'] = _bytes_to_datauri(blob, 'image/jpeg') | |
| return out | |
| original_span_tokens = None | |
| original_tree = None | |
| translated_tree = None | |
| def _base_img_for_overlay() -> core.Image.Image: | |
| if not (getattr(core, 'ERASE_OLD_TEXT_WITH_ORIGINAL_BOXES', True) and original_span_tokens): | |
| return img | |
| return core.erase_text_with_boxes( | |
| img, | |
| original_span_tokens, | |
| pad_px=getattr(core, 'ERASE_PADDING_PX', 2), | |
| sample_margin_px=getattr(core, 'ERASE_SAMPLE_MARGIN_PX', 6), | |
| ) | |
| if getattr(core, 'DO_ORIGINAL', True): | |
| tree, _ = core.decode_tree( | |
| out.get('originalParagraphs') or [], | |
| out.get('originalTextFull') or '', | |
| 'original', | |
| W, | |
| H, | |
| want_raw=False, | |
| ) | |
| original_tree = tree | |
| original_span_tokens = core.flatten_tree_spans(tree) | |
| _dbg('tree.original', _tree_stats(original_tree)) | |
| out['original'] = { | |
| 'originalTree': tree, | |
| 'originalTextFull': out.get('originalTextFull') or '', | |
| } | |
| if getattr(core, 'DO_TRANSLATED', True): | |
| tree, _ = core.decode_tree( | |
| out.get('translatedParagraphs') or [], | |
| out.get('translatedTextFull') or '', | |
| 'translated', | |
| W, | |
| H, | |
| want_raw=False, | |
| ) | |
| translated_tree = tree | |
| translated_span_tokens = core.flatten_tree_spans(tree) | |
| _dbg('tree.translated', _tree_stats(translated_tree)) | |
| out['translated'] = { | |
| 'translatedTree': tree, | |
| 'translatedTextFull': out.get('translatedTextFull') or '', | |
| } | |
| def _tree_score(tree: Any) -> int: | |
| if not isinstance(tree, dict): | |
| return -1 | |
| paragraphs = tree.get('paragraphs') or [] | |
| if not isinstance(paragraphs, list) or not paragraphs: | |
| return -1 | |
| para_count = len(paragraphs) | |
| item_count = 0 | |
| span_count = 0 | |
| for p in paragraphs: | |
| if not isinstance(p, dict): | |
| continue | |
| items = p.get('items') or [] | |
| if not isinstance(items, list): | |
| continue | |
| item_count += len(items) | |
| for it in items: | |
| if not isinstance(it, dict): | |
| continue | |
| spans = it.get('spans') or [] | |
| if isinstance(spans, list): | |
| span_count += len(spans) | |
| return item_count * 10000 + para_count * 100 + span_count | |
| def _pick_ai_template_tree() -> Optional[Dict[str, Any]]: | |
| tr_score = _tree_score(translated_tree) | |
| og_score = _tree_score(original_tree) | |
| if tr_score < 0 and og_score < 0: | |
| return None | |
| if og_score > tr_score: | |
| return original_tree | |
| return translated_tree or original_tree | |
| ai_tree = None | |
| if ai_cfg and (ai_cfg.api_key or '').strip() and getattr(core, 'DO_AI', True): | |
| src_paras = _tree_to_paragraph_texts(original_tree or {}) | |
| src_text = _apply_para_markers(src_paras) if src_paras else str( | |
| out.get('originalTextFull') or '') | |
| if not _has_meaningful_text(src_text): | |
| out['AiTextFull'] = '' | |
| out['Ai'] = { | |
| 'meta': { | |
| 'skipped': True, | |
| 'skipped_reason': 'no_text', | |
| } | |
| } | |
| else: | |
| ai = ai_translate_text(src_text, target_lang, ai_cfg) | |
| if src_paras and _needs_ai_retry(str(ai.get('aiTextFull') or ''), len(src_paras)): | |
| _dbg('ai.retry', { | |
| 'expected_paras': len(src_paras), | |
| 'found_markers': len(_extract_marker_indices(str(ai.get('aiTextFull') or ''))), | |
| }) | |
| retry_paras = [_clamp_runaway_repeats(p) for p in src_paras] | |
| retry_text = _apply_para_markers(retry_paras) or src_text | |
| ai = ai_translate_text( | |
| retry_text, target_lang, ai_cfg, is_retry=True) | |
| ai_text_full = str(ai.get('aiTextFull') or '') | |
| meta0 = ai.get('meta') or {} | |
| if src_paras: | |
| expected = len(src_paras) | |
| if not _has_complete_marker_sequence(ai_text_full, expected): | |
| fallback_paras = _tree_to_paragraph_texts(translated_tree or {}) | |
| if len(fallback_paras) < expected: | |
| fallback_paras = (fallback_paras + src_paras)[:expected] | |
| else: | |
| fallback_paras = fallback_paras[:expected] | |
| found = sorted(_extract_marker_indices(ai_text_full)) | |
| seg_map: Dict[int, str] = {} | |
| for idx in found: | |
| if idx < 0 or idx >= expected: | |
| continue | |
| marker = f"<<TP_P{idx}>>" | |
| m = re.search(rf"{re.escape(marker)}\s*([\s\S]*?)(?=<<TP_P\d+>>|\Z)", ai_text_full) | |
| seg = _collapse_ws(m.group(1) if m else '') | |
| if seg and idx not in seg_map: | |
| seg_map[idx] = seg | |
| missing = 0 | |
| out_lines: List[str] = [] | |
| for i in range(expected): | |
| seg = seg_map.get(i) or _collapse_ws(fallback_paras[i] if i < len(fallback_paras) else '') | |
| if not seg_map.get(i): | |
| missing += 1 | |
| out_lines.append(f"<<TP_P{i}>>") | |
| out_lines.append(seg) | |
| out_lines.append('') | |
| ai_text_full = "\n".join(out_lines).strip("\n") | |
| _dbg('ai.marker.repaired', { | |
| 'expected_paras': expected, | |
| 'found_markers': len(seg_map), | |
| 'missing': missing, | |
| }) | |
| meta0 = { | |
| **meta0, | |
| 'marker_repaired': True, | |
| 'marker_expected': expected, | |
| 'marker_found': len(seg_map), | |
| 'marker_missing': missing, | |
| } | |
| template_tree = _pick_ai_template_tree() | |
| _dbg('ai.template.pick', { | |
| 'score_original': _tree_score(original_tree), | |
| 'score_translated': _tree_score(translated_tree), | |
| 'picked': 'original' if template_tree is original_tree else ('translated' if template_tree is translated_tree else 'none'), | |
| }) | |
| if not isinstance(template_tree, dict): | |
| template_tree = original_tree if isinstance(original_tree, dict) else ( | |
| translated_tree if isinstance(translated_tree, dict) else {}) | |
| patched = core.patch( | |
| {'Ai': {'aiTextFull': str( | |
| ai_text_full or ''), 'aiTree': template_tree}}, | |
| W, | |
| H, | |
| thai_font or '', | |
| latin_font or '', | |
| lang=target_lang, | |
| ) | |
| ai_tree = (patched.get('Ai') or {}).get('aiTree') or {} | |
| _dbg('ai.patched', { | |
| 'ai_text_len': len(ai_text_full), | |
| 'stats_ai': _tree_stats(ai_tree), | |
| 'stats_original': _tree_stats(original_tree or {}), | |
| 'stats_translated': _tree_stats(translated_tree or {}), | |
| 'mode': mode_id, | |
| 'lang': target_lang, | |
| }) | |
| shared_para_sizes = core._compute_shared_para_sizes( | |
| [original_tree or {}, translated_tree or {}, ai_tree or {}], | |
| thai_font or '', | |
| latin_font or '', | |
| W, | |
| H, | |
| ) | |
| core._apply_para_font_size(original_tree or {}, shared_para_sizes) | |
| core._apply_para_font_size( | |
| translated_tree or {}, shared_para_sizes) | |
| core._apply_para_font_size(ai_tree or {}, shared_para_sizes) | |
| core._rebuild_ai_spans_after_font_resize( | |
| ai_tree or {}, W, H, thai_font or '', latin_font or '', lang=target_lang) | |
| out['AiTextFull'] = ai_text_full | |
| out['Ai'] = { | |
| 'aiTextFull': ai_text_full, | |
| 'aiTree': ai_tree, | |
| 'meta': meta0, | |
| } | |
| if getattr(core, 'DO_AI_HTML', True): | |
| core.fit_tree_font_sizes_for_tp_html( | |
| ai_tree, thai_font or '', latin_font or '', W, H) | |
| out['Ai']['aihtml'] = core.ai_tree_to_tp_html(ai_tree, W, H) | |
| out['Ai']['aihtmlMeta'] = { | |
| 'baseW': int(W), | |
| 'baseH': int(H), | |
| 'format': 'tp', | |
| } | |
| if getattr(core, 'DO_ORIGINAL', True) and getattr(core, 'DO_ORIGINAL_HTML', True) and isinstance(original_tree, dict): | |
| core.fit_tree_font_sizes_for_tp_html( | |
| original_tree, thai_font or '', latin_font or '', W, H) | |
| if isinstance(out.get('original'), dict): | |
| out['original']['originalhtml'] = core.ai_tree_to_tp_html( | |
| original_tree or {}, W, H) | |
| if getattr(core, 'DO_TRANSLATED', True) and getattr(core, 'DO_TRANSLATED_HTML', True) and isinstance(translated_tree, dict): | |
| core.fit_tree_font_sizes_for_tp_html( | |
| translated_tree, thai_font or '', latin_font or '', W, H) | |
| if isinstance(out.get('translated'), dict): | |
| out['translated']['translatedhtml'] = core.ai_tree_to_tp_html( | |
| translated_tree or {}, W, H) | |
| if getattr(core, 'HTML_INCLUDE_CSS', True) and (getattr(core, 'DO_ORIGINAL_HTML', True) or getattr(core, 'DO_TRANSLATED_HTML', True) or getattr(core, 'DO_AI_HTML', True)): | |
| out['htmlCss'] = core.tp_overlay_css() | |
| out['htmlMeta'] = { | |
| 'baseW': int(W), | |
| 'baseH': int(H), | |
| 'format': 'tp', | |
| } | |
| base_img = _base_img_for_overlay() | |
| buf = io.BytesIO() | |
| base_img.save(buf, format='PNG') | |
| out['imageDataUri'] = _bytes_to_datauri(buf.getvalue(), 'image/png') | |
| return out | |
| app = FastAPI(title='TextPhantom OCR API', version='1.0') | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=['*'], | |
| allow_credentials=True, | |
| allow_methods=['*'], | |
| allow_headers=['*'], | |
| ) | |
| async def _tp_access_log(request: Request, call_next): | |
| resp = await call_next(request) | |
| if TP_ACCESS_LOG_MODE in ('uvicorn', 'off', 'none'): | |
| return resp | |
| try: | |
| path = request.url.path | |
| if request.method == 'GET' and path.startswith("/translate/"): | |
| client = request.client | |
| host = client.host if client else "-" | |
| port = client.port if client else 0 | |
| ver = request.scope.get("http_version") or "1.1" | |
| phrase = HTTPStatus(resp.status_code).phrase | |
| print(f'{host}:{port} - "{request.method} {path} HTTP/{ver}" {resp.status_code} {phrase}', flush=True) | |
| except Exception: | |
| pass | |
| return resp | |
| async def _cleanup_jobs_loop(): | |
| while True: | |
| await asyncio.sleep(60) | |
| cutoff = _now() - JOB_TTL_SEC | |
| dead = [jid for jid, j in _jobs.items() if float( | |
| j.get('ts', 0)) < cutoff] | |
| for jid in dead: | |
| _jobs.pop(jid, None) | |
| async def _worker_loop(worker_id: int): | |
| while True: | |
| jid, payload = await _job_queue.get() | |
| try: | |
| _jobs[jid] = {'status': 'running', 'ts': _now()} | |
| result = await asyncio.to_thread(_process_payload, payload) | |
| _jobs[jid] = {'status': 'done', 'result': result, 'ts': _now()} | |
| except Exception as e: | |
| _jobs[jid] = {'status': 'error', 'result': str(e), 'ts': _now()} | |
| finally: | |
| _job_queue.task_done() | |
| def _process_payload(payload: dict) -> dict: | |
| t_all = time.perf_counter() | |
| mode = (payload.get('mode') or 'lens_images') | |
| lang = (payload.get('lang') or 'en') | |
| context = payload.get('context') if isinstance( | |
| payload.get('context'), dict) else {} | |
| page_url = str((context or {}).get('page_url') or '').strip() | |
| src = (payload.get('src') or '').strip() | |
| img_bytes = b'' | |
| mime = '' | |
| if payload.get('imageDataUri'): | |
| img_bytes, mime = _datauri_to_bytes(payload.get('imageDataUri')) | |
| elif src.startswith('data:'): | |
| img_bytes, mime = _datauri_to_bytes(src) | |
| else: | |
| img_bytes, mime = _download_bytes(src, page_url) | |
| t_img = time.perf_counter() | |
| if not img_bytes: | |
| raise Exception('No image data') | |
| ai_cfg = None | |
| ai = payload.get('ai') or None | |
| source = str(payload.get('source') or '').strip().lower() or 'translated' | |
| if mode == 'lens_text' and source == 'ai' and isinstance(ai, dict): | |
| api_key = str(ai.get('api_key') or '').strip() or ( | |
| os.getenv('AI_API_KEY') or '').strip() | |
| ai_cfg = AiConfig( | |
| api_key=api_key, | |
| model=str(ai.get('model') or 'auto').strip() or 'auto', | |
| provider=str(ai.get('provider') or 'auto').strip() or 'auto', | |
| base_url=str(ai.get('base_url') or 'auto').strip() or 'auto', | |
| prompt_editable=str(ai.get('prompt') or '').strip(), | |
| ) | |
| core.DO_AI_JSON = False | |
| img_hash = _sha256_hex(img_bytes) | |
| cache_key = '' | |
| if mode == 'lens_text' and img_hash: | |
| cache_source = 'ai' if source == 'ai' else 'text' | |
| cache_key = _build_cache_key( | |
| img_hash, lang, mode, cache_source, ai_cfg) | |
| cached = None | |
| if source == 'ai': | |
| cached = _lru_get(_ai_result_cache, _ai_cache_lock, cache_key) | |
| else: | |
| cached = _lru_get(_result_cache, _result_cache_lock, cache_key) | |
| if cached: | |
| cached['perf'] = { | |
| 'cache': 'hit', | |
| 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), | |
| 'img_ms': round((t_img - t_all) * 1000, 1), | |
| } | |
| return cached | |
| suffix = '.png' if (mime or '').endswith('png') else '.jpg' | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: | |
| f.write(img_bytes) | |
| tmp_path = f.name | |
| t_tmp = time.perf_counter() | |
| try: | |
| out = process_image_path(tmp_path, lang, mode, ai_cfg) | |
| out['perf'] = { | |
| 'cache': 'miss' if cache_key else 'off', | |
| 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), | |
| 'img_ms': round((t_img - t_all) * 1000, 1), | |
| 'tmp_ms': round((t_tmp - t_img) * 1000, 1), | |
| } | |
| if cache_key and isinstance(out, dict): | |
| if source == 'ai': | |
| _lru_set(_ai_result_cache, _ai_cache_lock, | |
| cache_key, out, TP_AI_RESULT_CACHE_MAX) | |
| else: | |
| _lru_set(_result_cache, _result_cache_lock, | |
| cache_key, out, TP_RESULT_CACHE_MAX) | |
| return out | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except Exception: | |
| pass | |
| async def _startup(): | |
| print( | |
| f'[TextPhantom][api] starting build={BUILD_ID} workers={SERVER_MAX_WORKERS}') | |
| for i in range(max(1, SERVER_MAX_WORKERS)): | |
| asyncio.create_task(_worker_loop(i)) | |
| asyncio.create_task(_cleanup_jobs_loop()) | |
| async def health(): | |
| return {'ok': True, 'build': BUILD_ID} | |
| async def version(): | |
| return {'ok': True, 'build': BUILD_ID, 'core': 'lens_core'} | |
| async def warmup(lang: str = TP_WARMUP_LANG): | |
| t0 = time.perf_counter() | |
| r = core.warmup(lang) | |
| return {'ok': True, 'build': BUILD_ID, 'dt_ms': round((time.perf_counter() - t0) * 1000, 1), 'result': r} | |
| async def meta(): | |
| langs = getattr(core, 'UI_LANGUAGES', None) or [] | |
| sources = [ | |
| {'id': 'original', 'name': 'Original'}, | |
| {'id': 'translated', 'name': 'Translated'}, | |
| {'id': 'ai', 'name': 'Ai'}, | |
| ] | |
| env_key = (os.getenv('AI_API_KEY') or '').strip() | |
| return {'ok': True, 'languages': langs, 'sources': sources, 'has_env_ai_key': bool(env_key)} | |
| async def translate(payload: Dict[str, Any]): | |
| jid = str(uuid.uuid4()) | |
| _dbg('rest.enqueue', { | |
| 'id': jid, | |
| 'mode': str(payload.get('mode') or ''), | |
| 'lang': str(payload.get('lang') or ''), | |
| 'source': str(payload.get('source') or ''), | |
| 'has_datauri': bool(payload.get('imageDataUri')), | |
| 'has_src': bool(payload.get('src')), | |
| }) | |
| _jobs[jid] = {'status': 'queued', 'ts': _now()} | |
| await _job_queue.put((jid, payload)) | |
| return {'id': jid} | |
| async def translate_status(job_id: str): | |
| j = _jobs.get(job_id) | |
| if not j: | |
| return {'status': 'error', 'result': 'job_not_found'} | |
| return j | |
| async def ai_resolve(payload: Dict[str, Any]): | |
| api_key = str(payload.get('api_key') or '').strip() or ( | |
| os.getenv('AI_API_KEY') or '').strip() | |
| lang = _normalize_lang(str(payload.get('lang') or 'en')) | |
| style_default = ((getattr(core, 'AI_LANG_STYLE', {}) or {}).get(lang) or (getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '').strip() | |
| if not api_key: | |
| return { | |
| 'ok': False, | |
| 'error': 'missing_api_key', | |
| 'provider': '', | |
| 'default_model': '', | |
| 'models': [], | |
| 'lang': lang, | |
| 'prompt_editable_default': style_default, | |
| } | |
| provider = core._canonical_provider(str(payload.get('provider') or 'auto')) | |
| if provider in ('', 'auto'): | |
| provider = _detect_provider_from_key(api_key) | |
| preset = _resolve_provider_defaults(provider) or {} | |
| requested_model = str(payload.get('model') or 'auto').strip() or 'auto' | |
| resolved_model = _resolve_model(provider, requested_model) | |
| models: List[str] = [] | |
| base_url = (str(payload.get('base_url') or 'auto')).strip() | |
| if base_url in ('', 'auto'): | |
| base_url = (preset.get('base_url') or '').strip() | |
| if provider == 'huggingface': | |
| if base_url: | |
| models = core._hf_router_available_models(api_key, base_url) | |
| if requested_model.lower() in ('', 'auto'): | |
| fallback = core._pick_hf_fallback_model(models) | |
| if fallback: | |
| resolved_model = fallback | |
| elif provider == 'gemini': | |
| models = getattr(core, '_gemini_available_models', | |
| lambda _k: [])(api_key) | |
| if not models: | |
| models = ['gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.5-pro', | |
| 'gemini-2.0-flash', 'gemini-3-flash-preview', 'gemini-3-pro-preview'] | |
| elif provider == 'anthropic': | |
| models = getattr(core, '_anthropic_available_models', | |
| lambda _k, _b=None: [])(api_key, base_url) | |
| else: | |
| if not base_url: | |
| base_url = (core.AI_PROVIDER_DEFAULTS.get('openai') or {}).get( | |
| 'base_url') or 'https://api.openai.com/v1' | |
| models = getattr(core, '_openai_compat_available_models', | |
| lambda _k, _b: [])(api_key, base_url) | |
| if provider == 'huggingface' and not models: | |
| models = [ | |
| 'google/gemma-3-27b-it:featherless-a', | |
| 'google/gemma-3-27b-it', | |
| 'google/gemma-2-2b-it', | |
| 'google/gemma-2-9b-it', | |
| ] | |
| if provider != 'huggingface' and not models: | |
| fallback_models: List[str] = [] | |
| preset_model = str(preset.get('model') or '').strip() | |
| if preset_model: | |
| fallback_models.append(preset_model) | |
| provider_defaults = (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get( | |
| provider, {}) or {} | |
| provider_model = str(provider_defaults.get('model') or '').strip() | |
| if provider_model: | |
| fallback_models.append(provider_model) | |
| if provider == 'gemini': | |
| fallback_models.extend([ | |
| 'gemini-2.5-flash', | |
| 'gemini-2.5-flash-lite', | |
| 'gemini-2.5-pro', | |
| 'gemini-2.0-flash', | |
| 'gemini-3-flash-preview', | |
| 'gemini-3-pro-preview', | |
| ]) | |
| models = sorted(set([m for m in fallback_models if m]), key=str.lower) | |
| if not models: | |
| all_models: List[str] = [] | |
| for _, v in (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).items(): | |
| m2 = str((v or {}).get('model') or '').strip() | |
| if m2: | |
| all_models.append(m2) | |
| models = sorted(set(all_models), key=str.lower) | |
| if models: | |
| models = sorted( | |
| {m.strip() for m in models if isinstance(m, str) and m.strip()}, | |
| key=str.lower, | |
| ) | |
| if models and resolved_model not in models: | |
| resolved_model = models[0] | |
| prompt_default = style_default | |
| return { | |
| 'ok': True, | |
| 'provider': provider, | |
| 'base_url': base_url, | |
| 'default_model': (preset.get('model') or ''), | |
| 'model': resolved_model, | |
| 'models': models, | |
| 'prompt_editable_default': prompt_default, | |
| } | |
| async def ai_prompt_default(lang: str = 'en'): | |
| l = _normalize_lang(lang) | |
| base = (getattr(core, 'AI_PROMPT_SYSTEM_BASE', '') or '').strip() | |
| style = (getattr(core, 'AI_LANG_STYLE', {}) or {}).get(l) or ( | |
| getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '' | |
| style = (style or '').strip() | |
| contract = "\n".join([ | |
| 'Return ONLY valid JSON (no markdown, no extra text).', | |
| 'Output JSON MUST have exactly one key: "aiTextFull".', | |
| 'Schema example: {"aiTextFull":"..."}', | |
| 'Markers: Keep every paragraph marker like <<TP_P0>> unchanged and in order. Do not remove or add markers.', | |
| "aiTextFull must include all markers, each followed by that paragraph's translated text.", | |
| ]) | |
| system_text = "\n\n".join([p for p in [base, style, contract] if p]) | |
| return { | |
| 'ok': True, | |
| 'lang': l, | |
| 'prompt_editable_default': style, | |
| 'lang_style': style, | |
| 'system_base': base, | |
| 'contract': contract, | |
| 'system_text': system_text, | |
| } | |
| async def ws_endpoint(ws: WebSocket): | |
| await ws.accept() | |
| await ws.send_text(json.dumps({'type': 'ack'})) | |
| try: | |
| while True: | |
| msg = await ws.receive_text() | |
| data = json.loads(msg) | |
| if data.get('type') != 'job': | |
| continue | |
| jid = str(data.get('id') or '') | |
| payload = data.get('payload') or {} | |
| _dbg('ws.job', { | |
| 'id': jid, | |
| 'mode': str(payload.get('mode') or ''), | |
| 'lang': str(payload.get('lang') or ''), | |
| 'source': str(payload.get('source') or ''), | |
| 'has_datauri': bool(payload.get('imageDataUri')), | |
| 'has_src': bool(payload.get('src')), | |
| }) | |
| try: | |
| result = await asyncio.to_thread(_process_payload, payload) | |
| try: | |
| await ws.send_text(json.dumps({'type': 'result', 'id': jid, 'result': result})) | |
| except WebSocketDisconnect: | |
| return | |
| except Exception as e: | |
| try: | |
| await ws.send_text(json.dumps({'type': 'error', 'id': jid, 'error': str(e)})) | |
| except (WebSocketDisconnect, RuntimeError): | |
| return | |
| except WebSocketDisconnect: | |
| return | |
| def main(): | |
| image_path = getattr(core, 'IMAGE_PATH', '') | |
| lang = getattr(core, 'LANG', 'en') | |
| mode = os.environ.get('MODE', 'lens_text') | |
| ai_key = os.environ.get('AI_API_KEY', getattr(core, 'AI_API_KEY', '')) | |
| ai_model = os.environ.get('AI_MODEL', getattr(core, 'AI_MODEL', 'auto')) | |
| ai_prompt = os.environ.get('AI_PROMPT', '') | |
| ai_cfg = AiConfig(api_key=ai_key, model=ai_model, | |
| prompt_editable=ai_prompt) if ai_key and mode == 'lens_text' else None | |
| out = process_image_path(image_path, lang, mode, ai_cfg) | |
| print(json.dumps(out, ensure_ascii=False, indent=2)) | |
| if __name__ == '__main__': | |
| main() | |