|
|
"""
|
|
|
ISL Sign Language Translation - TechMatrix Solvers Initiative
|
|
|
Main Streamlit Application
|
|
|
|
|
|
Developed by: TechMatrix Solvers Team
|
|
|
- Abhay Gupta (Team Lead)
|
|
|
- Kripanshu Gupta (Backend Developer)
|
|
|
- Dipanshu Patel (UI/UX Designer)
|
|
|
- Bhumika Patel (Deployment & Female Presenter)
|
|
|
|
|
|
Institution: Shri Ram Group of Institutions
|
|
|
"""
|
|
|
|
|
|
import streamlit as st
|
|
|
|
|
|
|
|
|
st.set_page_config(
|
|
|
page_title="ISL Translation - TechMatrix Solvers",
|
|
|
page_icon="π€",
|
|
|
layout="wide",
|
|
|
initial_sidebar_state="expanded"
|
|
|
)
|
|
|
|
|
|
|
|
|
st.write("π TechMatrix Solvers ISL Translator Loading...")
|
|
|
|
|
|
|
|
|
try:
|
|
|
import os
|
|
|
|
|
|
os.environ["KERAS_BACKEND"] = "tensorflow"
|
|
|
os.environ["HF_HOME"] = "/tmp/huggingface"
|
|
|
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers"
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
import tempfile
|
|
|
import time
|
|
|
from PIL import Image
|
|
|
import subprocess
|
|
|
from typing import NamedTuple
|
|
|
import json
|
|
|
import shutil
|
|
|
import platform
|
|
|
import uuid
|
|
|
|
|
|
|
|
|
try:
|
|
|
import cv2
|
|
|
except Exception as cv_error:
|
|
|
st.warning(f"OpenCV import issue: {cv_error}")
|
|
|
cv2 = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
import keras
|
|
|
from keras.models import Sequential
|
|
|
from keras.layers import LSTM, Dense, Bidirectional, Dropout, Input, BatchNormalization
|
|
|
except Exception as keras_error:
|
|
|
st.warning(f"Keras import issue: {keras_error}")
|
|
|
keras = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
import ffmpeg
|
|
|
except Exception as ffmpeg_error:
|
|
|
st.warning(f"FFmpeg import issue: {ffmpeg_error}")
|
|
|
ffmpeg = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
from huggingface_hub import hf_hub_download
|
|
|
except Exception as hf_error:
|
|
|
st.warning(f"HuggingFace Hub import issue: {hf_error}")
|
|
|
hf_hub_download = None
|
|
|
|
|
|
|
|
|
pose_models = None
|
|
|
expression_mapping = None
|
|
|
isl_processor = None
|
|
|
utils = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
from pose_models import create_bodypose_model, create_handpose_model
|
|
|
pose_models = True
|
|
|
st.success("β
Pose models imported successfully")
|
|
|
except Exception as pose_error:
|
|
|
st.warning(f"Pose models import issue: {pose_error}")
|
|
|
pose_models = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
from expression_mapping import expression_mapping
|
|
|
st.success("β
Expression mapping imported successfully")
|
|
|
except Exception as expr_error:
|
|
|
st.warning(f"Expression mapping import issue: {expr_error}")
|
|
|
|
|
|
expression_mapping = {
|
|
|
'hello': 0, 'thank_you': 1, 'please': 2, 'sorry': 3, 'help': 4,
|
|
|
'good': 5, 'bad': 6, 'yes': 7, 'no': 8, 'water': 9,
|
|
|
'food': 10, 'home': 11, 'work': 12, 'school': 13, 'family': 14
|
|
|
}
|
|
|
|
|
|
|
|
|
try:
|
|
|
from isl_processor import ISLTranslationModel
|
|
|
isl_processor = True
|
|
|
st.success("β
ISL processor imported successfully")
|
|
|
except Exception as isl_error:
|
|
|
st.warning(f"ISL processor import issue: {isl_error}")
|
|
|
isl_processor = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
import pose_utils as utils
|
|
|
st.success("β
Pose utils imported successfully")
|
|
|
except Exception as utils_error:
|
|
|
st.warning(f"Pose utils import issue: {utils_error}")
|
|
|
utils = None
|
|
|
|
|
|
st.success("β
Core dependencies loaded successfully!")
|
|
|
except ImportError as e:
|
|
|
st.error(f"β Critical import error: {e}")
|
|
|
st.error("Running in fallback mode with limited functionality.")
|
|
|
|
|
|
|
|
|
if utils is None:
|
|
|
try:
|
|
|
import pose_utils as utils
|
|
|
st.info("βΉοΈ Pose utils loaded on secondary attempt")
|
|
|
except ImportError as utils_error:
|
|
|
st.error(f"β Failed to import pose_utils: {utils_error}")
|
|
|
utils = None
|
|
|
|
|
|
|
|
|
if expression_mapping is None:
|
|
|
st.warning("β οΈ Using fallback expression mapping")
|
|
|
expression_mapping = {
|
|
|
0: 'hello', 1: 'thank_you', 2: 'please', 3: 'sorry', 4: 'help',
|
|
|
5: 'good', 6: 'bad', 7: 'yes', 8: 'no', 9: 'water',
|
|
|
10: 'food', 11: 'home', 12: 'work', 13: 'school', 14: 'family'
|
|
|
}
|
|
|
|
|
|
|
|
|
def get_sign_label(index):
|
|
|
"""Safely get sign label from prediction index"""
|
|
|
if isinstance(expression_mapping, dict):
|
|
|
return expression_mapping.get(int(index), f'unknown_sign_{index}')
|
|
|
else:
|
|
|
return f'sign_{index}'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoProbeResult(NamedTuple):
|
|
|
"""Structure for video probe results"""
|
|
|
return_code: int
|
|
|
json: str
|
|
|
error: str
|
|
|
|
|
|
|
|
|
def probe_video_info(file_path) -> VideoProbeResult:
|
|
|
"""
|
|
|
Probe video file for metadata using FFprobe
|
|
|
|
|
|
Args:
|
|
|
file_path: Path to video file
|
|
|
|
|
|
Returns:
|
|
|
VideoProbeResult containing metadata
|
|
|
"""
|
|
|
command_array = [
|
|
|
"ffprobe",
|
|
|
"-v", "quiet",
|
|
|
"-print_format", "json",
|
|
|
"-show_format",
|
|
|
"-show_streams",
|
|
|
file_path
|
|
|
]
|
|
|
result = subprocess.run(
|
|
|
command_array,
|
|
|
stdout=subprocess.PIPE,
|
|
|
stderr=subprocess.PIPE,
|
|
|
universal_newlines=True
|
|
|
)
|
|
|
return VideoProbeResult(
|
|
|
return_code=result.returncode,
|
|
|
json=result.stdout,
|
|
|
error=result.stderr
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
body_features = [f'bodypeaks_x_{i}' for i in range(15)] + [f'bodypeaks_y_{i}' for i in range(15)]
|
|
|
hand0_features = [f'hand0peaks_x_{i}' for i in range(21)] + [f'hand0peaks_y_{i}' for i in range(21)] + [f'hand0peaks_peaktxt{i}' for i in range(21)]
|
|
|
hand1_features = [f'hand1peaks_x_{i}' for i in range(21)] + [f'hand1peaks_y_{i}' for i in range(21)] + [f'hand1peaks_peaktxt{i}' for i in range(21)]
|
|
|
|
|
|
feature_columns_processed = body_features + hand0_features + hand1_features
|
|
|
label_columns = ['Expression_encoded']
|
|
|
|
|
|
|
|
|
@st.cache_resource
|
|
|
def create_time_series_sequences(isl_data, feature_columns, label_columns, window_size=20):
|
|
|
"""
|
|
|
Creates time series sequences from DataFrame with specified window size
|
|
|
|
|
|
Args:
|
|
|
isl_data: Input DataFrame with ISL data
|
|
|
feature_columns: List of feature column names
|
|
|
label_columns: List of label column names
|
|
|
window_size: Size of temporal window for sequence creation
|
|
|
|
|
|
Returns:
|
|
|
tuple: (X_sequences, y_sequences) for training/inference
|
|
|
"""
|
|
|
if isl_data.empty:
|
|
|
return [], []
|
|
|
|
|
|
X_sequences = []
|
|
|
y_sequences = []
|
|
|
|
|
|
for group, file_df in isl_data.groupby(['Type', 'Expression_encoded', 'FileName']):
|
|
|
expr_type, expression, filename = group
|
|
|
|
|
|
|
|
|
blank_frame = np.zeros((1, 156))
|
|
|
|
|
|
for idx, window_data in enumerate([file_df[i:i+window_size] for i in range(0, file_df.shape[0], 1)]):
|
|
|
if window_data.shape[0] < window_size:
|
|
|
|
|
|
padding_needed = window_size - window_data.shape[0]
|
|
|
padded_sequence = np.concatenate(
|
|
|
(np.repeat(blank_frame, padding_needed, axis=0),
|
|
|
window_data[feature_columns].values),
|
|
|
axis=0
|
|
|
)
|
|
|
X_sequences.append(padded_sequence)
|
|
|
y_sequences.append(expression)
|
|
|
continue
|
|
|
|
|
|
X_sequences.append(window_data[feature_columns].values)
|
|
|
y_sequences.append(expression)
|
|
|
|
|
|
return X_sequences, y_sequences
|
|
|
|
|
|
|
|
|
|
|
|
translation_model = None
|
|
|
|
|
|
|
|
|
@st.cache_resource
|
|
|
def load_translation_model():
|
|
|
"""
|
|
|
Load and configure the LSTM translation model
|
|
|
|
|
|
Returns:
|
|
|
Configured Keras Sequential model for ISL translation or None if failed
|
|
|
"""
|
|
|
try:
|
|
|
if keras is None or hf_hub_download is None:
|
|
|
st.warning("Keras or HuggingFace Hub not available. Model loading skipped.")
|
|
|
return None
|
|
|
|
|
|
|
|
|
model_file = hf_hub_download(
|
|
|
repo_id="sunilsarolkar/isl-translation-model",
|
|
|
filename="isl_model_final.keras"
|
|
|
)
|
|
|
|
|
|
|
|
|
try:
|
|
|
model = keras.models.load_model(model_file)
|
|
|
st.success("β
Model loaded successfully from saved file")
|
|
|
return model
|
|
|
except Exception as load_error:
|
|
|
st.warning(f"Failed to load complete model: {load_error}")
|
|
|
st.info("Attempting to build model architecture and load weights...")
|
|
|
|
|
|
|
|
|
model = Sequential()
|
|
|
model.add(Input(shape=((20, 156))))
|
|
|
model.add(keras.layers.Masking(mask_value=0.))
|
|
|
model.add(BatchNormalization())
|
|
|
model.add(Bidirectional(LSTM(32, recurrent_dropout=0.2, return_sequences=True)))
|
|
|
|
|
|
model.add(Dropout(0.2))
|
|
|
model.add(Bidirectional(LSTM(32, recurrent_dropout=0.2)))
|
|
|
|
|
|
model.add(keras.layers.Activation('elu'))
|
|
|
model.add(Dense(32, use_bias=False, kernel_initializer='he_normal'))
|
|
|
|
|
|
model.add(BatchNormalization())
|
|
|
model.add(Dropout(0.2))
|
|
|
model.add(keras.layers.Activation('elu'))
|
|
|
model.add(Dense(32, kernel_initializer='he_normal', use_bias=False))
|
|
|
|
|
|
model.add(BatchNormalization())
|
|
|
model.add(keras.layers.Activation('elu'))
|
|
|
model.add(Dropout(0.2))
|
|
|
|
|
|
num_classes = len(list(expression_mapping.keys())) if expression_mapping else 167
|
|
|
model.add(Dense(num_classes, activation='softmax'))
|
|
|
|
|
|
|
|
|
model.load_weights(model_file)
|
|
|
st.success("β
Model architecture built and weights loaded successfully")
|
|
|
return model
|
|
|
|
|
|
except Exception as e:
|
|
|
st.error(f"Failed to load translation model: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
@st.cache_data
|
|
|
def load_test_data():
|
|
|
"""Load test dataset and file information"""
|
|
|
testing_cleaned_path = hf_hub_download(
|
|
|
repo_id="sunilsarolkar/isl-test-data",
|
|
|
filename="testing_cleaned.csv",
|
|
|
repo_type="dataset"
|
|
|
)
|
|
|
|
|
|
test_files_path = hf_hub_download(
|
|
|
repo_id="sunilsarolkar/isl-test-data",
|
|
|
filename="test_files.csv",
|
|
|
repo_type="dataset"
|
|
|
)
|
|
|
|
|
|
testing_df = pd.read_csv(testing_cleaned_path)
|
|
|
test_files_df = pd.read_csv(test_files_path)
|
|
|
|
|
|
return testing_df, test_files_df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoWriter:
|
|
|
"""Custom video writer using FFmpeg for better compatibility"""
|
|
|
|
|
|
def __init__(self, output_file, input_fps, input_framesize, input_pix_fmt, input_vcodec):
|
|
|
self.ff_process = (
|
|
|
ffmpeg
|
|
|
.input('pipe:',
|
|
|
format='rawvideo',
|
|
|
pix_fmt="bgr24",
|
|
|
s=f'{input_framesize[1]}x{input_framesize[0]}',
|
|
|
r=input_fps)
|
|
|
.output(output_file, pix_fmt=input_pix_fmt, vcodec=input_vcodec)
|
|
|
.overwrite_output()
|
|
|
.run_async(pipe_stdin=True)
|
|
|
)
|
|
|
|
|
|
def write_frame(self, frame):
|
|
|
"""Write a single frame to the video"""
|
|
|
self.ff_process.stdin.write(frame.tobytes())
|
|
|
|
|
|
def close(self):
|
|
|
"""Close the video writer"""
|
|
|
self.ff_process.stdin.close()
|
|
|
self.ff_process.wait()
|
|
|
|
|
|
|
|
|
def calculate_weighted_average(numbers, weights):
|
|
|
"""
|
|
|
Calculate weighted average of numbers
|
|
|
|
|
|
Args:
|
|
|
numbers: List of numbers
|
|
|
weights: List of weights
|
|
|
|
|
|
Returns:
|
|
|
float: Weighted average
|
|
|
"""
|
|
|
if sum(weights) == 0:
|
|
|
return 0
|
|
|
return sum(x * y for x, y in zip(numbers, weights)) / sum(weights)
|
|
|
|
|
|
|
|
|
@st.cache_data
|
|
|
def resize_image(image, width=None, height=None, interpolation=cv2.INTER_AREA):
|
|
|
"""
|
|
|
Resize image maintaining aspect ratio
|
|
|
|
|
|
Args:
|
|
|
image: Input image
|
|
|
width: Target width
|
|
|
height: Target height
|
|
|
interpolation: OpenCV interpolation method
|
|
|
|
|
|
Returns:
|
|
|
Resized image
|
|
|
"""
|
|
|
dimensions = None
|
|
|
(h, w) = image.shape[:2]
|
|
|
|
|
|
if width is None and height is None:
|
|
|
return image
|
|
|
|
|
|
if width is None:
|
|
|
ratio = height / float(h)
|
|
|
dimensions = (int(w * ratio), height)
|
|
|
else:
|
|
|
ratio = width / float(w)
|
|
|
dimensions = (width, int(h * ratio))
|
|
|
|
|
|
resized = cv2.resize(image, dimensions, interpolation=interpolation)
|
|
|
return resized
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.title('π€ ISL Sign Language Translation - TechMatrix Solvers Initiative')
|
|
|
|
|
|
|
|
|
st.markdown(
|
|
|
"""
|
|
|
<style>
|
|
|
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
|
|
|
width: 350px;
|
|
|
}
|
|
|
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
|
|
|
width: 350px;
|
|
|
margin-left: -350px;
|
|
|
}
|
|
|
|
|
|
.team-info {
|
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
|
|
color: white;
|
|
|
padding: 1rem;
|
|
|
border-radius: 0.5rem;
|
|
|
margin: 1rem 0;
|
|
|
}
|
|
|
|
|
|
.tech-matrix-header {
|
|
|
background: linear-gradient(90deg, #1e3a8a, #7c3aed);
|
|
|
color: white;
|
|
|
padding: 1rem;
|
|
|
border-radius: 0.5rem;
|
|
|
text-align: center;
|
|
|
margin-bottom: 1rem;
|
|
|
}
|
|
|
</style>
|
|
|
""",
|
|
|
unsafe_allow_html=True,
|
|
|
)
|
|
|
|
|
|
|
|
|
st.markdown(
|
|
|
"""
|
|
|
<div class="tech-matrix-header">
|
|
|
<h2>π TechMatrix Solvers</h2>
|
|
|
<p>Innovating Accessible Technology Solutions</p>
|
|
|
</div>
|
|
|
""",
|
|
|
unsafe_allow_html=True
|
|
|
)
|
|
|
|
|
|
|
|
|
st.sidebar.title('π€ ISL Translation System')
|
|
|
st.sidebar.subheader('Configuration')
|
|
|
|
|
|
|
|
|
st.sidebar.markdown(
|
|
|
"""
|
|
|
<div class="team-info">
|
|
|
<h3>π¨βπ» Development Team</h3>
|
|
|
<ul>
|
|
|
<li><strong>Abhay Gupta</strong> - Team Lead</li>
|
|
|
<li><strong>Kripanshu Gupta</strong> - Backend Dev</li>
|
|
|
<li><strong>Dipanshu Patel</strong> - UI/UX Designer</li>
|
|
|
<li><strong>Bhumika Patel</strong> - Deployment</li>
|
|
|
</ul>
|
|
|
<p><em>Shri Ram Group of Institutions</em></p>
|
|
|
</div>
|
|
|
""",
|
|
|
unsafe_allow_html=True
|
|
|
)
|
|
|
|
|
|
|
|
|
frame_predictions = {}
|
|
|
|
|
|
|
|
|
app_mode = st.sidebar.selectbox(
|
|
|
'Choose Application Mode',
|
|
|
['About Project', 'Test Video Translation']
|
|
|
)
|
|
|
|
|
|
if app_mode == 'About Project':
|
|
|
st.markdown(
|
|
|
"""
|
|
|
## π― Project Overview
|
|
|
|
|
|
Welcome to the **ISL Sign Language Translation System** developed by **TechMatrix Solvers**.
|
|
|
This cutting-edge application demonstrates real-time Indian Sign Language recognition and
|
|
|
translation using advanced deep learning techniques.
|
|
|
|
|
|
### ποΈ Technical Architecture
|
|
|
|
|
|
Our system combines multiple state-of-the-art technologies:
|
|
|
|
|
|
1. **Body Pose Estimation**: 25-point skeletal tracking using OpenPose
|
|
|
2. **Hand Landmark Detection**: 21-point hand keypoint identification
|
|
|
3. **Temporal Modeling**: Bidirectional LSTM networks for sequence analysis
|
|
|
4. **Real-time Processing**: Optimized inference pipeline for live translation
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
st.markdown(
|
|
|
"""
|
|
|
### π Dataset Information
|
|
|
|
|
|
Our model is trained on the comprehensive [INCLUDE dataset](https://zenodo.org/records/4010759):
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
|
|
|
dataset_stats = {
|
|
|
"Metric": [
|
|
|
"Categories", "Total Words", "Training Videos",
|
|
|
"Avg Videos/Class", "Avg Video Length", "Resolution", "Frame Rate"
|
|
|
],
|
|
|
"Value": [
|
|
|
"15", "263", "4,257", "16.3", "2.57s", "1920x1080", "25fps"
|
|
|
]
|
|
|
}
|
|
|
st.table(pd.DataFrame(dataset_stats))
|
|
|
|
|
|
|
|
|
try:
|
|
|
categories_image = np.array(Image.open('original_project/categories_processed.png'))
|
|
|
st.image(categories_image, caption="π Processed Categories Distribution")
|
|
|
except:
|
|
|
st.info("π Dataset visualization images will be displayed when available")
|
|
|
|
|
|
|
|
|
st.markdown(
|
|
|
"""
|
|
|
### π§ Neural Network Architecture
|
|
|
|
|
|
```python
|
|
|
# TechMatrix Solvers LSTM Translation Model
|
|
|
model = Sequential([
|
|
|
Input(shape=(20, 156)), # 20-frame temporal window
|
|
|
Masking(mask_value=0.),
|
|
|
BatchNormalization(),
|
|
|
Bidirectional(LSTM(32, recurrent_dropout=0.2, return_sequences=True)),
|
|
|
Dropout(0.2),
|
|
|
Bidirectional(LSTM(32, recurrent_dropout=0.2)),
|
|
|
Dense(32, activation='elu'),
|
|
|
BatchNormalization(),
|
|
|
Dropout(0.2),
|
|
|
Dense(len(expression_mapping), activation='softmax')
|
|
|
])
|
|
|
```
|
|
|
|
|
|
**Model Statistics:**
|
|
|
- Total Parameters: 82,679 (322.96 KB)
|
|
|
- Trainable Parameters: 82,239 (321.25 KB)
|
|
|
- Input Features: 156-dimensional vectors
|
|
|
- Temporal Window: 20 frames
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
|
|
|
col1, col2 = st.columns(2)
|
|
|
|
|
|
with col1:
|
|
|
st.markdown(
|
|
|
"""
|
|
|
### π οΈ Technology Stack
|
|
|
|
|
|
**Frontend & UI:**
|
|
|
- Streamlit (Interactive Web App)
|
|
|
- Custom CSS Styling
|
|
|
- Responsive Design
|
|
|
|
|
|
**Deep Learning:**
|
|
|
- Keras/TensorFlow Backend
|
|
|
- PyTorch Integration
|
|
|
- LSTM Networks
|
|
|
- OpenPose Models
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
with col2:
|
|
|
st.markdown(
|
|
|
"""
|
|
|
### π± Key Features
|
|
|
|
|
|
**Real-time Processing:**
|
|
|
- Live video analysis
|
|
|
- Pose keypoint extraction
|
|
|
- Temporal sequence modeling
|
|
|
- Confidence scoring
|
|
|
|
|
|
**User Experience:**
|
|
|
- Intuitive interface
|
|
|
- Visual feedback
|
|
|
- Progress tracking
|
|
|
- Result visualization
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
|
|
|
st.markdown("### π§ System Information")
|
|
|
col1, col2 = st.columns(2)
|
|
|
|
|
|
with col1:
|
|
|
st.write(f"**Python Version:** {platform.python_version()}")
|
|
|
st.write(f"**FFmpeg:** {shutil.which('ffmpeg') or 'Not found'}")
|
|
|
st.write(f"**FFprobe:** {shutil.which('ffprobe') or 'Not found'}")
|
|
|
|
|
|
with col2:
|
|
|
try:
|
|
|
st.write(f"**OpenCV Version:** {cv2.__version__}")
|
|
|
except:
|
|
|
st.write("**OpenCV:** Not available")
|
|
|
try:
|
|
|
import torch
|
|
|
st.write(f"**PyTorch:** {torch.__version__}")
|
|
|
st.write(f"**Keras:** {keras.__version__}")
|
|
|
except:
|
|
|
st.write("**PyTorch/Keras:** Not available")
|
|
|
|
|
|
|
|
|
st.markdown(
|
|
|
"""
|
|
|
### π Contact Information
|
|
|
|
|
|
**TechMatrix Solvers Team:**
|
|
|
|
|
|
| Name | Role | Email | Phone |
|
|
|
|------|------|-------|---------|
|
|
|
| **Abhay Gupta** | Team Lead | contact2abhaygupta6187@gmail.com | 8115814535 |
|
|
|
| **Kripanshu Gupta** | Backend Developer | guptakripanshu83@gmail.com | 7067058400 |
|
|
|
| **Dipanshu Patel** | UI/UX Designer | dipanshupatel43@gmail.com | 9294526404 |
|
|
|
| **Bhumika Patel** | Deployment & Presenter | bp7249951@gmail.com | 9302271422 |
|
|
|
|
|
|
**Institution:** Shri Ram Group of Institutions
|
|
|
|
|
|
### π Documentation
|
|
|
|
|
|
For detailed technical documentation and implementation details, please refer to our
|
|
|
[comprehensive documentation](https://docs.google.com/document/d/1mzr2KGHRJT5heUjFF20NQ3Gb89urpjZJ/edit?usp=sharing).
|
|
|
|
|
|
---
|
|
|
|
|
|
**Β© 2024 TechMatrix Solvers - Innovating Accessible Technology Solutions**
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
elif app_mode == 'Test Video Translation':
|
|
|
|
|
|
st.markdown("## π₯ Test Video Translation")
|
|
|
|
|
|
|
|
|
with st.spinner("Loading test data..."):
|
|
|
try:
|
|
|
testing_df, test_files_df = load_test_data()
|
|
|
st.success("β
Test data loaded successfully!")
|
|
|
except Exception as e:
|
|
|
st.error(f"β Failed to load test data: {e}")
|
|
|
st.stop()
|
|
|
|
|
|
category = st.sidebar.selectbox(
|
|
|
'Choose Category',
|
|
|
np.sort(test_files_df['Category'].unique(), axis=-1, kind='mergesort')
|
|
|
)
|
|
|
|
|
|
|
|
|
category_mask = (test_files_df['Category'] == category)
|
|
|
test_files_category = test_files_df[category_mask]
|
|
|
|
|
|
class_name = st.sidebar.selectbox(
|
|
|
'Choose Class',
|
|
|
np.sort(test_files_category['Class'].unique(), axis=-1, kind='mergesort')
|
|
|
)
|
|
|
|
|
|
|
|
|
class_mask = (test_files_df['Class'] == class_name)
|
|
|
filename = st.sidebar.selectbox(
|
|
|
'Choose File',
|
|
|
np.sort(test_files_category[class_mask]['Filename'].unique(), axis=-1, kind='mergesort')
|
|
|
)
|
|
|
|
|
|
|
|
|
st.info(f"π Selected: {category} β {class_name} β {filename}")
|
|
|
|
|
|
if st.sidebar.button("π Start Translation", type="primary"):
|
|
|
|
|
|
data_mask = ((testing_df['FileName'] == filename) &
|
|
|
(testing_df['Type'] == category) &
|
|
|
(testing_df['Expression'] == class_name))
|
|
|
|
|
|
window_size = 20
|
|
|
current_test_data = testing_df[data_mask]
|
|
|
|
|
|
if current_test_data.empty:
|
|
|
st.error(f"β οΈ No matching data found for: {filename} | {category} | {class_name}")
|
|
|
st.stop()
|
|
|
else:
|
|
|
st.success(f"β
Loaded {current_test_data.shape[0]} frames for processing")
|
|
|
|
|
|
|
|
|
X_test_processed, y_test_processed = create_time_series_sequences(
|
|
|
current_test_data, feature_columns_processed, label_columns, window_size=window_size
|
|
|
)
|
|
|
X_test_processed = np.array(X_test_processed)
|
|
|
|
|
|
|
|
|
st.set_option('deprecation.showfileUploaderEncoding', False)
|
|
|
|
|
|
st.sidebar.markdown('---')
|
|
|
st.markdown(
|
|
|
"""
|
|
|
<style>
|
|
|
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child {
|
|
|
width: 400px;
|
|
|
}
|
|
|
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
|
|
|
width: 400px;
|
|
|
margin-left: -400px;
|
|
|
}
|
|
|
</style>
|
|
|
""",
|
|
|
unsafe_allow_html=True,
|
|
|
)
|
|
|
|
|
|
st.sidebar.markdown('---')
|
|
|
st.markdown('## π Translation Results')
|
|
|
|
|
|
|
|
|
progress_container = st.empty()
|
|
|
|
|
|
with progress_container.container():
|
|
|
progress_df = pd.DataFrame([['--', '--']],
|
|
|
columns=['Frames Processed', 'Detected Sign'])
|
|
|
progress_table = st.table(progress_df)
|
|
|
|
|
|
|
|
|
video_display = st.empty()
|
|
|
st.markdown("<hr/>", unsafe_allow_html=True)
|
|
|
frame_display = st.empty()
|
|
|
|
|
|
|
|
|
video_file_path = hf_hub_download(
|
|
|
repo_id="sunilsarolkar/isl-test-data",
|
|
|
filename=f'test/{category}/{class_name}/{filename}',
|
|
|
repo_type="dataset"
|
|
|
)
|
|
|
|
|
|
if not os.path.exists(video_file_path):
|
|
|
st.error(f"β οΈ Video file not found: {video_file_path}")
|
|
|
st.stop()
|
|
|
|
|
|
|
|
|
video_capture = cv2.VideoCapture(video_file_path)
|
|
|
|
|
|
|
|
|
probe_result = probe_video_info(video_file_path)
|
|
|
video_info = json.loads(probe_result.json)
|
|
|
video_stream = [stream for stream in video_info["streams"] if stream["codec_type"] == "video"][0]
|
|
|
|
|
|
input_fps = video_stream["avg_frame_rate"]
|
|
|
input_pix_fmt = video_stream["pix_fmt"]
|
|
|
input_vcodec = video_stream["codec_name"]
|
|
|
format_name = video_info["format"]["format_name"].split(",")[0]
|
|
|
|
|
|
|
|
|
width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
fps_input = int(video_capture.get(cv2.CAP_PROP_FPS))
|
|
|
|
|
|
|
|
|
total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
frame_buffer = []
|
|
|
|
|
|
|
|
|
output_file = f"/tmp/techmatrix_output_{uuid.uuid4().hex}.{format_name}"
|
|
|
video_writer = None
|
|
|
weighted_predictions = {}
|
|
|
frame_predictions = {}
|
|
|
|
|
|
frame_idx = 0
|
|
|
|
|
|
try:
|
|
|
|
|
|
for _, frame_data in current_test_data.iterrows():
|
|
|
if not video_capture.isOpened():
|
|
|
st.error(f"β Could not open video: {video_file_path}")
|
|
|
break
|
|
|
|
|
|
if video_capture.isOpened():
|
|
|
ret, frame = video_capture.read()
|
|
|
|
|
|
if len(frame_buffer) < window_size:
|
|
|
|
|
|
if utils is not None:
|
|
|
visualization_canvas = utils.render_stick_model(
|
|
|
frame,
|
|
|
eval(frame_data['bodypose_circles']),
|
|
|
eval(frame_data['bodypose_sticks']),
|
|
|
eval(frame_data['handpose_edges']),
|
|
|
eval(frame_data['handpose_peaks'])
|
|
|
)
|
|
|
else:
|
|
|
visualization_canvas = frame
|
|
|
|
|
|
|
|
|
if utils is not None:
|
|
|
canvas_with_predictions = utils.create_bar_plot_visualization(
|
|
|
visualization_canvas, {},
|
|
|
f'Building Buffer - Frame {frame_idx + 1} [No Predictions Yet]',
|
|
|
visualization_canvas
|
|
|
)
|
|
|
canvas_with_predictions = utils.create_bar_plot_visualization(
|
|
|
canvas_with_predictions, weighted_predictions,
|
|
|
f'Weighted Average - Frame {frame_idx + 1} [No Predictions Yet]',
|
|
|
visualization_canvas
|
|
|
)
|
|
|
canvas_with_predictions = utils.add_bottom_padding(
|
|
|
canvas_with_predictions, (255, 255, 255), 100
|
|
|
)
|
|
|
else:
|
|
|
canvas_with_predictions = visualization_canvas
|
|
|
|
|
|
|
|
|
if video_writer is None:
|
|
|
input_framesize = canvas_with_predictions.shape[:2]
|
|
|
video_writer = VideoWriter(output_file, input_fps, input_framesize,
|
|
|
input_pix_fmt, input_vcodec)
|
|
|
|
|
|
video_writer.write_frame(canvas_with_predictions)
|
|
|
|
|
|
|
|
|
with progress_container.container():
|
|
|
progress_df = pd.DataFrame(
|
|
|
[[f'{frame_idx + 1}/{current_test_data.shape[0]}',
|
|
|
'<Building 20-frame buffer>']],
|
|
|
columns=['Frames Processed', 'Detected Sign']
|
|
|
)
|
|
|
progress_table = st.table(progress_df)
|
|
|
|
|
|
frame_buffer.append(frame)
|
|
|
|
|
|
|
|
|
with video_display.container():
|
|
|
st.image(canvas_with_predictions, channels='BGR', use_column_width=True)
|
|
|
else:
|
|
|
|
|
|
frame_buffer[:-1] = frame_buffer[1:]
|
|
|
frame_buffer[-1] = frame
|
|
|
|
|
|
|
|
|
translation_model = load_translation_model()
|
|
|
|
|
|
|
|
|
sequence_idx = frame_idx - 20
|
|
|
if translation_model is None:
|
|
|
st.error("β Translation model failed to load. Cannot make predictions.")
|
|
|
|
|
|
current_predictions = {"model_not_available": 0.0}
|
|
|
top_3_signs = ["model_not_available"]
|
|
|
top_3_probabilities = [0.0]
|
|
|
else:
|
|
|
|
|
|
prediction_output = translation_model(
|
|
|
X_test_processed[sequence_idx].reshape(
|
|
|
1, X_test_processed[sequence_idx].shape[0],
|
|
|
X_test_processed[sequence_idx].shape[1]
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
prediction_output = prediction_output[0].cpu().detach().numpy()
|
|
|
except AttributeError:
|
|
|
|
|
|
prediction_output = prediction_output[0]
|
|
|
|
|
|
|
|
|
top_prediction_idx = np.argmax(prediction_output)
|
|
|
top_3_indices = prediction_output.argsort()[-3:][::-1]
|
|
|
top_3_signs = [get_sign_label(i) for i in top_3_indices]
|
|
|
top_3_probabilities = prediction_output[top_3_indices]
|
|
|
|
|
|
|
|
|
current_predictions = {}
|
|
|
for sign, prob in zip(top_3_signs, top_3_probabilities):
|
|
|
current_predictions[sign] = prob
|
|
|
|
|
|
|
|
|
for sign, prob in zip(top_3_signs, top_3_probabilities):
|
|
|
if sign not in frame_predictions:
|
|
|
frame_predictions[sign] = []
|
|
|
frame_predictions[sign].append(prob)
|
|
|
|
|
|
|
|
|
for sign in frame_predictions:
|
|
|
sign_predictions = frame_predictions[sign]
|
|
|
sign_weights = [len(sign_predictions) for _ in range(len(sign_predictions))]
|
|
|
weighted_predictions[sign] = calculate_weighted_average(
|
|
|
sign_predictions, sign_weights
|
|
|
)
|
|
|
|
|
|
|
|
|
sorted_predictions = dict(
|
|
|
sorted(weighted_predictions.items(), key=lambda item: item[1], reverse=True)
|
|
|
)
|
|
|
|
|
|
|
|
|
if utils is not None:
|
|
|
visualization_canvas = utils.render_stick_model(
|
|
|
frame,
|
|
|
eval(frame_data['bodypose_circles']),
|
|
|
eval(frame_data['bodypose_sticks']),
|
|
|
eval(frame_data['handpose_edges']),
|
|
|
eval(frame_data['handpose_peaks'])
|
|
|
)
|
|
|
else:
|
|
|
visualization_canvas = frame
|
|
|
|
|
|
|
|
|
if utils is not None:
|
|
|
canvas_with_predictions = utils.create_bar_plot_visualization(
|
|
|
visualization_canvas, current_predictions,
|
|
|
f'Current Window Prediction (Frames {sequence_idx + 1}-{frame_idx + 1})',
|
|
|
visualization_canvas
|
|
|
)
|
|
|
canvas_with_predictions = utils.create_bar_plot_visualization(
|
|
|
canvas_with_predictions, weighted_predictions,
|
|
|
f'Cumulative Weighted Average - Frame {frame_idx + 1}',
|
|
|
visualization_canvas
|
|
|
)
|
|
|
canvas_with_predictions = utils.add_bottom_padding(
|
|
|
canvas_with_predictions, (255, 255, 255), 100
|
|
|
)
|
|
|
else:
|
|
|
canvas_with_predictions = visualization_canvas
|
|
|
|
|
|
video_writer.write_frame(canvas_with_predictions)
|
|
|
|
|
|
|
|
|
if weighted_predictions:
|
|
|
best_sign = max(weighted_predictions, key=weighted_predictions.get)
|
|
|
best_confidence = weighted_predictions[best_sign]
|
|
|
else:
|
|
|
best_sign = "no_predictions"
|
|
|
best_confidence = 0.0
|
|
|
|
|
|
|
|
|
with progress_container.container():
|
|
|
progress_df = pd.DataFrame(
|
|
|
[[f'{frame_idx + 1}/{current_test_data.shape[0]}',
|
|
|
f'{best_sign} ({best_confidence * 100:.2f}%)']],
|
|
|
columns=['Frames Processed', 'Detected Sign']
|
|
|
)
|
|
|
progress_table = st.table(progress_df)
|
|
|
|
|
|
|
|
|
with video_display.container():
|
|
|
st.image(canvas_with_predictions, channels='BGR', use_column_width=True)
|
|
|
|
|
|
frame_idx += 1
|
|
|
|
|
|
|
|
|
st.success("β
Video processing completed!")
|
|
|
|
|
|
with video_display.container():
|
|
|
if video_writer is not None:
|
|
|
video_writer.close()
|
|
|
with open(output_file, 'rb') as video_file:
|
|
|
output_video_bytes = video_file.read()
|
|
|
st.video(output_video_bytes)
|
|
|
st.info(f"πΎ Processed video saved: {output_file}")
|
|
|
else:
|
|
|
st.warning("β οΈ No video output generated")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
if 'video_capture' in locals() and video_capture is not None:
|
|
|
video_capture.release()
|
|
|
if 'video_writer' in locals() and video_writer is not None:
|
|
|
video_writer.close()
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown(
|
|
|
"""
|
|
|
---
|
|
|
<div style="text-align: center; color: #666;">
|
|
|
<p><strong>TechMatrix Solvers</strong> | Shri Ram Group of Institutions</p>
|
|
|
<p>Innovating Accessible Technology Solutions for Everyone π</p>
|
|
|
</div>
|
|
|
""",
|
|
|
unsafe_allow_html=True
|
|
|
) |