Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import tensorflow as tf | |
| import tensorflow_hub as hub | |
| import os | |
| from preprocessing import preprocess | |
| import numpy as np | |
| import shutil | |
| import cv2 | |
| import constants as const | |
| from get_drive_model import ensure_model_download | |
| import atexit # Import atexit module | |
| from simple_salesforce import Salesforce | |
| from datetime import datetime | |
| # Salesforce login | |
| sf = Salesforce( | |
| username='karthikm@sathkrutha.com', | |
| password='Navya@1223', | |
| security_token='FDWBkm0pbrNFkv6bwznbW1SKn', | |
| domain='login' # use 'test' for sandbox, 'login' for production | |
| ) | |
| # Salesforce object API name | |
| SALESFORCE_OBJECT = 'Anomaly_Result__c' | |
| def log_to_salesforce(video_path, prediction_text): | |
| try: | |
| result = sf.__getattr__(SALESFORCE_OBJECT).create({ | |
| 'Video_Name__c': os.path.basename(video_path), | |
| 'Prediction_Result__c': prediction_text, | |
| 'Timestamp__c': datetime.utcnow().isoformat() | |
| }) | |
| print("Salesforce Record Created:", result) | |
| except Exception as e: | |
| print("Salesforce Logging Failed:", e) | |
| # Define the path for the directory | |
| FRAMES_FOLDER = 'static/frames' | |
| # Create the directory if it doesn't exist | |
| os.makedirs(FRAMES_FOLDER, exist_ok=True) | |
| # Ensure models are downloaded | |
| ensure_model_download(const.ANOMALY_DETECTION_MODEL_FILE_ID, 'anomaly_detection_model.h5') | |
| ensure_model_download(const.ANOMALY_CLASSIFICATION_MODEL_FILE_ID, 'anomaly_classification_model.h5') | |
| # Load the model once at startup | |
| first_model = tf.keras.models.load_model('anomaly_detection_model.h5', custom_objects={'KerasLayer': hub.KerasLayer}) | |
| second_model = tf.keras.models.load_model('anomaly_classification_model.h5', custom_objects={'KerasLayer': hub.KerasLayer}) | |
| UPLOAD_FOLDER = 'uploads' | |
| FRAMES_FOLDER = 'static/frames' | |
| def process_video(filepath): | |
| print(f"Processing video: {filepath}") | |
| frames_for_prediction, frames_for_display = preprocess(filepath) | |
| print(f"Shape of frames for prediction: {frames_for_prediction.shape}") | |
| print(f"Shape of frames for display: {frames_for_display.shape}") | |
| print(first_model.summary()) | |
| anomaly_prediction = first_model.predict(frames_for_prediction)[0][0] | |
| print(f"Anomaly Prediction: {anomaly_prediction}") | |
| if anomaly_prediction < 0.5: | |
| classification_prediction = second_model.predict(frames_for_prediction)[0][0] | |
| if classification_prediction < 0.5: | |
| prediction_label = f'The video is an Anomaly type.\nanomaly prediction with {(1-anomaly_prediction)*100:.2f}% confidence\nExplosion Detected with {(1-classification_prediction)*100:.2f}% confidence' | |
| else: | |
| prediction_label = f'The video is an Anomaly type.\nanomaly prediction with {(1-anomaly_prediction)* 100:.2f}% confidence\nViolent Activity Detected with {classification_prediction * 100:.2f}% confidence' | |
| else: | |
| prediction_label = f'No Anomalous Activity with {anomaly_prediction * 100:.2f}% confidence.' | |
| frame_paths = save_frames_to_filesystem(frames_for_display) | |
| # Log prediction to Salesforce | |
| log_to_salesforce(filepath, prediction_label) | |
| return prediction_label, frame_paths | |
| def save_frames_to_filesystem(frames): | |
| frame_paths = [] | |
| for i, frame in enumerate(frames): | |
| frame_uint8 = frame.astype(np.uint8) | |
| frame_filename = f'frame_{i}.png' | |
| frame_path = os.path.join(FRAMES_FOLDER, frame_filename) | |
| cv2.imwrite(frame_path, frame_uint8) | |
| frame_paths.append(frame_path) | |
| return frame_paths | |
| def cleanup_uploads_folder(): | |
| if os.path.exists(UPLOAD_FOLDER): | |
| for filename in os.listdir(UPLOAD_FOLDER): | |
| file_path = os.path.join(UPLOAD_FOLDER, filename) | |
| try: | |
| if os.path.isfile(file_path) or os.path.islink(file_path): | |
| os.unlink(file_path) | |
| elif os.path.isdir(file_path): | |
| shutil.rmtree(file_path) | |
| except Exception as e: | |
| print(f'Failed to delete {file_path}. Reason: {e}') | |
| if os.path.exists(FRAMES_FOLDER): | |
| for filename in os.listdir(FRAMES_FOLDER): | |
| file_path = os.path.join(FRAMES_FOLDER, filename) | |
| try: | |
| if os.path.isfile(file_path) or os.path.islink(file_path): | |
| os.unlink(file_path) | |
| elif os.path.isdir(file_path): | |
| shutil.rmtree(file_path) | |
| except Exception as e: | |
| print(f'Failed to delete {file_path}. Reason: {e}') | |
| print("Uploads and frames folders cleaned") | |
| # Register the cleanup function | |
| atexit.register(cleanup_uploads_folder) | |
| # Create Gradio Interface | |
| iface = gr.Interface( | |
| fn=process_video, | |
| inputs=gr.File(type="filepath"), | |
| outputs=[ | |
| gr.Textbox(label="Prediction", elem_id="prediction-box"), | |
| gr.Gallery(label="Video Frames", elem_id="frame-gallery", columns=5, rows=10) | |
| ], | |
| title="Anomaly Detection in Videos", | |
| description="Upload a video file and detect anomalies, violent activity, or explosions.", | |
| theme="default", | |
| css=""" | |
| body { | |
| background-color: #f0f8ff; | |
| } | |
| .interface { | |
| border-radius: 20px; | |
| background: #ffffff; | |
| padding: 20px; | |
| box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2); | |
| } | |
| .interface-title { | |
| color: #333399; | |
| font-size: 32px; | |
| } | |
| .interface-description { | |
| color: #6666cc; | |
| font-size: 18px; | |
| } | |
| #prediction-box { | |
| background: #e6e6ff; | |
| border-radius: 10px; | |
| } | |
| #frame-gallery { | |
| background: #e6e6ff; | |
| border-radius: 10px; | |
| } | |
| .gallery-item img { | |
| border: 2px solid #6666cc; | |
| border-radius: 10px; | |
| } | |
| .btn-primary { | |
| background-color: #6666cc; | |
| border-color: #6666cc; | |
| } | |
| .btn-primary:hover { | |
| background-color: #333399; | |
| border-color: #333399; | |
| } | |
| """ | |
| ) | |
| # Additional interfaces for individual triggers | |
| violent_iface = gr.Interface( | |
| fn=process_video, | |
| inputs=gr.File(type="filepath"), | |
| outputs=[ | |
| gr.Textbox(label="Prediction"), | |
| gr.Gallery(label="Video Frames", columns=5, rows=10) | |
| ], | |
| title="Violent Detection" | |
| ) | |
| explosion_iface = gr.Interface( | |
| fn=process_video, | |
| inputs=gr.File(type="filepath"), | |
| outputs=[ | |
| gr.Textbox(label="Prediction"), | |
| gr.Gallery(label="Video Frames", columns=5, rows=10) | |
| ], | |
| title="Explosion Detection" | |
| ) | |
| normal_iface = gr.Interface( | |
| fn=process_video, | |
| inputs=gr.File(type="filepath"), | |
| outputs=[ | |
| gr.Textbox(label="Prediction"), | |
| gr.Gallery(label="Video Frames", columns=5, rows=10) | |
| ], | |
| title="Normal Detection" | |
| ) | |
| # Combine all interfaces into a single application | |
| combined_iface = gr.TabbedInterface([iface, violent_iface, explosion_iface, normal_iface], | |
| ["Upload Video", "Violent Detection", "Explosion Detection", "Suspicious Activities"]) | |
| if __name__ == "__main__": | |
| combined_iface.launch() | |