import base64 import random import shutil import time from openai import OpenAI import glob import numpy as np import matplotlib.pyplot as plt from sklearn import svm import zipfile from PIL import Image from sklearn.decomposition import PCA from PIL import Image import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.svm import OneClassSVM import numpy as np import skimage from skimage.feature import hog from skimage.color import rgb2gray from skimage import io from sklearn.decomposition import PCA from sklearn.svm import OneClassSVM from sklearn.preprocessing import StandardScaler import os from tqdm import tqdm import pickle import joblib import cv2 import streamlit as st from streamlit_image_select import image_select def cut_video(video_path): # video_path = '/Users/ducky/Downloads/thief_1.mp4' cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frames_dir = "./data/video_frame" if os.path.exists(frames_dir): shutil.rmtree(frames_dir) os.makedirs(frames_dir, exist_ok=True) frame_count = 0 frame_times = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break timestamp_ms = (frame_count / fps) * 1000 minutes = int(timestamp_ms // 60000) seconds = int((timestamp_ms % 60000) // 1000) milliseconds = int(timestamp_ms % 1000) time_formatted = f"{minutes:02}:{seconds:02}:{milliseconds:03}" frame_times.append(time_formatted) frame_file_path = os.path.join(frames_dir, f'frame_{frame_count:04d}.jpg') cv2.imwrite(frame_file_path, frame) frame_count += 1 cap.release() return frames_dir, frame_times def extract_hog_features(image_path): """ 画像ファイルからHOG特徴量を抽出します。 :param image_path: 画像ファイルのパス :return: HOG特徴量のNumPy配列 """ # 画像を読み込む img = io.imread(image_path) img = img[:,:,:3] # 画像をグレースケールに変換 gray_img = rgb2gray(img) # HOG特徴量を抽出 features, _ = hog(gray_img, visualize=True, block_norm='L2-Hys') return features def prepare_features(image_paths): """ 複数の画像からHOG特徴量を抽出し、特徴量の行列を作成します。 :param image_paths: 画像ファイルのパスのリスト :return: 特徴量のNumPy配列 """ progress_bar = st.progress(0) status_text = st.empty() features = [] for i, path in enumerate(tqdm(image_paths)): features.append(extract_hog_features(path)) progress = int((i + 1) / len(image_paths) * 100) progress_bar.progress(progress) status_text.text(f"Processing image {i+1}/{len(image_paths)}: {path}") progress_bar.empty() status_text.text("Processing complete!") status_text.empty() return np.array(features) def run_pca(features): pca = PCA(n_components=4) progress_bar = st.progress(0) status_text = st.empty() def simulate_pca_progress(progress_bar, status_text, total_steps=100): for step in range(total_steps): progress_bar.progress(int((step + 1) / total_steps * 100)) status_text.text(f"PCA Transformation Progress: {int((step + 1) / total_steps * 100)}%") time.sleep(0.1) simulate_pca_progress(progress_bar, status_text) transformed_data = pca.fit_transform(features) status_text.text("PCA Transformation Complete!") progress_bar.empty() status_text.empty() return pca, transformed_data def run_standard_scale(features): scaler = StandardScaler() progress_bar = st.progress(0) status_text = st.empty() def simulate_ss_progress(progress_bar, status_text, total_steps=100): for step in range(total_steps): progress_bar.progress(int((step + 1) / total_steps * 100)) status_text.text(f"StandardScaler Transformation Progress: {int((step + 1) / total_steps * 100)}%") time.sleep(0.1) simulate_ss_progress(progress_bar, status_text) transformed_data = scaler.fit_transform(features) status_text.text("StandardScaler Transformation Complete!") progress_bar.empty() status_text.empty() return scaler, transformed_data def run_OneClassSVM(z_train): progress_bar = st.progress(0) status_text = st.empty() clf = svm.OneClassSVM(nu=0.2, kernel="rbf", gamma=0.001) def simulate_fitting_progress(clf, z_train, total_steps=100): for step in range(total_steps): time.sleep(0.05) progress_bar.progress(step + 1) status_text.text(f"Fitting model... {step + 1}% complete") clf.fit(z_train) progress_bar.empty() status_text.empty() simulate_fitting_progress(clf, z_train) return clf def predict_with_progress(clf, features_array): progress_bar = st.progress(0) status_text = st.empty() predictions = np.zeros(features_array.shape[0]) for i in range(features_array.shape[0]): predictions[i] = clf.predict(features_array[i].reshape(1, -1)) # predictions[i] = clf.decision_function(features_array[i].reshape(1, -1)) progress = int((i + 1) / features_array.shape[0] * 100) progress_bar.progress(progress) status_text.text(f"Predicting... {progress}% complete") progress_bar.empty() status_text.empty() return predictions def prepare_all_displayed_anomalies(frames_dir, predictions): anomaly_indices = [index for index, value in enumerate(predictions) if value == -1] anomaly_indices.sort() frames = os.listdir(frames_dir) frames.sort() anomaly_folder = "./data/anomaly" os.makedirs(anomaly_folder, exist_ok=True) anomaly_paths = [] frame_number = 0 anomaly_count = 0 for frame in frames: frame_path = os.path.join(frames_dir, frame) if frame_number == anomaly_indices[anomaly_count]: anomaly_frame_path = os.path.join(anomaly_folder, f'frame_{frame_number:04d}.jpg') shutil.copy(frame_path, anomaly_frame_path) anomaly_paths.append(anomaly_frame_path) anomaly_count += 1 if anomaly_count >= len(anomaly_indices): break frame_number += 1 return anomaly_paths def prepare_3_displayed_anomalies(frames_dir, predictions, frame_times): anomaly_frames = [index for index, value in enumerate(predictions) if value == -1] indices = random.sample(range(len(anomaly_frames)), 3) anomaly_frames = [anomaly_frames[i] for i in indices] anomaly_frames.sort() frames = os.listdir(frames_dir) frames.sort() anomaly_folder = "./data/anomaly" os.makedirs(anomaly_folder, exist_ok=True) anomaly_paths = [] frame_number = 0 anomaly_count = 0 for frame in frames: frame_path = os.path.join(frames_dir, frame) if frame_number == anomaly_frames[anomaly_count]: anomaly_frame_path = os.path.join(anomaly_folder, f'frame_{frame_number:04d}.jpg') shutil.copy(frame_path, anomaly_frame_path) anomaly_paths.append([anomaly_frame_path, frame_times[frame_number]]) anomaly_count += 1 if anomaly_count >= len(anomaly_frames): break frame_number += 1 return anomaly_paths def OneClassSvm_anomaly_detection(image_paths): features = prepare_features(image_paths) _, features_scaled = run_standard_scale(features) _, z_train = run_pca(features_scaled) clf = run_OneClassSVM(z_train) return clf, z_train def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") def get_response(model, client, image_path): base64_image = encode_image(image_path) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You are a helpful assistant that responds in Markdown. Help me with task!"}, {"role": "user", "content": [ {"type": "text", "text": "この画像の中の人物が行っている活動を説明してください"}, {"type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}"} } ]} ], temperature=0.0, ) return response.choices[0].message.content def VLM_anomaly_detection(anomaly_paths): model = "gpt-4o" API_KEY = os.getenv("MY_API_KEY") client = OpenAI(api_key=API_KEY) progress_bar = st.progress(0) status_text = st.empty() st.session_state.responses = [] anomaly_paths = [path[0] for path in anomaly_paths] for i, anomaly_path in enumerate(tqdm(anomaly_paths)): progress = int((i + 1) / len(anomaly_paths) * 100) progress_bar.progress(progress) status_text.text(f"Running VLM {i+1}/{len(anomaly_paths)}") response = get_response(model, client, anomaly_path) st.session_state.responses.append(response) progress_bar.empty() status_text.text("Processing complete!") status_text.empty() def main(): if 'responses' not in st.session_state: st.session_state.responses = [] if 'display_anomalies' not in st.session_state: st.session_state.display_anomalies = [] with st.sidebar: st.image("logo.png") uploaded_video = st.file_uploader("Upload video", type=["mp4", "mov", "avi"]) os.makedirs("./data", exist_ok=True) if uploaded_video is not None: video_file_path = "./data/uploaded_video.mp4" with open(video_file_path, "wb") as f: f.write(uploaded_video.read()) st.video(uploaded_video, start_time=0) for _ in range(3): st.write(" ") st.write("サイドバーより動画としてアップロードし推論ボタンをクリック") if st.button("推論開始"): with st.spinner("データを学習中、少々お待ちください..."): video_file_path = "./data/uploaded_video.mp4" frames_dir, frame_times = cut_video(video_file_path) image_paths = [os.path.join(frames_dir, image_path) for image_path in os.listdir(frames_dir)] clf, z_train = OneClassSvm_anomaly_detection(image_paths) with st.spinner("学習が完了しました。異常検知を行っています..."): predictions = predict_with_progress(clf, z_train) st.session_state.display_anomalies = [] st.session_state.display_anomalies = prepare_3_displayed_anomalies(frames_dir, predictions, frame_times) VLM_anomaly_detection(st.session_state.display_anomalies) if st.session_state.display_anomalies: anomaly_paths = [path[0] for path in st.session_state.display_anomalies] anomaly_time = [str(path[1]) for path in st.session_state.display_anomalies] selected = image_select( label = "「異常」である可能性があるフレーム", images = anomaly_paths, captions = anomaly_time, key = "image_select" ) selected_img = str(selected)[:100] idx = anomaly_paths.index(selected_img) st.info(st.session_state.responses[idx]) if __name__ == "__main__": main()