import asyncio, base64, copy, hashlib, io, json, os, re, tempfile, time, uuid, httpx, logging from backend import lens_core as core from http import HTTPStatus from collections import OrderedDict from threading import Lock, Semaphore from dataclasses import dataclass from typing import Any, Dict, List, Optional from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.middleware.cors import CORSMiddleware SERVER_MAX_WORKERS = int(os.environ.get('SERVER_MAX_WORKERS', '15')) JOB_TTL_SEC = int(os.environ.get('JOB_TTL_SEC', '3600')) HTTP_TIMEOUT_SEC = float(os.environ.get( 'HTTP_TIMEOUT_SEC', str(getattr(core, 'AI_TIMEOUT_SEC', 120)))) SUPPORTED_MODES = {"lens_images", "lens_text"} BUILD_ID = os.environ.get('TP_BUILD_ID', 'v9-backendfix-20260129') TP_DEBUG = str(os.environ.get('TP_DEBUG', '')).strip( ).lower() in ('1', 'true', 'yes', 'on') TP_PARA_MARKER_PREFIX = '<>') TP_ACCESS_LOG_MODE = (os.environ.get('TP_ACCESS_LOG_MODE', 'custom') or 'custom').strip().lower() if TP_ACCESS_LOG_MODE in ('custom', 'tp', 'plain'): try: _uv = logging.getLogger('uvicorn.access') _uv.disabled = True _uv.propagate = False _uv.setLevel(logging.CRITICAL) except Exception: pass def _dbg(tag: str, data=None) -> None: if not TP_DEBUG: return try: if data is None: print(f'[TextPhantom][dbg] {tag}') else: s = json.dumps(data, ensure_ascii=False) if len(s) > 2000: s = s[:2000] + '…' print(f'[TextPhantom][dbg] {tag} {s}') except Exception: try: print(f'[TextPhantom][dbg] {tag} {data}') except Exception: pass def _tree_stats(tree) -> dict: if not isinstance(tree, dict): return {'paras': 0, 'items': 0, 'spans': 0} paras = tree.get('paragraphs') or [] if not isinstance(paras, list): return {'paras': 0, 'items': 0, 'spans': 0} items = 0 spans = 0 for p in paras: if not isinstance(p, dict): continue its = p.get('items') or [] if not isinstance(its, list): continue items += len(its) for it in its: if not isinstance(it, dict): continue sp = it.get('spans') or [] if isinstance(sp, list): spans += len(sp) return {'paras': len(paras), 'items': items, 'spans': spans} def _tree_to_paragraph_texts(tree: Any) -> List[str]: if not isinstance(tree, dict): return [] paras = tree.get('paragraphs') or [] if not isinstance(paras, list) or not paras: return [] out: List[str] = [] for p in paras: if not isinstance(p, dict): out.append('') continue t = str(p.get('text') or '').strip() if not t: items = p.get('items') or [] if isinstance(items, list) and items: t = ' '.join(str(it.get('text') or '').strip() for it in items if isinstance( it, dict) and str(it.get('text') or '').strip()) out.append(t) return out def _apply_para_markers(paras: List[str]) -> str: if not paras: return '' parts: List[str] = [] for i, t in enumerate(paras): parts.append( f"{TP_PARA_MARKER_PREFIX}{i}{TP_PARA_MARKER_SUFFIX}\n{(t or '').strip()}") return '\n\n'.join(parts) def _clamp_runaway_repeats(s: str, max_repeat: int = 12) -> str: if not s: return '' pat = re.compile(r"(.)\1{" + str(max_repeat) + r",}") return pat.sub(lambda m: m.group(1) * max_repeat, s) def _extract_marker_indices(s: str) -> set[int]: if not s: return set() out: set[int] = set() for m in re.finditer(r"<>", s): try: out.add(int(m.group(1))) except Exception: continue return out def _needs_ai_retry(ai_text_full: str, expected_paras: int) -> bool: if expected_paras <= 0: return False idx = _extract_marker_indices(ai_text_full) if len(idx) >= expected_paras: return False if (TP_PARA_MARKER_PREFIX in (ai_text_full or '')) and (TP_PARA_MARKER_SUFFIX not in (ai_text_full or '')): return True return True def _now() -> float: return time.time() def _lru_get(cache: OrderedDict, lock: Lock, key: str) -> Optional[Dict[str, Any]]: if not key: return None with lock: v = cache.get(key) if v is None: return None cache.move_to_end(key) return copy.deepcopy(v) def _lru_set(cache: OrderedDict, lock: Lock, key: str, value: Dict[str, Any], max_items: int) -> None: if not key or not isinstance(value, dict) or max_items <= 0: return with lock: cache[key] = copy.deepcopy(value) cache.move_to_end(key) while len(cache) > max_items: cache.popitem(last=False) def _sha256_hex(blob: bytes) -> str: return hashlib.sha256(blob).hexdigest() if blob else '' def _ai_prompt_sig(s: str) -> str: t = (s or '').strip() if not t: return '' return hashlib.sha256(t.encode('utf-8')).hexdigest()[:12] def _build_cache_key(img_hash: str, lang: str, mode: str, source: str, ai_cfg: Optional["AiConfig"]) -> str: parts = [img_hash, _normalize_lang( lang), (mode or '').strip(), (source or '').strip()] if ai_cfg and (source or '').strip().lower() == 'ai': parts.extend([ (ai_cfg.provider or '').strip(), (ai_cfg.model or '').strip(), (ai_cfg.base_url or '').strip(), _ai_prompt_sig(ai_cfg.prompt_editable), ]) return '|'.join([p for p in parts if p is not None]) def _b64_to_bytes(b64: str) -> bytes: pad = '=' * ((4 - (len(b64) % 4)) % 4) return base64.b64decode(b64 + pad) def _datauri_to_bytes(data_uri: str) -> tuple[bytes, str]: s = (data_uri or '').strip() if not s.startswith('data:'): return b'', '' head, _, b64 = s.partition(',') mime = '' if ';' in head: mime = head[5:head.index(';')] return _b64_to_bytes(b64), mime or 'application/octet-stream' def _bytes_to_datauri(blob: bytes, mime: str) -> str: b64 = base64.b64encode(blob).decode('ascii') return f"data:{mime};base64,{b64}" def _download_bytes(url: str, referer: str = '') -> tuple[bytes, str]: u = (url or '').strip() if not u: return b'', '' headers = { 'user-agent': 'Mozilla/5.0 (TextPhantomOCR; +https://huggingface.co/spaces)', } ref = (referer or '').strip() if ref: headers['referer'] = ref with httpx.Client(timeout=HTTP_TIMEOUT_SEC, follow_redirects=True, headers=headers) as client: r = client.get(u) r.raise_for_status() ct = (r.headers.get('content-type') or '').split(';')[0].strip() return r.content, ct def _detect_provider_from_key(api_key: str) -> str: return core._canonical_provider(core._detect_ai_provider_from_key(api_key)) def _resolve_provider_defaults(provider: str) -> dict: return (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get(provider, {}) def _resolve_model(provider: str, model: str) -> str: return core._resolve_model(provider, model) def _has_meaningful_text(s: str) -> bool: t = _tp_marker_re.sub('', str(s or '')) return bool(t.strip()) def _is_hf_provider(provider: str, base_url: str) -> bool: p = (provider or '').strip().lower() b = (base_url or '').strip().lower() return p == 'huggingface' or 'router.huggingface.co' in b def _is_hf_rate_limited_error(msg: str) -> bool: t = (msg or '').lower() if 'rate limit' in t or 'ratelimit' in t or 'too many requests' in t: return True if 'http 429' in t or ' 429' in t: return True if 'http 503' in t or ' 503' in t or 'overloaded' in t or 'temporarily' in t: return True return False def _hf_throttle_before_call() -> None: if HF_AI_MIN_INTERVAL_SEC <= 0: return global _hf_ai_last_ts with _hf_ai_lock: now = _now() dt = now - float(_hf_ai_last_ts or 0.0) wait = HF_AI_MIN_INTERVAL_SEC - dt if wait > 0: time.sleep(wait) _hf_ai_last_ts = _now() def _openai_compat_generate_with_hf_backoff(api_key: str, base_url: str, model: str, system_text: str, user_parts: List[str]): last_err: Optional[Exception] = None for attempt in range(int(HF_AI_MAX_RETRIES)): try: with _hf_ai_sem: _hf_throttle_before_call() return core._openai_compat_generate_json(api_key, base_url, model, system_text, user_parts) except Exception as e: last_err = e if not _is_hf_rate_limited_error(str(e)): raise delay = min(15.0, max(float(HF_AI_MIN_INTERVAL_SEC), float( HF_AI_RETRY_BASE_SEC) * (2 ** min(attempt, 4)))) _dbg('ai.hf.backoff', { 'attempt': attempt + 1, 'delay_sec': round(delay, 2), 'err': str(e)[:240]}) time.sleep(delay) continue if last_err is not None: raise last_err raise Exception('hf_backoff_failed') def _normalize_lang(lang: str) -> str: return core._normalize_lang(lang) @dataclass class AiConfig: api_key: str model: str = 'auto' provider: str = 'auto' base_url: str = 'auto' prompt_editable: str = '' def _collapse_ws(text: str) -> str: return re.sub(r"\s+", " ", str(text or "")).strip() def _sanitize_marked_text(marked_text: str) -> str: t = str(marked_text or "") if not t: return "" t = t.replace("\r\n", "\n").replace("\r", "\n") t = re.sub(r"<>)[^\s>]*>?", "", t) t = re.sub(r"(?m)^\s*(<>)\s*(\S)", r"\1\n\2", t) lines = t.split("\n") out0: List[str] = [] for line in lines: if "<>)\s*$", line) if m: out0.append(m.group(1)) continue m2 = re.match(r"^\s*(<>)\s*(.*)$", line) if m2: out0.append(m2.group(1)) rest = (m2.group(2) or "").strip() if rest: out0.append(rest) continue out0.append(re.sub(r"<>", "", line)) t = "\n".join(out0) indices = sorted(_extract_marker_indices(t)) if not indices: return _collapse_ws(t) out_lines: List[str] = [] for idx in indices: marker = f"<>" m = re.search( rf"{re.escape(marker)}\s*([\s\S]*?)(?=<>|\Z)", t) seg = m.group(1) if m else "" seg = _collapse_ws(seg) out_lines.append(marker) out_lines.append(seg) out_lines.append("") return "\n".join(out_lines).strip("\n") def _has_complete_marker_sequence(ai_text_full: str, expected_paras: int) -> bool: if expected_paras <= 0: return True t = str(ai_text_full or "") need = list(range(int(expected_paras))) idx = sorted(_extract_marker_indices(t)) if len(idx) < len(need): return False if idx[:len(need)] != need: return False last = -1 for i in need: m = f"<>" p = t.find(m) if p < 0 or p <= last: return False last = p return True def _build_ai_prompt_packet_custom(target_lang: str, original_text_full: str, prompt_editable: str, is_retry: bool = False) -> tuple[str, List[str]]: lang = _normalize_lang(target_lang) base = (getattr(core, "AI_PROMPT_SYSTEM_BASE", "") or "").strip() style = (prompt_editable or "").strip() if not style: style = ( (getattr(core, "AI_LANG_STYLE", {}) or {}).get(lang) or (getattr(core, "AI_LANG_STYLE", {}) or {}).get("default") or "" ).strip() contract_parts: List[str] = [ "Output ONLY the translated text (no JSON, no markdown, no extra commentary).", "Markers: Keep every paragraph marker like <> unchanged and in order. Do not remove, rename, or add markers.", "For each marker, output the marker followed by that paragraph's translated text.", ] if is_retry: contract_parts.append( "Retry: You MUST output ALL markers from the first to the last marker in the input." ) system_text = "\n\n".join( [p for p in [base, style, "\n".join(contract_parts)] if p] ) user_parts: List[str] = ["Input:\n" + str(original_text_full or "")] return system_text, user_parts def ai_translate_text(original_text_full: str, target_lang: str, ai: AiConfig, is_retry: bool = False) -> dict: if not _has_meaningful_text(original_text_full): return { 'aiTextFull': '', 'meta': { 'skipped': True, 'skipped_reason': 'no_text', }, } api_key = (ai.api_key or '').strip() if not api_key: raise Exception('AI api_key is required') provider = core._canonical_provider((ai.provider or 'auto')) if provider in ('', 'auto'): provider = _detect_provider_from_key(api_key) preset = _resolve_provider_defaults(provider) or {} model = _resolve_model(provider, (ai.model or 'auto')) base_url = (ai.base_url or 'auto').strip() if base_url in ('', 'auto'): base_url = (preset.get('base_url') or '').strip() if provider not in ('gemini', 'anthropic'): if not base_url: base_url = (_resolve_provider_defaults('openai') or {}).get( 'base_url') or 'https://api.openai.com/v1' system_text, user_parts = _build_ai_prompt_packet_custom( target_lang, original_text_full, ai.prompt_editable, is_retry=is_retry ) started = _now() used_model = model if provider == 'gemini': raw = core._gemini_generate_json( api_key, model, system_text, user_parts) elif provider == 'anthropic': raw = core._anthropic_generate_json( api_key, model, system_text, user_parts) else: if _is_hf_provider(provider, base_url): raw, used_model = _openai_compat_generate_with_hf_backoff( api_key, base_url, model, system_text, user_parts) else: raw, used_model = core._openai_compat_generate_json( api_key, base_url, model, system_text, user_parts) ai_text_full = core._parse_ai_textfull_only( raw) if core.DO_AI_JSON else core._parse_ai_textfull_text_only(raw) ai_text_full = _sanitize_marked_text(ai_text_full) return { 'aiTextFull': ai_text_full, 'meta': { 'model': used_model, 'provider': provider, 'base_url': base_url, 'latency_sec': round(_now() - started, 3), }, } def process_image_path(image_path: str, lang: str, mode: str, ai_cfg: Optional[AiConfig]) -> dict: mode_id = (mode or '').strip() if mode_id not in SUPPORTED_MODES: mode_id = 'lens_images' target_lang = _normalize_lang(lang) data = core.get_lens_data_from_image( image_path, getattr(core, 'FIREBASE_URL', ''), target_lang) img = core.Image.open(image_path).convert('RGB') W, H = img.size thai_font = getattr(core, 'FONT_THAI_PATH', 'NotoSansThai-Regular.ttf') latin_font = getattr(core, 'FONT_LATIN_PATH', 'NotoSans-Regular.ttf') if target_lang == 'ja': latin_font = getattr(core, 'FONT_JA_PATH', latin_font) elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): latin_font = getattr(core, 'FONT_ZH_SC_PATH', latin_font) elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): latin_font = getattr(core, 'FONT_ZH_TC_PATH', latin_font) if getattr(core, 'FONT_DOWNLOD', True): thai_font = core.ensure_font( thai_font, getattr(core, 'FONT_THAI_URLS', [])) if target_lang == 'ja': latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_JA_URLS', [])) elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_ZH_SC_URLS', [])) elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_ZH_TC_URLS', [])) else: latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_LATIN_URLS', [])) image_url = data.get('imageUrl') if isinstance(data, dict) else None out: Dict[str, Any] = { 'mode': mode_id, 'imageUrl': image_url, 'imageDataUri': '', 'originalContentLanguage': data.get('originalContentLanguage') if isinstance(data, dict) else None, 'originalTextFull': data.get('originalTextFull') if isinstance(data, dict) else None, 'translatedTextFull': data.get('translatedTextFull') if isinstance(data, dict) else None, 'AiTextFull': '', 'originalParagraphs': (data.get('originalParagraphs') or []) if isinstance(data, dict) else [], 'translatedParagraphs': (data.get('translatedParagraphs') or []) if isinstance(data, dict) else [], 'original': {}, 'translated': {}, 'Ai': {}, } if mode_id == 'lens_images': if image_url: decoded = core.decode_imageurl_to_datauri(str(image_url)) if decoded: out['imageDataUri'] = decoded elif isinstance(image_url, str) and image_url.startswith(('http://', 'https://')): blob, mime2 = _download_bytes(image_url) out['imageDataUri'] = _bytes_to_datauri( blob, mime2 or 'image/jpeg') if not out.get('imageDataUri'): with open(image_path, 'rb') as f: blob = f.read() out['imageDataUri'] = _bytes_to_datauri(blob, 'image/jpeg') return out original_span_tokens = None original_tree = None translated_tree = None def _base_img_for_overlay() -> core.Image.Image: if not (getattr(core, 'ERASE_OLD_TEXT_WITH_ORIGINAL_BOXES', True) and original_span_tokens): return img return core.erase_text_with_boxes( img, original_span_tokens, pad_px=getattr(core, 'ERASE_PADDING_PX', 2), sample_margin_px=getattr(core, 'ERASE_SAMPLE_MARGIN_PX', 6), ) if getattr(core, 'DO_ORIGINAL', True): tree, _ = core.decode_tree( out.get('originalParagraphs') or [], out.get('originalTextFull') or '', 'original', W, H, want_raw=False, ) original_tree = tree original_span_tokens = core.flatten_tree_spans(tree) _dbg('tree.original', _tree_stats(original_tree)) out['original'] = { 'originalTree': tree, 'originalTextFull': out.get('originalTextFull') or '', } if getattr(core, 'DO_TRANSLATED', True): tree, _ = core.decode_tree( out.get('translatedParagraphs') or [], out.get('translatedTextFull') or '', 'translated', W, H, want_raw=False, ) translated_tree = tree translated_span_tokens = core.flatten_tree_spans(tree) _dbg('tree.translated', _tree_stats(translated_tree)) out['translated'] = { 'translatedTree': tree, 'translatedTextFull': out.get('translatedTextFull') or '', } def _tree_score(tree: Any) -> int: if not isinstance(tree, dict): return -1 paragraphs = tree.get('paragraphs') or [] if not isinstance(paragraphs, list) or not paragraphs: return -1 para_count = len(paragraphs) item_count = 0 span_count = 0 for p in paragraphs: if not isinstance(p, dict): continue items = p.get('items') or [] if not isinstance(items, list): continue item_count += len(items) for it in items: if not isinstance(it, dict): continue spans = it.get('spans') or [] if isinstance(spans, list): span_count += len(spans) return item_count * 10000 + para_count * 100 + span_count def _pick_ai_template_tree() -> Optional[Dict[str, Any]]: tr_score = _tree_score(translated_tree) og_score = _tree_score(original_tree) if tr_score < 0 and og_score < 0: return None if og_score > tr_score: return original_tree return translated_tree or original_tree ai_tree = None if ai_cfg and (ai_cfg.api_key or '').strip() and getattr(core, 'DO_AI', True): src_paras = _tree_to_paragraph_texts(original_tree or {}) src_text = _apply_para_markers(src_paras) if src_paras else str( out.get('originalTextFull') or '') if not _has_meaningful_text(src_text): out['AiTextFull'] = '' out['Ai'] = { 'meta': { 'skipped': True, 'skipped_reason': 'no_text', } } else: ai = ai_translate_text(src_text, target_lang, ai_cfg) if src_paras and _needs_ai_retry(str(ai.get('aiTextFull') or ''), len(src_paras)): _dbg('ai.retry', { 'expected_paras': len(src_paras), 'found_markers': len(_extract_marker_indices(str(ai.get('aiTextFull') or ''))), }) retry_paras = [_clamp_runaway_repeats(p) for p in src_paras] retry_text = _apply_para_markers(retry_paras) or src_text ai = ai_translate_text( retry_text, target_lang, ai_cfg, is_retry=True) ai_text_full = str(ai.get('aiTextFull') or '') meta0 = ai.get('meta') or {} if src_paras: expected = len(src_paras) if not _has_complete_marker_sequence(ai_text_full, expected): fallback_paras = _tree_to_paragraph_texts(translated_tree or {}) if len(fallback_paras) < expected: fallback_paras = (fallback_paras + src_paras)[:expected] else: fallback_paras = fallback_paras[:expected] found = sorted(_extract_marker_indices(ai_text_full)) seg_map: Dict[int, str] = {} for idx in found: if idx < 0 or idx >= expected: continue marker = f"<>" m = re.search(rf"{re.escape(marker)}\s*([\s\S]*?)(?=<>|\Z)", ai_text_full) seg = _collapse_ws(m.group(1) if m else '') if seg and idx not in seg_map: seg_map[idx] = seg missing = 0 out_lines: List[str] = [] for i in range(expected): seg = seg_map.get(i) or _collapse_ws(fallback_paras[i] if i < len(fallback_paras) else '') if not seg_map.get(i): missing += 1 out_lines.append(f"<>") out_lines.append(seg) out_lines.append('') ai_text_full = "\n".join(out_lines).strip("\n") _dbg('ai.marker.repaired', { 'expected_paras': expected, 'found_markers': len(seg_map), 'missing': missing, }) meta0 = { **meta0, 'marker_repaired': True, 'marker_expected': expected, 'marker_found': len(seg_map), 'marker_missing': missing, } template_tree = _pick_ai_template_tree() _dbg('ai.template.pick', { 'score_original': _tree_score(original_tree), 'score_translated': _tree_score(translated_tree), 'picked': 'original' if template_tree is original_tree else ('translated' if template_tree is translated_tree else 'none'), }) if not isinstance(template_tree, dict): template_tree = original_tree if isinstance(original_tree, dict) else ( translated_tree if isinstance(translated_tree, dict) else {}) patched = core.patch( {'Ai': {'aiTextFull': str( ai_text_full or ''), 'aiTree': template_tree}}, W, H, thai_font or '', latin_font or '', lang=target_lang, ) ai_tree = (patched.get('Ai') or {}).get('aiTree') or {} _dbg('ai.patched', { 'ai_text_len': len(ai_text_full), 'stats_ai': _tree_stats(ai_tree), 'stats_original': _tree_stats(original_tree or {}), 'stats_translated': _tree_stats(translated_tree or {}), 'mode': mode_id, 'lang': target_lang, }) shared_para_sizes = core._compute_shared_para_sizes( [original_tree or {}, translated_tree or {}, ai_tree or {}], thai_font or '', latin_font or '', W, H, ) core._apply_para_font_size(original_tree or {}, shared_para_sizes) core._apply_para_font_size( translated_tree or {}, shared_para_sizes) core._apply_para_font_size(ai_tree or {}, shared_para_sizes) core._rebuild_ai_spans_after_font_resize( ai_tree or {}, W, H, thai_font or '', latin_font or '', lang=target_lang) out['AiTextFull'] = ai_text_full out['Ai'] = { 'aiTextFull': ai_text_full, 'aiTree': ai_tree, 'meta': meta0, } if getattr(core, 'DO_AI_HTML', True): core.fit_tree_font_sizes_for_tp_html( ai_tree, thai_font or '', latin_font or '', W, H) out['Ai']['aihtml'] = core.ai_tree_to_tp_html(ai_tree, W, H) out['Ai']['aihtmlMeta'] = { 'baseW': int(W), 'baseH': int(H), 'format': 'tp', } if getattr(core, 'DO_ORIGINAL', True) and getattr(core, 'DO_ORIGINAL_HTML', True) and isinstance(original_tree, dict): core.fit_tree_font_sizes_for_tp_html( original_tree, thai_font or '', latin_font or '', W, H) if isinstance(out.get('original'), dict): out['original']['originalhtml'] = core.ai_tree_to_tp_html( original_tree or {}, W, H) if getattr(core, 'DO_TRANSLATED', True) and getattr(core, 'DO_TRANSLATED_HTML', True) and isinstance(translated_tree, dict): core.fit_tree_font_sizes_for_tp_html( translated_tree, thai_font or '', latin_font or '', W, H) if isinstance(out.get('translated'), dict): out['translated']['translatedhtml'] = core.ai_tree_to_tp_html( translated_tree or {}, W, H) if getattr(core, 'HTML_INCLUDE_CSS', True) and (getattr(core, 'DO_ORIGINAL_HTML', True) or getattr(core, 'DO_TRANSLATED_HTML', True) or getattr(core, 'DO_AI_HTML', True)): out['htmlCss'] = core.tp_overlay_css() out['htmlMeta'] = { 'baseW': int(W), 'baseH': int(H), 'format': 'tp', } base_img = _base_img_for_overlay() buf = io.BytesIO() base_img.save(buf, format='PNG') out['imageDataUri'] = _bytes_to_datauri(buf.getvalue(), 'image/png') return out app = FastAPI(title='TextPhantom OCR API', version='1.0') app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) @app.middleware("http") async def _tp_access_log(request: Request, call_next): resp = await call_next(request) if TP_ACCESS_LOG_MODE in ('uvicorn', 'off', 'none'): return resp try: path = request.url.path if request.method == 'GET' and path.startswith("/translate/"): client = request.client host = client.host if client else "-" port = client.port if client else 0 ver = request.scope.get("http_version") or "1.1" phrase = HTTPStatus(resp.status_code).phrase print(f'{host}:{port} - "{request.method} {path} HTTP/{ver}" {resp.status_code} {phrase}', flush=True) except Exception: pass return resp async def _cleanup_jobs_loop(): while True: await asyncio.sleep(60) cutoff = _now() - JOB_TTL_SEC dead = [jid for jid, j in _jobs.items() if float( j.get('ts', 0)) < cutoff] for jid in dead: _jobs.pop(jid, None) async def _worker_loop(worker_id: int): while True: jid, payload = await _job_queue.get() try: _jobs[jid] = {'status': 'running', 'ts': _now()} result = await asyncio.to_thread(_process_payload, payload) _jobs[jid] = {'status': 'done', 'result': result, 'ts': _now()} except Exception as e: _jobs[jid] = {'status': 'error', 'result': str(e), 'ts': _now()} finally: _job_queue.task_done() def _process_payload(payload: dict) -> dict: t_all = time.perf_counter() mode = (payload.get('mode') or 'lens_images') lang = (payload.get('lang') or 'en') context = payload.get('context') if isinstance( payload.get('context'), dict) else {} page_url = str((context or {}).get('page_url') or '').strip() src = (payload.get('src') or '').strip() img_bytes = b'' mime = '' if payload.get('imageDataUri'): img_bytes, mime = _datauri_to_bytes(payload.get('imageDataUri')) elif src.startswith('data:'): img_bytes, mime = _datauri_to_bytes(src) else: img_bytes, mime = _download_bytes(src, page_url) t_img = time.perf_counter() if not img_bytes: raise Exception('No image data') ai_cfg = None ai = payload.get('ai') or None source = str(payload.get('source') or '').strip().lower() or 'translated' if mode == 'lens_text' and source == 'ai' and isinstance(ai, dict): api_key = str(ai.get('api_key') or '').strip() or ( os.getenv('AI_API_KEY') or '').strip() ai_cfg = AiConfig( api_key=api_key, model=str(ai.get('model') or 'auto').strip() or 'auto', provider=str(ai.get('provider') or 'auto').strip() or 'auto', base_url=str(ai.get('base_url') or 'auto').strip() or 'auto', prompt_editable=str(ai.get('prompt') or '').strip(), ) core.DO_AI_JSON = False img_hash = _sha256_hex(img_bytes) cache_key = '' if mode == 'lens_text' and img_hash: cache_source = 'ai' if source == 'ai' else 'text' cache_key = _build_cache_key( img_hash, lang, mode, cache_source, ai_cfg) cached = None if source == 'ai': cached = _lru_get(_ai_result_cache, _ai_cache_lock, cache_key) else: cached = _lru_get(_result_cache, _result_cache_lock, cache_key) if cached: cached['perf'] = { 'cache': 'hit', 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), 'img_ms': round((t_img - t_all) * 1000, 1), } return cached suffix = '.png' if (mime or '').endswith('png') else '.jpg' with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: f.write(img_bytes) tmp_path = f.name t_tmp = time.perf_counter() try: out = process_image_path(tmp_path, lang, mode, ai_cfg) out['perf'] = { 'cache': 'miss' if cache_key else 'off', 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), 'img_ms': round((t_img - t_all) * 1000, 1), 'tmp_ms': round((t_tmp - t_img) * 1000, 1), } if cache_key and isinstance(out, dict): if source == 'ai': _lru_set(_ai_result_cache, _ai_cache_lock, cache_key, out, TP_AI_RESULT_CACHE_MAX) else: _lru_set(_result_cache, _result_cache_lock, cache_key, out, TP_RESULT_CACHE_MAX) return out finally: try: os.unlink(tmp_path) except Exception: pass @app.on_event('startup') async def _startup(): print( f'[TextPhantom][api] starting build={BUILD_ID} workers={SERVER_MAX_WORKERS}') for i in range(max(1, SERVER_MAX_WORKERS)): asyncio.create_task(_worker_loop(i)) asyncio.create_task(_cleanup_jobs_loop()) @app.get('/health') async def health(): return {'ok': True, 'build': BUILD_ID} @app.get('/version') async def version(): return {'ok': True, 'build': BUILD_ID, 'core': 'lens_core'} @app.get('/warmup') async def warmup(lang: str = TP_WARMUP_LANG): t0 = time.perf_counter() r = core.warmup(lang) return {'ok': True, 'build': BUILD_ID, 'dt_ms': round((time.perf_counter() - t0) * 1000, 1), 'result': r} @app.get('/meta') async def meta(): langs = getattr(core, 'UI_LANGUAGES', None) or [] sources = [ {'id': 'original', 'name': 'Original'}, {'id': 'translated', 'name': 'Translated'}, {'id': 'ai', 'name': 'Ai'}, ] env_key = (os.getenv('AI_API_KEY') or '').strip() return {'ok': True, 'languages': langs, 'sources': sources, 'has_env_ai_key': bool(env_key)} @app.post('/translate') async def translate(payload: Dict[str, Any]): jid = str(uuid.uuid4()) _dbg('rest.enqueue', { 'id': jid, 'mode': str(payload.get('mode') or ''), 'lang': str(payload.get('lang') or ''), 'source': str(payload.get('source') or ''), 'has_datauri': bool(payload.get('imageDataUri')), 'has_src': bool(payload.get('src')), }) _jobs[jid] = {'status': 'queued', 'ts': _now()} await _job_queue.put((jid, payload)) return {'id': jid} @app.get('/translate/{job_id}') async def translate_status(job_id: str): j = _jobs.get(job_id) if not j: return {'status': 'error', 'result': 'job_not_found'} return j @app.post('/ai/resolve') async def ai_resolve(payload: Dict[str, Any]): api_key = str(payload.get('api_key') or '').strip() or ( os.getenv('AI_API_KEY') or '').strip() lang = _normalize_lang(str(payload.get('lang') or 'en')) style_default = ((getattr(core, 'AI_LANG_STYLE', {}) or {}).get(lang) or (getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '').strip() if not api_key: return { 'ok': False, 'error': 'missing_api_key', 'provider': '', 'default_model': '', 'models': [], 'lang': lang, 'prompt_editable_default': style_default, } provider = core._canonical_provider(str(payload.get('provider') or 'auto')) if provider in ('', 'auto'): provider = _detect_provider_from_key(api_key) preset = _resolve_provider_defaults(provider) or {} requested_model = str(payload.get('model') or 'auto').strip() or 'auto' resolved_model = _resolve_model(provider, requested_model) models: List[str] = [] base_url = (str(payload.get('base_url') or 'auto')).strip() if base_url in ('', 'auto'): base_url = (preset.get('base_url') or '').strip() if provider == 'huggingface': if base_url: models = core._hf_router_available_models(api_key, base_url) if requested_model.lower() in ('', 'auto'): fallback = core._pick_hf_fallback_model(models) if fallback: resolved_model = fallback elif provider == 'gemini': models = getattr(core, '_gemini_available_models', lambda _k: [])(api_key) if not models: models = ['gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.5-pro', 'gemini-2.0-flash', 'gemini-3-flash-preview', 'gemini-3-pro-preview'] elif provider == 'anthropic': models = getattr(core, '_anthropic_available_models', lambda _k, _b=None: [])(api_key, base_url) else: if not base_url: base_url = (core.AI_PROVIDER_DEFAULTS.get('openai') or {}).get( 'base_url') or 'https://api.openai.com/v1' models = getattr(core, '_openai_compat_available_models', lambda _k, _b: [])(api_key, base_url) if provider == 'huggingface' and not models: models = [ 'google/gemma-3-27b-it:featherless-a', 'google/gemma-3-27b-it', 'google/gemma-2-2b-it', 'google/gemma-2-9b-it', ] if provider != 'huggingface' and not models: fallback_models: List[str] = [] preset_model = str(preset.get('model') or '').strip() if preset_model: fallback_models.append(preset_model) provider_defaults = (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get( provider, {}) or {} provider_model = str(provider_defaults.get('model') or '').strip() if provider_model: fallback_models.append(provider_model) if provider == 'gemini': fallback_models.extend([ 'gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.5-pro', 'gemini-2.0-flash', 'gemini-3-flash-preview', 'gemini-3-pro-preview', ]) models = sorted(set([m for m in fallback_models if m]), key=str.lower) if not models: all_models: List[str] = [] for _, v in (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).items(): m2 = str((v or {}).get('model') or '').strip() if m2: all_models.append(m2) models = sorted(set(all_models), key=str.lower) if models: models = sorted( {m.strip() for m in models if isinstance(m, str) and m.strip()}, key=str.lower, ) if models and resolved_model not in models: resolved_model = models[0] prompt_default = style_default return { 'ok': True, 'provider': provider, 'base_url': base_url, 'default_model': (preset.get('model') or ''), 'model': resolved_model, 'models': models, 'prompt_editable_default': prompt_default, } @app.get('/ai/prompt/default') async def ai_prompt_default(lang: str = 'en'): l = _normalize_lang(lang) base = (getattr(core, 'AI_PROMPT_SYSTEM_BASE', '') or '').strip() style = (getattr(core, 'AI_LANG_STYLE', {}) or {}).get(l) or ( getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '' style = (style or '').strip() contract = "\n".join([ 'Return ONLY valid JSON (no markdown, no extra text).', 'Output JSON MUST have exactly one key: "aiTextFull".', 'Schema example: {"aiTextFull":"..."}', 'Markers: Keep every paragraph marker like <> unchanged and in order. Do not remove or add markers.', "aiTextFull must include all markers, each followed by that paragraph's translated text.", ]) system_text = "\n\n".join([p for p in [base, style, contract] if p]) return { 'ok': True, 'lang': l, 'prompt_editable_default': style, 'lang_style': style, 'system_base': base, 'contract': contract, 'system_text': system_text, } @app.websocket('/ws') async def ws_endpoint(ws: WebSocket): await ws.accept() await ws.send_text(json.dumps({'type': 'ack'})) try: while True: msg = await ws.receive_text() data = json.loads(msg) if data.get('type') != 'job': continue jid = str(data.get('id') or '') payload = data.get('payload') or {} _dbg('ws.job', { 'id': jid, 'mode': str(payload.get('mode') or ''), 'lang': str(payload.get('lang') or ''), 'source': str(payload.get('source') or ''), 'has_datauri': bool(payload.get('imageDataUri')), 'has_src': bool(payload.get('src')), }) try: result = await asyncio.to_thread(_process_payload, payload) try: await ws.send_text(json.dumps({'type': 'result', 'id': jid, 'result': result})) except WebSocketDisconnect: return except Exception as e: try: await ws.send_text(json.dumps({'type': 'error', 'id': jid, 'error': str(e)})) except (WebSocketDisconnect, RuntimeError): return except WebSocketDisconnect: return def main(): image_path = getattr(core, 'IMAGE_PATH', '') lang = getattr(core, 'LANG', 'en') mode = os.environ.get('MODE', 'lens_text') ai_key = os.environ.get('AI_API_KEY', getattr(core, 'AI_API_KEY', '')) ai_model = os.environ.get('AI_MODEL', getattr(core, 'AI_MODEL', 'auto')) ai_prompt = os.environ.get('AI_PROMPT', '') ai_cfg = AiConfig(api_key=ai_key, model=ai_model, prompt_editable=ai_prompt) if ai_key and mode == 'lens_text' else None out = process_image_path(image_path, lang, mode, ai_cfg) print(json.dumps(out, ensure_ascii=False, indent=2)) if __name__ == '__main__': main()