# Fingertip-Only OCR — EasyOCR-only + SIM-history warp # (OP/INFO 모드 스케줄 재구성 & HUD 개선) # -------------------------------------------------------------- import cv2, time, numpy as np, threading, queue, os, math, re, traceback from PIL import Image, ImageDraw, ImageFont from tts_reader import TTSReader import seeing try: from rapidfuzz import process, fuzz from jamo import h2j, j2hcj _SPELLFIX_OK = True except Exception as _e: print(f"[SPELLFIX] disabled: { _e }") _SPELLFIX_OK = False # ========= User / Display ========= CAMERA_ID = 0 CAPTURE_TARGET_W = 1920 CAPTURE_TARGET_H = 1080 WORK_WIDTH_TARGET = 1280 DISPLAY_MAX_W = 1280 WINDOW_NAME = 'Assistive Fingertip OCR (fast)' # ========= OCR / Scheduling ========= OCR_ENABLED = True BASE_OCR_PERIOD = 1.5 EXTRA_OCR_PERIOD = 0.6 STALE_AGE_SEC = 7.0 LOW_CONF_TH = 0.55 # ========= ROI (work-space) ========= # <실험1> 문제 상황: 서연 세탁기 글씨가 작아서 detection 못함 # [CASE 1] ROI_W, ROI_H = 420, 420 -> detection 성능 향상 (부족함) # [CASE 2] 서연 세탁기 사진 잘라서 글씨 더 크게 보이도록 조정 (안 해봄) # [CASE 3] MAX_OCR_LONG 을 420으로 제한하지 않고 원본을 넣기 (속도 느려짐) # [CASE 4] 실제 사이즈로 인쇄 ROI_W, ROI_H = 420,420 MIN_ROI_W, MIN_ROI_H = 200, 120 BLUR_VAR_THRESH = 80.0 # ROI 유지 유예(손가락 잠깐 끊겨도 ROI 내부 TTL 갱신) ROI_KEEPALIVE_GRACE_SEC = 1.2 last_roi_active_until = 0.0 # ========= No masking ========= #USE_MASKED_FULL_ROI = True 삭제(8.20) # EXCLUDE_PAD = 8 # MASK_FILL_VAL = (127,127,127) # ========= Donut OCR (unused) ========= # DONUT_PAD = 3 삭제(8.20) # SUBROI_MIN_AREA = 1200 # MAX_SUBROIS = 1 # ========= TTL / Pruning ========= BASE_TTL = 3.0 # 연장시간 조정(8.20) PIN_GRACE_SEC = 1.2 MAX_OVERLAYS = 300 #ONSCREEN_KEEPALIVE = 0.8 삭제(8.20) HARD_MAX_LIFETIME = 9.0 no_repeat_until_ts = 2.0 # 같은 문장 재발화 금지 시간 (8.21) IGNORE_HARD_CAP_WHILE_FINGER_IN_ROI = True PRUNE_TIMEOUT_SEC =0.5 # prune 주기 변수화 (8.20) # ========= Merge criteria ========= MERGE_IOU_TH = 0.50 MERGE_CENTER_DIST = 28.0 # ========= TTS ========= TTS_ENABLE = True TTS_CONF = 0.0 # 발화 기준 임계치 필요할듯. 지금은 다 말함 (8.20) TTS_REPEAT_SEC = 1.0 # TTS_QUEUE_MAX = 1 삭제(8.20) TTS_TARGET_STICKY_SEC = 0.6 # TTS_DEBUG = False 삭제 (8.20) # TTS_STRICT_LATEST = True 삭제(8.20) STRICT_DICT_ONLY = True TTS_CONF_FALLBACK = 0.35 SHOW_TTS_HINT = True tts_current_display = "" tts_current_note = "" tts_last_spoken_text = "" # <<< CHANGED: 마지막 발화 문구를 HUD에 유지 # ★ 추가: 모드 전환 멘트 직후 1회 즉시 요약 트리거 + 선점 락 INFO_FORCE_IMMEDIATE = False tts_force_lock = threading.Lock() # ========= Speed knobs ========= # 수정1: MAX_OCR_LONG 416 -> 420 MAX_OCR_LONG = 420 ENHANCE_MODE = "off" MOTION_GATE_PX = 2.0 MAX_TEXT_DRAW = 30 # ========= Global tracking (SIM) ========= FLOW_DS = 0.45 FLOW_MAX_CORNERS=240; FLOW_QUALITY=0.01; FLOW_MIN_DISTANCE=7 FLOW_WINSIZE=(21,21); FLOW_LEVELS=3 RESEED_INTERVAL_FRAMES=8 MAX_TRANS_PX = 90 MAX_SCALE_STEP = 0.18 MAX_ROT_STEP_DEG = 10.0 EMA_ALPHA_SIM = 0.28 USE_ORB_FALLBACK = True ORB_NFEATURES=600; ORB_MIN_GOOD=45 # ========= Finger ========= EMA_ALPHA_FINGER=0.35 FINGER_STALE_MS = 800 finger_last_seen = 0.0 # had_finger = False 삭제(8.20) last_finger_xy = None # ========= YOLO ========= YOLO_DEBUG = True YOLO_DRAW_ALL = True YOLO_IMG_SIZE = 640 YOLO_CONF_TH = 0.25 YOLO_IOU_TH = 0.50 YOLO_CLASS_NAME = None YOLO_CLASS_ID = 0 # fingerip_o.pt 사용 가능 YOLO_WEIGHTS = r'weights/fingertip.pt' YOLO_SHOW_INPUT = False YOLO_INPUT_WIN = 'YOLO_INPUT' DO_PIC=True # 보기모드 입력 출력(8.21) # # ========= speed change parameters ========= # # 해상도/스케일 # WORK_WIDTH_TARGET = 960 # YOLO_IMG_SIZE = 448 # MAX_OCR_LONG = 360 # FLOW_DS = 0.35 # # ROI 크기 # ROI_W, ROI_H = 270,270 # # 빈도/주기 # BASE_OCR_PERIOD = 2.0 # EXTRA_OCR_PERIOD = 0.9 # RESEED_INTERVAL_FRAMES = 12 # PRUNE_TIMEOUT_SEC = 1.0 # # 전역 SIM # FLOW_MAX_CORNERS = 150 # FLOW_WINSIZE = (17,17) # FLOW_LEVELS = 2 # # estimateAffinePartial2D maxIters ~800로 하향 # # KLT # KLT_LEVELS = 2 # KLT_WIN = (25,25) # KLT_TERM = (cv2.TERM_CRITERIA_EPS|cv2.TERM_CRITERIA_COUNT, 12, 0.03) # KLT_N_SAMPLES = 6 # KLT_USE_CLAHE = False # # YOLO # YOLO_CONF_TH = 0.3 # 잡음↓ # # yolo_model.predict(..., half=True) # (GPU일 때) # # OCR # # rotation_info=[0] 로 축소 # # canvas_size=1280, mag_ratio=1.1 # MAX_TEXT_DRAW = 30 # MAX_OVERLAYS = 150 # ========= KLT fallback ========= # 수정2: False -> True USE_KLT_FALLBACK = True KLT_WIN=(31,31); KLT_LEVELS=4 KLT_TERM=(cv2.TERM_CRITERIA_EPS|cv2.TERM_CRITERIA_COUNT, 20, 0.03) KLT_FB_MAX=4.0; KLT_ERR_MAX=100.0; KLT_STEP_MAX=30.0 KLT_OUT_MARGIN=4; KLT_N_SAMPLES=12; KLT_RING_R=10 # KLT_RESEED_EVERY=6; 삭제(8.20) KLT_MIN_GOOD=5; KLT_LOSS_GRACE=3 KLT_USE_CLAHE=True klt_pts_prev=None; klt_lost_frames=0; #frames_since_reseed=0 삭제 (8.20) # ========= OCR time meter ========= OCR_EMA=None; OCR_EMA_ALPHA=0.25 # ==== Dict-based merge parameters ==== # 수정3: 80 -> 60 DICT_MERGE_SCORE = 70 # 사전 변환 수정(8.21) DICT_TIE_DELTA = 3 DICT_ONLY =True # 사전 단어만 표기(8.21) # ========= GUIDE MODE ========= GUIDE_MODE = False GUIDE_TARGET = None GUIDE_TOL_PX = 40 GUIDE_REPEAT_SEC = 1.0 GUIDE_LAST_TS = 0.0 GUIDE_LAST_SENT = "" GUIDE_TARGET_ITEM = None GUIDE_REQUIRE_FINGER = True # ========= INFO/OP 모드 ========= MODE_OP = 1 # 조작 모드 MODE_INFO = 2 # 보기 모드 MODE_GUIDE = 3 # 안내 모드 mode_lock = threading.Lock() mode_state = MODE_OP # 보기 주기(초) INFO_PERIOD_SEC = 5.0 # <<< CHANGED: 8s → 5s # 즉시 실행/주기 스케줄용 _next_info_due = 0.0 # 보기용 최신 프레임 공유 _latest_frame_for_info = None _latest_frame_lock = threading.Lock() # 보기 스레드 제어 _info_stop = threading.Event() def _is_speaker_busy() -> bool: try: import pygame return pygame.mixer.music.get_busy() except Exception: return False def _say_once(text: str): """한 문장만 안전하게 재생(비동기 TTS) + 표시 유지. - 재생 시작을 잠깐 대기(최대 2s 시도) - 끝날 때까지 폴링(최대 30s), 그 후 target만 None으로 지워 재반복 차단 - HUD는 tts_last_spoken_text로 마지막 발화를 계속 보여줌 """ global no_repeat_until_ts t_start = time.time() # ★ 누락되었던 t_start 보완 set_tts_target(text) no_repeat_until_ts = time.time() + 60.0 # 같은 문장 재enqueue 금지(안전 마진) # 재생 시작 감지(최대 2s) while not _info_stop.is_set(): if _is_speaker_busy(): break if '_last_spoken_enqueue_ts' in globals() and _last_spoken_enqueue_ts >= t_start: time.sleep(0.1) break if (time.time() - t_start) > 2.0: break time.sleep(0.02) # 재생 종료 대기(최대 30s) t0 = time.time() while _is_speaker_busy() and not _info_stop.is_set(): if (time.time() - t0) > 30.0: break time.sleep(0.05) # target만 지워서 재반복 방지(표시는 tts_last_spoken_text로 유지됨) set_tts_target(None) no_repeat_until_ts = 0.0 def announce_force_async(text: str, after=None): """모드 전환 전용: 현재 재생 중이어도 즉시 중단하고 text부터 발화.""" def _runner(): with tts_force_lock: if TTS_ENABLE and tts is not None: try: tts.clear_queue() except Exception: pass for m in ("stop","cancel","flush"): if hasattr(tts, m): try: getattr(tts, m)() except Exception: pass _say_once(text) # 마지막 멘트 HUD 유지 + 재반복 억제 로직 그대로 if callable(after): try: after() except Exception: pass threading.Thread(target=_runner, daemon=True).start() def _enter_op_mode(): # 1) 지금 말하는 TTS 전부 끊고, 2) "조작 모드로 전환합니다"를 끝까지 말한 다음, 3) 모드 적용 def _after(): global mode_state with mode_lock: mode_state = MODE_OP globals().update(GUIDE_MODE=False) announce_force_async("조작 모드로 전환합니다.", after=_after) def _enter_info_mode(): # 1) 선점 발화 → 2) 발화 끝난 뒤 INFO 모드 플래그 세팅 + 첫 요약 즉시 허용 def _after(): global mode_state, _next_info_due, INFO_FORCE_IMMEDIATE with mode_lock: mode_state = MODE_INFO _next_info_due = 0.0 # 진입 직후 1회 즉시 INFO_FORCE_IMMEDIATE = True # 다음 루프에서 바로 요약 globals().update(GUIDE_MODE=False) announce_force_async("보기 모드로 전환합니다. 지금부터 상황을 설명합니다.", after=_after) def _enter_guide_mode(): # 1) 선점 발화 → 2) 발화 완료 후 GUIDE 모드 적용 def _after(): global mode_state, GUIDE_MODE with mode_lock: mode_state = MODE_GUIDE GUIDE_MODE = True announce_force_async("안내 모드로 전환합니다. 목표를 지정해 주세요.", after=_after) # def _finger_present_now() -> bool: #손가락 탐지 제거 (8.21) # try: # if last_finger_xy is None: # return False # return (time.time() - finger_last_seen) * 1000.0 <= FINGER_STALE_MS # except NameError: # return False # def _wait_till_no_finger(max_wait_sec: float = 8.0): # t0 = time.time() # while _finger_present_now() and not _info_stop.is_set(): # if time.time() - t0 > max_wait_sec: # break # time.sleep(0.05) def _info_worker(): """보기 모드: 진입 즉시 1회, 이후 5초마다. 말하는 중이면 '말 끝 + 2초' 후 실행 단, 모드 진입 멘트 직후 1회는 지연 없이 곧바로 요약.""" global _next_info_due, INFO_FORCE_IMMEDIATE while not _info_stop.is_set(): time.sleep(0.05) with mode_lock: info_on = (mode_state == MODE_INFO) if not info_on: _next_info_due = 0.0 continue now = time.time() if now < _next_info_due: continue # 1) 말하는 중이면 끝날 때까지 대기 was_busy = False while _is_speaker_busy() and not _info_stop.is_set(): was_busy = True time.sleep(0.05) # 1-1) 일반 경우: 말 끝났으면 2초 뒤로 # 단, 직전이 "진입 멘트"였다면 지연 없이 곧바로 진행 if was_busy: if INFO_FORCE_IMMEDIATE: # 진입 멘트 방금 끝남 → 즉시 1회 실행 INFO_FORCE_IMMEDIATE = False else: _next_info_due = time.time() + 2.0 continue # 2) 손가락 있으면 치워 달라고 말하고(한번) 손가락 사라질 때까지 대기 # 3) 최신 프레임 요약 with _latest_frame_lock: frame = None if _latest_frame_for_info is None else _latest_frame_for_info.copy() if frame is not None: try: summary = seeing.summarize_scene(frame, easy_reader,do_pic=DO_PIC, debug_dir=r"logs/ocr_bbox",debug_font=r"C:\Windows\Fonts\malgun.ttf") except Exception as e: print("[INFO] summarize failed:", e) summary = None if summary: _say_once(summary) # 4) 다음 실행 예약(지금 시점 + 5s) _next_info_due = time.time() + INFO_PERIOD_SEC # ===== STT ===== USE_STT = True try: import speech_recognition as sr _STT_OK = True except Exception as _e: print(f"[STT] disabled: {_e}") _STT_OK = False # ========= GPU / OCR / YOLO 로드 ========= def torch_cuda_ok(): try: import torch ok = bool(torch.cuda.is_available()) print(f"[GPU] torch CUDA available: {ok}") return ok except Exception as e: print(f"[GPU] torch check failed: {e}") return False gpu_ok = torch_cuda_ok() OCR_ENGINE=None; easy_reader=None import easyocr try: easy_reader = easyocr.Reader(['ko'], gpu=gpu_ok, model_storage_directory='models', user_network_directory='user_network', recog_network='best_accuracy1', download_enabled=False) OCR_ENGINE = 'easyocr_gpu' if gpu_ok else 'easyocr_cpu' print(f"[OCR] EasyOCR (GPU={gpu_ok})") except Exception as e: traceback.print_exc() raise SystemExit("No OCR engine available") # === 보기(상황 설명) 스레드 기동 === def _start_info_thread_once(): if not hasattr(_start_info_thread_once, "_started"): threading.Thread(target=_info_worker, daemon=True).start() _start_info_thread_once._started = True _start_info_thread_once() # ========= YOLO ========= try: from ultralytics import YOLO yolo_device = 0 if gpu_ok else 'cpu' yolo_model = YOLO(YOLO_WEIGHTS) print(f"[YOLO] Loaded: {YOLO_WEIGHTS} (device={yolo_device})") class_names = yolo_model.names if YOLO_CLASS_NAME: inv = {str(v).lower(): int(k) for k, v in class_names.items()} if YOLO_CLASS_NAME.lower() in inv: YOLO_CLASS_ID = inv[YOLO_CLASS_NAME.lower()] except Exception as e: traceback.print_exc() raise SystemExit("[YOLO] 모델 로드 실패. YOLO_WEIGHTS 경로/파일 확인") def _pick_best_tip(cands, last_xy): if not cands: return None if last_xy is None: return max(cands, key=lambda t: t[2]) lx, ly = last_xy def score(t): cx, cy, conf, _ = t d2 = (cx-lx)**2 + (cy-ly)**2 return conf - 0.0005*d2 return max(cands, key=score) # --- YOLO 비동기 워커 --- yolo_in_q=queue.Queue(maxsize=1); yolo_out_q=queue.Queue(maxsize=1); yolo_stop=threading.Event() def _yolo_worker(): while not yolo_stop.is_set(): try: frame = yolo_in_q.get(timeout=0.2) except queue.Empty: continue yolo_in_vis, _r, _off = _yolo_letterbox_bgr(frame, YOLO_IMG_SIZE) res = yolo_model.predict(source=frame, imgsz=YOLO_IMG_SIZE, conf=YOLO_CONF_TH, iou=YOLO_IOU_TH, device=yolo_device, verbose=False) det=None; raw_boxes=[] if res and res[0].boxes is not None and len(res[0].boxes) > 0: cands=[] for b in res[0].boxes: x1,y1,x2,y2 = b.xyxy[0].tolist() conf = float(b.conf[0]) if b.conf is not None else 0.0 cls_id = int(b.cls[0]) if b.cls is not None else 0 raw_boxes.append((x1,y1,x2,y2,conf,cls_id)) if YOLO_CLASS_ID is not None and cls_id != YOLO_CLASS_ID: continue cx, cy = (x1+x2)/2.0, (y1+y2)/2.0 cands.append((cx, cy, conf, (x1, y1, x2-x1, y2-y1))) best=_pick_best_tip(cands, last_finger_xy) if best is not None: cx, cy, conf, (x,y,w,h) = best det={'xy':(int(round(cx)), int(round(cy))), 'box':(int(x), int(y), int(w), int(h)), 'conf':conf, 'ts':time.time(), 'raw_boxes':raw_boxes, 'yolo_in':yolo_in_vis} else: det={'xy':None, 'raw_boxes':[], 'yolo_in':yolo_in_vis} try: while True: yolo_out_q.get_nowait() except queue.Empty: pass try: yolo_out_q.put_nowait(det) except queue.Full: pass threading.Thread(target=_yolo_worker, daemon=True).start() def _yolo_letterbox_bgr(img, new_size=YOLO_IMG_SIZE, pad_val=114): h, w = img.shape[:2] r = min(new_size / float(h), new_size / float(w)) new_w, new_h = int(round(w*r)), int(round(h*r)) resized = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR) dw = (new_size - new_w) / 2.0; dh = (new_size - new_h) / 2.0 left, right = int(math.floor(dw)), int(math.ceil(dw)) top, bottom = int(math.floor(dh)), int(math.ceil(dh)) out = cv2.copyMakeBorder(resized, top, bottom, left, right, borderType=cv2.BORDER_CONSTANT, value=(pad_val, pad_val, pad_val)) return out, r, (left, top) # ========= TTS ========= # tts_q=queue.Queue(maxsize=TTS_QUEUE_MAX) 삭제(8.20) # tts_is_speaking=threading.Event() 삭제(8.20) tts_stop=threading.Event() tts_target_lock=threading.Lock() tts_target_text=None _last_spoken_enqueue_ts=0.0 # tts_last_done_ts=0.0 삭제(8.20) tts_last_seen_target_ts=0.0 SPELLFIX_ENABLE=True JAMO_THRESHOLD=85; JAMO_THRESHOLD_LOWCONF=80 # 수정4: 딕셔너리 확장 (서연 세탁기 포함하도록) #"동작","일시정지", DICT_WORDS=["통살균","원격제어","예약","내마음","세탁","헹굼","탈수","물온도","빨래추가","알림음","구김방지","터보샷", "강력물살","온수세탁","냉수세탁","물높이","코스","동작/일시정지","전원"] CANON={"표준세탁":"세탁","손세탁":"세탁"} def _to_jamo(s: str) -> str: try: return j2hcj(h2j(s)) except Exception: return s or "" if _SPELLFIX_OK: _DICT_JAMO=[_to_jamo(w) for w in DICT_WORDS] else: _DICT_JAMO=[] _TOKENIZER=re.compile(r"[가-힣A-Za-z0-9]+|[^\s가-힣A-Za-z0-9]") def correct_token(tok: str, threshold: int): if not (_SPELLFIX_OK and SPELLFIX_ENABLE and _DICT_JAMO and tok): return tok, 0.0 q=_to_jamo(tok) res=process.extractOne(q, _DICT_JAMO, scorer=fuzz.ratio, score_cutoff=threshold) if not res: return tok, 0.0 matched, score, idx=res best=DICT_WORDS[idx]; best=CANON.get(best, best) return best, float(score) def correct_text(text: str, threshold: int): if not (_SPELLFIX_OK and SPELLFIX_ENABLE and _DICT_JAMO and text): return text, False out=[]; changed=False for tok in _TOKENIZER.findall(text): if re.match(r"^[가-힣A-Za-z0-9]+$", tok): fixed, sc = correct_token(tok, threshold=threshold) if fixed!=tok: changed=True out.append(fixed) else: out.append(tok) return "".join(out), changed DICT_SPEAK_ENABLE=True DICT_THRESHOLD=80; DICT_THRESHOLD_LOWCONF=80 def _build_dict_index(words, canon_map): keys=[]; vals=[] for w in words: keys.append(w); vals.append(canon_map.get(w,w)) for alias, canon in canon_map.items(): keys.append(alias); vals.append(canon) keys_j=[_to_jamo(re.sub(r"\s+","",k)) for k in keys] return keys, keys_j, vals _DICT_KEYS, _DICT_KEYS_J, _DICT_VALS=_build_dict_index(DICT_WORDS, CANON) # _DICT_KEYS_PLAIN=[re.sub(r"\s+","",k).casefold() for k in _DICT_KEYS] 삭제(8.20) def _normalize_plain(s:str)->str: return re.sub(r"\s+","",(s or "")).casefold() def map_to_dict_canon(text: str, threshold: int): #사용 if not DICT_SPEAK_ENABLE or not text: return None, 0.0 if _SPELLFIX_OK: queries=[] s=re.sub(r"\s+","",text) if s: queries.append(_to_jamo(s)) for tok in _TOKENIZER.findall(text): if re.match(r"^[가-힣A-Za-z0-9]+$", tok): queries.append(_to_jamo(tok)) best_idx, best_sc=-1, 0.0 for q in queries: res=process.extractOne(q, _DICT_KEYS_J, scorer=fuzz.ratio, score_cutoff=threshold) if res: _, sc, idx=res if sc>best_sc: best_sc=float(sc); best_idx=int(idx) if best_idx>=0: return _DICT_VALS[best_idx], best_sc # 수정6: 사전 매칭 점수로만 결정. 부분집합 때문에 옵션을 읽어버리는 문제(강 -> 강력세탁 매칭) 방지. # q_full=_normalize_plain(text) # q_tokens=[_normalize_plain(tok) for tok in _TOKENIZER.findall(text) if re.match(r"^[가-힣A-Za-z0-9]+$", tok)] # for q in [q_full]+q_tokens: # if not q: continue # for i,k in enumerate(_DICT_KEYS_PLAIN): # if q==k: return _DICT_VALS[i], 100.0 # for q in [q_full]+q_tokens: # if not q: continue # for i,k in enumerate(_DICT_KEYS_PLAIN): # if (k and k in q) or (q and q in k): return _DICT_VALS[i], 90.0 return None, 0.0 def enrich_with_dict(text: str, conf: float): canon, sc = map_to_dict_canon(text, threshold=DICT_MERGE_SCORE) display = canon if canon else text return display, canon, float(sc or 0.0), float(conf or 0.0) def _has_korean(s: str)->bool: return any('가'<=ch<='힣' for ch in (s or "")) # (tts / 안내 스레드 근처 아무 곳에 추가) # def announce_async(text: str): 삭제(8.20) # threading.Thread(target=_say_once, args=(text,), daemon=True).start() # TTS 초기화 try: import tempfile, os try: tts=TTSReader(cooldown_sec=TTS_REPEAT_SEC, speaking_rate=1.05, pitch=0.0, min_len=2, credentials_path=r"yugpae-4f8335e15ba0.json", cache_dir=None, persist_cache=False) except TypeError: tts=TTSReader(cooldown_sec=TTS_REPEAT_SEC, speaking_rate=1.05, pitch=0.0, min_len=2, credentials_path=r"yugpae-4f8335e15ba0.json") for attr in ("set_cache","disable_cache"): if hasattr(tts, attr): try: getattr(tts, attr)(persist=False, dir=None) except Exception: pass try: if not (hasattr(tts,"cache_dir") and getattr(tts,"cache_dir") is None): tmp_cache=os.path.join(tempfile.gettempdir(),"tts_runtime_cache") os.makedirs(tmp_cache, exist_ok=True) if hasattr(tts,"cache_dir"): tts.cache_dir=tmp_cache except Exception: pass except Exception as e: print(f"[TTS] init failed: {e}") TTS_ENABLE=False tts=None try: import pygame if not pygame.mixer.get_init(): pygame.mixer.init() pygame.mixer.music.set_volume(1.0) except Exception: pass def tts_scheduler(): global _last_spoken_enqueue_ts, tts_last_spoken_text, no_repeat_until_ts #tts_last_done_ts 삭제(8.20) last_sent_text = None while not tts_stop.is_set(): time.sleep(0.05) if not TTS_ENABLE or tts is None: continue with tts_target_lock: tgt = (tts_target_text or "").strip() # 타겟이 없으면 아무것도 하지 않고 넘김 (중단/정지 금지) if not tgt: last_sent_text = None continue now = time.time() # 지금 말하는 중이면 일반 TTS는 절대 선점/중단하지 않음 if _is_speaker_busy(): continue # 같은 문장을 너무 자주 반복하지 않음 if tgt == last_sent_text and now < no_repeat_until_ts: continue # 재생 (모드 전환이 아닌 한 clear_queue/stop/flush 절대 금지) try: tts.say(tgt) tts_last_spoken_text = tgt _last_spoken_enqueue_ts = now #tts_last_done_ts = now 삭제(8.20) last_sent_text = tgt except Exception as e: print(f"[TTS] error: {e}") if TTS_ENABLE: threading.Thread(target=tts_scheduler, daemon=True).start() # def set_tts_target(text_or_none, note: str=""): # global tts_target_text, tts_current_display, tts_current_note # # 일반 TTS는 오직 타겟만 갱신. 여기서 재생을 중단/선점하지 않음. # with tts_target_lock: # tts_target_text = text_or_none # tts_current_display = (text_or_none or "").strip() # tts_current_note = note or "" last_text="" #선점 발화를 위한 마지막 text 기록 (8.21) def set_tts_target(text_or_none, note: str="", # 선점 발화를 위한 force 추가 (8.21) *, force: bool=False): """TTS 타겟 갱신. - force=True : 지금 재생 중단(큐 비우고 stop/cancel/flush) 후 새 타겟 적용 - bypass_repeat=True: 같은 문장 반복 억제 타이머 무시(바로 재발화) - speak_now=True : 스케줄러 기다리지 않고 즉시 say() 실행 """ global tts_target_text, tts_current_display, tts_current_note global no_repeat_until_ts, _last_spoken_enqueue_ts global last_text # 1) 타겟 갱신 with tts_target_lock: tts_target_text = text_or_none tts_current_display = (text_or_none or "").strip() tts_current_note = note or "" # 2) 선점 옵션 if force and ("tts" in globals()) and (tts is not None) and last_text != text_or_none: last_text=text_or_none try: if hasattr(tts, "clear_queue"): tts.clear_queue() for m in ("stop","cancel","flush"): if hasattr(tts, m): try: getattr(tts, m)() except Exception: pass except Exception: pass _last_spoken_enqueue_ts = 0.0 # 스케줄러와 동기화 # ========= Camera ========= cap = cv2.VideoCapture(CAMERA_ID, cv2.CAP_DSHOW) if cv2.getBuildInformation().find('Windows')!=-1 else cv2.VideoCapture(CAMERA_ID) if not cap.isOpened(): raise SystemExit("카메라 열기 실패") cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAPTURE_TARGET_W) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAPTURE_TARGET_H) cap.set(cv2.CAP_PROP_FPS, 30) try: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) except: pass time.sleep(0.15) Wc=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); Hc=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"[Camera] requested ~{CAPTURE_TARGET_W}x{CAPTURE_TARGET_H}, actual {Wc}x{Hc}") WORK_SCALE=min(1.0, WORK_WIDTH_TARGET/float(Wc)) print(f"[Work] WORK_SCALE={WORK_SCALE:.3f} (work width ~{int(Wc*WORK_SCALE)})") # ========= State ========= cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL) frame_idx=0 prev_gray_s=None; prev_pts=None overlays=[]; last_prune=time.time() # OCR 스케줄 last_ocr_time=0.0 last_roi=None # ORB orb=None; bf=None if USE_ORB_FALLBACK: orb=cv2.ORB_create(nfeatures=ORB_NFEATURES) bf=cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) # ========= Utils ========= _next_oid=1 def alloc_oid(): global _next_oid oid=_next_oid; _next_oid+=1; return oid def clamp_rect(x,y,w,h,W,H): x=max(0,min(x,W-1)); y=max(0,min(y,H-1)) w=max(1,min(w,W-x)); h=max(1,min(h,H-y)) return x,y,w,h def poly_center(poly): return np.mean(poly,axis=0) def bbox_of_poly(poly): x1=float(np.min(poly[:,0])); y1=float(np.min(poly[:,1])) x2=float(np.max(poly[:,0])); y2=float(np.max(poly[:,1])) return (x1,y1,x2-x1,y2-y1) def variance_of_laplacian(g): return cv2.Laplacian(g, cv2.CV_64F).var() def rect_contains(outer, inner, tol=2.0): ox, oy, ow, oh = outer ix, iy, iw, ih = inner return (ix >= ox - tol) and (iy >= oy - tol) and \ (ix + iw <= ox + ow + tol) and (iy + ih <= oy + oh + tol) def _canon_equal(a: str, b: str) -> bool: a = (a or "").strip(); b = (b or "").strip() if not a or not b: return False try: return _normalize_plain(a) == _normalize_plain(b) except Exception: import re aa = re.sub(r"\s+","",a).casefold() bb = re.sub(r"\s+","",b).casefold() return aa == bb def iou(a,b): ax,ay,aw,ah=a; bx,by,bw,bh=b ax2,ay2=ax+aw,ay+ah; bx2,by2=bx+bw,by+bh ix1,iy1=max(ax,bx),max(ay,by) ix2,iy2=min(ax2,bx2),min(ay2,by2) iw,ih=max(0,ix2-ix1),max(0,iy2-iy1) inter=iw*ih; union=aw*ah+bw*bh-inter+1e-9 return inter/union # def expand_rect(x,y,w,h,pad,W,H): 삭제(8.20) # x2=x-pad; y2=y-pad; w2=w+2*pad; h2=h+2*pad # return clamp_rect(x2,y2,w2,h2,W,H) def is_visible_in_view(poly, W, H, min_overlap=0.7): x, y, w, h = bbox_of_poly(poly) x1, y1, x2, y2 = x, y, x+w, y+h vx1, vy1, vx2, vy2 = 0, 0, W, H ix1, iy1 = max(x1, vx1), max(y1, vy1) ix2, iy2 = min(x2, vx2), min(y2, vy2) iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) inter = iw * ih; area = max(1.0, w * h) return (inter / area) >= min_overlap def draw_overlays(frame, items, now_ts): H, W = frame.shape[:2] to_draw=[] for it in items: if is_visible_in_view(it['poly'], W, H, min_overlap=0.7): it['last_seen']=now_ts to_draw.append(it) to_draw=to_draw[:MAX_TEXT_DRAW] for it in to_draw: cv2.polylines(frame, [it['poly'].astype(int)], True, (255,165,0), 2, cv2.LINE_AA) img_rgb=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil=Image.fromarray(img_rgb); draw=ImageDraw.Draw(pil) font_path=None for p in [r"C:\Windows\Fonts\malgun.ttf", r"C:\Windows\Fonts\NanumGothic.ttf", r"C:\Windows\Fonts\NotoSansCJKkr-Regular.otf", "/usr/share/fonts/truetype/noto/NotoSansCJKkr-Regular.ttc"]: if os.path.isfile(p): font_path=p; break font=ImageFont.truetype(font_path, 22) if font_path else ImageFont.load_default() for it in to_draw: poly=it['poly'].astype(int) x=int(np.min(poly[:,0])); y=int(np.min(poly[:,1]))-6 draw.text((x, max(0,y)), f"{it['text']} ({it['conf']:.2f})", font=font, fill=(255,255,255), stroke_width=2, stroke_fill=(0,0,0)) frame[:]=cv2.cvtColor(np.array(pil), cv2.COLOR_RGB2BGR) def prune_overlays(items, now, active_roi=None): def center_in_roi(c, roi): if roi is None: return False rx,ry,rw,rh = roi return (rx<=c[0]<=rx+rw) and (ry<=c[1]<=ry+rh) kept=[] for it in items: pinned = (now <= it.get('pin_until', 0.0)) if pinned: kept.append(it); continue birth = it.get('time', now) alive_by_ttl = (now <= it.get('expiry', 0.0)) if IGNORE_HARD_CAP_WHILE_FINGER_IN_ROI and active_roi is not None: c = poly_center(it['poly']) if center_in_roi(c, active_roi): if alive_by_ttl: kept.append(it) continue under_hard_cap = ((now - birth) <= HARD_MAX_LIFETIME) if alive_by_ttl and under_hard_cap: kept.append(it) if len(kept) > MAX_OVERLAYS: kept = sorted( kept, key=lambda d: max(d.get('expiry', 0.0), d.get('pin_until', 0.0)), reverse=True )[:MAX_OVERLAYS] return kept # ========= SIM helpers ========= def closest_rotation(A): U, _, Vt = np.linalg.svd(A); R = U @ Vt if np.linalg.det(R) < 0: Vt[-1,:]*=-1; R = U @ Vt return R def project_to_similarity(M): #사용 A=M[:,:2]; R=closest_rotation(A) s=float(np.trace(A.T@R)/2.0); t=M[:,2].reshape(2) return s, R, t def angle_from_R(R): return math.atan2(R[1,0], R[0,0]) #사용 def build_similarity(s, theta): c, n = math.cos(theta), math.sin(theta) A=np.array([[c,-n],[n,c]], dtype=np.float32)*float(s) return A SIM_HIST_MAX=240 sim_steps=[] def _rect_aabb_after_M(rect, M3, W, H): x,y,w,h=rect corners=np.array([[x,y],[x+w,y],[x+w,y+h],[x,y+h]], dtype=np.float32) tc=(corners @ M3[:2,:2].T)+M3[:2,2] minx,miny=float(np.min(tc[:,0])), float(np.min(tc[:,1])) maxx,maxy=float(np.max(tc[:,0])), float(np.max(tc[:,1])) rx=int(max(0,minx)); ry=int(max(0,miny)) rh=int(max(1,min(H-1,maxy)-ry)); rw=int(max(1,min(W-1,maxx)-rx)) #약간의 오류 수정 (8.21) return (rx,ry,rw,rh) def estimate_similarity_small(prev_gray_s, gray_s, prev_pts): #사용 if prev_pts is None or len(prev_pts) < 140: prev_pts=cv2.goodFeaturesToTrack(prev_gray_s, maxCorners=FLOW_MAX_CORNERS, qualityLevel=FLOW_QUALITY, minDistance=FLOW_MIN_DISTANCE, blockSize=7) if prev_pts is None: return None, None next_pts, st, err=cv2.calcOpticalFlowPyrLK(prev_gray_s, gray_s, prev_pts, None, winSize=FLOW_WINSIZE, maxLevel=FLOW_LEVELS, criteria=(cv2.TERM_CRITERIA_EPS|cv2.TERM_CRITERIA_COUNT,12,0.03)) if next_pts is None: return None, None P=prev_pts[st==1].reshape(-1,1,2); Q=next_pts[st==1].reshape(-1,1,2) if len(P) < 60: return None, None M,_=cv2.estimateAffinePartial2D(P,Q,method=cv2.RANSAC, ransacReprojThreshold=3.0, maxIters=1500, confidence=0.99) if M is None: return None, None return M, next_pts def transform_overlays_similarity(items, s, theta, t_s): #사용 tx=float(t_s[0])/FLOW_DS; ty=float(t_s[1])/FLOW_DS step_mag=math.hypot(tx,ty) if step_mag>MAX_TRANS_PX: scale=MAX_TRANS_PX/(step_mag+1e-6) tx*=scale; ty*=scale A=build_similarity(s, theta).astype(np.float32) for it in items: pts=it['poly'].astype(np.float32) it['poly']=(pts@A.T)+np.array([tx,ty], dtype=np.float32) def orb_similarity(prev_g, cur_g): kp1, des1 = orb.detectAndCompute(prev_g, None) kp2, des2 = orb.detectAndCompute(cur_g, None) if des1 is None or des2 is None or len(kp1)<8 or len(kp2)<8: return None matches=bf.knnMatch(des1, des2, k=2) good=[] for mn in matches: if len(mn)==2: m,n=mn if m.distance < 0.75*n.distance: good.append(m) if len(good) < ORB_MIN_GOOD: return None src=np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1,1,2) dst=np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1,1,2) M,_=cv2.estimateAffinePartial2D(src,dst,method=cv2.RANSAC, ransacReprojThreshold=3.0,maxIters=1500,confidence=0.99) return M # ========= KLT ========= def _build_gray_for_klt(gray): #사용 g=gray if KLT_USE_CLAHE: clahe=cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) g=clahe.apply(g) gx=cv2.Sobel(g, cv2.CV_32F, 1, 0, ksize=3) gy=cv2.Sobel(g, cv2.CV_32F, 0, 1, ksize=3) mag=cv2.magnitude(gx,gy) if mag.max()>0: mag=(mag/mag.max())*255.0 return mag.astype(np.uint8) def _klt_seed_ring(center, n=KLT_N_SAMPLES, r=KLT_RING_R): #사용 cx, cy = float(center[0]), float(center[1]) pts=[(cx,cy)] for k in range(n): a=2.0*math.pi*k/float(n) pts.append((cx+r*math.cos(a), cy+r*math.sin(a))) return np.array(pts, dtype=np.float32).reshape(-1,1,2) def _in_bounds(pt, W, H, margin=0): x,y=float(pt[0]), float(pt[1]) return (-margin<=x<=(W-1+margin)) and (-margin<=y<=(H-1+margin)) def klt_track_multi(prev_gray, cur_gray, prev_pts, W, H): #사용 if prev_gray is None or cur_gray is None or prev_pts is None or len(prev_pts)==0: return None, None p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, cur_gray, prev_pts, None, winSize=KLT_WIN, maxLevel=KLT_LEVELS, criteria=KLT_TERM) if p1 is None: return None, None p0r, st2, err2 = cv2.calcOpticalFlowPyrLK(cur_gray, prev_gray, p1, None, winSize=KLT_WIN, maxLevel=KLT_LEVELS, criteria=KLT_TERM) good=[] for i in range(len(prev_pts)): if st[i]==1 and st2[i]==1: fb=float(np.linalg.norm(prev_pts[i,0]-p0r[i,0])) e=float(err[i][0]) if err is not None else 0.0 step=float(np.linalg.norm(p1[i,0]-prev_pts[i,0])) if fb<=KLT_FB_MAX and e<=KLT_ERR_MAX and step<=KLT_STEP_MAX and _in_bounds(p1[i,0], W, H, KLT_OUT_MARGIN):#KTL_OUT_MARGIN 단순화 (8.20) good.append(p1[i,0]) if len(good)= iou_th) or (d <= center_dist_th) # ② 같은 글자 + 포함관계면 매칭으로 간주(작은 박스가 큰 박스 안에 있는 경우 등) text_same = _canon_equal(it.get('canon_text') or it.get('text'), canon_new or disp_new) contained = rect_contains(box_old, box_new) or rect_contains(box_new, box_old) text_same_contained = text_same and contained if not (loc_match or text_same_contained): continue # 베스트 선택(우선 IoU, 다음 거리) if (i > best_iou) or (abs(i - best_iou) < 1e-6 and d < best_d): best_iou, best_d, best_idx = i, d, idx if best_idx >= 0: it = items[best_idx] # 우선순위: (사전 일치 점수) > (conf) csc_old = float(it.get('canon_score', 0.0)) conf_old = float(it.get('conf', 0.0)) replace = False if csc_new >= DICT_MERGE_SCORE and csc_old < DICT_MERGE_SCORE: replace = True elif csc_new >= DICT_MERGE_SCORE and csc_old >= DICT_MERGE_SCORE: if csc_new > csc_old + DICT_TIE_DELTA: replace = True elif abs(csc_new - csc_old) <= DICT_TIE_DELTA and conf_new > conf_old: replace = True else: if conf_new > conf_old and csc_new > csc_old: #신뢰도가 더 높을 경우만 대체 (8.20) replace = True #상대비교 기반으로 대체하면 어떨지 고민 if replace: it['poly'] = poly_new it['ocr_text'] = raw_txt it['text'] = disp_new it['canon_text'] = canon_new it['canon_score'] = csc_new it['conf'] = conf_new it['expiry'] = now_ts + BASE_TTL #연장시간 코드 중첩 삭제 (8.20) used_old.add(best_idx) else: if DICT_ONLY and (disp_new is None or disp_new not in DICT_WORDS): #사전 단어만 표기 (8.21) continue items.append({ 'poly': poly_new, 'ocr_text': raw_txt, 'text': disp_new, 'canon_text': canon_new, 'canon_score': csc_new, 'conf': conf_new, 'time': now_ts, 'last_seen': now_ts, 'expiry': now_ts + BASE_TTL, 'pin_until': 0.0, 'id': alloc_oid() }) # ROI 안에 있던 기존 항목들의 여유시간(keepalive) 연장 # 메인루프에서 연장되므로 시간 연장 삭제(8.20) # prune_overlays에서 사용하는것과 겹침 삭제(8.20) # 만약 바운딩 박스가 많아지면 여기서 prune 한번 진행 필요 return items def dedupe_same_text_overlays(items, iou_th=0.55, center_dist_th=26.0): """동일/유사 텍스트(사전 정규화 기준) 중복 박스 제거. - 같은 텍스트로 간주되는 박스가 서로 많이 겹치거나 가깝거나 한쪽이 다른쪽을 '포함'하면 하나만 남김 - 우선순위: (1) 사전 일치 점수 높음 → (2) 동률이면 conf 높은 것 """ def _canon_key(it): t = (it.get('canon_text') or it.get('text') or '').strip() try: return _normalize_plain(t) except Exception: import re as _re return _re.sub(r"\s+","",t).casefold() def _rect(it): return bbox_of_poly(it['poly']) def _score(it): csc = float(it.get('canon_score', 0.0)) conf = float(it.get('conf', 0.0)) return ((1 if csc >= DICT_MERGE_SCORE else 0), csc, conf) groups = {} for it in items: key = _canon_key(it) if not key: # 빈 문자열 제외 continue groups.setdefault(key, []).append(it) keep = set() drop = set() for key, arr in groups.items(): arr_sorted = sorted(arr, key=_score, reverse=True) for i, a in enumerate(arr_sorted): if id(a) in drop or id(a) in keep: continue keep.add(id(a)) ax, ay, aw, ah = _rect(a) acx, acy = ax+aw*0.5, ay+ah*0.5 for b in arr_sorted[i+1:]: if id(b) in drop or id(b) in keep: continue bx, by, bw, bh = _rect(b) bcx, bcy = bx+bw*0.5, by+bh*0.5 ov = iou((ax,ay,aw,ah), (bx,by,bw,bh)) d = ((acx-bcx)**2 + (acy-bcy)**2)**0.5 contained = rect_contains((ax,ay,aw,ah), (bx,by,bw,bh)) or rect_contains((bx,by,bw,bh), (ax,ay,aw,ah)) if contained or (ov >= iou_th) or (d <= center_dist_th): drop.add(id(b)) if not drop: return items return [it for it in items if id(it) not in drop] # ===== GUIDE MODE 유틸 ===== def _overlay_center(it): P = it['poly'] x1, y1 = float(np.min(P[:,0])), float(np.min(P[:,1])) x2, y2 = float(np.max(P[:,0])), float(np.max(P[:,1])) return (0.5*(x1+x2), 0.5*(y1+y2)) def _choose_target_overlay(target_canon: str, overlays, finger_xy=None): cands = [] t = (target_canon or "").strip() if not t: return None for it in overlays: ct = (it.get('canon_text') or "").strip() tx = (it.get('text') or "").strip() ok = (ct == t) or (tx == t) or (t in tx) if ok: cx, cy = _overlay_center(it) d = 0.0 if finger_xy is not None: d = math.hypot(cx - (finger_xy[0]), cy - (finger_xy[1])) canon_bonus = 1.0 if (ct == t) else 0.0 cands.append((canon_bonus, float(it.get('conf',0.0)), -d, it)) if not cands: return None cands.sort(reverse=True) return cands[0][3] def _dir_sentence(dx, dy): def q(px): a = abs(int(round(px))) if a < 30: lvl = "조금" elif a < 90: lvl = "약간" elif a < 180: lvl = "보통" else: lvl = "많이" return lvl, a msg = [] if dx > 0: lvl, a = q(dx); msg.append(f"오른쪽으로 {a}픽셀({lvl})") elif dx < 0: lvl, a = q(dx); msg.append(f"왼쪽으로 {a}픽셀({lvl})") if dy > 0: lvl, a = q(dy); msg.append(f"아래로 {a}픽셀({lvl})") elif dy < 0: lvl, a = q(dy); msg.append(f"위로 {a}픽셀({lvl})") return " , ".join(msg) if msg else "그대로 유지" def set_guide_target_from_text(text: str): global GUIDE_TARGET, GUIDE_TARGET_ITEM if not text: GUIDE_TARGET = None GUIDE_TARGET_ITEM = None set_tts_target("목표가 비었습니다.", note="guide") return False canon, sc = map_to_dict_canon(text, threshold=DICT_THRESHOLD) if not canon: set_tts_target(f"'{text}'는 사전에 없습니다.", note="guide no-dict") return False GUIDE_TARGET = canon GUIDE_TARGET_ITEM = None set_tts_target(f"목표 '{canon}' 안내를 시작합니다.", note="guide") return True def guide_tick(now_ts, finger_xy, overlays): global GUIDE_LAST_TS, GUIDE_LAST_SENT, GUIDE_TARGET_ITEM, GUIDE_TARGET if not GUIDE_MODE or not GUIDE_TARGET: return if GUIDE_REQUIRE_FINGER and finger_xy is None: if now_ts - GUIDE_LAST_TS >= GUIDE_REPEAT_SEC: set_tts_target("손가락을 화면에 올려 주세요.", note="guide") GUIDE_LAST_TS = now_ts GUIDE_LAST_SENT = "ask_finger" return tgt = _choose_target_overlay(GUIDE_TARGET, overlays, finger_xy) GUIDE_TARGET_ITEM = tgt if tgt is None: if now_ts - GUIDE_LAST_TS >= 2.0: set_tts_target(f"화면에서 '{GUIDE_TARGET}'을 찾지 못했습니다.", note="guide") GUIDE_LAST_TS = now_ts GUIDE_LAST_SENT = "not_found" return cx, cy = _overlay_center(tgt) if finger_xy is None: if now_ts - GUIDE_LAST_TS >= GUIDE_REPEAT_SEC: set_tts_target(f"목표 '{GUIDE_TARGET}'이 화면에 있습니다. 손가락을 이동해 주세요.", note="guide") GUIDE_LAST_TS = now_ts GUIDE_LAST_SENT = "where_only" return dx = int(round(cx - finger_xy[0])) dy = int(round(cy - finger_xy[1])) dist = math.hypot(dx, dy) if dist <= GUIDE_TOL_PX: if GUIDE_LAST_SENT != "arrived": set_tts_target(f"도착. '{GUIDE_TARGET}' 입니다.", note="guide ok") GUIDE_LAST_SENT = "arrived" GUIDE_LAST_TS = now_ts return if (now_ts - GUIDE_LAST_TS) >= GUIDE_REPEAT_SEC: msg = _dir_sentence(dx, dy) set_tts_target(f"{msg}", note=f"guide d={int(dist)}") GUIDE_LAST_SENT = msg GUIDE_LAST_TS = now_ts def highlight_guide_target(frame_bgr, item): if item is None: return poly = item['poly'].astype(int) cv2.polylines(frame_bgr, [poly], True, (0,0,255), 3, cv2.LINE_AA) cx, cy = map(int, _overlay_center(item)) cv2.circle(frame_bgr, (cx,cy), 6, (0,0,255), -1) def stt_listen_once(timeout=4, phrase_time_limit=4): if not (USE_STT and _STT_OK): set_tts_target("음성 인식이 비활성화되어 있습니다.", note="stt off") return None try: r = sr.Recognizer() with sr.Microphone() as source: set_tts_target("목표 단어를 말씀해 주세요.", note="stt") if hasattr(r, "adjust_for_ambient_noise"): r.adjust_for_ambient_noise(source, duration=0.5) audio = r.listen(source, timeout=timeout, phrase_time_limit=phrase_time_limit) try: text = r.recognize_google(audio, language="ko-KR") except Exception: text = r.recognize_google(audio, language="ko-KR") return text except Exception as e: print(f"[STT] error: {e}") set_tts_target("음성 인식에 실패했습니다.", note="stt err") return None # ===== OCR worker ===== task_q=queue.Queue(maxsize=1) result_q=queue.Queue(maxsize=2) def enhance_for_ocr(bgr): if ENHANCE_MODE=="off": return bgr if ENHANCE_MODE=="fast": blur=cv2.GaussianBlur(bgr,(0,0),0.8) return cv2.addWeighted(bgr, 1.6, blur, -0.6, 0) img=bgr.copy() img=cv2.bilateralFilter(img, d=0, sigmaColor=45, sigmaSpace=12) lab=cv2.cvtColor(img, cv2.COLOR_BGR2LAB); L,A,B=cv2.split(lab) clahe=cv2.createCLAHE(clipLimit=1.6, tileGridSize=(8,8)); L=clahe.apply(L) img=cv2.cvtColor(cv2.merge([L,A,B]), cv2.COLOR_LAB2BGR) blur=cv2.GaussianBlur(img,(0,0),0.9) return cv2.addWeighted(img, 1.8, blur, -0.8, 0) def prep_fixed(roi_bgr): h,w=roi_bgr.shape[:2] long_side=max(h,w); scale=min(1.0, float(MAX_OCR_LONG)/float(long_side)) proc=cv2.resize(roi_bgr, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA) if scale<1.0 else roi_bgr proc=enhance_for_ocr(proc) sx_pre=proc.shape[1]/float(w); sy_pre=proc.shape[0]/float(h) return proc, sx_pre, sy_pre def run_ocr_rect(frame_work, rect_work, mask_boxes=None): x,y,w,h=rect_work src=frame_work[y:y+h, x:x+w].copy() base_off=(x,y) proc,sx_pre,sy_pre=prep_fixed(src) items=[] r1=easy_reader.readtext(proc, detail=1, decoder='greedy', rotation_info=[0,180], contrast_ths=0.05, adjust_contrast=0.7, text_threshold=0.6, low_text=0.3, link_threshold=0.4, canvas_size=1920, mag_ratio=1.3, paragraph=False, min_size=2) for (bbox_points,text,prob) in r1: poly=np.array(bbox_points,dtype=np.float32) poly[:,0]=poly[:,0]/sx_pre + base_off[0] poly[:,1]=poly[:,1]/sy_pre + base_off[1] bx,by,bw,bh=bbox_of_poly(poly) if bw*bh>=120: items.append({'poly':poly,'text':text,'conf':float(prob)}) return items def ocr_worker(): while True: item=task_q.get() if item is None: break t0=time.time() out=[] for rect_work in item['rects']: out.extend(run_ocr_rect(item['frame_work'], rect_work, mask_boxes=None)) dt_ms=(time.time()-t0)*1000.0 result_q.put({ 'roi': item['roi'], 'new_items': out, 'dt_ms': dt_ms, 'frame_idx': item['frame_idx'], }) threading.Thread(target=ocr_worker, daemon=True).start() def drain_queue(q): try: while True: q.get_nowait() except queue.Empty: pass # === 폴백 요약기 === 삭제 (8.20) # def _fallback_summarize(frame_bgr): # try: # r = easy_reader.readtext(frame_bgr, detail=1) # tokens = [re.sub(r"[^가-힣0-9A-Za-z]", "", t).strip() for (_b,t,_c) in r] # tokens = [t for t in tokens if t] # if not tokens: # return "눈에 띄는 텍스트가 없습니다." # top = ", ".join(tokens[:5]) # return f"화면에서 텍스트가 보입니다: {top}" # except Exception: # return "장면을 요약할 수 없습니다." # ===== Main loop ===== print("실시간 시작. 'q' 종료 / 'o' OCR ON/OFF / 't' HUD / 's' TTS / 'y' YOLO 입력 / 'p' YOLO PNG 저장") print("모드 전환: '1' 조작 모드 / '2' 보기 모드(상황 설명)")# / '3' 안내 모드(목표로 이동 안내)") # <<< CHANGED print("ROI 조절: '[' 너비-, ']' 너비+, ';' 높이-, \"'\" 높이+ / 'r' 기본값 복원") #print("GUIDE: '3' 안내 모드 / 'v' 음성으로 목표 지정 / 'f' 문자 입력 / 'c' 목표 취소") s_ema=1.0; theta_ema=0.0; tx_ema=0.0; ty_ema=0.0 prev_gray_full=None; prev_gray_klt=None prev_gray_s=None; prev_pts=None # <실험2> 손가락 없을 때 TTS 잘못 안내 # [CASE 1] KLT OFF & FINGER_STALE_MS 800 -> 2000 (손가락 잘 안 따라올 수 있음) # [CASE 2] KLT ON & (1초 내내 KLT만 썼으면 finger_is_fresh = False) # [CASE 3] YOLO 연속 n번 해야 KLT ON # 수정7: KLT 단독 추적 시작 시간 기록 (위 상황의 CASE2에 해당) klt_only_start_ts = 0.0 finger_src="NONE"; yolo_last_conf=None; klt_draw_pts=None; yolo_box_count=None; yolo_last_in=None while True: ret, frame_cap = cap.read() if not ret: break frame_work = frame_cap if WORK_SCALE==1.0 else cv2.resize(frame_cap, None, fx=WORK_SCALE, fy=WORK_SCALE, interpolation=cv2.INTER_AREA) H,W = frame_work.shape[:2] frame_for_ocr=frame_work.copy() frame_disp=frame_work.copy() # 안내용 최신 프레임 공유 with _latest_frame_lock: _latest_frame_for_info = frame_work.copy() gray=cv2.cvtColor(frame_work, cv2.COLOR_BGR2GRAY) gray_klt=_build_gray_for_klt(gray) if USE_KLT_FALLBACK else gray # ---- Global SIM ---- gray_s=cv2.resize(gray, None, fx=FLOW_DS, fy=FLOW_DS, interpolation=cv2.INTER_AREA) M_s=None; did_motion=False if prev_gray_s is not None: reseed=((frame_idx % RESEED_INTERVAL_FRAMES)==0) if reseed: prev_pts=None M_s, next_pts = estimate_similarity_small(prev_gray_s, gray_s, prev_pts) prev_pts=next_pts prev_gray_s=gray_s if M_s is not None: s_step,R_step,t_step_s=project_to_similarity(M_s) s_step=max(1.0-MAX_SCALE_STEP, min(1.0+MAX_SCALE_STEP, s_step)) theta_step=angle_from_R(R_step) theta_step=max(-math.radians(MAX_ROT_STEP_DEG), min(math.radians(MAX_ROT_STEP_DEG), theta_step)) s_ema=(1-EMA_ALPHA_SIM)*s_ema+EMA_ALPHA_SIM*s_step theta_ema=(theta_ema+((theta_step-theta_ema+math.pi)%(2*math.pi)-math.pi)*EMA_ALPHA_SIM) tx_ema=(1-EMA_ALPHA_SIM)*tx_ema+EMA_ALPHA_SIM*float(t_step_s[0]) ty_ema=(1-EMA_ALPHA_SIM)*ty_ema+EMA_ALPHA_SIM*float(t_step_s[1]) transform_overlays_similarity(overlays, s_ema, theta_ema, (tx_ema,ty_ema)) did_motion=True else: if USE_ORB_FALLBACK and (prev_gray_full is not None): M2=orb_similarity(prev_gray_full, gray) if M2 is not None: s2,R2,t2s=project_to_similarity(M2) s2=max(1.0-MAX_SCALE_STEP, min(1.0+MAX_SCALE_STEP, s2)) theta2=angle_from_R(R2) theta2=max(-math.radians(MAX_ROT_STEP_DEG), min(math.radians(MAX_ROT_STEP_DEG), theta2)) s_ema=(1-EMA_ALPHA_SIM)*s_ema+EMA_ALPHA_SIM*s2 theta_ema=(theta_ema+((theta2-theta_ema+math.pi)%(2*math.pi)-math.pi)*EMA_ALPHA_SIM) tx_ema=(1-EMA_ALPHA_SIM)*tx_ema+EMA_ALPHA_SIM*float(t2s[0])*FLOW_DS ty_ema=(1-EMA_ALPHA_SIM)*ty_ema+EMA_ALPHA_SIM*float(t2s[1])*FLOW_DS transform_overlays_similarity(overlays, s_ema, theta_ema, (tx_ema,ty_ema)) did_motion=True if did_motion: M2_step=np.array([[math.cos(theta_ema)*s_ema, -math.sin(theta_ema)*s_ema, float(tx_ema)/FLOW_DS], [math.sin(theta_ema)*s_ema, math.cos(theta_ema)*s_ema, float(ty_ema)/FLOW_DS]], dtype=np.float32) else: M2_step=np.array([[1,0,0],[0,1,0]], dtype=np.float32) if frame_idx>0: sim_steps.append((frame_idx-1, frame_idx, np.vstack([M2_step, [0,0,1]]).astype(np.float32))) if len(sim_steps)>SIM_HIST_MAX: sim_steps.pop(0) # ---- YOLO finger (ASYNC) ---- now=time.time() with mode_lock: in_op_or_guide = (mode_state == MODE_OP) or (mode_state == MODE_GUIDE) #보기 모드에서 YOLO가 비동기로 실행 안되게 막음 (8.21) if in_op_or_guide: #보기 모드에서 YOLO가 비동기로 실행 안되게 막음(8.21) if yolo_in_q.empty(): try: yolo_in_q.put_nowait(frame_work.copy()) except queue.Full: pass try: det=yolo_out_q.get_nowait() except queue.Empty: det=None else: det=None finger_is_fresh=False finger_src="NONE" yolo_last_conf=None klt_draw_pts=None yolo_box_count=None if isinstance(det, dict): xy=det.get('xy') if xy is not None: fx,fy=int(xy[0]), int(xy[1]) if last_finger_xy is None: filt=np.array([fx,fy], dtype=np.float32) else: filt=(1-EMA_ALPHA_FINGER)*np.array(last_finger_xy,dtype=np.float32)+EMA_ALPHA_FINGER*np.array([fx,fy],dtype=np.float32) last_finger_xy=(int(filt[0]), int(filt[1])) finger_last_seen=now; finger_is_fresh=True; finger_src="YOLO" yolo_last_conf=float(det.get('conf',0.0)) if USE_KLT_FALLBACK: klt_pts_prev=_klt_seed_ring(last_finger_xy) klt_lost_frames=0; #frames_since_reseed=0 삭제(8.20) if isinstance(det, dict) and det.get('yolo_in') is not None: yolo_last_in=det['yolo_in'] if YOLO_SHOW_INPUT: cv2.imshow(YOLO_INPUT_WIN, yolo_last_in) if YOLO_DEBUG: rb=det.get('raw_boxes',[]) yolo_box_count=len(rb) if YOLO_DRAW_ALL: for (x1,y1,x2,y2,conf,cls_id) in rb: cv2.rectangle(frame_disp,(int(x1),int(y1)),(int(x2),int(y2)),(0,200,255),1) cv2.putText(frame_disp,f"{conf:.2f}/{cls_id}",(int(x1),max(0,int(y1)-3)), cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,200,255),1,cv2.LINE_AA) # 수정5: KLT execution if USE_KLT_FALLBACK and not finger_is_fresh and (prev_gray_klt is not None) and (klt_pts_prev is not None): klt_xy, klt_pts_next = klt_track_multi(prev_gray_klt, gray_klt, klt_pts_prev, W, H) if klt_xy is not None: # KLT 추적 성공 last_finger_xy = klt_xy klt_pts_prev = klt_pts_next klt_draw_pts = klt_pts_next # 화면 표시용 finger_last_seen = now finger_is_fresh = True # KLT가 찾았어도 'fresh'로 간주하여 OCR 등 후속 로직 실행 finger_src = "KLT" klt_lost_frames = 0 else: # KLT 추적 실패 klt_lost_frames += 1 if klt_lost_frames > KLT_LOSS_GRACE: klt_pts_prev = None # 추적점이 너무 오래되었으므로 초기화 # # 수정7: KLT 단독 추적 시간제한 KLT_TIMEOUT_SEC = 1.0 if finger_src == "YOLO": klt_only_start_ts = 0.0 # YOLO가 잡았으면 타이머 리셋 elif finger_src == "KLT": if klt_only_start_ts == 0.0: klt_only_start_ts = now # KLT 추적 시작, 타이머 개시 # KLT 추적이 1초 이상 지속되면 포인트 무효화 elif (now - klt_only_start_ts) > KLT_TIMEOUT_SEC: last_finger_xy = None # 손가락 좌표 삭제 finger_is_fresh = False # tts 방지 klt_pts_prev = None # klt execution 방지 klt_only_start_ts = 0.0 # 타이머 리셋 finger_src = "NONE" else: # "NONE" klt_only_start_ts = 0.0 # 아무것도 못 잡았으면 타이머 리셋 # ---- 모드 분기 ---- with mode_lock: mode_now = mode_state # 제거1 --- 중복 키 입력 제거 --- # ---- ROI & OCR + 근접 읽기 (OP 모드에서만) ---- roi=None; protected_boxes=[]; protected_ids=[] if mode_now == MODE_OP and finger_is_fresh and (last_finger_xy is not None): fx, fy = last_finger_xy roi = clamp_rect(int(fx-ROI_W//2), int(fy-ROI_H//2), ROI_W, ROI_H, W, H) last_roi = roi last_roi_active_until = now + ROI_KEEPALIVE_GRACE_SEC rx,ry,rw,rh=roi for it in overlays: c=poly_center(it['poly']) if (rx<=c[0]<=rx+rw) and (ry<=c[1]<=ry+rh): it['expiry']=max(it.get('expiry', now), now + BASE_TTL)# 시간 연장 방식 통일 (8.20) for it in overlays: bx,by,bw,bh=rect_from_poly(it['poly']) if fingertip_overlaps_box((fx,fy),(bx,by,bw,bh)): protected_boxes.append((bx,by,bw,bh)) protected_ids.append(it.get('id')) it['expiry']=max(it.get('expiry', now), now + BASE_TTL) it['pin_until']=now+PIN_GRACE_SEC # 근접 읽기(TTS) - OP 모드에서만 overlap_items=[] for it in overlays: bx,by,bw,bh=rect_from_poly(it['poly']) if fingertip_overlaps_box((fx,fy),(bx,by,bw,bh)): overlap_items.append(it) near=None; bestd=1e9 for it in overlap_items: c=poly_center(it['poly']); d=np.hypot(c[0]-fx, c[1]-fy) if d=TTS_CONF) or (_has_korean(txt) and (conf>=TTS_CONF_FALLBACK)) note="" low_conf=(_has_korean(txt) and conf TTS_TARGET_STICKY_SEC: set_tts_target(None, note="") # ---- OCR 스케줄 (OP 모드에서만) ---- if OCR_ENABLED: want_period=BASE_OCR_PERIOD roi_labels=[it for it in overlays if (roi[0]<=poly_center(it['poly'])[0]<=roi[0]+roi[2] and roi[1]<=poly_center(it['poly'])[1]<=roi[1]+roi[3])] roi_moved_fast=(last_roi is None) or (iou(last_roi, roi) < 0.6) roi_empty=(len(roi_labels)==0) roi_stale=(len(roi_labels)>0 and all((now - it.get('time',now) > STALE_AGE_SEC) or (it.get('conf',0)= want_period and task_q.qsize()==0: gx,gy,gw,gh=roi g_roi=gray[gy:gy+gh, gx:gx+gw] blur_ok=(variance_of_laplacian(g_roi)>=BLUR_VAR_THRESH) or roi_empty avg_step=math.hypot(tx_ema, ty_ema)/max(1e-6, FLOW_DS) if blur_ok and avg_step>MOTION_GATE_PX: blur_ok=False if blur_ok: rects_to_run=[roi] try: task_q.put_nowait({ 'frame_work': frame_for_ocr.copy(), 'rects': rects_to_run, 'roi': roi, 'frame_idx': frame_idx, }) last_ocr_time=now; last_roi=roi except queue.Full: pass elif mode_now == MODE_OP and (last_roi is not None) and (now <= last_roi_active_until): # YOLO가 잠깐 끊겨도 최근 ROI 내부 항목들의 TTL을 유지/초기화 rx,ry,rw,rh = last_roi for it in overlays: c = poly_center(it['poly']) if (rx<=c[0]<=rx+rw) and (ry<=c[1]<=ry+rh): it['expiry'] = max(it.get('expiry', now), now + BASE_TTL) else: # INFO 모드에서는 근접 읽기/ROI OCR 모두 비활성화 if mode_now == MODE_OP: pass # 손가락이 사라진 뒤에도 근접 읽기가 남아 반복되는 것 방지 if mode_now == MODE_OP and not finger_is_fresh: #손가락이 사라진 뒤에도 근접 읽기가 남아 반복되는것을 제거 (8.21) if (time.time() - tts_last_seen_target_ts) > TTS_TARGET_STICKY_SEC: set_tts_target(None, note="") # ---- OCR 결과 병합 (OP 모드에서만) ---- if mode_now == MODE_OP: try: while True: res=result_q.get_nowait() if 'dt_ms' in res: if OCR_EMA is None: OCR_EMA=res['dt_ms'] else: OCR_EMA=(1-OCR_EMA_ALPHA)*OCR_EMA + OCR_EMA_ALPHA*res['dt_ms'] if res.get('new_items'): def _T_from_to(a,b): if b<=a: return np.eye(3,dtype=np.float32) T=np.eye(3,dtype=np.float32) for (src,dst,M3) in sim_steps: if a < dst <= b: T = M3 @ T return T T_cap2now=_T_from_to(res.get('frame_idx',frame_idx), frame_idx) def _apply(poly, M3): P=poly.astype(np.float32) return (P @ M3[:2,:2].T) + M3[:2,2] roi_now=_rect_aabb_after_M(res['roi'], T_cap2now, W, H) new_items=[] for ni in res['new_items']: ni['poly']=_apply(ni['poly'], T_cap2now) bx,by,bw,bh=bbox_of_poly(ni['poly']) cx,cy=bx+bw/2, by+bh/2 gx,gy,gw,gh=roi_now if gx<=cx<=gx+gw and gy<=cy<=gy+gh: new_items.append(ni) overlays=merge_update_overlays(overlays, new_items, roi_now, now_ts=time.time(), iou_th=MERGE_IOU_TH, center_dist_th=MERGE_CENTER_DIST) except queue.Empty: pass # ---- GUIDE MODE tick (OP 모드에서만) ---- if mode_now == MODE_OP: guide_tick(now, last_finger_xy if finger_is_fresh else None, overlays) # ---- Prune & render ---- now2=time.time() if (now2-last_prune) >= PRUNE_TIMEOUT_SEC: #prune 주기 변수화 (8.20) overlays = dedupe_same_text_overlays(overlays) active_roi = None if mode_now == MODE_OP: if finger_is_fresh and roi is not None: active_roi = roi elif (last_roi is not None) and (now2 <= last_roi_active_until): active_roi = last_roi overlays = prune_overlays(overlays, now2, active_roi=active_roi) last_prune=now2 if roi is not None and (mode_now == MODE_OP) and finger_is_fresh and last_finger_xy is not None: cv2.rectangle(frame_disp, (roi[0],roi[1]), (roi[0]+roi[2], roi[1]+roi[3]), (120,120,255), 1) if last_finger_xy is not None: color=(0,255,0) if finger_src=="YOLO" else ((255,0,255) if finger_src=="KLT" else (160,160,160)) cv2.circle(frame_disp, last_finger_xy, 9, color, -1) if finger_src=="KLT" and YOLO_DEBUG and klt_draw_pts is not None: for p in klt_draw_pts[:60]: cv2.circle(frame_disp, (int(p[0,0]), int(p[0,1])), 2, (180,0,180), -1) # 안내 목표 강조(OP 모드에서만) if (mode_now == MODE_OP) and GUIDE_MODE and GUIDE_TARGET_ITEM is not None: highlight_guide_target(frame_disp, GUIDE_TARGET_ITEM) # 오버레이 렌더링(OP 모드에서만) if mode_now == MODE_OP: draw_overlays(frame_disp, overlays, now2) if SHOW_TTS_HINT: mode_txt = "MODE: OP" if mode_now == MODE_OP else f"MODE: INFO({int(INFO_PERIOD_SEC)}s)" l1=f"{mode_txt} | OCR: {'ON' if (OCR_ENABLED and mode_now==MODE_OP) else 'OFF'} | {OCR_ENGINE}" if OCR_EMA is not None and mode_now==MODE_OP: l1+=f" ~{int(OCR_EMA)} ms" l1+=f" TTS: {'ON' if TTS_ENABLE else 'OFF'}" src_txt=f"SRC: {finger_src}" if finger_src=="YOLO" and yolo_last_conf is not None: src_txt+=f" conf={yolo_last_conf:.2f}" if finger_src=="KLT" and klt_draw_pts is not None: src_txt+=f" klt_pts={len(klt_draw_pts)}" age_ms=int((now - finger_last_seen)*1000.0) if last_finger_xy is not None else -1 if age_ms>=0: src_txt+=f" age={age_ms} ms" if yolo_box_count is not None: src_txt+=f" boxes={yolo_box_count}" roi_txt=f"ROI: {ROI_W}x{ROI_H} ([ ] width ; ' height)" # <<< CHANGED: 현재 target 없더라도 마지막 발화 문구를 HUD에 유지 say_txt = (tts_current_display.strip() or tts_last_spoken_text.strip()) say_line=f"SAY: {say_txt}" if say_txt else "SAY: (none)" if tts_current_note: say_line+=f" [{tts_current_note}]" guide_txt = f"GUIDE: {'ON' if (mode_now == MODE_OP and GUIDE_MODE) else 'OFF'}" if (mode_now == MODE_OP) and GUIDE_TARGET: guide_txt += f" target='{GUIDE_TARGET}'" if (mode_now == MODE_OP) and GUIDE_TARGET_ITEM is not None: cx, cy = map(int, _overlay_center(GUIDE_TARGET_ITEM)) guide_txt += f" tgt@({cx},{cy})" lines=[guide_txt, l1, src_txt, roi_txt, say_line] img_rgb=cv2.cvtColor(frame_disp, cv2.COLOR_BGR2RGB) pil=Image.fromarray(img_rgb); draw=ImageDraw.Draw(pil) font_path=None for p in [r"C:\Windows\Fonts\malgun.ttf", r"C:\Windows\Fonts\NanumGothic.ttf", r"C:\Windows\Fonts\NotoSansCJKkr-Regular.otf", "/usr/share/fonts/truetype/noto/NotoSansCJKkr-Regular.ttc"]: if os.path.isfile(p): font_path=p; break font=ImageFont.truetype(font_path, 22) if font_path else ImageFont.load_default() pad_x,pad_y,gap=10,8,4 widths=[draw.textlength(s, font=font) for s in lines] tw=int(max(widths)) if widths else 0; lh=24 th=lh*len(lines)+(len(lines)-1)*gap x0,y0=8,6 bg=Image.new("RGBA",(tw+pad_x*2, th+pad_y*2),(0,0,0,180)) pil.paste(bg,(x0,y0),bg) y=y0+pad_y for s in lines: draw.text((x0+pad_x,y), s, font=font, fill=(255,255,255), stroke_width=2, stroke_fill=(0,0,0)) y+=lh+gap frame_disp[:]=cv2.cvtColor(np.array(pil), cv2.COLOR_RGB2BGR) disp_scale=min(1.0, DISPLAY_MAX_W/float(W)) vis=frame_disp if disp_scale==1.0 else cv2.resize(frame_disp, None, fx=disp_scale, fy=disp_scale, interpolation=cv2.INTER_AREA) cv2.imshow(WINDOW_NAME, vis) key=cv2.waitKey(1)&0xFF if key==ord('q'): break elif key==ord('o'): OCR_ENABLED = not OCR_ENABLED drain_queue(task_q) last_ocr_time = 0.0 if OCR_ENABLED else time.time() print(f"[OCR] {'ENABLED' if OCR_ENABLED else 'DISABLED'}") elif key==ord('t'): SHOW_TTS_HINT = not SHOW_TTS_HINT print(f"[HUD] {'ON' if SHOW_TTS_HINT else 'OFF'}") elif key==ord('s'): TTS_ENABLE = not TTS_ENABLE set_tts_target(None, note="") print(f"[TTS] {'ENABLED' if TTS_ENABLE else 'DISABLED'}") elif key==ord('y'): YOLO_SHOW_INPUT = not YOLO_SHOW_INPUT if not YOLO_SHOW_INPUT: try: cv2.destroyWindow(YOLO_INPUT_WIN) except: pass print(f"[YOLO] INPUT PREVIEW {'ON' if YOLO_SHOW_INPUT else 'OFF'}") elif key==ord('p'): if yolo_last_in is not None: os.makedirs('yolo_inputs', exist_ok=True) fname=time.strftime("yolo_inputs/%Y%m%d_%H%M%S.png") cv2.imwrite(fname, yolo_last_in) print(f"[YOLO] saved input preview -> {fname}") else: print("[YOLO] no input to save yet") # --- ROI 크기 조절 --- elif key==ord('['): # width - ROI_W=max(MIN_ROI_W, ROI_W-40) elif key==ord(']'): # width + ROI_W=min(W, ROI_W+40) elif key==ord(';'): # height - ROI_H=max(MIN_ROI_H, ROI_H-30) elif key==ord("'"): # height + ROI_H=min(H, ROI_H+30) elif key==ord('r'): # reset ROI ROI_W, ROI_H = 420, 420 print("[ROI] reset to 420x420") # ---- 모드 전환 키 ---- ---- 모드 전환 키 ---- elif key == ord('1'): _enter_op_mode() print("[MODE] OP") elif key == ord('2'): _enter_info_mode() print("[MODE] INFO") elif key == ord('3'): _enter_guide_mode() print("[MODE] GUIDE") # ---- GUIDE mode keys ---- elif key == ord('c'): GUIDE_TARGET = None GUIDE_TARGET_ITEM = None set_tts_target("목표를 취소했습니다.", note="guide cancel") print("[GUIDE] target cleared") elif key == ord('v'): if mode_state == MODE_GUIDE: text = stt_listen_once(timeout=4, phrase_time_limit=4) if (USE_STT and _STT_OK) else None if text: print(f"[STT] heard: {text}") set_guide_target_from_text(text) else: print("[STT] no text") else: set_tts_target("먼저 3번을 눌러 안내 모드를 켜 주세요.", note="guide") elif key == ord('f'): if mode_state == MODE_GUIDE: try: print("\n[GUIDE] 입력 예시: '세탁', '건조맞춤' ...") user_in = input("[GUIDE] 목표 단어 입력: ").strip() if user_in: set_guide_target_from_text(user_in) except Exception: pass else: set_tts_target("먼저 3번을 눌러 안내 모드를 켜 주세요.", note="guide") frame_idx+=1 prev_gray_full=gray.copy() prev_gray_klt=gray_klt.copy() # cleanup task_q.put(None) _info_stop.set() if TTS_ENABLE and 'tts' in globals() and tts: tts_stop.set() # tts_q.put(None) 삭제 (8.20) try: tts.close() except Exception: pass yolo_stop.set() cap.release() cv2.destroyAllWindows()