# -*- coding: utf-8 -*- """video.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1d-uwY0B5q7gOItN6fyA-1RoBcDwkOxb6 """ # ✅ 全局加载模型一次(推荐) from faster_whisper import WhisperModel whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8") # 将整段video.py封装为analyze_video函数 def analyze_video(file, lang=None): output = "" # 用于收集结果文本 import os import zipfile from dotenv import load_dotenv from ultralytics import YOLO import cv2 import requests import base64 import json # === 配置 === VIDEO_PATH = file YOLO_MODEL_PATH = "trained_dataset/runs/detect/train/weights/best.pt" # ✅ 自动解压模型文件(如果还没解压) if not os.path.exists("trained_dataset"): print("📦 正在解压 YOLO 模型文件...") with zipfile.ZipFile("trained_dataset.zip", "r") as zip_ref: zip_ref.extractall(".") print("✅ 解压完成") # ✅ 加载环境变量 from dotenv import load_dotenv load_dotenv() API_KEY = os.getenv("GOOGLE_VISION_API_KEY") VISION_API_URL = f"https://vision.googleapis.com/v1/images:annotate?key={API_KEY}" LIKELIHOOD_MAPPING = {"UNKNOWN": 0, "VERY_UNLIKELY": 1, "UNLIKELY": 2, "POSSIBLE": 3, "LIKELY": 4, "VERY_LIKELY": 5} # === 初始化 === model = YOLO(YOLO_MODEL_PATH) cap = cv2.VideoCapture(VIDEO_PATH) safe_search_results = {"adult": 0, "spoof": 0, "medical": 0, "violence": 0, "racy": 0} detected_texts = set() detected_labels = set() total_frames = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break total_frames += 1 # YOLO 检测 results = model.predict(source=frame, conf=0.2, save=False, verbose=False) if len(results[0].boxes) > 0: for c in results[0].boxes.cls: class_id = int(c.item()) class_name = results[0].names[class_id] detected_labels.add(class_name) # 每 10 帧 Google 内容分析 if total_frames % 10 == 0: _, buffer = cv2.imencode('.jpg', frame) img_base64 = base64.b64encode(buffer).decode() payload = { "requests": [{ "image": {"content": img_base64}, "features": [ {"type": "SAFE_SEARCH_DETECTION"}, {"type": "TEXT_DETECTION"} ] }] } response = requests.post(VISION_API_URL, json=payload) result = response.json() if "responses" in result and len(result["responses"]) > 0: safe = result["responses"][0].get("safeSearchAnnotation", {}) for key in safe_search_results.keys(): if LIKELIHOOD_MAPPING.get(safe.get(key, "UNKNOWN"), 0) >= 4: safe_search_results[key] += 1 texts = result["responses"][0].get("textAnnotations", []) for text in texts: detected_texts.add(text["description"]) cap.release() # === Whisper 音频转文字 === output += "\n🎙 正在转录音频文本...:\n" segments, info = whisper_model.transcribe(VIDEO_PATH, language=lang if lang != "auto" else None) transcribed_text = "".join([seg.text for seg in segments]) # === 输出结果 === output += "\n🎯 YOLO检测到的图案类别:\n" if detected_labels: output += "✅ 检测到图案:" + "、".join(detected_labels) + "\n" else: output += "❌ 未检测到任何图案\n" def risk_level(count, total): if count > total * 0.05: return "⛔️ 高风险" elif count > 5: return "⚠️ 中等风险" else: return "✅ 低风险" output += "\n🔎 内容安全风险分析(每类满足可能性 ≥ LIKELY 的帧计数):\n" for k, v in safe_search_results.items(): output += f"{k.capitalize():<10}: {risk_level(v, total_frames)}({v} 帧)\n" output += "\n📝 视觉文字识别(OCR):\n" output += " ".join(detected_texts) + "\n" if detected_texts else "无可识别文字\n" output += "\n🔊 Whisper语音识别结果:\n" output += transcribed_text + "\n" if transcribed_text.strip() else "无有效语音\n" return output