import streamlit as st import os from pathlib import Path import tempfile import time import subprocess import shutil import uuid from object_detection import detectObjects, detectVideo from object_detection_count import detectObjectsAndCount from pose_analysis import process_gif from traffic_sign_detection import detectTrafficObjects # ----------------------------- # Constants # ----------------------------- MAX_FILE_SIZE_MB = 250 TABS = ["Object Detection", "Pose Analysis", "Object Counting", "Traffic Sign Detection"] ASSETS_DIR = Path("assets") TASK_TO_ASSET_SUBDIR = { "Object Detection": "object_detection", "Pose Analysis": "pose_analysis", "Object Counting": "object_counting", "Traffic Sign Detection": "traffic_sign_detection", } IMAGE_EXTS = {".jpg", ".jpeg", ".png"} VIDEO_EXTS = {".mp4", ".mov", ".avi", ".gif"} # ----------------------------- # Helpers # ----------------------------- def check_file_size(file): file.seek(0, os.SEEK_END) file_size = file.tell() / (1024 * 1024) file.seek(0) return file_size def save_uploaded_file_to_temp(uploaded_file) -> str: # NOTE: left exactly as you had it (do not change) safe_name = uploaded_file.name.replace("/", "_").replace("\\", "_") save_path = "./" with open(safe_name, "wb") as f: f.write(uploaded_file.getbuffer()) print("Saved uploaded file to:", safe_name) return str(safe_name) def list_demo_files(task_name: str, limit: int = 6): subdir = TASK_TO_ASSET_SUBDIR.get(task_name) if not subdir: return [] folder = ASSETS_DIR / subdir if not folder.exists(): return [] files = [ p for p in folder.iterdir() if p.is_file() and p.suffix.lower() in (IMAGE_EXTS | VIDEO_EXTS) ] files.sort(key=lambda p: p.name.lower()) return files[:limit] def st_video_file(path: str, fmt: str = "video/mp4"): """ More reliable than passing a path to st.video on Spaces. """ with open(path, "rb") as f: st.video(f.read(), format=fmt) def is_h264(path: str) -> bool: """ Best-effort: checks if the first video stream codec is h264 using ffprobe. If ffprobe isn't available, return False so we can attempt conversion when needed. """ if shutil.which("ffprobe") is None: return False try: out = subprocess.check_output( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=nk=1:nw=1", path, ], text=True, ).strip().lower() return out == "h264" except Exception: return False def to_h264_mp4(input_path: str) -> str: """ Converts any input video to H.264 MP4 for browser/Streamlit playback. - If ffmpeg is missing or conversion fails, returns the original input_path. - Uses a unique temp output so we don't overwrite assets or collide across reruns. """ if shutil.which("ffmpeg") is None: st.warning("ffmpeg not found in this environment. Video may show black screen if not H.264.") return input_path out_path = str(Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.mp4") cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-movflags", "+faststart", "-c:a", "aac", "-b:a", "128k", out_path, ] try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) # Sanity check: ensure output exists and is not tiny/empty if not Path(out_path).exists() or Path(out_path).stat().st_size < 1024: st.warning("Video conversion produced an empty file; showing original.") return input_path return out_path except Exception: st.warning("Video conversion failed; showing original.") return input_path def render_media(path: Path): ext = path.suffix.lower() if ext in IMAGE_EXTS: st.image(str(path), caption=path.name, use_container_width=True) elif ext in VIDEO_EXTS: # IMPORTANT: do NOT try to "convert-and-overwrite" showcase assets. # Only convert to a temp H.264 file if needed, then pass BYTES to Streamlit. playable = str(path) if not is_h264(playable): playable = to_h264_mp4(playable) st_video_file(playable, fmt="video/mp4") st.caption(path.name) def render_showcase(tasks, per_task_limit=6): st.subheader("Example outputs (what to expect)") st.write( "These are pre-generated results (detected/segmented/pose analyzed/traffic signs) " "so you can see the expected output before uploading." ) for task in tasks: st.markdown(f"### {task}") demo_files = list_demo_files(task, limit=per_task_limit) if not demo_files: st.info( f"No demo files found for **{task}**. Add images/videos under: " f"{ASSETS_DIR / TASK_TO_ASSET_SUBDIR[task]}" ) continue cols = st.columns(3) for idx, p in enumerate(demo_files): with cols[idx % 3]: render_media(p) st.divider() def process_file(file_path, tab_name, confidence_score, progress_placeholder, class_type): progress_placeholder.info(f"Processing... Please wait. (Confidence Score: {confidence_score})") time.sleep(1) if tab_name == "Object Detection": if file_path.lower().endswith((".jpg", ".png", ".jpeg")): progress_placeholder.empty() img = detectObjects(file_path, confidence_score) return img, "image" elif file_path.lower().endswith((".mp4", ".avi", ".mov", ".gif")): progress_placeholder.empty() out_video_path = detectVideo(file_path, confidence_score) return out_video_path, "video" progress_placeholder.empty() st.error("Unsupported file format! Please upload an image or video.") return None, None elif tab_name == "Object Counting": if file_path.lower().endswith((".jpg", ".png", ".jpeg")): progress_placeholder.empty() img, count = detectObjectsAndCount(file_path, confidence_score, class_type) st.info(f"Count for class '{class_type}': {count}") return img, "image" elif file_path.lower().endswith((".mp4", ".avi", ".mov", ".gif")): progress_placeholder.empty() out_video_path = detectVideo(file_path, confidence_score) return out_video_path, "video" progress_placeholder.empty() st.error("Unsupported file format! Please upload an image or video.") return None, None elif tab_name == "Pose Analysis": progress_placeholder.empty() out_video_path = process_gif(file_path, confidence_score) return out_video_path, "video" elif tab_name == "Traffic Sign Detection": if file_path.lower().endswith((".jpg", ".png", ".jpeg")): progress_placeholder.empty() img = detectTrafficObjects(file_path, confidence_score) return img, "image" progress_placeholder.empty() st.error("Unsupported file format! Please upload an image.") return None, None st.error("Unknown tab selection.") return None, None # ----------------------------- # Streamlit Layout # ----------------------------- st.set_page_config(page_title="AI Video/Image Analysis Platform", layout="wide") st.title("AI Video/Image Analysis Platform") st.write("Upload an image or video and choose a tab for analysis.") # Tabs for different functionalities tabs = st.tabs(TABS) for i, tab_name in enumerate(TABS): with tabs[i]: st.header(tab_name) # ----------------------------- # MINIMAL CHANGE: per-tab uploader constraints # ----------------------------- if tab_name in ["Object Detection", "Traffic Sign Detection"]: allowed_types = ["jpg", "jpeg", "png", "gif", "mp4", "avi", "mov"] # image + video elif tab_name == "Pose Analysis": allowed_types = ["gif", "mp4", "avi", "mov"] # video only elif tab_name == "Object Counting": allowed_types = ["jpg", "jpeg", "png"] # image only else: allowed_types = ["jpg", "jpeg", "png", "gif", "mp4", "avi", "mov"] uploaded_file = st.file_uploader( "Upload an Image/Video", type=allowed_types, key=f"uploader_{tab_name}", ) if uploaded_file: file_size = check_file_size(uploaded_file) if file_size > MAX_FILE_SIZE_MB: st.error(f"File size exceeds {MAX_FILE_SIZE_MB} MB. Please upload a smaller file.") continue st.success(f"Uploaded file: {uploaded_file.name} ({file_size:.2f} MB)") # Save to temp and use the full path for downstream processing file_path = save_uploaded_file_to_temp(uploaded_file) confidence_score = st.number_input( "Adjust Confidence Score", min_value=0.0, max_value=1.0, value=0.5, step=0.01, help="Set the confidence score threshold for the analysis (default: 0.5).", key=f"confidence_{tab_name}", ) class_type = None if tab_name == "Object Counting": class_type = st.text_input( "Enter Class Type", value="car", help="Specify the class type to count (e.g., 'car', 'person').", key=f"class_type_{tab_name}", ) safe_name = uploaded_file.name.replace("/", "_").replace("\\", "_") print("Process file called with", safe_name) # ----------------------------- # MINIMAL CHANGE: hard guard (enforce per-tab constraints) # ----------------------------- ext = os.path.splitext(safe_name)[1].lower() if tab_name == "Pose Analysis" and ext in (".jpg", ".jpeg", ".png"): st.error("Pose Analysis supports video only. Please upload mp4/mov/avi/gif.") continue if tab_name == "Object Counting" and ext in (".mp4", ".mov", ".avi", ".gif"): st.error("Object Counting supports images only. Please upload jpg/jpeg/png.") continue if st.button(f"Process {tab_name}", key=f"process_{tab_name}"): progress_placeholder = st.empty() with st.spinner("Processing... Please wait."): result, result_type = process_file( safe_name, tab_name, confidence_score, progress_placeholder, class_type, ) if result_type == "video" and result: st.success(f"{tab_name} completed successfully!") playable = result # Convert ONLY if not already H.264 (or if ffprobe missing, this will attempt conversion) if not is_h264(playable): playable = to_h264_mp4(playable) # Use bytes for reliability on Spaces st_video_file(playable, fmt="video/mp4") if result_type == "image" and result is not None: st.success(f"{tab_name} completed successfully!") st.image(result, caption=f"{tab_name} Result", use_container_width=True) # Showcase section (NO expander) render_showcase(TABS, per_task_limit=6)