File size: 5,687 Bytes
4fd9791
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
cli.py
──────
Command-line interface for ClearPath Scene Description.

Usage examples:
  python cli.py --image photo.jpg
  python cli.py --image photo.jpg --speak
  python cli.py --video footage.mp4 --interval 3 --speak
  python cli.py --camera --speak          # live webcam loop (press q to quit)
"""

import argparse
import sys
import logging
import time

from PIL import Image
import cv2

from scene_captioner import SceneCaptioner
from safety_classifier import SafetyClassifier
from tts_engine import TTSEngine

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)

# ── ANSI colours ──────────────────────────────────────────────────────────────
RED    = "\033[91m"
GREEN  = "\033[92m"
YELLOW = "\033[93m"
CYAN   = "\033[96m"
BOLD   = "\033[1m"
RESET  = "\033[0m"


def print_result(caption: str, result, timestamp: str = ""):
    print()
    print("─" * 60)
    if timestamp:
        print(f"{CYAN}{timestamp}{RESET}")
    print(f"{BOLD}📝 Caption:{RESET}")
    print(f"   {caption}")
    print()
    if result.is_dangerous:
        print(f"{RED}{BOLD}⚠️  CLASSIFICATION : DANGEROUS{RESET}")
        print(f"{RED}   Hazard categories : {', '.join(result.hazards)}{RESET}")
        print(f"{RED}   Matched tokens    : {', '.join(result.matches)}{RESET}")
    else:
        print(f"{GREEN}{BOLD}✅  CLASSIFICATION : SAFE{RESET}")
    print("─" * 60)
    print()


def main():
    parser = argparse.ArgumentParser(
        description="ClearPath — Real-Time Scene Description for Visually-Impaired People",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__,
    )
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument("--image",  metavar="PATH", help="Path to an image file")
    group.add_argument("--video",  metavar="PATH", help="Path to a video file")
    group.add_argument("--camera", action="store_true", help="Use webcam (live loop)")

    parser.add_argument("--speak",    action="store_true", help="Read description aloud via TTS")
    parser.add_argument("--interval", type=float, default=3.0,
                        help="Seconds between captures in video/camera mode (default: 3)")
    parser.add_argument("--model",    default=None,
                        help="Override Qwen model ID (e.g. Qwen/Qwen2-VL-7B-Instruct)")
    args = parser.parse_args()

    # ── Load modules ──────────────────────────────────────────────────────────
    logger.info("Loading captioning model …")
    captioner  = SceneCaptioner(model_id=args.model) if args.model else SceneCaptioner()
    classifier = SafetyClassifier()
    tts        = TTSEngine() if args.speak else None

    def run(image: Image.Image, ts: str = ""):
        caption = captioner.describe(image)
        result  = classifier.classify(caption)
        print_result(caption, result, ts)
        if tts:
            prefix = "Danger detected. " if result.is_dangerous else "Safe. "
            tts.speak(prefix + caption)
        return caption, result

    # ── Image mode ────────────────────────────────────────────────────────────
    if args.image:
        img = Image.open(args.image).convert("RGB")
        run(img)

    # ── Video mode ────────────────────────────────────────────────────────────
    elif args.video:
        cap  = cv2.VideoCapture(args.video)
        fps  = cap.get(cv2.CAP_PROP_FPS) or 25
        step = max(1, int(fps * args.interval))
        idx  = 0
        print(f"{CYAN}Processing video — capturing every {args.interval}s …{RESET}")

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if idx % step == 0:
                rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil = Image.fromarray(rgb)
                ts  = f"Frame {idx}  /  {round(idx / fps, 1)}s"
                run(pil, ts)
            idx += 1
        cap.release()
        print(f"{GREEN}Video processing complete.{RESET}")

    # ── Camera (live) mode ────────────────────────────────────────────────────
    elif args.camera:
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            sys.exit("❌  Could not open webcam.")
        print(f"{CYAN}Live camera mode — capturing every {args.interval}s. Press Ctrl+C to quit.{RESET}")
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    continue
                cv2.imshow("ClearPath — press q to quit", frame)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
                rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil = Image.fromarray(rgb)
                run(pil, ts=time.strftime("%H:%M:%S"))
                time.sleep(args.interval)
        except KeyboardInterrupt:
            print("\nStopped.")
        finally:
            cap.release()
            cv2.destroyAllWindows()


if __name__ == "__main__":
    main()