File size: 5,687 Bytes
4fd9791 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | """
cli.py
──────
Command-line interface for ClearPath Scene Description.
Usage examples:
python cli.py --image photo.jpg
python cli.py --image photo.jpg --speak
python cli.py --video footage.mp4 --interval 3 --speak
python cli.py --camera --speak # live webcam loop (press q to quit)
"""
import argparse
import sys
import logging
import time
from PIL import Image
import cv2
from scene_captioner import SceneCaptioner
from safety_classifier import SafetyClassifier
from tts_engine import TTSEngine
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
# ── ANSI colours ──────────────────────────────────────────────────────────────
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
BOLD = "\033[1m"
RESET = "\033[0m"
def print_result(caption: str, result, timestamp: str = ""):
print()
print("─" * 60)
if timestamp:
print(f"{CYAN}⏱ {timestamp}{RESET}")
print(f"{BOLD}📝 Caption:{RESET}")
print(f" {caption}")
print()
if result.is_dangerous:
print(f"{RED}{BOLD}⚠️ CLASSIFICATION : DANGEROUS{RESET}")
print(f"{RED} Hazard categories : {', '.join(result.hazards)}{RESET}")
print(f"{RED} Matched tokens : {', '.join(result.matches)}{RESET}")
else:
print(f"{GREEN}{BOLD}✅ CLASSIFICATION : SAFE{RESET}")
print("─" * 60)
print()
def main():
parser = argparse.ArgumentParser(
description="ClearPath — Real-Time Scene Description for Visually-Impaired People",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--image", metavar="PATH", help="Path to an image file")
group.add_argument("--video", metavar="PATH", help="Path to a video file")
group.add_argument("--camera", action="store_true", help="Use webcam (live loop)")
parser.add_argument("--speak", action="store_true", help="Read description aloud via TTS")
parser.add_argument("--interval", type=float, default=3.0,
help="Seconds between captures in video/camera mode (default: 3)")
parser.add_argument("--model", default=None,
help="Override Qwen model ID (e.g. Qwen/Qwen2-VL-7B-Instruct)")
args = parser.parse_args()
# ── Load modules ──────────────────────────────────────────────────────────
logger.info("Loading captioning model …")
captioner = SceneCaptioner(model_id=args.model) if args.model else SceneCaptioner()
classifier = SafetyClassifier()
tts = TTSEngine() if args.speak else None
def run(image: Image.Image, ts: str = ""):
caption = captioner.describe(image)
result = classifier.classify(caption)
print_result(caption, result, ts)
if tts:
prefix = "Danger detected. " if result.is_dangerous else "Safe. "
tts.speak(prefix + caption)
return caption, result
# ── Image mode ────────────────────────────────────────────────────────────
if args.image:
img = Image.open(args.image).convert("RGB")
run(img)
# ── Video mode ────────────────────────────────────────────────────────────
elif args.video:
cap = cv2.VideoCapture(args.video)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
step = max(1, int(fps * args.interval))
idx = 0
print(f"{CYAN}Processing video — capturing every {args.interval}s …{RESET}")
while True:
ret, frame = cap.read()
if not ret:
break
if idx % step == 0:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(rgb)
ts = f"Frame {idx} / {round(idx / fps, 1)}s"
run(pil, ts)
idx += 1
cap.release()
print(f"{GREEN}Video processing complete.{RESET}")
# ── Camera (live) mode ────────────────────────────────────────────────────
elif args.camera:
cap = cv2.VideoCapture(0)
if not cap.isOpened():
sys.exit("❌ Could not open webcam.")
print(f"{CYAN}Live camera mode — capturing every {args.interval}s. Press Ctrl+C to quit.{RESET}")
try:
while True:
ret, frame = cap.read()
if not ret:
continue
cv2.imshow("ClearPath — press q to quit", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(rgb)
run(pil, ts=time.strftime("%H:%M:%S"))
time.sleep(args.interval)
except KeyboardInterrupt:
print("\nStopped.")
finally:
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
|