|
|
|
|
|
|
|
|
|
|
|
|
|
|
import cv2, time, numpy as np, threading, queue, os, math, re, traceback |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
from tts_reader import TTSReader |
|
|
import seeing |
|
|
|
|
|
try: |
|
|
from rapidfuzz import process, fuzz |
|
|
from jamo import h2j, j2hcj |
|
|
_SPELLFIX_OK = True |
|
|
except Exception as _e: |
|
|
print(f"[SPELLFIX] disabled: { _e }") |
|
|
_SPELLFIX_OK = False |
|
|
|
|
|
|
|
|
CAMERA_ID = 0 |
|
|
CAPTURE_TARGET_W = 1920 |
|
|
CAPTURE_TARGET_H = 1080 |
|
|
WORK_WIDTH_TARGET = 1280 |
|
|
DISPLAY_MAX_W = 1280 |
|
|
WINDOW_NAME = 'Assistive Fingertip OCR (fast)' |
|
|
|
|
|
|
|
|
OCR_ENABLED = True |
|
|
BASE_OCR_PERIOD = 1.5 |
|
|
EXTRA_OCR_PERIOD = 0.6 |
|
|
STALE_AGE_SEC = 7.0 |
|
|
LOW_CONF_TH = 0.55 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ROI_W, ROI_H = 420,420 |
|
|
MIN_ROI_W, MIN_ROI_H = 200, 120 |
|
|
BLUR_VAR_THRESH = 80.0 |
|
|
|
|
|
|
|
|
ROI_KEEPALIVE_GRACE_SEC = 1.2 |
|
|
last_roi_active_until = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_TTL = 3.0 |
|
|
PIN_GRACE_SEC = 1.2 |
|
|
MAX_OVERLAYS = 300 |
|
|
|
|
|
HARD_MAX_LIFETIME = 9.0 |
|
|
no_repeat_until_ts = 2.0 |
|
|
|
|
|
IGNORE_HARD_CAP_WHILE_FINGER_IN_ROI = True |
|
|
PRUNE_TIMEOUT_SEC =0.5 |
|
|
|
|
|
|
|
|
MERGE_IOU_TH = 0.50 |
|
|
MERGE_CENTER_DIST = 28.0 |
|
|
|
|
|
|
|
|
TTS_ENABLE = True |
|
|
TTS_CONF = 0.0 |
|
|
TTS_REPEAT_SEC = 1.0 |
|
|
|
|
|
TTS_TARGET_STICKY_SEC = 0.6 |
|
|
|
|
|
|
|
|
|
|
|
STRICT_DICT_ONLY = True |
|
|
TTS_CONF_FALLBACK = 0.35 |
|
|
|
|
|
SHOW_TTS_HINT = True |
|
|
tts_current_display = "" |
|
|
tts_current_note = "" |
|
|
tts_last_spoken_text = "" |
|
|
|
|
|
|
|
|
INFO_FORCE_IMMEDIATE = False |
|
|
tts_force_lock = threading.Lock() |
|
|
|
|
|
|
|
|
|
|
|
MAX_OCR_LONG = 420 |
|
|
ENHANCE_MODE = "off" |
|
|
MOTION_GATE_PX = 2.0 |
|
|
MAX_TEXT_DRAW = 30 |
|
|
|
|
|
|
|
|
FLOW_DS = 0.45 |
|
|
FLOW_MAX_CORNERS=240; FLOW_QUALITY=0.01; FLOW_MIN_DISTANCE=7 |
|
|
FLOW_WINSIZE=(21,21); FLOW_LEVELS=3 |
|
|
RESEED_INTERVAL_FRAMES=8 |
|
|
|
|
|
MAX_TRANS_PX = 90 |
|
|
MAX_SCALE_STEP = 0.18 |
|
|
MAX_ROT_STEP_DEG = 10.0 |
|
|
EMA_ALPHA_SIM = 0.28 |
|
|
|
|
|
USE_ORB_FALLBACK = True |
|
|
ORB_NFEATURES=600; ORB_MIN_GOOD=45 |
|
|
|
|
|
|
|
|
EMA_ALPHA_FINGER=0.35 |
|
|
FINGER_STALE_MS = 800 |
|
|
finger_last_seen = 0.0 |
|
|
|
|
|
last_finger_xy = None |
|
|
|
|
|
|
|
|
YOLO_DEBUG = True |
|
|
YOLO_DRAW_ALL = True |
|
|
YOLO_IMG_SIZE = 640 |
|
|
YOLO_CONF_TH = 0.25 |
|
|
YOLO_IOU_TH = 0.50 |
|
|
YOLO_CLASS_NAME = None |
|
|
YOLO_CLASS_ID = 0 |
|
|
|
|
|
YOLO_WEIGHTS = r'weights/fingertip.pt' |
|
|
|
|
|
YOLO_SHOW_INPUT = False |
|
|
YOLO_INPUT_WIN = 'YOLO_INPUT' |
|
|
|
|
|
DO_PIC=True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
USE_KLT_FALLBACK = True |
|
|
KLT_WIN=(31,31); KLT_LEVELS=4 |
|
|
KLT_TERM=(cv2.TERM_CRITERIA_EPS|cv2.TERM_CRITERIA_COUNT, 20, 0.03) |
|
|
KLT_FB_MAX=4.0; KLT_ERR_MAX=100.0; KLT_STEP_MAX=30.0 |
|
|
KLT_OUT_MARGIN=4; KLT_N_SAMPLES=12; KLT_RING_R=10 |
|
|
|
|
|
KLT_MIN_GOOD=5; KLT_LOSS_GRACE=3 |
|
|
KLT_USE_CLAHE=True |
|
|
|
|
|
klt_pts_prev=None; klt_lost_frames=0; |
|
|
|
|
|
|
|
|
OCR_EMA=None; OCR_EMA_ALPHA=0.25 |
|
|
|
|
|
|
|
|
|
|
|
DICT_MERGE_SCORE = 70 |
|
|
DICT_TIE_DELTA = 3 |
|
|
DICT_ONLY =True |
|
|
|
|
|
|
|
|
GUIDE_MODE = False |
|
|
GUIDE_TARGET = None |
|
|
GUIDE_TOL_PX = 40 |
|
|
GUIDE_REPEAT_SEC = 1.0 |
|
|
GUIDE_LAST_TS = 0.0 |
|
|
GUIDE_LAST_SENT = "" |
|
|
GUIDE_TARGET_ITEM = None |
|
|
GUIDE_REQUIRE_FINGER = True |
|
|
|
|
|
|
|
|
MODE_OP = 1 |
|
|
MODE_INFO = 2 |
|
|
MODE_GUIDE = 3 |
|
|
|
|
|
mode_lock = threading.Lock() |
|
|
mode_state = MODE_OP |
|
|
|
|
|
|
|
|
INFO_PERIOD_SEC = 5.0 |
|
|
|
|
|
|
|
|
_next_info_due = 0.0 |
|
|
|
|
|
|
|
|
_latest_frame_for_info = None |
|
|
_latest_frame_lock = threading.Lock() |
|
|
|
|
|
|
|
|
_info_stop = threading.Event() |
|
|
|
|
|
def _is_speaker_busy() -> bool: |
|
|
try: |
|
|
import pygame |
|
|
return pygame.mixer.music.get_busy() |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def _say_once(text: str): |
|
|
"""한 문장만 안전하게 재생(비동기 TTS) + 표시 유지. |
|
|
- 재생 시작을 잠깐 대기(최대 2s 시도) |
|
|
- 끝날 때까지 폴링(최대 30s), 그 후 target만 None으로 지워 재반복 차단 |
|
|
- HUD는 tts_last_spoken_text로 마지막 발화를 계속 보여줌 |
|
|
""" |
|
|
global no_repeat_until_ts |
|
|
|
|
|
t_start = time.time() |
|
|
set_tts_target(text) |
|
|
no_repeat_until_ts = time.time() + 60.0 |
|
|
|
|
|
|
|
|
while not _info_stop.is_set(): |
|
|
if _is_speaker_busy(): |
|
|
break |
|
|
if '_last_spoken_enqueue_ts' in globals() and _last_spoken_enqueue_ts >= t_start: |
|
|
time.sleep(0.1) |
|
|
break |
|
|
if (time.time() - t_start) > 2.0: |
|
|
break |
|
|
time.sleep(0.02) |
|
|
|
|
|
|
|
|
t0 = time.time() |
|
|
while _is_speaker_busy() and not _info_stop.is_set(): |
|
|
if (time.time() - t0) > 30.0: |
|
|
break |
|
|
time.sleep(0.05) |
|
|
|
|
|
|
|
|
set_tts_target(None) |
|
|
no_repeat_until_ts = 0.0 |
|
|
|
|
|
def announce_force_async(text: str, after=None): |
|
|
"""모드 전환 전용: 현재 재생 중이어도 즉시 중단하고 text부터 발화.""" |
|
|
def _runner(): |
|
|
with tts_force_lock: |
|
|
if TTS_ENABLE and tts is not None: |
|
|
try: tts.clear_queue() |
|
|
except Exception: pass |
|
|
for m in ("stop","cancel","flush"): |
|
|
if hasattr(tts, m): |
|
|
try: getattr(tts, m)() |
|
|
except Exception: pass |
|
|
_say_once(text) |
|
|
if callable(after): |
|
|
try: after() |
|
|
except Exception: pass |
|
|
threading.Thread(target=_runner, daemon=True).start() |
|
|
|
|
|
def _enter_op_mode(): |
|
|
|
|
|
def _after(): |
|
|
global mode_state |
|
|
with mode_lock: |
|
|
mode_state = MODE_OP |
|
|
globals().update(GUIDE_MODE=False) |
|
|
announce_force_async("조작 모드로 전환합니다.", after=_after) |
|
|
|
|
|
def _enter_info_mode(): |
|
|
|
|
|
def _after(): |
|
|
global mode_state, _next_info_due, INFO_FORCE_IMMEDIATE |
|
|
with mode_lock: |
|
|
mode_state = MODE_INFO |
|
|
_next_info_due = 0.0 |
|
|
INFO_FORCE_IMMEDIATE = True |
|
|
globals().update(GUIDE_MODE=False) |
|
|
announce_force_async("보기 모드로 전환합니다. 지금부터 상황을 설명합니다.", after=_after) |
|
|
|
|
|
|
|
|
def _enter_guide_mode(): |
|
|
|
|
|
def _after(): |
|
|
global mode_state, GUIDE_MODE |
|
|
with mode_lock: |
|
|
mode_state = MODE_GUIDE |
|
|
GUIDE_MODE = True |
|
|
announce_force_async("안내 모드로 전환합니다. 목표를 지정해 주세요.", after=_after) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _info_worker(): |
|
|
"""보기 모드: 진입 즉시 1회, 이후 5초마다. 말하는 중이면 '말 끝 + 2초' 후 실행 |
|
|
단, 모드 진입 멘트 직후 1회는 지연 없이 곧바로 요약.""" |
|
|
global _next_info_due, INFO_FORCE_IMMEDIATE |
|
|
|
|
|
while not _info_stop.is_set(): |
|
|
time.sleep(0.05) |
|
|
|
|
|
with mode_lock: |
|
|
info_on = (mode_state == MODE_INFO) |
|
|
if not info_on: |
|
|
_next_info_due = 0.0 |
|
|
continue |
|
|
|
|
|
now = time.time() |
|
|
if now < _next_info_due: |
|
|
continue |
|
|
|
|
|
|
|
|
was_busy = False |
|
|
while _is_speaker_busy() and not _info_stop.is_set(): |
|
|
was_busy = True |
|
|
time.sleep(0.05) |
|
|
|
|
|
|
|
|
|
|
|
if was_busy: |
|
|
if INFO_FORCE_IMMEDIATE: |
|
|
|
|
|
INFO_FORCE_IMMEDIATE = False |
|
|
else: |
|
|
_next_info_due = time.time() + 2.0 |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with _latest_frame_lock: |
|
|
frame = None if _latest_frame_for_info is None else _latest_frame_for_info.copy() |
|
|
|
|
|
if frame is not None: |
|
|
try: |
|
|
summary = seeing.summarize_scene(frame, easy_reader,do_pic=DO_PIC, debug_dir=r"logs/ocr_bbox",debug_font=r"C:\Windows\Fonts\malgun.ttf") |
|
|
except Exception as e: |
|
|
print("[INFO] summarize failed:", e) |
|
|
summary = None |
|
|
|
|
|
if summary: |
|
|
_say_once(summary) |
|
|
|
|
|
|
|
|
_next_info_due = time.time() + INFO_PERIOD_SEC |
|
|
|
|
|
|
|
|
USE_STT = True |
|
|
try: |
|
|
import speech_recognition as sr |
|
|
_STT_OK = True |
|
|
except Exception as _e: |
|
|
print(f"[STT] disabled: {_e}") |
|
|
_STT_OK = False |
|
|
|
|
|
|
|
|
def torch_cuda_ok(): |
|
|
try: |
|
|
import torch |
|
|
ok = bool(torch.cuda.is_available()) |
|
|
print(f"[GPU] torch CUDA available: {ok}") |
|
|
return ok |
|
|
except Exception as e: |
|
|
print(f"[GPU] torch check failed: {e}") |
|
|
return False |
|
|
|
|
|
gpu_ok = torch_cuda_ok() |
|
|
|
|
|
OCR_ENGINE=None; easy_reader=None |
|
|
import easyocr |
|
|
try: |
|
|
easy_reader = easyocr.Reader(['ko'], gpu=gpu_ok, |
|
|
model_storage_directory='models', |
|
|
user_network_directory='user_network', |
|
|
recog_network='best_accuracy1', |
|
|
download_enabled=False) |
|
|
OCR_ENGINE = 'easyocr_gpu' if gpu_ok else 'easyocr_cpu' |
|
|
print(f"[OCR] EasyOCR (GPU={gpu_ok})") |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise SystemExit("No OCR engine available") |
|
|
|
|
|
|
|
|
def _start_info_thread_once(): |
|
|
if not hasattr(_start_info_thread_once, "_started"): |
|
|
threading.Thread(target=_info_worker, daemon=True).start() |
|
|
_start_info_thread_once._started = True |
|
|
_start_info_thread_once() |
|
|
|
|
|
|
|
|
try: |
|
|
from ultralytics import YOLO |
|
|
yolo_device = 0 if gpu_ok else 'cpu' |
|
|
yolo_model = YOLO(YOLO_WEIGHTS) |
|
|
print(f"[YOLO] Loaded: {YOLO_WEIGHTS} (device={yolo_device})") |
|
|
class_names = yolo_model.names |
|
|
if YOLO_CLASS_NAME: |
|
|
inv = {str(v).lower(): int(k) for k, v in class_names.items()} |
|
|
if YOLO_CLASS_NAME.lower() in inv: |
|
|
YOLO_CLASS_ID = inv[YOLO_CLASS_NAME.lower()] |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise SystemExit("[YOLO] 모델 로드 실패. YOLO_WEIGHTS 경로/파일 확인") |
|
|
|
|
|
def _pick_best_tip(cands, last_xy): |
|
|
if not cands: return None |
|
|
if last_xy is None: |
|
|
return max(cands, key=lambda t: t[2]) |
|
|
lx, ly = last_xy |
|
|
def score(t): |
|
|
cx, cy, conf, _ = t |
|
|
d2 = (cx-lx)**2 + (cy-ly)**2 |
|
|
return conf - 0.0005*d2 |
|
|
return max(cands, key=score) |
|
|
|
|
|
|
|
|
yolo_in_q=queue.Queue(maxsize=1); yolo_out_q=queue.Queue(maxsize=1); yolo_stop=threading.Event() |
|
|
def _yolo_worker(): |
|
|
while not yolo_stop.is_set(): |
|
|
try: |
|
|
frame = yolo_in_q.get(timeout=0.2) |
|
|
except queue.Empty: |
|
|
continue |
|
|
yolo_in_vis, _r, _off = _yolo_letterbox_bgr(frame, YOLO_IMG_SIZE) |
|
|
res = yolo_model.predict(source=frame, imgsz=YOLO_IMG_SIZE, |
|
|
conf=YOLO_CONF_TH, iou=YOLO_IOU_TH, |
|
|
device=yolo_device, verbose=False) |
|
|
det=None; raw_boxes=[] |
|
|
if res and res[0].boxes is not None and len(res[0].boxes) > 0: |
|
|
cands=[] |
|
|
for b in res[0].boxes: |
|
|
x1,y1,x2,y2 = b.xyxy[0].tolist() |
|
|
conf = float(b.conf[0]) if b.conf is not None else 0.0 |
|
|
cls_id = int(b.cls[0]) if b.cls is not None else 0 |
|
|
raw_boxes.append((x1,y1,x2,y2,conf,cls_id)) |
|
|
if YOLO_CLASS_ID is not None and cls_id != YOLO_CLASS_ID: continue |
|
|
cx, cy = (x1+x2)/2.0, (y1+y2)/2.0 |
|
|
cands.append((cx, cy, conf, (x1, y1, x2-x1, y2-y1))) |
|
|
best=_pick_best_tip(cands, last_finger_xy) |
|
|
if best is not None: |
|
|
cx, cy, conf, (x,y,w,h) = best |
|
|
det={'xy':(int(round(cx)), int(round(cy))), |
|
|
'box':(int(x), int(y), int(w), int(h)), |
|
|
'conf':conf, 'ts':time.time(), |
|
|
'raw_boxes':raw_boxes, 'yolo_in':yolo_in_vis} |
|
|
else: |
|
|
det={'xy':None, 'raw_boxes':[], 'yolo_in':yolo_in_vis} |
|
|
try: |
|
|
while True: yolo_out_q.get_nowait() |
|
|
except queue.Empty: |
|
|
pass |
|
|
try: yolo_out_q.put_nowait(det) |
|
|
except queue.Full: pass |
|
|
threading.Thread(target=_yolo_worker, daemon=True).start() |
|
|
|
|
|
def _yolo_letterbox_bgr(img, new_size=YOLO_IMG_SIZE, pad_val=114): |
|
|
h, w = img.shape[:2] |
|
|
r = min(new_size / float(h), new_size / float(w)) |
|
|
new_w, new_h = int(round(w*r)), int(round(h*r)) |
|
|
resized = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR) |
|
|
dw = (new_size - new_w) / 2.0; dh = (new_size - new_h) / 2.0 |
|
|
left, right = int(math.floor(dw)), int(math.ceil(dw)) |
|
|
top, bottom = int(math.floor(dh)), int(math.ceil(dh)) |
|
|
out = cv2.copyMakeBorder(resized, top, bottom, left, right, |
|
|
borderType=cv2.BORDER_CONSTANT, |
|
|
value=(pad_val, pad_val, pad_val)) |
|
|
return out, r, (left, top) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tts_stop=threading.Event() |
|
|
tts_target_lock=threading.Lock() |
|
|
tts_target_text=None |
|
|
_last_spoken_enqueue_ts=0.0 |
|
|
|
|
|
tts_last_seen_target_ts=0.0 |
|
|
|
|
|
SPELLFIX_ENABLE=True |
|
|
JAMO_THRESHOLD=85; JAMO_THRESHOLD_LOWCONF=80 |
|
|
|
|
|
|
|
|
DICT_WORDS=["통살균","원격제어","예약","내마음","세탁","헹굼","탈수","물온도","빨래추가","알림음","구김방지","터보샷", "강력물살","온수세탁","냉수세탁","물높이","코스","동작/일시정지","전원"] |
|
|
CANON={"표준세탁":"세탁","손세탁":"세탁"} |
|
|
|
|
|
def _to_jamo(s: str) -> str: |
|
|
try: |
|
|
return j2hcj(h2j(s)) |
|
|
except Exception: |
|
|
return s or "" |
|
|
|
|
|
if _SPELLFIX_OK: |
|
|
_DICT_JAMO=[_to_jamo(w) for w in DICT_WORDS] |
|
|
else: |
|
|
_DICT_JAMO=[] |
|
|
|
|
|
_TOKENIZER=re.compile(r"[가-힣A-Za-z0-9]+|[^\s가-힣A-Za-z0-9]") |
|
|
|
|
|
def correct_token(tok: str, threshold: int): |
|
|
if not (_SPELLFIX_OK and SPELLFIX_ENABLE and _DICT_JAMO and tok): |
|
|
return tok, 0.0 |
|
|
q=_to_jamo(tok) |
|
|
res=process.extractOne(q, _DICT_JAMO, scorer=fuzz.ratio, score_cutoff=threshold) |
|
|
if not res: return tok, 0.0 |
|
|
matched, score, idx=res |
|
|
best=DICT_WORDS[idx]; best=CANON.get(best, best) |
|
|
return best, float(score) |
|
|
|
|
|
def correct_text(text: str, threshold: int): |
|
|
if not (_SPELLFIX_OK and SPELLFIX_ENABLE and _DICT_JAMO and text): |
|
|
return text, False |
|
|
out=[]; changed=False |
|
|
for tok in _TOKENIZER.findall(text): |
|
|
if re.match(r"^[가-힣A-Za-z0-9]+$", tok): |
|
|
fixed, sc = correct_token(tok, threshold=threshold) |
|
|
if fixed!=tok: changed=True |
|
|
out.append(fixed) |
|
|
else: |
|
|
out.append(tok) |
|
|
return "".join(out), changed |
|
|
|
|
|
DICT_SPEAK_ENABLE=True |
|
|
DICT_THRESHOLD=80; DICT_THRESHOLD_LOWCONF=80 |
|
|
def _build_dict_index(words, canon_map): |
|
|
keys=[]; vals=[] |
|
|
for w in words: keys.append(w); vals.append(canon_map.get(w,w)) |
|
|
for alias, canon in canon_map.items(): keys.append(alias); vals.append(canon) |
|
|
keys_j=[_to_jamo(re.sub(r"\s+","",k)) for k in keys] |
|
|
return keys, keys_j, vals |
|
|
_DICT_KEYS, _DICT_KEYS_J, _DICT_VALS=_build_dict_index(DICT_WORDS, CANON) |
|
|
|
|
|
def _normalize_plain(s:str)->str: return re.sub(r"\s+","",(s or "")).casefold() |
|
|
def map_to_dict_canon(text: str, threshold: int): |
|
|
if not DICT_SPEAK_ENABLE or not text: return None, 0.0 |
|
|
if _SPELLFIX_OK: |
|
|
queries=[] |
|
|
s=re.sub(r"\s+","",text) |
|
|
if s: queries.append(_to_jamo(s)) |
|
|
for tok in _TOKENIZER.findall(text): |
|
|
if re.match(r"^[가-힣A-Za-z0-9]+$", tok): queries.append(_to_jamo(tok)) |
|
|
best_idx, best_sc=-1, 0.0 |
|
|
for q in queries: |
|
|
res=process.extractOne(q, _DICT_KEYS_J, scorer=fuzz.ratio, score_cutoff=threshold) |
|
|
if res: |
|
|
_, sc, idx=res |
|
|
if sc>best_sc: |
|
|
best_sc=float(sc); best_idx=int(idx) |
|
|
if best_idx>=0: return _DICT_VALS[best_idx], best_sc |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return None, 0.0 |
|
|
|
|
|
def enrich_with_dict(text: str, conf: float): |
|
|
canon, sc = map_to_dict_canon(text, threshold=DICT_MERGE_SCORE) |
|
|
display = canon if canon else text |
|
|
return display, canon, float(sc or 0.0), float(conf or 0.0) |
|
|
|
|
|
def _has_korean(s: str)->bool: |
|
|
return any('가'<=ch<='힣' for ch in (s or "")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
import tempfile, os |
|
|
try: |
|
|
tts=TTSReader(cooldown_sec=TTS_REPEAT_SEC, speaking_rate=1.05, pitch=0.0, |
|
|
min_len=2, credentials_path=r"yugpae-4f8335e15ba0.json", |
|
|
cache_dir=None, persist_cache=False) |
|
|
except TypeError: |
|
|
tts=TTSReader(cooldown_sec=TTS_REPEAT_SEC, speaking_rate=1.05, pitch=0.0, |
|
|
min_len=2, credentials_path=r"yugpae-4f8335e15ba0.json") |
|
|
for attr in ("set_cache","disable_cache"): |
|
|
if hasattr(tts, attr): |
|
|
try: getattr(tts, attr)(persist=False, dir=None) |
|
|
except Exception: pass |
|
|
try: |
|
|
if not (hasattr(tts,"cache_dir") and getattr(tts,"cache_dir") is None): |
|
|
tmp_cache=os.path.join(tempfile.gettempdir(),"tts_runtime_cache") |
|
|
os.makedirs(tmp_cache, exist_ok=True) |
|
|
if hasattr(tts,"cache_dir"): tts.cache_dir=tmp_cache |
|
|
except Exception: pass |
|
|
except Exception as e: |
|
|
print(f"[TTS] init failed: {e}") |
|
|
TTS_ENABLE=False |
|
|
tts=None |
|
|
|
|
|
try: |
|
|
import pygame |
|
|
if not pygame.mixer.get_init(): pygame.mixer.init() |
|
|
pygame.mixer.music.set_volume(1.0) |
|
|
except Exception: pass |
|
|
|
|
|
def tts_scheduler(): |
|
|
global _last_spoken_enqueue_ts, tts_last_spoken_text, no_repeat_until_ts |
|
|
last_sent_text = None |
|
|
while not tts_stop.is_set(): |
|
|
time.sleep(0.05) |
|
|
if not TTS_ENABLE or tts is None: |
|
|
continue |
|
|
|
|
|
with tts_target_lock: |
|
|
tgt = (tts_target_text or "").strip() |
|
|
|
|
|
|
|
|
if not tgt: |
|
|
last_sent_text = None |
|
|
continue |
|
|
|
|
|
now = time.time() |
|
|
|
|
|
|
|
|
if _is_speaker_busy(): |
|
|
continue |
|
|
|
|
|
|
|
|
if tgt == last_sent_text and now < no_repeat_until_ts: |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
tts.say(tgt) |
|
|
tts_last_spoken_text = tgt |
|
|
_last_spoken_enqueue_ts = now |
|
|
|
|
|
last_sent_text = tgt |
|
|
except Exception as e: |
|
|
print(f"[TTS] error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
if TTS_ENABLE: |
|
|
threading.Thread(target=tts_scheduler, daemon=True).start() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
last_text="" |
|
|
|
|
|
def set_tts_target(text_or_none, note: str="", |
|
|
*, force: bool=False): |
|
|
"""TTS 타겟 갱신. |
|
|
- force=True : 지금 재생 중단(큐 비우고 stop/cancel/flush) 후 새 타겟 적용 |
|
|
- bypass_repeat=True: 같은 문장 반복 억제 타이머 무시(바로 재발화) |
|
|
- speak_now=True : 스케줄러 기다리지 않고 즉시 say() 실행 |
|
|
""" |
|
|
global tts_target_text, tts_current_display, tts_current_note |
|
|
global no_repeat_until_ts, _last_spoken_enqueue_ts |
|
|
global last_text |
|
|
|
|
|
with tts_target_lock: |
|
|
tts_target_text = text_or_none |
|
|
tts_current_display = (text_or_none or "").strip() |
|
|
tts_current_note = note or "" |
|
|
|
|
|
|
|
|
if force and ("tts" in globals()) and (tts is not None) and last_text != text_or_none: |
|
|
last_text=text_or_none |
|
|
try: |
|
|
if hasattr(tts, "clear_queue"): tts.clear_queue() |
|
|
for m in ("stop","cancel","flush"): |
|
|
if hasattr(tts, m): |
|
|
try: getattr(tts, m)() |
|
|
except Exception: pass |
|
|
except Exception: |
|
|
pass |
|
|
_last_spoken_enqueue_ts = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(CAMERA_ID, cv2.CAP_DSHOW) if cv2.getBuildInformation().find('Windows')!=-1 else cv2.VideoCapture(CAMERA_ID) |
|
|
if not cap.isOpened(): raise SystemExit("카메라 열기 실패") |
|
|
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) |
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAPTURE_TARGET_W) |
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAPTURE_TARGET_H) |
|
|
cap.set(cv2.CAP_PROP_FPS, 30) |
|
|
try: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) |
|
|
except: pass |
|
|
time.sleep(0.15) |
|
|
Wc=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); Hc=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
print(f"[Camera] requested ~{CAPTURE_TARGET_W}x{CAPTURE_TARGET_H}, actual {Wc}x{Hc}") |
|
|
|
|
|
WORK_SCALE=min(1.0, WORK_WIDTH_TARGET/float(Wc)) |
|
|
print(f"[Work] WORK_SCALE={WORK_SCALE:.3f} (work width ~{int(Wc*WORK_SCALE)})") |
|
|
|
|
|
|
|
|
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL) |
|
|
frame_idx=0 |
|
|
prev_gray_s=None; prev_pts=None |
|
|
overlays=[]; last_prune=time.time() |
|
|
|
|
|
|
|
|
last_ocr_time=0.0 |
|
|
last_roi=None |
|
|
|
|
|
|
|
|
orb=None; bf=None |
|
|
if USE_ORB_FALLBACK: |
|
|
orb=cv2.ORB_create(nfeatures=ORB_NFEATURES) |
|
|
bf=cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) |
|
|
|
|
|
|
|
|
_next_oid=1 |
|
|
def alloc_oid(): |
|
|
global _next_oid |
|
|
oid=_next_oid; _next_oid+=1; return oid |
|
|
|
|
|
def clamp_rect(x,y,w,h,W,H): |
|
|
x=max(0,min(x,W-1)); y=max(0,min(y,H-1)) |
|
|
w=max(1,min(w,W-x)); h=max(1,min(h,H-y)) |
|
|
return x,y,w,h |
|
|
|
|
|
def poly_center(poly): return np.mean(poly,axis=0) |
|
|
|
|
|
def bbox_of_poly(poly): |
|
|
x1=float(np.min(poly[:,0])); y1=float(np.min(poly[:,1])) |
|
|
x2=float(np.max(poly[:,0])); y2=float(np.max(poly[:,1])) |
|
|
return (x1,y1,x2-x1,y2-y1) |
|
|
|
|
|
def variance_of_laplacian(g): return cv2.Laplacian(g, cv2.CV_64F).var() |
|
|
|
|
|
def rect_contains(outer, inner, tol=2.0): |
|
|
ox, oy, ow, oh = outer |
|
|
ix, iy, iw, ih = inner |
|
|
return (ix >= ox - tol) and (iy >= oy - tol) and \ |
|
|
(ix + iw <= ox + ow + tol) and (iy + ih <= oy + oh + tol) |
|
|
|
|
|
def _canon_equal(a: str, b: str) -> bool: |
|
|
a = (a or "").strip(); b = (b or "").strip() |
|
|
if not a or not b: return False |
|
|
try: |
|
|
return _normalize_plain(a) == _normalize_plain(b) |
|
|
except Exception: |
|
|
import re |
|
|
aa = re.sub(r"\s+","",a).casefold() |
|
|
bb = re.sub(r"\s+","",b).casefold() |
|
|
return aa == bb |
|
|
|
|
|
|
|
|
def iou(a,b): |
|
|
ax,ay,aw,ah=a; bx,by,bw,bh=b |
|
|
ax2,ay2=ax+aw,ay+ah; bx2,by2=bx+bw,by+bh |
|
|
ix1,iy1=max(ax,bx),max(ay,by) |
|
|
ix2,iy2=min(ax2,bx2),min(ay2,by2) |
|
|
iw,ih=max(0,ix2-ix1),max(0,iy2-iy1) |
|
|
inter=iw*ih; union=aw*ah+bw*bh-inter+1e-9 |
|
|
return inter/union |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_visible_in_view(poly, W, H, min_overlap=0.7): |
|
|
x, y, w, h = bbox_of_poly(poly) |
|
|
x1, y1, x2, y2 = x, y, x+w, y+h |
|
|
vx1, vy1, vx2, vy2 = 0, 0, W, H |
|
|
ix1, iy1 = max(x1, vx1), max(y1, vy1) |
|
|
ix2, iy2 = min(x2, vx2), min(y2, vy2) |
|
|
iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) |
|
|
inter = iw * ih; area = max(1.0, w * h) |
|
|
return (inter / area) >= min_overlap |
|
|
|
|
|
def draw_overlays(frame, items, now_ts): |
|
|
H, W = frame.shape[:2] |
|
|
to_draw=[] |
|
|
for it in items: |
|
|
if is_visible_in_view(it['poly'], W, H, min_overlap=0.7): |
|
|
it['last_seen']=now_ts |
|
|
to_draw.append(it) |
|
|
to_draw=to_draw[:MAX_TEXT_DRAW] |
|
|
for it in to_draw: |
|
|
cv2.polylines(frame, [it['poly'].astype(int)], True, (255,165,0), 2, cv2.LINE_AA) |
|
|
|
|
|
img_rgb=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
pil=Image.fromarray(img_rgb); draw=ImageDraw.Draw(pil) |
|
|
font_path=None |
|
|
for p in [r"C:\Windows\Fonts\malgun.ttf", r"C:\Windows\Fonts\NanumGothic.ttf", |
|
|
r"C:\Windows\Fonts\NotoSansCJKkr-Regular.otf", |
|
|
"/usr/share/fonts/truetype/noto/NotoSansCJKkr-Regular.ttc"]: |
|
|
if os.path.isfile(p): font_path=p; break |
|
|
font=ImageFont.truetype(font_path, 22) if font_path else ImageFont.load_default() |
|
|
|
|
|
for it in to_draw: |
|
|
poly=it['poly'].astype(int) |
|
|
x=int(np.min(poly[:,0])); y=int(np.min(poly[:,1]))-6 |
|
|
draw.text((x, max(0,y)), f"{it['text']} ({it['conf']:.2f})", |
|
|
font=font, fill=(255,255,255), stroke_width=2, stroke_fill=(0,0,0)) |
|
|
frame[:]=cv2.cvtColor(np.array(pil), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
def prune_overlays(items, now, active_roi=None): |
|
|
def center_in_roi(c, roi): |
|
|
if roi is None: return False |
|
|
rx,ry,rw,rh = roi |
|
|
return (rx<=c[0]<=rx+rw) and (ry<=c[1]<=ry+rh) |
|
|
|
|
|
kept=[] |
|
|
for it in items: |
|
|
pinned = (now <= it.get('pin_until', 0.0)) |
|
|
if pinned: |
|
|
kept.append(it); continue |
|
|
birth = it.get('time', now) |
|
|
alive_by_ttl = (now <= it.get('expiry', 0.0)) |
|
|
if IGNORE_HARD_CAP_WHILE_FINGER_IN_ROI and active_roi is not None: |
|
|
c = poly_center(it['poly']) |
|
|
if center_in_roi(c, active_roi): |
|
|
if alive_by_ttl: |
|
|
kept.append(it) |
|
|
continue |
|
|
under_hard_cap = ((now - birth) <= HARD_MAX_LIFETIME) |
|
|
if alive_by_ttl and under_hard_cap: |
|
|
kept.append(it) |
|
|
|
|
|
if len(kept) > MAX_OVERLAYS: |
|
|
kept = sorted( |
|
|
kept, |
|
|
key=lambda d: max(d.get('expiry', 0.0), d.get('pin_until', 0.0)), |
|
|
reverse=True |
|
|
)[:MAX_OVERLAYS] |
|
|
return kept |
|
|
|
|
|
|
|
|
def closest_rotation(A): |
|
|
U, _, Vt = np.linalg.svd(A); R = U @ Vt |
|
|
if np.linalg.det(R) < 0: Vt[-1,:]*=-1; R = U @ Vt |
|
|
return R |
|
|
def project_to_similarity(M): |
|
|
A=M[:,:2]; R=closest_rotation(A) |
|
|
s=float(np.trace(A.T@R)/2.0); t=M[:,2].reshape(2) |
|
|
return s, R, t |
|
|
def angle_from_R(R): return math.atan2(R[1,0], R[0,0]) |
|
|
def build_similarity(s, theta): |
|
|
c, n = math.cos(theta), math.sin(theta) |
|
|
A=np.array([[c,-n],[n,c]], dtype=np.float32)*float(s) |
|
|
return A |
|
|
|
|
|
SIM_HIST_MAX=240 |
|
|
sim_steps=[] |
|
|
|
|
|
def _rect_aabb_after_M(rect, M3, W, H): |
|
|
x,y,w,h=rect |
|
|
corners=np.array([[x,y],[x+w,y],[x+w,y+h],[x,y+h]], dtype=np.float32) |
|
|
tc=(corners @ M3[:2,:2].T)+M3[:2,2] |
|
|
minx,miny=float(np.min(tc[:,0])), float(np.min(tc[:,1])) |
|
|
maxx,maxy=float(np.max(tc[:,0])), float(np.max(tc[:,1])) |
|
|
rx=int(max(0,minx)); ry=int(max(0,miny)) |
|
|
rh=int(max(1,min(H-1,maxy)-ry)); rw=int(max(1,min(W-1,maxx)-rx)) |
|
|
return (rx,ry,rw,rh) |
|
|
|
|
|
def estimate_similarity_small(prev_gray_s, gray_s, prev_pts): |
|
|
if prev_pts is None or len(prev_pts) < 140: |
|
|
prev_pts=cv2.goodFeaturesToTrack(prev_gray_s, maxCorners=FLOW_MAX_CORNERS, |
|
|
qualityLevel=FLOW_QUALITY, minDistance=FLOW_MIN_DISTANCE, blockSize=7) |
|
|
if prev_pts is None: return None, None |
|
|
next_pts, st, err=cv2.calcOpticalFlowPyrLK(prev_gray_s, gray_s, prev_pts, None, |
|
|
winSize=FLOW_WINSIZE, maxLevel=FLOW_LEVELS, |
|
|
criteria=(cv2.TERM_CRITERIA_EPS|cv2.TERM_CRITERIA_COUNT,12,0.03)) |
|
|
if next_pts is None: return None, None |
|
|
P=prev_pts[st==1].reshape(-1,1,2); Q=next_pts[st==1].reshape(-1,1,2) |
|
|
if len(P) < 60: return None, None |
|
|
M,_=cv2.estimateAffinePartial2D(P,Q,method=cv2.RANSAC, |
|
|
ransacReprojThreshold=3.0, maxIters=1500, confidence=0.99) |
|
|
if M is None: return None, None |
|
|
return M, next_pts |
|
|
|
|
|
def transform_overlays_similarity(items, s, theta, t_s): |
|
|
tx=float(t_s[0])/FLOW_DS; ty=float(t_s[1])/FLOW_DS |
|
|
step_mag=math.hypot(tx,ty) |
|
|
if step_mag>MAX_TRANS_PX: |
|
|
scale=MAX_TRANS_PX/(step_mag+1e-6) |
|
|
tx*=scale; ty*=scale |
|
|
A=build_similarity(s, theta).astype(np.float32) |
|
|
for it in items: |
|
|
pts=it['poly'].astype(np.float32) |
|
|
it['poly']=(pts@A.T)+np.array([tx,ty], dtype=np.float32) |
|
|
|
|
|
def orb_similarity(prev_g, cur_g): |
|
|
kp1, des1 = orb.detectAndCompute(prev_g, None) |
|
|
kp2, des2 = orb.detectAndCompute(cur_g, None) |
|
|
if des1 is None or des2 is None or len(kp1)<8 or len(kp2)<8: return None |
|
|
matches=bf.knnMatch(des1, des2, k=2) |
|
|
good=[] |
|
|
for mn in matches: |
|
|
if len(mn)==2: |
|
|
m,n=mn |
|
|
if m.distance < 0.75*n.distance: good.append(m) |
|
|
if len(good) < ORB_MIN_GOOD: return None |
|
|
src=np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1,1,2) |
|
|
dst=np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1,1,2) |
|
|
M,_=cv2.estimateAffinePartial2D(src,dst,method=cv2.RANSAC, |
|
|
ransacReprojThreshold=3.0,maxIters=1500,confidence=0.99) |
|
|
return M |
|
|
|
|
|
|
|
|
def _build_gray_for_klt(gray): |
|
|
g=gray |
|
|
if KLT_USE_CLAHE: |
|
|
clahe=cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) |
|
|
g=clahe.apply(g) |
|
|
gx=cv2.Sobel(g, cv2.CV_32F, 1, 0, ksize=3) |
|
|
gy=cv2.Sobel(g, cv2.CV_32F, 0, 1, ksize=3) |
|
|
mag=cv2.magnitude(gx,gy) |
|
|
if mag.max()>0: mag=(mag/mag.max())*255.0 |
|
|
return mag.astype(np.uint8) |
|
|
|
|
|
def _klt_seed_ring(center, n=KLT_N_SAMPLES, r=KLT_RING_R): |
|
|
cx, cy = float(center[0]), float(center[1]) |
|
|
pts=[(cx,cy)] |
|
|
for k in range(n): |
|
|
a=2.0*math.pi*k/float(n) |
|
|
pts.append((cx+r*math.cos(a), cy+r*math.sin(a))) |
|
|
return np.array(pts, dtype=np.float32).reshape(-1,1,2) |
|
|
|
|
|
def _in_bounds(pt, W, H, margin=0): |
|
|
x,y=float(pt[0]), float(pt[1]) |
|
|
return (-margin<=x<=(W-1+margin)) and (-margin<=y<=(H-1+margin)) |
|
|
|
|
|
def klt_track_multi(prev_gray, cur_gray, prev_pts, W, H): |
|
|
if prev_gray is None or cur_gray is None or prev_pts is None or len(prev_pts)==0: |
|
|
return None, None |
|
|
p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, cur_gray, prev_pts, None, |
|
|
winSize=KLT_WIN, maxLevel=KLT_LEVELS, criteria=KLT_TERM) |
|
|
if p1 is None: return None, None |
|
|
p0r, st2, err2 = cv2.calcOpticalFlowPyrLK(cur_gray, prev_gray, p1, None, |
|
|
winSize=KLT_WIN, maxLevel=KLT_LEVELS, criteria=KLT_TERM) |
|
|
good=[] |
|
|
for i in range(len(prev_pts)): |
|
|
if st[i]==1 and st2[i]==1: |
|
|
fb=float(np.linalg.norm(prev_pts[i,0]-p0r[i,0])) |
|
|
e=float(err[i][0]) if err is not None else 0.0 |
|
|
step=float(np.linalg.norm(p1[i,0]-prev_pts[i,0])) |
|
|
if fb<=KLT_FB_MAX and e<=KLT_ERR_MAX and step<=KLT_STEP_MAX and _in_bounds(p1[i,0], W, H, KLT_OUT_MARGIN): |
|
|
good.append(p1[i,0]) |
|
|
if len(good)<KLT_MIN_GOOD: return None, None |
|
|
good=np.array(good, dtype=np.float32) |
|
|
med=np.median(good, axis=0) |
|
|
cx, cy = int(round(float(med[0]))), int(round(float(med[1]))) |
|
|
if not _in_bounds((cx,cy), W, H, 0): return None, None |
|
|
return (cx,cy), good.reshape(-1,1,2) |
|
|
|
|
|
|
|
|
def rect_from_poly(poly): |
|
|
x,y,w,h=bbox_of_poly(poly); return (int(x),int(y),int(w),int(h)) |
|
|
|
|
|
def fingertip_overlaps_box(finger, box): |
|
|
if finger is None: return False |
|
|
x,y,w,h=box |
|
|
return (x<=finger[0]<=x+w) and (y<=finger[1]<=y+h) |
|
|
|
|
|
def clip_poly_to_rect(poly, rect): |
|
|
x,y,w,h=rect; rx1,ry1,rx2,ry2=x,y,x+w,y+h |
|
|
P=poly.copy() |
|
|
P[:,0]=np.clip(P[:,0], rx1, rx2); P[:,1]=np.clip(P[:,1], ry1, ry2) |
|
|
return P |
|
|
|
|
|
def merge_update_overlays(items, new_items, roi_rect, now_ts, |
|
|
iou_th=MERGE_IOU_TH, center_dist_th=MERGE_CENTER_DIST): |
|
|
rx, ry, rw, rh = roi_rect |
|
|
|
|
|
def center_in_roi(c): |
|
|
return (rx <= c[0] <= rx+rw) and (ry <= c[1] <= ry+rh) |
|
|
|
|
|
roi_indices = [idx for idx, it in enumerate(items) if center_in_roi(poly_center(it['poly']))] |
|
|
used_old = set() |
|
|
|
|
|
for ni in new_items: |
|
|
poly_new = clip_poly_to_rect(ni['poly'], roi_rect) |
|
|
box_new = bbox_of_poly(poly_new) |
|
|
raw_txt = str(ni.get('text','')).strip() |
|
|
raw_conf = float(ni.get('conf', 0.0)) |
|
|
disp_new, canon_new, csc_new, conf_new = enrich_with_dict(raw_txt, raw_conf) |
|
|
|
|
|
best_idx = -1 |
|
|
best_iou = -1.0 |
|
|
best_d = 1e9 |
|
|
|
|
|
for idx in roi_indices: |
|
|
if idx in used_old: |
|
|
continue |
|
|
it = items[idx] |
|
|
box_old = bbox_of_poly(it['poly']) |
|
|
|
|
|
|
|
|
i = iou(box_new, box_old) |
|
|
cxn = (box_new[0]*2 + box_new[2]) * 0.5 |
|
|
cyn = (box_new[1]*2 + box_new[3]) * 0.5 |
|
|
cxo = (box_old[0]*2 + box_old[2]) * 0.5 |
|
|
cyo = (box_old[1]*2 + box_old[3]) * 0.5 |
|
|
d = math.hypot(cxn - cxo, cyn - cyo) |
|
|
loc_match = (i >= iou_th) or (d <= center_dist_th) |
|
|
|
|
|
|
|
|
text_same = _canon_equal(it.get('canon_text') or it.get('text'), |
|
|
canon_new or disp_new) |
|
|
contained = rect_contains(box_old, box_new) or rect_contains(box_new, box_old) |
|
|
text_same_contained = text_same and contained |
|
|
|
|
|
if not (loc_match or text_same_contained): |
|
|
continue |
|
|
|
|
|
|
|
|
if (i > best_iou) or (abs(i - best_iou) < 1e-6 and d < best_d): |
|
|
best_iou, best_d, best_idx = i, d, idx |
|
|
|
|
|
if best_idx >= 0: |
|
|
it = items[best_idx] |
|
|
|
|
|
csc_old = float(it.get('canon_score', 0.0)) |
|
|
conf_old = float(it.get('conf', 0.0)) |
|
|
|
|
|
replace = False |
|
|
|
|
|
if csc_new >= DICT_MERGE_SCORE and csc_old < DICT_MERGE_SCORE: |
|
|
replace = True |
|
|
elif csc_new >= DICT_MERGE_SCORE and csc_old >= DICT_MERGE_SCORE: |
|
|
if csc_new > csc_old + DICT_TIE_DELTA: |
|
|
replace = True |
|
|
elif abs(csc_new - csc_old) <= DICT_TIE_DELTA and conf_new > conf_old: |
|
|
replace = True |
|
|
else: |
|
|
if conf_new > conf_old and csc_new > csc_old: |
|
|
replace = True |
|
|
|
|
|
if replace: |
|
|
it['poly'] = poly_new |
|
|
it['ocr_text'] = raw_txt |
|
|
it['text'] = disp_new |
|
|
it['canon_text'] = canon_new |
|
|
it['canon_score'] = csc_new |
|
|
it['conf'] = conf_new |
|
|
it['expiry'] = now_ts + BASE_TTL |
|
|
|
|
|
|
|
|
used_old.add(best_idx) |
|
|
|
|
|
else: |
|
|
if DICT_ONLY and (disp_new is None or disp_new not in DICT_WORDS): |
|
|
continue |
|
|
items.append({ |
|
|
'poly': poly_new, |
|
|
'ocr_text': raw_txt, |
|
|
'text': disp_new, |
|
|
'canon_text': canon_new, |
|
|
'canon_score': csc_new, |
|
|
'conf': conf_new, |
|
|
'time': now_ts, |
|
|
'last_seen': now_ts, |
|
|
'expiry': now_ts + BASE_TTL, |
|
|
'pin_until': 0.0, |
|
|
'id': alloc_oid() |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return items |
|
|
|
|
|
|
|
|
def dedupe_same_text_overlays(items, iou_th=0.55, center_dist_th=26.0): |
|
|
"""동일/유사 텍스트(사전 정규화 기준) 중복 박스 제거. |
|
|
- 같은 텍스트로 간주되는 박스가 서로 많이 겹치거나 가깝거나 |
|
|
한쪽이 다른쪽을 '포함'하면 하나만 남김 |
|
|
- 우선순위: (1) 사전 일치 점수 높음 → (2) 동률이면 conf 높은 것 |
|
|
""" |
|
|
def _canon_key(it): |
|
|
t = (it.get('canon_text') or it.get('text') or '').strip() |
|
|
try: |
|
|
return _normalize_plain(t) |
|
|
except Exception: |
|
|
import re as _re |
|
|
return _re.sub(r"\s+","",t).casefold() |
|
|
|
|
|
def _rect(it): |
|
|
return bbox_of_poly(it['poly']) |
|
|
|
|
|
def _score(it): |
|
|
csc = float(it.get('canon_score', 0.0)) |
|
|
conf = float(it.get('conf', 0.0)) |
|
|
return ((1 if csc >= DICT_MERGE_SCORE else 0), csc, conf) |
|
|
|
|
|
groups = {} |
|
|
for it in items: |
|
|
key = _canon_key(it) |
|
|
if not key: |
|
|
continue |
|
|
groups.setdefault(key, []).append(it) |
|
|
|
|
|
keep = set() |
|
|
drop = set() |
|
|
for key, arr in groups.items(): |
|
|
arr_sorted = sorted(arr, key=_score, reverse=True) |
|
|
for i, a in enumerate(arr_sorted): |
|
|
if id(a) in drop or id(a) in keep: |
|
|
continue |
|
|
keep.add(id(a)) |
|
|
ax, ay, aw, ah = _rect(a) |
|
|
acx, acy = ax+aw*0.5, ay+ah*0.5 |
|
|
for b in arr_sorted[i+1:]: |
|
|
if id(b) in drop or id(b) in keep: |
|
|
continue |
|
|
bx, by, bw, bh = _rect(b) |
|
|
bcx, bcy = bx+bw*0.5, by+bh*0.5 |
|
|
ov = iou((ax,ay,aw,ah), (bx,by,bw,bh)) |
|
|
d = ((acx-bcx)**2 + (acy-bcy)**2)**0.5 |
|
|
contained = rect_contains((ax,ay,aw,ah), (bx,by,bw,bh)) or rect_contains((bx,by,bw,bh), (ax,ay,aw,ah)) |
|
|
if contained or (ov >= iou_th) or (d <= center_dist_th): |
|
|
drop.add(id(b)) |
|
|
|
|
|
if not drop: |
|
|
return items |
|
|
return [it for it in items if id(it) not in drop] |
|
|
|
|
|
|
|
|
|
|
|
def _overlay_center(it): |
|
|
P = it['poly'] |
|
|
x1, y1 = float(np.min(P[:,0])), float(np.min(P[:,1])) |
|
|
x2, y2 = float(np.max(P[:,0])), float(np.max(P[:,1])) |
|
|
return (0.5*(x1+x2), 0.5*(y1+y2)) |
|
|
|
|
|
def _choose_target_overlay(target_canon: str, overlays, finger_xy=None): |
|
|
cands = [] |
|
|
t = (target_canon or "").strip() |
|
|
if not t: return None |
|
|
for it in overlays: |
|
|
ct = (it.get('canon_text') or "").strip() |
|
|
tx = (it.get('text') or "").strip() |
|
|
ok = (ct == t) or (tx == t) or (t in tx) |
|
|
if ok: |
|
|
cx, cy = _overlay_center(it) |
|
|
d = 0.0 |
|
|
if finger_xy is not None: |
|
|
d = math.hypot(cx - (finger_xy[0]), cy - (finger_xy[1])) |
|
|
canon_bonus = 1.0 if (ct == t) else 0.0 |
|
|
cands.append((canon_bonus, float(it.get('conf',0.0)), -d, it)) |
|
|
if not cands: |
|
|
return None |
|
|
cands.sort(reverse=True) |
|
|
return cands[0][3] |
|
|
|
|
|
def _dir_sentence(dx, dy): |
|
|
def q(px): |
|
|
a = abs(int(round(px))) |
|
|
if a < 30: lvl = "조금" |
|
|
elif a < 90: lvl = "약간" |
|
|
elif a < 180: lvl = "보통" |
|
|
else: lvl = "많이" |
|
|
return lvl, a |
|
|
msg = [] |
|
|
if dx > 0: lvl, a = q(dx); msg.append(f"오른쪽으로 {a}픽셀({lvl})") |
|
|
elif dx < 0: lvl, a = q(dx); msg.append(f"왼쪽으로 {a}픽셀({lvl})") |
|
|
if dy > 0: lvl, a = q(dy); msg.append(f"아래로 {a}픽셀({lvl})") |
|
|
elif dy < 0: lvl, a = q(dy); msg.append(f"위로 {a}픽셀({lvl})") |
|
|
return " , ".join(msg) if msg else "그대로 유지" |
|
|
|
|
|
def set_guide_target_from_text(text: str): |
|
|
global GUIDE_TARGET, GUIDE_TARGET_ITEM |
|
|
if not text: |
|
|
GUIDE_TARGET = None |
|
|
GUIDE_TARGET_ITEM = None |
|
|
set_tts_target("목표가 비었습니다.", note="guide") |
|
|
return False |
|
|
canon, sc = map_to_dict_canon(text, threshold=DICT_THRESHOLD) |
|
|
if not canon: |
|
|
set_tts_target(f"'{text}'는 사전에 없습니다.", note="guide no-dict") |
|
|
return False |
|
|
GUIDE_TARGET = canon |
|
|
GUIDE_TARGET_ITEM = None |
|
|
set_tts_target(f"목표 '{canon}' 안내를 시작합니다.", note="guide") |
|
|
return True |
|
|
|
|
|
def guide_tick(now_ts, finger_xy, overlays): |
|
|
global GUIDE_LAST_TS, GUIDE_LAST_SENT, GUIDE_TARGET_ITEM, GUIDE_TARGET |
|
|
|
|
|
if not GUIDE_MODE or not GUIDE_TARGET: |
|
|
return |
|
|
|
|
|
if GUIDE_REQUIRE_FINGER and finger_xy is None: |
|
|
if now_ts - GUIDE_LAST_TS >= GUIDE_REPEAT_SEC: |
|
|
set_tts_target("손가락을 화면에 올려 주세요.", note="guide") |
|
|
GUIDE_LAST_TS = now_ts |
|
|
GUIDE_LAST_SENT = "ask_finger" |
|
|
return |
|
|
|
|
|
tgt = _choose_target_overlay(GUIDE_TARGET, overlays, finger_xy) |
|
|
GUIDE_TARGET_ITEM = tgt |
|
|
|
|
|
if tgt is None: |
|
|
if now_ts - GUIDE_LAST_TS >= 2.0: |
|
|
set_tts_target(f"화면에서 '{GUIDE_TARGET}'을 찾지 못했습니다.", note="guide") |
|
|
GUIDE_LAST_TS = now_ts |
|
|
GUIDE_LAST_SENT = "not_found" |
|
|
return |
|
|
|
|
|
cx, cy = _overlay_center(tgt) |
|
|
if finger_xy is None: |
|
|
if now_ts - GUIDE_LAST_TS >= GUIDE_REPEAT_SEC: |
|
|
set_tts_target(f"목표 '{GUIDE_TARGET}'이 화면에 있습니다. 손가락을 이동해 주세요.", note="guide") |
|
|
GUIDE_LAST_TS = now_ts |
|
|
GUIDE_LAST_SENT = "where_only" |
|
|
return |
|
|
|
|
|
dx = int(round(cx - finger_xy[0])) |
|
|
dy = int(round(cy - finger_xy[1])) |
|
|
dist = math.hypot(dx, dy) |
|
|
|
|
|
if dist <= GUIDE_TOL_PX: |
|
|
if GUIDE_LAST_SENT != "arrived": |
|
|
set_tts_target(f"도착. '{GUIDE_TARGET}' 입니다.", note="guide ok") |
|
|
GUIDE_LAST_SENT = "arrived" |
|
|
GUIDE_LAST_TS = now_ts |
|
|
return |
|
|
|
|
|
if (now_ts - GUIDE_LAST_TS) >= GUIDE_REPEAT_SEC: |
|
|
msg = _dir_sentence(dx, dy) |
|
|
set_tts_target(f"{msg}", note=f"guide d={int(dist)}") |
|
|
GUIDE_LAST_SENT = msg |
|
|
GUIDE_LAST_TS = now_ts |
|
|
|
|
|
def highlight_guide_target(frame_bgr, item): |
|
|
if item is None: return |
|
|
poly = item['poly'].astype(int) |
|
|
cv2.polylines(frame_bgr, [poly], True, (0,0,255), 3, cv2.LINE_AA) |
|
|
cx, cy = map(int, _overlay_center(item)) |
|
|
cv2.circle(frame_bgr, (cx,cy), 6, (0,0,255), -1) |
|
|
|
|
|
def stt_listen_once(timeout=4, phrase_time_limit=4): |
|
|
if not (USE_STT and _STT_OK): |
|
|
set_tts_target("음성 인식이 비활성화되어 있습니다.", note="stt off") |
|
|
return None |
|
|
try: |
|
|
r = sr.Recognizer() |
|
|
with sr.Microphone() as source: |
|
|
set_tts_target("목표 단어를 말씀해 주세요.", note="stt") |
|
|
if hasattr(r, "adjust_for_ambient_noise"): |
|
|
r.adjust_for_ambient_noise(source, duration=0.5) |
|
|
audio = r.listen(source, timeout=timeout, phrase_time_limit=phrase_time_limit) |
|
|
try: |
|
|
text = r.recognize_google(audio, language="ko-KR") |
|
|
except Exception: |
|
|
text = r.recognize_google(audio, language="ko-KR") |
|
|
return text |
|
|
except Exception as e: |
|
|
print(f"[STT] error: {e}") |
|
|
set_tts_target("음성 인식에 실패했습니다.", note="stt err") |
|
|
return None |
|
|
|
|
|
|
|
|
task_q=queue.Queue(maxsize=1) |
|
|
result_q=queue.Queue(maxsize=2) |
|
|
|
|
|
def enhance_for_ocr(bgr): |
|
|
if ENHANCE_MODE=="off": return bgr |
|
|
if ENHANCE_MODE=="fast": |
|
|
blur=cv2.GaussianBlur(bgr,(0,0),0.8) |
|
|
return cv2.addWeighted(bgr, 1.6, blur, -0.6, 0) |
|
|
img=bgr.copy() |
|
|
img=cv2.bilateralFilter(img, d=0, sigmaColor=45, sigmaSpace=12) |
|
|
lab=cv2.cvtColor(img, cv2.COLOR_BGR2LAB); L,A,B=cv2.split(lab) |
|
|
clahe=cv2.createCLAHE(clipLimit=1.6, tileGridSize=(8,8)); L=clahe.apply(L) |
|
|
img=cv2.cvtColor(cv2.merge([L,A,B]), cv2.COLOR_LAB2BGR) |
|
|
blur=cv2.GaussianBlur(img,(0,0),0.9) |
|
|
return cv2.addWeighted(img, 1.8, blur, -0.8, 0) |
|
|
|
|
|
def prep_fixed(roi_bgr): |
|
|
h,w=roi_bgr.shape[:2] |
|
|
long_side=max(h,w); scale=min(1.0, float(MAX_OCR_LONG)/float(long_side)) |
|
|
proc=cv2.resize(roi_bgr, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA) if scale<1.0 else roi_bgr |
|
|
proc=enhance_for_ocr(proc) |
|
|
sx_pre=proc.shape[1]/float(w); sy_pre=proc.shape[0]/float(h) |
|
|
return proc, sx_pre, sy_pre |
|
|
|
|
|
def run_ocr_rect(frame_work, rect_work, mask_boxes=None): |
|
|
x,y,w,h=rect_work |
|
|
src=frame_work[y:y+h, x:x+w].copy() |
|
|
base_off=(x,y) |
|
|
proc,sx_pre,sy_pre=prep_fixed(src) |
|
|
items=[] |
|
|
r1=easy_reader.readtext(proc, detail=1, decoder='greedy', |
|
|
rotation_info=[0,180], |
|
|
contrast_ths=0.05, adjust_contrast=0.7, |
|
|
text_threshold=0.6, low_text=0.3, link_threshold=0.4, |
|
|
canvas_size=1920, mag_ratio=1.3, |
|
|
paragraph=False, min_size=2) |
|
|
for (bbox_points,text,prob) in r1: |
|
|
poly=np.array(bbox_points,dtype=np.float32) |
|
|
poly[:,0]=poly[:,0]/sx_pre + base_off[0] |
|
|
poly[:,1]=poly[:,1]/sy_pre + base_off[1] |
|
|
bx,by,bw,bh=bbox_of_poly(poly) |
|
|
if bw*bh>=120: items.append({'poly':poly,'text':text,'conf':float(prob)}) |
|
|
return items |
|
|
|
|
|
def ocr_worker(): |
|
|
while True: |
|
|
item=task_q.get() |
|
|
if item is None: break |
|
|
t0=time.time() |
|
|
out=[] |
|
|
for rect_work in item['rects']: |
|
|
out.extend(run_ocr_rect(item['frame_work'], rect_work, mask_boxes=None)) |
|
|
dt_ms=(time.time()-t0)*1000.0 |
|
|
result_q.put({ |
|
|
'roi': item['roi'], |
|
|
'new_items': out, |
|
|
'dt_ms': dt_ms, |
|
|
'frame_idx': item['frame_idx'], |
|
|
}) |
|
|
threading.Thread(target=ocr_worker, daemon=True).start() |
|
|
|
|
|
def drain_queue(q): |
|
|
try: |
|
|
while True: q.get_nowait() |
|
|
except queue.Empty: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("실시간 시작. 'q' 종료 / 'o' OCR ON/OFF / 't' HUD / 's' TTS / 'y' YOLO 입력 / 'p' YOLO PNG 저장") |
|
|
print("모드 전환: '1' 조작 모드 / '2' 보기 모드(상황 설명)") |
|
|
print("ROI 조절: '[' 너비-, ']' 너비+, ';' 높이-, \"'\" 높이+ / 'r' 기본값 복원") |
|
|
|
|
|
|
|
|
s_ema=1.0; theta_ema=0.0; tx_ema=0.0; ty_ema=0.0 |
|
|
prev_gray_full=None; prev_gray_klt=None |
|
|
prev_gray_s=None; prev_pts=None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
klt_only_start_ts = 0.0 |
|
|
|
|
|
finger_src="NONE"; yolo_last_conf=None; klt_draw_pts=None; yolo_box_count=None; yolo_last_in=None |
|
|
|
|
|
while True: |
|
|
ret, frame_cap = cap.read() |
|
|
if not ret: break |
|
|
|
|
|
frame_work = frame_cap if WORK_SCALE==1.0 else cv2.resize(frame_cap, None, fx=WORK_SCALE, fy=WORK_SCALE, interpolation=cv2.INTER_AREA) |
|
|
H,W = frame_work.shape[:2] |
|
|
frame_for_ocr=frame_work.copy() |
|
|
frame_disp=frame_work.copy() |
|
|
|
|
|
|
|
|
with _latest_frame_lock: |
|
|
_latest_frame_for_info = frame_work.copy() |
|
|
|
|
|
gray=cv2.cvtColor(frame_work, cv2.COLOR_BGR2GRAY) |
|
|
gray_klt=_build_gray_for_klt(gray) if USE_KLT_FALLBACK else gray |
|
|
|
|
|
|
|
|
gray_s=cv2.resize(gray, None, fx=FLOW_DS, fy=FLOW_DS, interpolation=cv2.INTER_AREA) |
|
|
M_s=None; did_motion=False |
|
|
if prev_gray_s is not None: |
|
|
reseed=((frame_idx % RESEED_INTERVAL_FRAMES)==0) |
|
|
if reseed: prev_pts=None |
|
|
M_s, next_pts = estimate_similarity_small(prev_gray_s, gray_s, prev_pts) |
|
|
prev_pts=next_pts |
|
|
prev_gray_s=gray_s |
|
|
|
|
|
if M_s is not None: |
|
|
s_step,R_step,t_step_s=project_to_similarity(M_s) |
|
|
s_step=max(1.0-MAX_SCALE_STEP, min(1.0+MAX_SCALE_STEP, s_step)) |
|
|
theta_step=angle_from_R(R_step) |
|
|
theta_step=max(-math.radians(MAX_ROT_STEP_DEG), min(math.radians(MAX_ROT_STEP_DEG), theta_step)) |
|
|
s_ema=(1-EMA_ALPHA_SIM)*s_ema+EMA_ALPHA_SIM*s_step |
|
|
theta_ema=(theta_ema+((theta_step-theta_ema+math.pi)%(2*math.pi)-math.pi)*EMA_ALPHA_SIM) |
|
|
tx_ema=(1-EMA_ALPHA_SIM)*tx_ema+EMA_ALPHA_SIM*float(t_step_s[0]) |
|
|
ty_ema=(1-EMA_ALPHA_SIM)*ty_ema+EMA_ALPHA_SIM*float(t_step_s[1]) |
|
|
transform_overlays_similarity(overlays, s_ema, theta_ema, (tx_ema,ty_ema)) |
|
|
did_motion=True |
|
|
else: |
|
|
if USE_ORB_FALLBACK and (prev_gray_full is not None): |
|
|
M2=orb_similarity(prev_gray_full, gray) |
|
|
if M2 is not None: |
|
|
s2,R2,t2s=project_to_similarity(M2) |
|
|
s2=max(1.0-MAX_SCALE_STEP, min(1.0+MAX_SCALE_STEP, s2)) |
|
|
theta2=angle_from_R(R2) |
|
|
theta2=max(-math.radians(MAX_ROT_STEP_DEG), min(math.radians(MAX_ROT_STEP_DEG), theta2)) |
|
|
s_ema=(1-EMA_ALPHA_SIM)*s_ema+EMA_ALPHA_SIM*s2 |
|
|
theta_ema=(theta_ema+((theta2-theta_ema+math.pi)%(2*math.pi)-math.pi)*EMA_ALPHA_SIM) |
|
|
tx_ema=(1-EMA_ALPHA_SIM)*tx_ema+EMA_ALPHA_SIM*float(t2s[0])*FLOW_DS |
|
|
ty_ema=(1-EMA_ALPHA_SIM)*ty_ema+EMA_ALPHA_SIM*float(t2s[1])*FLOW_DS |
|
|
transform_overlays_similarity(overlays, s_ema, theta_ema, (tx_ema,ty_ema)) |
|
|
did_motion=True |
|
|
|
|
|
if did_motion: |
|
|
M2_step=np.array([[math.cos(theta_ema)*s_ema, -math.sin(theta_ema)*s_ema, float(tx_ema)/FLOW_DS], |
|
|
[math.sin(theta_ema)*s_ema, math.cos(theta_ema)*s_ema, float(ty_ema)/FLOW_DS]], dtype=np.float32) |
|
|
else: |
|
|
M2_step=np.array([[1,0,0],[0,1,0]], dtype=np.float32) |
|
|
if frame_idx>0: |
|
|
sim_steps.append((frame_idx-1, frame_idx, np.vstack([M2_step, [0,0,1]]).astype(np.float32))) |
|
|
if len(sim_steps)>SIM_HIST_MAX: sim_steps.pop(0) |
|
|
|
|
|
|
|
|
now=time.time() |
|
|
|
|
|
with mode_lock: |
|
|
in_op_or_guide = (mode_state == MODE_OP) or (mode_state == MODE_GUIDE) |
|
|
|
|
|
if in_op_or_guide: |
|
|
if yolo_in_q.empty(): |
|
|
try: yolo_in_q.put_nowait(frame_work.copy()) |
|
|
except queue.Full: pass |
|
|
try: det=yolo_out_q.get_nowait() |
|
|
except queue.Empty: det=None |
|
|
else: |
|
|
det=None |
|
|
|
|
|
finger_is_fresh=False |
|
|
finger_src="NONE" |
|
|
yolo_last_conf=None |
|
|
klt_draw_pts=None |
|
|
yolo_box_count=None |
|
|
|
|
|
if isinstance(det, dict): |
|
|
xy=det.get('xy') |
|
|
if xy is not None: |
|
|
fx,fy=int(xy[0]), int(xy[1]) |
|
|
if last_finger_xy is None: |
|
|
filt=np.array([fx,fy], dtype=np.float32) |
|
|
else: |
|
|
filt=(1-EMA_ALPHA_FINGER)*np.array(last_finger_xy,dtype=np.float32)+EMA_ALPHA_FINGER*np.array([fx,fy],dtype=np.float32) |
|
|
last_finger_xy=(int(filt[0]), int(filt[1])) |
|
|
finger_last_seen=now; finger_is_fresh=True; finger_src="YOLO" |
|
|
yolo_last_conf=float(det.get('conf',0.0)) |
|
|
if USE_KLT_FALLBACK: |
|
|
klt_pts_prev=_klt_seed_ring(last_finger_xy) |
|
|
klt_lost_frames=0; |
|
|
if isinstance(det, dict) and det.get('yolo_in') is not None: |
|
|
yolo_last_in=det['yolo_in'] |
|
|
if YOLO_SHOW_INPUT: cv2.imshow(YOLO_INPUT_WIN, yolo_last_in) |
|
|
if YOLO_DEBUG: |
|
|
rb=det.get('raw_boxes',[]) |
|
|
yolo_box_count=len(rb) |
|
|
if YOLO_DRAW_ALL: |
|
|
for (x1,y1,x2,y2,conf,cls_id) in rb: |
|
|
cv2.rectangle(frame_disp,(int(x1),int(y1)),(int(x2),int(y2)),(0,200,255),1) |
|
|
cv2.putText(frame_disp,f"{conf:.2f}/{cls_id}",(int(x1),max(0,int(y1)-3)), |
|
|
cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,200,255),1,cv2.LINE_AA) |
|
|
|
|
|
if USE_KLT_FALLBACK and not finger_is_fresh and (prev_gray_klt is not None) and (klt_pts_prev is not None): |
|
|
klt_xy, klt_pts_next = klt_track_multi(prev_gray_klt, gray_klt, klt_pts_prev, W, H) |
|
|
if klt_xy is not None: |
|
|
|
|
|
last_finger_xy = klt_xy |
|
|
klt_pts_prev = klt_pts_next |
|
|
klt_draw_pts = klt_pts_next |
|
|
|
|
|
finger_last_seen = now |
|
|
finger_is_fresh = True |
|
|
finger_src = "KLT" |
|
|
klt_lost_frames = 0 |
|
|
else: |
|
|
|
|
|
klt_lost_frames += 1 |
|
|
if klt_lost_frames > KLT_LOSS_GRACE: |
|
|
klt_pts_prev = None |
|
|
|
|
|
|
|
|
KLT_TIMEOUT_SEC = 1.0 |
|
|
if finger_src == "YOLO": |
|
|
klt_only_start_ts = 0.0 |
|
|
elif finger_src == "KLT": |
|
|
if klt_only_start_ts == 0.0: |
|
|
klt_only_start_ts = now |
|
|
|
|
|
elif (now - klt_only_start_ts) > KLT_TIMEOUT_SEC: |
|
|
last_finger_xy = None |
|
|
finger_is_fresh = False |
|
|
klt_pts_prev = None |
|
|
klt_only_start_ts = 0.0 |
|
|
finger_src = "NONE" |
|
|
else: |
|
|
klt_only_start_ts = 0.0 |
|
|
|
|
|
|
|
|
with mode_lock: |
|
|
mode_now = mode_state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
roi=None; protected_boxes=[]; protected_ids=[] |
|
|
if mode_now == MODE_OP and finger_is_fresh and (last_finger_xy is not None): |
|
|
fx, fy = last_finger_xy |
|
|
roi = clamp_rect(int(fx-ROI_W//2), int(fy-ROI_H//2), ROI_W, ROI_H, W, H) |
|
|
last_roi = roi |
|
|
last_roi_active_until = now + ROI_KEEPALIVE_GRACE_SEC |
|
|
|
|
|
rx,ry,rw,rh=roi |
|
|
for it in overlays: |
|
|
c=poly_center(it['poly']) |
|
|
if (rx<=c[0]<=rx+rw) and (ry<=c[1]<=ry+rh): |
|
|
it['expiry']=max(it.get('expiry', now), now + BASE_TTL) |
|
|
|
|
|
for it in overlays: |
|
|
bx,by,bw,bh=rect_from_poly(it['poly']) |
|
|
if fingertip_overlaps_box((fx,fy),(bx,by,bw,bh)): |
|
|
protected_boxes.append((bx,by,bw,bh)) |
|
|
protected_ids.append(it.get('id')) |
|
|
it['expiry']=max(it.get('expiry', now), now + BASE_TTL) |
|
|
it['pin_until']=now+PIN_GRACE_SEC |
|
|
|
|
|
|
|
|
overlap_items=[] |
|
|
for it in overlays: |
|
|
bx,by,bw,bh=rect_from_poly(it['poly']) |
|
|
if fingertip_overlaps_box((fx,fy),(bx,by,bw,bh)): |
|
|
overlap_items.append(it) |
|
|
near=None; bestd=1e9 |
|
|
for it in overlap_items: |
|
|
c=poly_center(it['poly']); d=np.hypot(c[0]-fx, c[1]-fy) |
|
|
if d<bestd: bestd=d; near=it |
|
|
|
|
|
if near is not None: |
|
|
txt=str(near.get('text','')).strip() |
|
|
conf=float(near.get('conf',0.0)) |
|
|
speak_ok=(conf>=TTS_CONF) or (_has_korean(txt) and (conf>=TTS_CONF_FALLBACK)) |
|
|
note="" |
|
|
low_conf=(_has_korean(txt) and conf<TTS_CONF_FALLBACK) or (not _has_korean(txt) and TTS_CONF) |
|
|
if low_conf: note=(note+f" | low-conf({conf:.2f})") if note else f"low-conf({conf:.2f})" |
|
|
say_txt=None |
|
|
if speak_ok and txt: |
|
|
dict_thr=DICT_THRESHOLD_LOWCONF if low_conf else DICT_THRESHOLD |
|
|
mapped, sc = map_to_dict_canon(txt, threshold=dict_thr) |
|
|
if mapped: |
|
|
say_txt=mapped; note=(note+f" | dict:{sc:.0f}") if note else f"dict:{sc:.0f}" |
|
|
elif not STRICT_DICT_ONLY: |
|
|
thr=JAMO_THRESHOLD_LOWCONF if low_conf else JAMO_THRESHOLD |
|
|
fixed, changed = correct_text(txt, threshold=thr) |
|
|
say_txt=fixed if changed else txt |
|
|
if changed: note=(note+" | spellfix") if note else "spellfix" |
|
|
if say_txt: |
|
|
set_tts_target(say_txt, note=note,force=True); tts_last_seen_target_ts=now |
|
|
else: |
|
|
if STRICT_DICT_ONLY and (speak_ok and txt): |
|
|
note=(note+" | no-dict") if note else "no-dict" |
|
|
set_tts_target(None, note=note); tts_current_display=txt |
|
|
else: |
|
|
if (now - tts_last_seen_target_ts) > TTS_TARGET_STICKY_SEC: |
|
|
set_tts_target(None, note="") |
|
|
|
|
|
|
|
|
if OCR_ENABLED: |
|
|
want_period=BASE_OCR_PERIOD |
|
|
roi_labels=[it for it in overlays if (roi[0]<=poly_center(it['poly'])[0]<=roi[0]+roi[2] |
|
|
and roi[1]<=poly_center(it['poly'])[1]<=roi[1]+roi[3])] |
|
|
roi_moved_fast=(last_roi is None) or (iou(last_roi, roi) < 0.6) |
|
|
roi_empty=(len(roi_labels)==0) |
|
|
roi_stale=(len(roi_labels)>0 and all((now - it.get('time',now) > STALE_AGE_SEC) or |
|
|
(it.get('conf',0)<LOW_CONF_TH) for it in roi_labels)) |
|
|
if roi_moved_fast or roi_empty or roi_stale: |
|
|
want_period=min(want_period, EXTRA_OCR_PERIOD) |
|
|
|
|
|
if (now - last_ocr_time) >= want_period and task_q.qsize()==0: |
|
|
gx,gy,gw,gh=roi |
|
|
g_roi=gray[gy:gy+gh, gx:gx+gw] |
|
|
blur_ok=(variance_of_laplacian(g_roi)>=BLUR_VAR_THRESH) or roi_empty |
|
|
avg_step=math.hypot(tx_ema, ty_ema)/max(1e-6, FLOW_DS) |
|
|
if blur_ok and avg_step>MOTION_GATE_PX: blur_ok=False |
|
|
if blur_ok: |
|
|
rects_to_run=[roi] |
|
|
try: |
|
|
task_q.put_nowait({ |
|
|
'frame_work': frame_for_ocr.copy(), |
|
|
'rects': rects_to_run, |
|
|
'roi': roi, |
|
|
'frame_idx': frame_idx, |
|
|
}) |
|
|
last_ocr_time=now; last_roi=roi |
|
|
except queue.Full: |
|
|
pass |
|
|
|
|
|
elif mode_now == MODE_OP and (last_roi is not None) and (now <= last_roi_active_until): |
|
|
|
|
|
rx,ry,rw,rh = last_roi |
|
|
for it in overlays: |
|
|
c = poly_center(it['poly']) |
|
|
if (rx<=c[0]<=rx+rw) and (ry<=c[1]<=ry+rh): |
|
|
it['expiry'] = max(it.get('expiry', now), now + BASE_TTL) |
|
|
else: |
|
|
|
|
|
if mode_now == MODE_OP: |
|
|
pass |
|
|
|
|
|
|
|
|
if mode_now == MODE_OP and not finger_is_fresh: |
|
|
if (time.time() - tts_last_seen_target_ts) > TTS_TARGET_STICKY_SEC: |
|
|
set_tts_target(None, note="") |
|
|
|
|
|
|
|
|
if mode_now == MODE_OP: |
|
|
try: |
|
|
while True: |
|
|
res=result_q.get_nowait() |
|
|
if 'dt_ms' in res: |
|
|
if OCR_EMA is None: OCR_EMA=res['dt_ms'] |
|
|
else: OCR_EMA=(1-OCR_EMA_ALPHA)*OCR_EMA + OCR_EMA_ALPHA*res['dt_ms'] |
|
|
if res.get('new_items'): |
|
|
def _T_from_to(a,b): |
|
|
if b<=a: return np.eye(3,dtype=np.float32) |
|
|
T=np.eye(3,dtype=np.float32) |
|
|
for (src,dst,M3) in sim_steps: |
|
|
if a < dst <= b: T = M3 @ T |
|
|
return T |
|
|
T_cap2now=_T_from_to(res.get('frame_idx',frame_idx), frame_idx) |
|
|
def _apply(poly, M3): |
|
|
P=poly.astype(np.float32) |
|
|
return (P @ M3[:2,:2].T) + M3[:2,2] |
|
|
roi_now=_rect_aabb_after_M(res['roi'], T_cap2now, W, H) |
|
|
new_items=[] |
|
|
for ni in res['new_items']: |
|
|
ni['poly']=_apply(ni['poly'], T_cap2now) |
|
|
bx,by,bw,bh=bbox_of_poly(ni['poly']) |
|
|
cx,cy=bx+bw/2, by+bh/2 |
|
|
gx,gy,gw,gh=roi_now |
|
|
if gx<=cx<=gx+gw and gy<=cy<=gy+gh: |
|
|
new_items.append(ni) |
|
|
overlays=merge_update_overlays(overlays, new_items, roi_now, now_ts=time.time(), |
|
|
iou_th=MERGE_IOU_TH, center_dist_th=MERGE_CENTER_DIST) |
|
|
except queue.Empty: |
|
|
pass |
|
|
|
|
|
|
|
|
if mode_now == MODE_OP: |
|
|
guide_tick(now, last_finger_xy if finger_is_fresh else None, overlays) |
|
|
|
|
|
|
|
|
now2=time.time() |
|
|
if (now2-last_prune) >= PRUNE_TIMEOUT_SEC: |
|
|
overlays = dedupe_same_text_overlays(overlays) |
|
|
active_roi = None |
|
|
if mode_now == MODE_OP: |
|
|
if finger_is_fresh and roi is not None: |
|
|
active_roi = roi |
|
|
elif (last_roi is not None) and (now2 <= last_roi_active_until): |
|
|
active_roi = last_roi |
|
|
overlays = prune_overlays(overlays, now2, active_roi=active_roi) |
|
|
last_prune=now2 |
|
|
|
|
|
if roi is not None and (mode_now == MODE_OP) and finger_is_fresh and last_finger_xy is not None: |
|
|
cv2.rectangle(frame_disp, (roi[0],roi[1]), (roi[0]+roi[2], roi[1]+roi[3]), (120,120,255), 1) |
|
|
|
|
|
if last_finger_xy is not None: |
|
|
color=(0,255,0) if finger_src=="YOLO" else ((255,0,255) if finger_src=="KLT" else (160,160,160)) |
|
|
cv2.circle(frame_disp, last_finger_xy, 9, color, -1) |
|
|
if finger_src=="KLT" and YOLO_DEBUG and klt_draw_pts is not None: |
|
|
for p in klt_draw_pts[:60]: |
|
|
cv2.circle(frame_disp, (int(p[0,0]), int(p[0,1])), 2, (180,0,180), -1) |
|
|
|
|
|
|
|
|
if (mode_now == MODE_OP) and GUIDE_MODE and GUIDE_TARGET_ITEM is not None: |
|
|
highlight_guide_target(frame_disp, GUIDE_TARGET_ITEM) |
|
|
|
|
|
|
|
|
if mode_now == MODE_OP: |
|
|
draw_overlays(frame_disp, overlays, now2) |
|
|
|
|
|
if SHOW_TTS_HINT: |
|
|
mode_txt = "MODE: OP" if mode_now == MODE_OP else f"MODE: INFO({int(INFO_PERIOD_SEC)}s)" |
|
|
l1=f"{mode_txt} | OCR: {'ON' if (OCR_ENABLED and mode_now==MODE_OP) else 'OFF'} | {OCR_ENGINE}" |
|
|
if OCR_EMA is not None and mode_now==MODE_OP: l1+=f" ~{int(OCR_EMA)} ms" |
|
|
l1+=f" TTS: {'ON' if TTS_ENABLE else 'OFF'}" |
|
|
src_txt=f"SRC: {finger_src}" |
|
|
if finger_src=="YOLO" and yolo_last_conf is not None: src_txt+=f" conf={yolo_last_conf:.2f}" |
|
|
if finger_src=="KLT" and klt_draw_pts is not None: src_txt+=f" klt_pts={len(klt_draw_pts)}" |
|
|
age_ms=int((now - finger_last_seen)*1000.0) if last_finger_xy is not None else -1 |
|
|
if age_ms>=0: src_txt+=f" age={age_ms} ms" |
|
|
if yolo_box_count is not None: src_txt+=f" boxes={yolo_box_count}" |
|
|
roi_txt=f"ROI: {ROI_W}x{ROI_H} ([ ] width ; ' height)" |
|
|
|
|
|
|
|
|
say_txt = (tts_current_display.strip() or tts_last_spoken_text.strip()) |
|
|
say_line=f"SAY: {say_txt}" if say_txt else "SAY: (none)" |
|
|
if tts_current_note: say_line+=f" [{tts_current_note}]" |
|
|
|
|
|
guide_txt = f"GUIDE: {'ON' if (mode_now == MODE_OP and GUIDE_MODE) else 'OFF'}" |
|
|
if (mode_now == MODE_OP) and GUIDE_TARGET: |
|
|
guide_txt += f" target='{GUIDE_TARGET}'" |
|
|
if (mode_now == MODE_OP) and GUIDE_TARGET_ITEM is not None: |
|
|
cx, cy = map(int, _overlay_center(GUIDE_TARGET_ITEM)) |
|
|
guide_txt += f" tgt@({cx},{cy})" |
|
|
|
|
|
lines=[guide_txt, l1, src_txt, roi_txt, say_line] |
|
|
|
|
|
img_rgb=cv2.cvtColor(frame_disp, cv2.COLOR_BGR2RGB) |
|
|
pil=Image.fromarray(img_rgb); draw=ImageDraw.Draw(pil) |
|
|
font_path=None |
|
|
for p in [r"C:\Windows\Fonts\malgun.ttf", r"C:\Windows\Fonts\NanumGothic.ttf", |
|
|
r"C:\Windows\Fonts\NotoSansCJKkr-Regular.otf", |
|
|
"/usr/share/fonts/truetype/noto/NotoSansCJKkr-Regular.ttc"]: |
|
|
if os.path.isfile(p): font_path=p; break |
|
|
font=ImageFont.truetype(font_path, 22) if font_path else ImageFont.load_default() |
|
|
|
|
|
pad_x,pad_y,gap=10,8,4 |
|
|
widths=[draw.textlength(s, font=font) for s in lines] |
|
|
tw=int(max(widths)) if widths else 0; lh=24 |
|
|
th=lh*len(lines)+(len(lines)-1)*gap |
|
|
x0,y0=8,6 |
|
|
bg=Image.new("RGBA",(tw+pad_x*2, th+pad_y*2),(0,0,0,180)) |
|
|
pil.paste(bg,(x0,y0),bg) |
|
|
y=y0+pad_y |
|
|
for s in lines: |
|
|
draw.text((x0+pad_x,y), s, font=font, fill=(255,255,255), stroke_width=2, stroke_fill=(0,0,0)) |
|
|
y+=lh+gap |
|
|
frame_disp[:]=cv2.cvtColor(np.array(pil), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
disp_scale=min(1.0, DISPLAY_MAX_W/float(W)) |
|
|
vis=frame_disp if disp_scale==1.0 else cv2.resize(frame_disp, None, fx=disp_scale, fy=disp_scale, interpolation=cv2.INTER_AREA) |
|
|
cv2.imshow(WINDOW_NAME, vis) |
|
|
|
|
|
key=cv2.waitKey(1)&0xFF |
|
|
if key==ord('q'): break |
|
|
elif key==ord('o'): |
|
|
OCR_ENABLED = not OCR_ENABLED |
|
|
drain_queue(task_q) |
|
|
last_ocr_time = 0.0 if OCR_ENABLED else time.time() |
|
|
print(f"[OCR] {'ENABLED' if OCR_ENABLED else 'DISABLED'}") |
|
|
elif key==ord('t'): |
|
|
SHOW_TTS_HINT = not SHOW_TTS_HINT |
|
|
print(f"[HUD] {'ON' if SHOW_TTS_HINT else 'OFF'}") |
|
|
elif key==ord('s'): |
|
|
TTS_ENABLE = not TTS_ENABLE |
|
|
set_tts_target(None, note="") |
|
|
print(f"[TTS] {'ENABLED' if TTS_ENABLE else 'DISABLED'}") |
|
|
elif key==ord('y'): |
|
|
YOLO_SHOW_INPUT = not YOLO_SHOW_INPUT |
|
|
if not YOLO_SHOW_INPUT: |
|
|
try: cv2.destroyWindow(YOLO_INPUT_WIN) |
|
|
except: pass |
|
|
print(f"[YOLO] INPUT PREVIEW {'ON' if YOLO_SHOW_INPUT else 'OFF'}") |
|
|
elif key==ord('p'): |
|
|
if yolo_last_in is not None: |
|
|
os.makedirs('yolo_inputs', exist_ok=True) |
|
|
fname=time.strftime("yolo_inputs/%Y%m%d_%H%M%S.png") |
|
|
cv2.imwrite(fname, yolo_last_in) |
|
|
print(f"[YOLO] saved input preview -> {fname}") |
|
|
else: |
|
|
print("[YOLO] no input to save yet") |
|
|
|
|
|
elif key==ord('['): |
|
|
ROI_W=max(MIN_ROI_W, ROI_W-40) |
|
|
elif key==ord(']'): |
|
|
ROI_W=min(W, ROI_W+40) |
|
|
elif key==ord(';'): |
|
|
ROI_H=max(MIN_ROI_H, ROI_H-30) |
|
|
elif key==ord("'"): |
|
|
ROI_H=min(H, ROI_H+30) |
|
|
elif key==ord('r'): |
|
|
ROI_W, ROI_H = 420, 420 |
|
|
print("[ROI] reset to 420x420") |
|
|
|
|
|
|
|
|
elif key == ord('1'): |
|
|
_enter_op_mode() |
|
|
print("[MODE] OP") |
|
|
|
|
|
elif key == ord('2'): |
|
|
_enter_info_mode() |
|
|
print("[MODE] INFO") |
|
|
|
|
|
elif key == ord('3'): |
|
|
_enter_guide_mode() |
|
|
print("[MODE] GUIDE") |
|
|
|
|
|
|
|
|
elif key == ord('c'): |
|
|
GUIDE_TARGET = None |
|
|
GUIDE_TARGET_ITEM = None |
|
|
set_tts_target("목표를 취소했습니다.", note="guide cancel") |
|
|
print("[GUIDE] target cleared") |
|
|
|
|
|
elif key == ord('v'): |
|
|
if mode_state == MODE_GUIDE: |
|
|
text = stt_listen_once(timeout=4, phrase_time_limit=4) if (USE_STT and _STT_OK) else None |
|
|
if text: |
|
|
print(f"[STT] heard: {text}") |
|
|
set_guide_target_from_text(text) |
|
|
else: |
|
|
print("[STT] no text") |
|
|
else: |
|
|
set_tts_target("먼저 3번을 눌러 안내 모드를 켜 주세요.", note="guide") |
|
|
|
|
|
elif key == ord('f'): |
|
|
if mode_state == MODE_GUIDE: |
|
|
try: |
|
|
print("\n[GUIDE] 입력 예시: '세탁', '건조맞춤' ...") |
|
|
user_in = input("[GUIDE] 목표 단어 입력: ").strip() |
|
|
if user_in: |
|
|
set_guide_target_from_text(user_in) |
|
|
except Exception: |
|
|
pass |
|
|
else: |
|
|
set_tts_target("먼저 3번을 눌러 안내 모드를 켜 주세요.", note="guide") |
|
|
|
|
|
|
|
|
frame_idx+=1 |
|
|
prev_gray_full=gray.copy() |
|
|
prev_gray_klt=gray_klt.copy() |
|
|
|
|
|
|
|
|
task_q.put(None) |
|
|
_info_stop.set() |
|
|
if TTS_ENABLE and 'tts' in globals() and tts: |
|
|
tts_stop.set() |
|
|
|
|
|
try: tts.close() |
|
|
except Exception: pass |
|
|
yolo_stop.set() |
|
|
cap.release() |
|
|
cv2.destroyAllWindows() |