|
|
import streamlit as st |
|
|
import os |
|
|
from pathlib import Path |
|
|
import tempfile |
|
|
import time |
|
|
import subprocess |
|
|
import shutil |
|
|
import uuid |
|
|
|
|
|
from object_detection import detectObjects, detectVideo |
|
|
from object_detection_count import detectObjectsAndCount |
|
|
from pose_analysis import process_gif |
|
|
from traffic_sign_detection import detectTrafficObjects |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MAX_FILE_SIZE_MB = 250 |
|
|
TABS = ["Object Detection", "Pose Analysis", "Object Counting", "Traffic Sign Detection"] |
|
|
|
|
|
ASSETS_DIR = Path("assets") |
|
|
|
|
|
TASK_TO_ASSET_SUBDIR = { |
|
|
"Object Detection": "object_detection", |
|
|
"Pose Analysis": "pose_analysis", |
|
|
"Object Counting": "object_counting", |
|
|
"Traffic Sign Detection": "traffic_sign_detection", |
|
|
} |
|
|
|
|
|
IMAGE_EXTS = {".jpg", ".jpeg", ".png"} |
|
|
VIDEO_EXTS = {".mp4", ".mov", ".avi", ".gif"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_file_size(file): |
|
|
file.seek(0, os.SEEK_END) |
|
|
file_size = file.tell() / (1024 * 1024) |
|
|
file.seek(0) |
|
|
return file_size |
|
|
|
|
|
|
|
|
def save_uploaded_file_to_temp(uploaded_file) -> str: |
|
|
|
|
|
safe_name = uploaded_file.name.replace("/", "_").replace("\\", "_") |
|
|
save_path = "./" |
|
|
|
|
|
with open(safe_name, "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
print("Saved uploaded file to:", safe_name) |
|
|
return str(safe_name) |
|
|
|
|
|
|
|
|
def list_demo_files(task_name: str, limit: int = 6): |
|
|
subdir = TASK_TO_ASSET_SUBDIR.get(task_name) |
|
|
if not subdir: |
|
|
return [] |
|
|
folder = ASSETS_DIR / subdir |
|
|
if not folder.exists(): |
|
|
return [] |
|
|
files = [ |
|
|
p |
|
|
for p in folder.iterdir() |
|
|
if p.is_file() and p.suffix.lower() in (IMAGE_EXTS | VIDEO_EXTS) |
|
|
] |
|
|
files.sort(key=lambda p: p.name.lower()) |
|
|
return files[:limit] |
|
|
|
|
|
|
|
|
def st_video_file(path: str, fmt: str = "video/mp4"): |
|
|
""" |
|
|
More reliable than passing a path to st.video on Spaces. |
|
|
""" |
|
|
with open(path, "rb") as f: |
|
|
st.video(f.read(), format=fmt) |
|
|
|
|
|
|
|
|
def is_h264(path: str) -> bool: |
|
|
""" |
|
|
Best-effort: checks if the first video stream codec is h264 using ffprobe. |
|
|
If ffprobe isn't available, return False so we can attempt conversion when needed. |
|
|
""" |
|
|
if shutil.which("ffprobe") is None: |
|
|
return False |
|
|
try: |
|
|
out = subprocess.check_output( |
|
|
[ |
|
|
"ffprobe", |
|
|
"-v", |
|
|
"error", |
|
|
"-select_streams", |
|
|
"v:0", |
|
|
"-show_entries", |
|
|
"stream=codec_name", |
|
|
"-of", |
|
|
"default=nk=1:nw=1", |
|
|
path, |
|
|
], |
|
|
text=True, |
|
|
).strip().lower() |
|
|
return out == "h264" |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def to_h264_mp4(input_path: str) -> str: |
|
|
""" |
|
|
Converts any input video to H.264 MP4 for browser/Streamlit playback. |
|
|
- If ffmpeg is missing or conversion fails, returns the original input_path. |
|
|
- Uses a unique temp output so we don't overwrite assets or collide across reruns. |
|
|
""" |
|
|
if shutil.which("ffmpeg") is None: |
|
|
st.warning("ffmpeg not found in this environment. Video may show black screen if not H.264.") |
|
|
return input_path |
|
|
|
|
|
out_path = str(Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.mp4") |
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-y", |
|
|
"-i", |
|
|
input_path, |
|
|
"-c:v", |
|
|
"libx264", |
|
|
"-pix_fmt", |
|
|
"yuv420p", |
|
|
"-movflags", |
|
|
"+faststart", |
|
|
"-c:a", |
|
|
"aac", |
|
|
"-b:a", |
|
|
"128k", |
|
|
out_path, |
|
|
] |
|
|
try: |
|
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) |
|
|
|
|
|
if not Path(out_path).exists() or Path(out_path).stat().st_size < 1024: |
|
|
st.warning("Video conversion produced an empty file; showing original.") |
|
|
return input_path |
|
|
return out_path |
|
|
except Exception: |
|
|
st.warning("Video conversion failed; showing original.") |
|
|
return input_path |
|
|
|
|
|
|
|
|
def render_media(path: Path): |
|
|
ext = path.suffix.lower() |
|
|
if ext in IMAGE_EXTS: |
|
|
st.image(str(path), caption=path.name, use_container_width=True) |
|
|
elif ext in VIDEO_EXTS: |
|
|
|
|
|
|
|
|
playable = str(path) |
|
|
if not is_h264(playable): |
|
|
playable = to_h264_mp4(playable) |
|
|
st_video_file(playable, fmt="video/mp4") |
|
|
st.caption(path.name) |
|
|
|
|
|
|
|
|
def render_showcase(tasks, per_task_limit=6): |
|
|
st.subheader("Example outputs (what to expect)") |
|
|
st.write( |
|
|
"These are pre-generated results (detected/segmented/pose analyzed/traffic signs) " |
|
|
"so you can see the expected output before uploading." |
|
|
) |
|
|
|
|
|
for task in tasks: |
|
|
st.markdown(f"### {task}") |
|
|
demo_files = list_demo_files(task, limit=per_task_limit) |
|
|
|
|
|
if not demo_files: |
|
|
st.info( |
|
|
f"No demo files found for **{task}**. Add images/videos under: " |
|
|
f"{ASSETS_DIR / TASK_TO_ASSET_SUBDIR[task]}" |
|
|
) |
|
|
continue |
|
|
|
|
|
cols = st.columns(3) |
|
|
for idx, p in enumerate(demo_files): |
|
|
with cols[idx % 3]: |
|
|
render_media(p) |
|
|
|
|
|
st.divider() |
|
|
|
|
|
|
|
|
def process_file(file_path, tab_name, confidence_score, progress_placeholder, class_type): |
|
|
progress_placeholder.info(f"Processing... Please wait. (Confidence Score: {confidence_score})") |
|
|
time.sleep(1) |
|
|
|
|
|
if tab_name == "Object Detection": |
|
|
if file_path.lower().endswith((".jpg", ".png", ".jpeg")): |
|
|
progress_placeholder.empty() |
|
|
img = detectObjects(file_path, confidence_score) |
|
|
return img, "image" |
|
|
|
|
|
elif file_path.lower().endswith((".mp4", ".avi", ".mov", ".gif")): |
|
|
progress_placeholder.empty() |
|
|
out_video_path = detectVideo(file_path, confidence_score) |
|
|
return out_video_path, "video" |
|
|
|
|
|
progress_placeholder.empty() |
|
|
st.error("Unsupported file format! Please upload an image or video.") |
|
|
return None, None |
|
|
|
|
|
elif tab_name == "Object Counting": |
|
|
if file_path.lower().endswith((".jpg", ".png", ".jpeg")): |
|
|
progress_placeholder.empty() |
|
|
img, count = detectObjectsAndCount(file_path, confidence_score, class_type) |
|
|
st.info(f"Count for class '{class_type}': {count}") |
|
|
return img, "image" |
|
|
|
|
|
elif file_path.lower().endswith((".mp4", ".avi", ".mov", ".gif")): |
|
|
progress_placeholder.empty() |
|
|
out_video_path = detectVideo(file_path, confidence_score) |
|
|
return out_video_path, "video" |
|
|
|
|
|
progress_placeholder.empty() |
|
|
st.error("Unsupported file format! Please upload an image or video.") |
|
|
return None, None |
|
|
|
|
|
elif tab_name == "Pose Analysis": |
|
|
progress_placeholder.empty() |
|
|
out_video_path = process_gif(file_path, confidence_score) |
|
|
return out_video_path, "video" |
|
|
|
|
|
elif tab_name == "Traffic Sign Detection": |
|
|
if file_path.lower().endswith((".jpg", ".png", ".jpeg")): |
|
|
progress_placeholder.empty() |
|
|
img = detectTrafficObjects(file_path, confidence_score) |
|
|
return img, "image" |
|
|
|
|
|
progress_placeholder.empty() |
|
|
st.error("Unsupported file format! Please upload an image.") |
|
|
return None, None |
|
|
|
|
|
st.error("Unknown tab selection.") |
|
|
return None, None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="AI Video/Image Analysis Platform", layout="wide") |
|
|
|
|
|
st.title("AI Video/Image Analysis Platform") |
|
|
st.write("Upload an image or video and choose a tab for analysis.") |
|
|
|
|
|
|
|
|
tabs = st.tabs(TABS) |
|
|
|
|
|
for i, tab_name in enumerate(TABS): |
|
|
with tabs[i]: |
|
|
st.header(tab_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if tab_name in ["Object Detection", "Traffic Sign Detection"]: |
|
|
allowed_types = ["jpg", "jpeg", "png", "gif", "mp4", "avi", "mov"] |
|
|
elif tab_name == "Pose Analysis": |
|
|
allowed_types = ["gif", "mp4", "avi", "mov"] |
|
|
elif tab_name == "Object Counting": |
|
|
allowed_types = ["jpg", "jpeg", "png"] |
|
|
else: |
|
|
allowed_types = ["jpg", "jpeg", "png", "gif", "mp4", "avi", "mov"] |
|
|
|
|
|
uploaded_file = st.file_uploader( |
|
|
"Upload an Image/Video", |
|
|
type=allowed_types, |
|
|
key=f"uploader_{tab_name}", |
|
|
) |
|
|
|
|
|
if uploaded_file: |
|
|
file_size = check_file_size(uploaded_file) |
|
|
if file_size > MAX_FILE_SIZE_MB: |
|
|
st.error(f"File size exceeds {MAX_FILE_SIZE_MB} MB. Please upload a smaller file.") |
|
|
continue |
|
|
|
|
|
st.success(f"Uploaded file: {uploaded_file.name} ({file_size:.2f} MB)") |
|
|
|
|
|
|
|
|
file_path = save_uploaded_file_to_temp(uploaded_file) |
|
|
|
|
|
confidence_score = st.number_input( |
|
|
"Adjust Confidence Score", |
|
|
min_value=0.0, |
|
|
max_value=1.0, |
|
|
value=0.5, |
|
|
step=0.01, |
|
|
help="Set the confidence score threshold for the analysis (default: 0.5).", |
|
|
key=f"confidence_{tab_name}", |
|
|
) |
|
|
|
|
|
class_type = None |
|
|
if tab_name == "Object Counting": |
|
|
class_type = st.text_input( |
|
|
"Enter Class Type", |
|
|
value="car", |
|
|
help="Specify the class type to count (e.g., 'car', 'person').", |
|
|
key=f"class_type_{tab_name}", |
|
|
) |
|
|
|
|
|
safe_name = uploaded_file.name.replace("/", "_").replace("\\", "_") |
|
|
print("Process file called with", safe_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ext = os.path.splitext(safe_name)[1].lower() |
|
|
if tab_name == "Pose Analysis" and ext in (".jpg", ".jpeg", ".png"): |
|
|
st.error("Pose Analysis supports video only. Please upload mp4/mov/avi/gif.") |
|
|
continue |
|
|
if tab_name == "Object Counting" and ext in (".mp4", ".mov", ".avi", ".gif"): |
|
|
st.error("Object Counting supports images only. Please upload jpg/jpeg/png.") |
|
|
continue |
|
|
|
|
|
if st.button(f"Process {tab_name}", key=f"process_{tab_name}"): |
|
|
progress_placeholder = st.empty() |
|
|
with st.spinner("Processing... Please wait."): |
|
|
result, result_type = process_file( |
|
|
safe_name, |
|
|
tab_name, |
|
|
confidence_score, |
|
|
progress_placeholder, |
|
|
class_type, |
|
|
) |
|
|
|
|
|
if result_type == "video" and result: |
|
|
st.success(f"{tab_name} completed successfully!") |
|
|
|
|
|
playable = result |
|
|
|
|
|
if not is_h264(playable): |
|
|
playable = to_h264_mp4(playable) |
|
|
|
|
|
|
|
|
st_video_file(playable, fmt="video/mp4") |
|
|
|
|
|
if result_type == "image" and result is not None: |
|
|
st.success(f"{tab_name} completed successfully!") |
|
|
st.image(result, caption=f"{tab_name} Result", use_container_width=True) |
|
|
|
|
|
|
|
|
render_showcase(TABS, per_task_limit=6) |
|
|
|