manikandan18ramalingam's picture
Update app.py
a216c73 verified
import streamlit as st
import os
from pathlib import Path
import tempfile
import time
import subprocess
import shutil
import uuid
from object_detection import detectObjects, detectVideo
from object_detection_count import detectObjectsAndCount
from pose_analysis import process_gif
from traffic_sign_detection import detectTrafficObjects
# -----------------------------
# Constants
# -----------------------------
MAX_FILE_SIZE_MB = 250
TABS = ["Object Detection", "Pose Analysis", "Object Counting", "Traffic Sign Detection"]
ASSETS_DIR = Path("assets")
TASK_TO_ASSET_SUBDIR = {
"Object Detection": "object_detection",
"Pose Analysis": "pose_analysis",
"Object Counting": "object_counting",
"Traffic Sign Detection": "traffic_sign_detection",
}
IMAGE_EXTS = {".jpg", ".jpeg", ".png"}
VIDEO_EXTS = {".mp4", ".mov", ".avi", ".gif"}
# -----------------------------
# Helpers
# -----------------------------
def check_file_size(file):
file.seek(0, os.SEEK_END)
file_size = file.tell() / (1024 * 1024)
file.seek(0)
return file_size
def save_uploaded_file_to_temp(uploaded_file) -> str:
# NOTE: left exactly as you had it (do not change)
safe_name = uploaded_file.name.replace("/", "_").replace("\\", "_")
save_path = "./"
with open(safe_name, "wb") as f:
f.write(uploaded_file.getbuffer())
print("Saved uploaded file to:", safe_name)
return str(safe_name)
def list_demo_files(task_name: str, limit: int = 6):
subdir = TASK_TO_ASSET_SUBDIR.get(task_name)
if not subdir:
return []
folder = ASSETS_DIR / subdir
if not folder.exists():
return []
files = [
p
for p in folder.iterdir()
if p.is_file() and p.suffix.lower() in (IMAGE_EXTS | VIDEO_EXTS)
]
files.sort(key=lambda p: p.name.lower())
return files[:limit]
def st_video_file(path: str, fmt: str = "video/mp4"):
"""
More reliable than passing a path to st.video on Spaces.
"""
with open(path, "rb") as f:
st.video(f.read(), format=fmt)
def is_h264(path: str) -> bool:
"""
Best-effort: checks if the first video stream codec is h264 using ffprobe.
If ffprobe isn't available, return False so we can attempt conversion when needed.
"""
if shutil.which("ffprobe") is None:
return False
try:
out = subprocess.check_output(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=nk=1:nw=1",
path,
],
text=True,
).strip().lower()
return out == "h264"
except Exception:
return False
def to_h264_mp4(input_path: str) -> str:
"""
Converts any input video to H.264 MP4 for browser/Streamlit playback.
- If ffmpeg is missing or conversion fails, returns the original input_path.
- Uses a unique temp output so we don't overwrite assets or collide across reruns.
"""
if shutil.which("ffmpeg") is None:
st.warning("ffmpeg not found in this environment. Video may show black screen if not H.264.")
return input_path
out_path = str(Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.mp4")
cmd = [
"ffmpeg",
"-y",
"-i",
input_path,
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-movflags",
"+faststart",
"-c:a",
"aac",
"-b:a",
"128k",
out_path,
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
# Sanity check: ensure output exists and is not tiny/empty
if not Path(out_path).exists() or Path(out_path).stat().st_size < 1024:
st.warning("Video conversion produced an empty file; showing original.")
return input_path
return out_path
except Exception:
st.warning("Video conversion failed; showing original.")
return input_path
def render_media(path: Path):
ext = path.suffix.lower()
if ext in IMAGE_EXTS:
st.image(str(path), caption=path.name, use_container_width=True)
elif ext in VIDEO_EXTS:
# IMPORTANT: do NOT try to "convert-and-overwrite" showcase assets.
# Only convert to a temp H.264 file if needed, then pass BYTES to Streamlit.
playable = str(path)
if not is_h264(playable):
playable = to_h264_mp4(playable)
st_video_file(playable, fmt="video/mp4")
st.caption(path.name)
def render_showcase(tasks, per_task_limit=6):
st.subheader("Example outputs (what to expect)")
st.write(
"These are pre-generated results (detected/segmented/pose analyzed/traffic signs) "
"so you can see the expected output before uploading."
)
for task in tasks:
st.markdown(f"### {task}")
demo_files = list_demo_files(task, limit=per_task_limit)
if not demo_files:
st.info(
f"No demo files found for **{task}**. Add images/videos under: "
f"{ASSETS_DIR / TASK_TO_ASSET_SUBDIR[task]}"
)
continue
cols = st.columns(3)
for idx, p in enumerate(demo_files):
with cols[idx % 3]:
render_media(p)
st.divider()
def process_file(file_path, tab_name, confidence_score, progress_placeholder, class_type):
progress_placeholder.info(f"Processing... Please wait. (Confidence Score: {confidence_score})")
time.sleep(1)
if tab_name == "Object Detection":
if file_path.lower().endswith((".jpg", ".png", ".jpeg")):
progress_placeholder.empty()
img = detectObjects(file_path, confidence_score)
return img, "image"
elif file_path.lower().endswith((".mp4", ".avi", ".mov", ".gif")):
progress_placeholder.empty()
out_video_path = detectVideo(file_path, confidence_score)
return out_video_path, "video"
progress_placeholder.empty()
st.error("Unsupported file format! Please upload an image or video.")
return None, None
elif tab_name == "Object Counting":
if file_path.lower().endswith((".jpg", ".png", ".jpeg")):
progress_placeholder.empty()
img, count = detectObjectsAndCount(file_path, confidence_score, class_type)
st.info(f"Count for class '{class_type}': {count}")
return img, "image"
elif file_path.lower().endswith((".mp4", ".avi", ".mov", ".gif")):
progress_placeholder.empty()
out_video_path = detectVideo(file_path, confidence_score)
return out_video_path, "video"
progress_placeholder.empty()
st.error("Unsupported file format! Please upload an image or video.")
return None, None
elif tab_name == "Pose Analysis":
progress_placeholder.empty()
out_video_path = process_gif(file_path, confidence_score)
return out_video_path, "video"
elif tab_name == "Traffic Sign Detection":
if file_path.lower().endswith((".jpg", ".png", ".jpeg")):
progress_placeholder.empty()
img = detectTrafficObjects(file_path, confidence_score)
return img, "image"
progress_placeholder.empty()
st.error("Unsupported file format! Please upload an image.")
return None, None
st.error("Unknown tab selection.")
return None, None
# -----------------------------
# Streamlit Layout
# -----------------------------
st.set_page_config(page_title="AI Video/Image Analysis Platform", layout="wide")
st.title("AI Video/Image Analysis Platform")
st.write("Upload an image or video and choose a tab for analysis.")
# Tabs for different functionalities
tabs = st.tabs(TABS)
for i, tab_name in enumerate(TABS):
with tabs[i]:
st.header(tab_name)
# -----------------------------
# MINIMAL CHANGE: per-tab uploader constraints
# -----------------------------
if tab_name in ["Object Detection", "Traffic Sign Detection"]:
allowed_types = ["jpg", "jpeg", "png", "gif", "mp4", "avi", "mov"] # image + video
elif tab_name == "Pose Analysis":
allowed_types = ["gif", "mp4", "avi", "mov"] # video only
elif tab_name == "Object Counting":
allowed_types = ["jpg", "jpeg", "png"] # image only
else:
allowed_types = ["jpg", "jpeg", "png", "gif", "mp4", "avi", "mov"]
uploaded_file = st.file_uploader(
"Upload an Image/Video",
type=allowed_types,
key=f"uploader_{tab_name}",
)
if uploaded_file:
file_size = check_file_size(uploaded_file)
if file_size > MAX_FILE_SIZE_MB:
st.error(f"File size exceeds {MAX_FILE_SIZE_MB} MB. Please upload a smaller file.")
continue
st.success(f"Uploaded file: {uploaded_file.name} ({file_size:.2f} MB)")
# Save to temp and use the full path for downstream processing
file_path = save_uploaded_file_to_temp(uploaded_file)
confidence_score = st.number_input(
"Adjust Confidence Score",
min_value=0.0,
max_value=1.0,
value=0.5,
step=0.01,
help="Set the confidence score threshold for the analysis (default: 0.5).",
key=f"confidence_{tab_name}",
)
class_type = None
if tab_name == "Object Counting":
class_type = st.text_input(
"Enter Class Type",
value="car",
help="Specify the class type to count (e.g., 'car', 'person').",
key=f"class_type_{tab_name}",
)
safe_name = uploaded_file.name.replace("/", "_").replace("\\", "_")
print("Process file called with", safe_name)
# -----------------------------
# MINIMAL CHANGE: hard guard (enforce per-tab constraints)
# -----------------------------
ext = os.path.splitext(safe_name)[1].lower()
if tab_name == "Pose Analysis" and ext in (".jpg", ".jpeg", ".png"):
st.error("Pose Analysis supports video only. Please upload mp4/mov/avi/gif.")
continue
if tab_name == "Object Counting" and ext in (".mp4", ".mov", ".avi", ".gif"):
st.error("Object Counting supports images only. Please upload jpg/jpeg/png.")
continue
if st.button(f"Process {tab_name}", key=f"process_{tab_name}"):
progress_placeholder = st.empty()
with st.spinner("Processing... Please wait."):
result, result_type = process_file(
safe_name,
tab_name,
confidence_score,
progress_placeholder,
class_type,
)
if result_type == "video" and result:
st.success(f"{tab_name} completed successfully!")
playable = result
# Convert ONLY if not already H.264 (or if ffprobe missing, this will attempt conversion)
if not is_h264(playable):
playable = to_h264_mp4(playable)
# Use bytes for reliability on Spaces
st_video_file(playable, fmt="video/mp4")
if result_type == "image" and result is not None:
st.success(f"{tab_name} completed successfully!")
st.image(result, caption=f"{tab_name} Result", use_container_width=True)
# Showcase section (NO expander)
render_showcase(TABS, per_task_limit=6)