Spaces:
Sleeping
Sleeping
| import base64 | |
| import random | |
| import shutil | |
| import time | |
| from openai import OpenAI | |
| import glob | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from sklearn import svm | |
| import zipfile | |
| from PIL import Image | |
| from sklearn.decomposition import PCA | |
| from PIL import Image | |
| import numpy as np | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.svm import OneClassSVM | |
| import numpy as np | |
| import skimage | |
| from skimage.feature import hog | |
| from skimage.color import rgb2gray | |
| from skimage import io | |
| from sklearn.decomposition import PCA | |
| from sklearn.svm import OneClassSVM | |
| from sklearn.preprocessing import StandardScaler | |
| import os | |
| from tqdm import tqdm | |
| import pickle | |
| import joblib | |
| import cv2 | |
| import streamlit as st | |
| from streamlit_image_select import image_select | |
| def cut_video(video_path): | |
| # video_path = '/Users/ducky/Downloads/thief_1.mp4' | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frames_dir = "./data/video_frame" | |
| if os.path.exists(frames_dir): | |
| shutil.rmtree(frames_dir) | |
| os.makedirs(frames_dir, exist_ok=True) | |
| frame_count = 0 | |
| frame_times = [] | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| timestamp_ms = (frame_count / fps) * 1000 | |
| minutes = int(timestamp_ms // 60000) | |
| seconds = int((timestamp_ms % 60000) // 1000) | |
| milliseconds = int(timestamp_ms % 1000) | |
| time_formatted = f"{minutes:02}:{seconds:02}:{milliseconds:03}" | |
| frame_times.append(time_formatted) | |
| frame_file_path = os.path.join(frames_dir, f'frame_{frame_count:04d}.jpg') | |
| cv2.imwrite(frame_file_path, frame) | |
| frame_count += 1 | |
| cap.release() | |
| return frames_dir, frame_times | |
| def extract_hog_features(image_path): | |
| """ | |
| 画像ファイルからHOG特徴量を抽出します。 | |
| :param image_path: 画像ファイルのパス | |
| :return: HOG特徴量のNumPy配列 | |
| """ | |
| # 画像を読み込む | |
| img = io.imread(image_path) | |
| img = img[:,:,:3] | |
| # 画像をグレースケールに変換 | |
| gray_img = rgb2gray(img) | |
| # HOG特徴量を抽出 | |
| features, _ = hog(gray_img, visualize=True, block_norm='L2-Hys') | |
| return features | |
| def prepare_features(image_paths): | |
| """ | |
| 複数の画像からHOG特徴量を抽出し、特徴量の行列を作成します。 | |
| :param image_paths: 画像ファイルのパスのリスト | |
| :return: 特徴量のNumPy配列 | |
| """ | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| features = [] | |
| for i, path in enumerate(tqdm(image_paths)): | |
| features.append(extract_hog_features(path)) | |
| progress = int((i + 1) / len(image_paths) * 100) | |
| progress_bar.progress(progress) | |
| status_text.text(f"Processing image {i+1}/{len(image_paths)}: {path}") | |
| progress_bar.empty() | |
| status_text.text("Processing complete!") | |
| status_text.empty() | |
| return np.array(features) | |
| def run_pca(features): | |
| pca = PCA(n_components=4) | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| def simulate_pca_progress(progress_bar, status_text, total_steps=100): | |
| for step in range(total_steps): | |
| progress_bar.progress(int((step + 1) / total_steps * 100)) | |
| status_text.text(f"PCA Transformation Progress: {int((step + 1) / total_steps * 100)}%") | |
| time.sleep(0.1) | |
| simulate_pca_progress(progress_bar, status_text) | |
| transformed_data = pca.fit_transform(features) | |
| status_text.text("PCA Transformation Complete!") | |
| progress_bar.empty() | |
| status_text.empty() | |
| return pca, transformed_data | |
| def run_standard_scale(features): | |
| scaler = StandardScaler() | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| def simulate_ss_progress(progress_bar, status_text, total_steps=100): | |
| for step in range(total_steps): | |
| progress_bar.progress(int((step + 1) / total_steps * 100)) | |
| status_text.text(f"StandardScaler Transformation Progress: {int((step + 1) / total_steps * 100)}%") | |
| time.sleep(0.1) | |
| simulate_ss_progress(progress_bar, status_text) | |
| transformed_data = scaler.fit_transform(features) | |
| status_text.text("StandardScaler Transformation Complete!") | |
| progress_bar.empty() | |
| status_text.empty() | |
| return scaler, transformed_data | |
| def run_OneClassSVM(z_train): | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| clf = svm.OneClassSVM(nu=0.2, kernel="rbf", gamma=0.001) | |
| def simulate_fitting_progress(clf, z_train, total_steps=100): | |
| for step in range(total_steps): | |
| time.sleep(0.05) | |
| progress_bar.progress(step + 1) | |
| status_text.text(f"Fitting model... {step + 1}% complete") | |
| clf.fit(z_train) | |
| progress_bar.empty() | |
| status_text.empty() | |
| simulate_fitting_progress(clf, z_train) | |
| return clf | |
| def predict_with_progress(clf, features_array): | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| predictions = np.zeros(features_array.shape[0]) | |
| for i in range(features_array.shape[0]): | |
| predictions[i] = clf.predict(features_array[i].reshape(1, -1)) | |
| # predictions[i] = clf.decision_function(features_array[i].reshape(1, -1)) | |
| progress = int((i + 1) / features_array.shape[0] * 100) | |
| progress_bar.progress(progress) | |
| status_text.text(f"Predicting... {progress}% complete") | |
| progress_bar.empty() | |
| status_text.empty() | |
| return predictions | |
| def prepare_all_displayed_anomalies(frames_dir, predictions): | |
| anomaly_indices = [index for index, value in enumerate(predictions) if value == -1] | |
| anomaly_indices.sort() | |
| frames = os.listdir(frames_dir) | |
| frames.sort() | |
| anomaly_folder = "./data/anomaly" | |
| os.makedirs(anomaly_folder, exist_ok=True) | |
| anomaly_paths = [] | |
| frame_number = 0 | |
| anomaly_count = 0 | |
| for frame in frames: | |
| frame_path = os.path.join(frames_dir, frame) | |
| if frame_number == anomaly_indices[anomaly_count]: | |
| anomaly_frame_path = os.path.join(anomaly_folder, f'frame_{frame_number:04d}.jpg') | |
| shutil.copy(frame_path, anomaly_frame_path) | |
| anomaly_paths.append(anomaly_frame_path) | |
| anomaly_count += 1 | |
| if anomaly_count >= len(anomaly_indices): break | |
| frame_number += 1 | |
| return anomaly_paths | |
| def prepare_3_displayed_anomalies(frames_dir, predictions, frame_times): | |
| anomaly_frames = [index for index, value in enumerate(predictions) if value == -1] | |
| indices = random.sample(range(len(anomaly_frames)), 3) | |
| anomaly_frames = [anomaly_frames[i] for i in indices] | |
| anomaly_frames.sort() | |
| frames = os.listdir(frames_dir) | |
| frames.sort() | |
| anomaly_folder = "./data/anomaly" | |
| os.makedirs(anomaly_folder, exist_ok=True) | |
| anomaly_paths = [] | |
| frame_number = 0 | |
| anomaly_count = 0 | |
| for frame in frames: | |
| frame_path = os.path.join(frames_dir, frame) | |
| if frame_number == anomaly_frames[anomaly_count]: | |
| anomaly_frame_path = os.path.join(anomaly_folder, f'frame_{frame_number:04d}.jpg') | |
| shutil.copy(frame_path, anomaly_frame_path) | |
| anomaly_paths.append([anomaly_frame_path, frame_times[frame_number]]) | |
| anomaly_count += 1 | |
| if anomaly_count >= len(anomaly_frames): break | |
| frame_number += 1 | |
| return anomaly_paths | |
| def OneClassSvm_anomaly_detection(image_paths): | |
| features = prepare_features(image_paths) | |
| _, features_scaled = run_standard_scale(features) | |
| _, z_train = run_pca(features_scaled) | |
| clf = run_OneClassSVM(z_train) | |
| return clf, z_train | |
| def encode_image(image_path): | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode("utf-8") | |
| def get_response(model, client, image_path): | |
| base64_image = encode_image(image_path) | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant that responds in Markdown. Help me with task!"}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "この画像の中の人物が行っている活動を説明してください"}, | |
| {"type": "image_url", "image_url": { | |
| "url": f"data:image/png;base64,{base64_image}"} | |
| } | |
| ]} | |
| ], | |
| temperature=0.0, | |
| ) | |
| return response.choices[0].message.content | |
| def VLM_anomaly_detection(anomaly_paths): | |
| model = "gpt-4o" | |
| API_KEY = os.getenv("MY_API_KEY") | |
| client = OpenAI(api_key=API_KEY) | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| st.session_state.responses = [] | |
| anomaly_paths = [path[0] for path in anomaly_paths] | |
| for i, anomaly_path in enumerate(tqdm(anomaly_paths)): | |
| progress = int((i + 1) / len(anomaly_paths) * 100) | |
| progress_bar.progress(progress) | |
| status_text.text(f"Running VLM {i+1}/{len(anomaly_paths)}") | |
| response = get_response(model, client, anomaly_path) | |
| st.session_state.responses.append(response) | |
| progress_bar.empty() | |
| status_text.text("Processing complete!") | |
| status_text.empty() | |
| def main(): | |
| if 'responses' not in st.session_state: | |
| st.session_state.responses = [] | |
| if 'display_anomalies' not in st.session_state: | |
| st.session_state.display_anomalies = [] | |
| with st.sidebar: | |
| st.image("logo.png") | |
| uploaded_video = st.file_uploader("Upload video", type=["mp4", "mov", "avi"]) | |
| os.makedirs("./data", exist_ok=True) | |
| if uploaded_video is not None: | |
| video_file_path = "./data/uploaded_video.mp4" | |
| with open(video_file_path, "wb") as f: | |
| f.write(uploaded_video.read()) | |
| st.video(uploaded_video, start_time=0) | |
| for _ in range(3): st.write(" ") | |
| st.write("サイドバーより動画としてアップロードし推論ボタンをクリック") | |
| if st.button("推論開始"): | |
| with st.spinner("データを学習中、少々お待ちください..."): | |
| video_file_path = "./data/uploaded_video.mp4" | |
| frames_dir, frame_times = cut_video(video_file_path) | |
| image_paths = [os.path.join(frames_dir, image_path) for image_path in os.listdir(frames_dir)] | |
| clf, z_train = OneClassSvm_anomaly_detection(image_paths) | |
| with st.spinner("学習が完了しました。異常検知を行っています..."): | |
| predictions = predict_with_progress(clf, z_train) | |
| st.session_state.display_anomalies = [] | |
| st.session_state.display_anomalies = prepare_3_displayed_anomalies(frames_dir, predictions, frame_times) | |
| VLM_anomaly_detection(st.session_state.display_anomalies) | |
| if st.session_state.display_anomalies: | |
| anomaly_paths = [path[0] for path in st.session_state.display_anomalies] | |
| anomaly_time = [str(path[1]) for path in st.session_state.display_anomalies] | |
| selected = image_select( | |
| label = "「異常」である可能性があるフレーム", | |
| images = anomaly_paths, | |
| captions = anomaly_time, | |
| key = "image_select" | |
| ) | |
| selected_img = str(selected)[:100] | |
| idx = anomaly_paths.index(selected_img) | |
| st.info(st.session_state.responses[idx]) | |
| if __name__ == "__main__": | |
| main() |