anomaly_dector / app.py
foreversheikh's picture
Update app.py
8a74454 verified
import os
import cv2
import torch
import numpy as np
import time
from datetime import datetime
import threading
import base64
from werkzeug.utils import secure_filename
from flask import Flask, render_template, Response, request, jsonify
from flask_socketio import SocketIO
# CHANGED: Imports are now flat
from load_model import load_models
from utils import build_transforms
from TorchUtils import get_torch_device
from yolo_detection import analyze_video_with_yolo
# ---- App Setup ----
# CHANGED: Told Flask to look for templates/static files in the current folder ('.')
app = Flask(__name__, template_folder='.', static_folder='.')
app.config['SECRET_KEY'] = 'your_secret_key!'
# --- File Paths (CHANGED to relative paths) ---
UPLOAD_FOLDER = 'uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
SAVE_DIR = "outputs/anomaly_frames"
os.makedirs(SAVE_DIR, exist_ok=True)
# CHANGED: All paths are now relative to this app.py file
FEATURE_EXTRACTOR_PATH = "models//c3d.pickle"
AD_MODEL_PATH = "models//epoch_8000.pt"
YOLO_MODEL_PATH = "models//yolo_my_model.pt"
# CHANGED: All video paths are now relative
VIDEO_PATHS = {
"Abuse": "demo_videos//Abuse.mp4",
"Arrest": "demo_videos//Arrest.mp4",
"Arson": "demo_videos//Arson.mp4",
"Assault": "demo_videos//Assault.mp4",
"Burglary": "demo_videos//Burglary.mp4",
"Explosion": "demo_videos//Explosion.mp4",
"Fighting": "demo_videos//Fighting.mp4",
"RoadAccidents": "demo_videos//RoadAccidents.mp4",
"Robbery": "demo_videos//Robbery.mp4",
"Shooting": "demo_videos//Shooting.mp4",
"Shoplifting": "demo_videos//Shoplifting.mp4",
"Stealing": "demo_videos//Stealing.mp4",
"Vandalism": "demo_videos//Vandalism.mp4",
"Normal": "demo_videos//Normal.mp4"
}
# ---- Global Config & Model Loading ----
print("[INFO] Loading models...")
DEVICE = get_torch_device()
anomaly_detector, feature_extractor = load_models(
FEATURE_EXTRACTOR_PATH, AD_MODEL_PATH, features_method="c3d", device=DEVICE
)
feature_extractor.eval()
anomaly_detector.eval()
TRANSFORMS = build_transforms(mode="c3d")
ANOMALY_THRESHOLD = 0.5
print("[INFO] Models loaded successfully.")
# --- Threading control ---
thread = None
thread_lock = threading.Lock()
stop_event = threading.Event()
# (The `smooth_score` and `video_processing_task` functions remain unchanged)
def smooth_score(scores, new_score, window=5):
scores.append(new_score)
if len(scores) > window:
scores.pop(0)
return float(np.mean(scores))
def video_processing_task(video_path):
global thread
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
socketio.emit('processing_error', {'error': f'Could not open video file.'})
return
frame_buffer = []
last_save_time = 0
recent_scores = []
FRAME_SKIP = 4
frame_count = 0
while cap.isOpened() and not stop_event.is_set():
socketio.sleep(0.001)
ret, frame = cap.read()
if not ret: break
frame_count += 1
if frame_count % (FRAME_SKIP + 1) != 0: continue
frame_buffer.append(frame.copy())
if len(frame_buffer) == 16:
frames_resized = [cv2.resize(f, (112, 112)) for f in frame_buffer]
clip_np = np.array(frames_resized, dtype=np.uint8)
clip_torch = torch.from_numpy(clip_np)
clip_torch = TRANSFORMS(clip_torch)
clip_torch = clip_torch.unsqueeze(0).to(DEVICE)
with torch.no_grad():
features = feature_extractor(clip_torch).detach()
score_tensor = anomaly_detector(features).detach()
score = float(score_tensor.view(-1)[0].item())
score = smooth_score(recent_scores, score)
score = float(np.clip(score, 0, 1))
socketio.emit('update_graph', {'score': score})
if score > ANOMALY_THRESHOLD and (time.time() - last_save_time) >= 30:
last_save_time = time.time()
socketio.emit('update_status', {'status': 'Anomaly detected! Saving clip...'})
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
clip_dir = os.path.join(SAVE_DIR, f"anomaly_{timestamp}")
os.makedirs(clip_dir, exist_ok=True)
first_frame_path = os.path.join(clip_dir, "anomaly_frame.jpg")
cv2.imwrite(first_frame_path, frame_buffer[0])
try:
yolo_result = analyze_video_with_yolo(first_frame_path, model_path=YOLO_MODEL_PATH, return_class=True)
socketio.emit('update_yolo_text', {'text': f"YOLO Class: {yolo_result}"})
_, buffer = cv2.imencode('.jpg', frame_buffer[0])
b64_str = base64.b64encode(buffer).decode('utf-8')
socketio.emit('update_yolo_image', {'image_data': b64_str})
except Exception as e:
socketio.emit('update_yolo_text', {'text': f'YOLO Error: {e}'})
frame_buffer.clear()
cap.release()
if not stop_event.is_set():
socketio.emit('processing_finished', {'message': 'Video finished.'})
finally:
with thread_lock:
thread = None
stop_event.clear()
@app.route('/')
def index():
# This will now look for 'index.html' in the same folder
return render_template('index.html', anomaly_names=VIDEO_PATHS.keys())
@app.route('/upload', methods=['POST'])
def upload_file():
if 'video' not in request.files:
return jsonify({'error': 'No video file found'}), 400
file = request.files['video']
if file.filename == '':
return jsonify({'error': 'No video file selected'}), 400
if file:
filename = secure_filename(file.filename)
unique_filename = f"{datetime.now().strftime('%Y%m%d%HM%S')}_{filename}"
save_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(save_path)
return jsonify({'success': True, 'filename': unique_filename})
return jsonify({'error': 'File upload failed'}), 500
@app.route('/video_stream/<source>/<filename>')
def video_stream(source, filename):
if source == 'demo':
path = VIDEO_PATHS.get(filename)
elif source == 'upload':
# Need to URL-decode the filename
safe_filename = secure_filename(filename)
path = os.path.join(app.config['UPLOAD_FOLDER'], safe_filename)
else:
return "Invalid source", 404
if not path or not os.path.exists(path):
return "Video not found", 404
def generate():
with open(path, "rb") as f:
while chunk := f.read(1024 * 1024):
yield chunk
return Response(generate(), mimetype="video/mp4")
@socketio.on('start_processing')
def handle_start_processing(data):
global thread
with thread_lock:
if thread is None:
stop_event.clear()
source = data.get('source')
filename = data.get('filename')
video_path = None
if source == 'demo':
video_path = VIDEO_PATHS.get(filename)
elif source == 'upload':
safe_filename = secure_filename(filename)
video_path = os.path.join(app.config['UPLOAD_FOLDER'], safe_filename)
if video_path and os.path.exists(video_path):
print(f"[INFO] Starting processing for {filename} from {source}")
thread = socketio.start_background_task(target=video_processing_task, video_path=video_path)
else:
socketio.emit('processing_error', {'error': f'Video file not found!'})
@socketio.on('reset_system')
def handle_reset():
global thread
with thread_lock:
if thread is not None:
stop_event.set()
socketio.emit('system_reset_confirm')
if __name__ == '__main__':
print("[INFO] Starting Flask server...")
# Using 0.0.0.0 is still correct for services like Hugging Face
socketio.run(app, debug=True)