import hashlib import json import logging import os import queue as _queue import re import subprocess import tempfile import threading import time import warnings import shutil import atexit import gradio as gr from dotenv import load_dotenv from openai import OpenAI # ---------- 基础环境配置 ---------- _script_dir = os.path.dirname(os.path.abspath(__file__)) _temp_dir = os.path.join(_script_dir, ".gradio_temp") os.makedirs(_temp_dir, exist_ok=True) os.environ["GRADIO_TEMP_DIR"] = _temp_dir tempfile.gettempdir = lambda: _temp_dir load_dotenv() warnings.filterwarnings("ignore") logging.getLogger("httpx").setLevel(logging.WARNING) from VideoAgent import QueryParam, VideoRAG from VideoAgent.prompt import PROMPTS from VideoAgent.query import _result_query_stream from VideoAgent._utils import clean_output # ---------- 并行处理补丁:解决 Spliting Video / Saving Video Segments 串行慢、CPU 利用率低的问题 ---------- from concurrent.futures import ProcessPoolExecutor, as_completed import VideoAgent.vidrag_pipeline as _pipeline_mod import VideoAgent._videoutil.split as _split_mod _VIDEO_WORKERS = int(os.getenv("VIDEO_SPLIT_WORKERS", os.cpu_count() or 4)) def _extract_audio_seg(args): """Worker: 提取单个视频片段的音频(进程池调用)""" video_path, start, end, output_path = args from moviepy.video.io.VideoFileClip import VideoFileClip try: with VideoFileClip(video_path) as video: subvideo = video.subclip(start, end) subaudio = subvideo.audio subaudio.write_audiofile( output_path, codec='pcm_s16le', fps=16000, nbytes=2, verbose=False, logger=None, ) return True except Exception: return False def _save_video_seg(args): """Worker: 提取并编码单个视频片段(进程池调用)""" video_path, start, end, output_path = args from moviepy.video.io.VideoFileClip import VideoFileClip with VideoFileClip(video_path) as video: subvideo = video.subclip(start, end) subvideo.write_videofile( output_path, codec='libx264', ffmpeg_params=['-threads', '0'], verbose=False, logger=None, ) return True def _parallel_split_video( video_path, working_dir, segment_length, num_frames_per_segment, audio_output_format='mp3', ): """split_video 的并行版本:先算元数据,再用进程池并行提取音频""" import shutil as _shutil import numpy as _np from tqdm import tqdm as _tqdm from moviepy.video.io.VideoFileClip import VideoFileClip as _VideoFileClip unique_timestamp = str(int(time.time() * 1000)) video_name = os.path.basename(video_path).split('.')[0] cache_dir = os.path.join(working_dir, '_cache', video_name) if os.path.exists(cache_dir): _shutil.rmtree(cache_dir) os.makedirs(cache_dir, exist_ok=False) segment_index2name, segment_times_info = {}, {} with _VideoFileClip(video_path) as video: total_length = int(video.duration) start_times = list(range(0, total_length, segment_length)) if len(start_times) > 1 and (total_length - start_times[-1]) < 5: start_times = start_times[:-1] for idx, start in enumerate(start_times): end = (min(start + segment_length, total_length) if start != start_times[-1] else total_length) frame_times = _np.linspace(0, end - start, num_frames_per_segment, endpoint=False) frame_times += start sid = str(idx) segment_index2name[sid] = f"{unique_timestamp}-{idx}-{start}-{end}" segment_times_info[sid] = {"frame_times": frame_times, "timestamp": (start, end)} # 并行提取音频 tasks = [] for idx in segment_index2name: s, e = segment_times_info[idx]["timestamp"] out = os.path.join(cache_dir, f'{segment_index2name[idx]}.{audio_output_format}') tasks.append((video_path, s, e, out)) nw = min(_VIDEO_WORKERS, len(tasks)) if tasks else 1 with ProcessPoolExecutor(max_workers=nw) as ex: futs = {ex.submit(_extract_audio_seg, t): t for t in tasks} for f in _tqdm(as_completed(futs), total=len(futs), desc=f"Spliting Video {video_name}"): try: f.result() except Exception: pass return segment_index2name, segment_times_info def _parallel_saving_video_segments( video_name, video_path, working_dir, segment_index2name, segment_times_info, error_queue, video_output_format='mp4', ): """saving_video_segments 的并行版本:进程池并行编码每个片段""" from tqdm import tqdm as _tqdm2 try: cache_dir = os.path.join(working_dir, '_cache', video_name) tasks = [] for idx in segment_index2name: s, e = segment_times_info[idx]["timestamp"] out = os.path.join(cache_dir, f'{segment_index2name[idx]}.{video_output_format}') tasks.append((video_path, s, e, out)) nw = min(_VIDEO_WORKERS, len(tasks)) if tasks else 1 with ProcessPoolExecutor(max_workers=nw) as ex: futs = {ex.submit(_save_video_seg, t): t for t in tasks} for f in _tqdm2(as_completed(futs), total=len(futs), desc=f"Saving Video Segments {video_name}"): try: f.result() except Exception as e: raise RuntimeError(f"Error in saving_video_segments: {str(e)}") except Exception as e: error_queue.put(f"Error in saving_video_segments:\n {str(e)}") raise RuntimeError def _parallel_speech_to_text(video_name, working_dir, segment_index2name, audio_output_format): """speech_to_text 的并行版本:线程池并行调用 ASR""" from concurrent.futures import ThreadPoolExecutor, as_completed as _ac from tqdm import tqdm as _tqdm3 cache_dir = os.path.join(working_dir, '_cache', video_name) transcripts = {} def _transcribe_one(idx_name): idx, seg_name = idx_name audio_file = os.path.join(cache_dir, f"{seg_name}.{audio_output_format}") if not os.path.exists(audio_file): return idx, "" from VideoAgent._videoutil.asr import OnnxASRClient result = OnnxASRClient.transcribe(audio_file) if isinstance(result, tuple): text = result[0] if result else "" elif hasattr(result, 'text'): text = result.text else: text = str(result) return idx, text items = list(segment_index2name.items()) nw = min(_VIDEO_WORKERS, len(items)) if items else 1 with ThreadPoolExecutor(max_workers=nw) as ex: futs = {ex.submit(_transcribe_one, item): item for item in items} for f in _tqdm3(_ac(futs), total=len(futs), desc=f"Speech Recognition {video_name}"): idx, text = f.result() transcripts[idx] = text return transcripts # 用并行版本替换 pipeline 模块中的串行实现 _pipeline_mod.split_video = _parallel_split_video _pipeline_mod.saving_video_segments = _parallel_saving_video_segments _pipeline_mod.speech_to_text = _parallel_speech_to_text _split_mod.split_video = _parallel_split_video _split_mod.saving_video_segments = _parallel_saving_video_segments # 同样替换 asr 模块中的引用 import VideoAgent._videoutil.asr as _asr_mod _asr_mod.speech_to_text = _parallel_speech_to_text # ---------- 样式表 (CSS) ---------- custom_css = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap'); :root { --radius-sm: 6px; --radius-md: 8px; --radius-lg: 10px; --radius-xl: 12px; --color-bg: #f1f5f9; --color-surface: #ffffff; --color-border: #e2e8f0; --color-border-light: #f1f5f9; --color-primary: #6366f1; --color-primary-hover: #4f46e5; --color-text: #1e293b; --color-text-muted: #64748b; --shadow-sm: 0 1px 2px rgba(0,0,0,0.04); --shadow-md: 0 1px 3px rgba(0,0,0,0.06), 0 1px 2px rgba(0,0,0,0.04); --shadow-lg: 0 4px 12px rgba(0,0,0,0.06), 0 2px 4px rgba(0,0,0,0.04); } .gradio-container { background: var(--color-bg) !important; font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif !important; color: var(--color-text) !important; max-width: 100% !important; } /* ---- 标题 ---- */ .app-title { text-align: center; padding: 6px 0 2px 0; } .app-title h1 { margin: 0; font-size: 18px; font-weight: 700; background: linear-gradient(135deg, #4338ca 0%, #6366f1 50%, #8b5cf6 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text; letter-spacing: -0.02em; } /* ---- Tabs 导航栏 ---- */ .tabs { gap: 0 !important; } .tabs > .tab-nav { gap: 4px !important; padding: 0 8px !important; } .tabs > .tab-nav > button { font-size: 13px !important; font-weight: 500 !important; padding: 9px 22px !important; border-radius: var(--radius-lg) var(--radius-lg) 0 0 !important; transition: all 0.2s ease !important; color: var(--color-text-muted) !important; background: transparent !important; border: none !important; } .tabs > .tab-nav > button.selected { color: var(--color-primary) !important; background: var(--color-surface) !important; box-shadow: 0 -1px 3px rgba(0,0,0,0.04) !important; } /* ---- 卡片 ---- */ .card-style { border-radius: var(--radius-lg) !important; border: 1px solid var(--color-border) !important; padding: 16px !important; background: var(--color-surface) !important; box-shadow: var(--shadow-sm) !important; transition: box-shadow 0.2s ease !important; margin-bottom: 10px !important; } .card-style:hover { box-shadow: var(--shadow-lg) !important; } /* ---- 分区标题 ---- */ .section-label { font-weight: 600; font-size: 12px; color: var(--color-primary); margin-bottom: 10px; display: flex; align-items: center; gap: 8px; text-transform: uppercase; letter-spacing: 0.06em; opacity: 0.85; } .section-label::before { content: ''; display: inline-block; width: 3px; height: 14px; background: linear-gradient(180deg, #6366f1, #8b5cf6); border-radius: var(--radius-sm); } /* ---- 按钮 ---- */ .gradio-container .gr-button-primary { background: linear-gradient(135deg, #4f46e5 0%, #6366f1 100%) !important; border: none !important; font-weight: 500 !important; font-size: 13px !important; border-radius: var(--radius-md) !important; padding: 6px 18px !important; transition: all 0.2s ease !important; box-shadow: 0 1px 3px rgba(79,70,229,0.25) !important; } .gradio-container .gr-button-primary:hover { transform: translateY(-1px); box-shadow: 0 4px 14px rgba(79,70,229,0.35) !important; filter: brightness(1.06); } .gradio-container .gr-button-secondary { border: 1px solid var(--color-border) !important; background: var(--color-surface) !important; color: #475569 !important; font-weight: 500 !important; font-size: 13px !important; border-radius: var(--radius-md) !important; padding: 6px 18px !important; transition: all 0.15s ease !important; } .gradio-container .gr-button-secondary:hover { background: #f8fafc !important; border-color: #94a3b8 !important; color: #334155 !important; } /* ---- 输入框全局 ---- */ .gradio-container input, .gradio-container textarea { border-radius: var(--radius-md) !important; border: 1px solid var(--color-border) !important; font-size: 13px !important; transition: border-color 0.15s ease, box-shadow 0.15s ease !important; background: var(--color-surface) !important; } .gradio-container input:focus, .gradio-container textarea:focus { outline: none !important; border-color: var(--color-primary) !important; box-shadow: 0 0 0 3px rgba(99,102,241,0.1) !important; } /* ---- 搜索面板 ---- */ .search-toolbar { padding: 14px 16px !important; margin-bottom: 8px; } .search-query textarea { font-size: 14px !important; line-height: 1.6 !important; min-height: 68px !important; border: 1px solid var(--color-border) !important; background: #fafbff !important; border-radius: var(--radius-lg) !important; padding: 12px 14px !important; } .search-query textarea:focus { background: var(--color-surface) !important; border-color: var(--color-primary) !important; box-shadow: 0 0 0 4px rgba(99,102,241,0.06) !important; } .search-actions { margin-top: 10px; justify-content: flex-end; gap: 10px; } .search-actions .gr-button { min-height: 44px !important; font-size: 15px !important; border-radius: var(--radius-lg) !important; min-width: 130px; } .search-panel { margin-top: 0; gap: 10px !important; } /* ---- 控制台输出框 (暗色主题) ---- */ .console-font, .console-font > div { border-radius: var(--radius-lg) !important; } .console-font textarea { border-radius: var(--radius-lg) !important; overflow-y: auto !important; } .console-font textarea { font-family: 'JetBrains Mono', 'Fira Code', ui-monospace, SFMono-Regular, Menlo, Monaco, monospace !important; font-size: 12px !important; line-height: 1.5 !important; background: #0f172a !important; color: #e2e8f0 !important; border: 1px solid #1e293b !important; padding: 14px !important; } .console-font textarea:focus { border-color: #334155 !important; box-shadow: 0 0 0 2px rgba(99,102,241,0.15) !important; outline: none !important; } /* ---- 结果展示框 ---- */ .result-box { min-height: 360px; max-height: 360px; overflow: auto; } .result-box textarea { min-height: 360px !important; max-height: 360px !important; } /* ---- 视频预览 ---- */ .video-box { border-radius: var(--radius-lg) !important; overflow: hidden !important; border: 1px solid var(--color-border); box-shadow: var(--shadow-sm); background: #f8fafc !important; } .video-box video { border-radius: var(--radius-lg) !important; } /* ---- 画廊 ---- */ .clip-gallery { border: 1px solid var(--color-border); border-radius: var(--radius-lg); padding: 8px; background: var(--color-surface); box-shadow: var(--shadow-sm); min-height: 360px; } .clip-gallery .grid-wrap { gap: 8px !important; } .clip-gallery img, .clip-gallery video { border-radius: var(--radius-md) !important; transition: transform 0.2s ease, box-shadow 0.2s ease !important; } .clip-gallery img:hover, .clip-gallery video:hover { transform: scale(1.03); box-shadow: 0 4px 20px rgba(0,0,0,0.1) !important; } /* ---- 文件上传区域 ---- */ .gradio-container .file-preview { border-radius: var(--radius-md) !important; border: 2px dashed #cbd5e1 !important; background: #f8fafc !important; transition: all 0.2s ease !important; padding: 8px !important; } .gradio-container .file-preview:hover { border-color: var(--color-primary) !important; background: #fafbff !important; } /* ---- 设置面板 ---- */ .settings-group { margin-bottom: 20px; } .settings-section-title { font-size: 13px !important; font-weight: 600 !important; color: #334155 !important; margin-bottom: 12px !important; padding-bottom: 8px !important; border-bottom: 2px solid var(--color-border); } .config-card { background: #f8fafc !important; border-radius: var(--radius-md) !important; padding: 14px !important; border: 1px solid var(--color-border) !important; margin-bottom: 12px; } .param-row { display: flex !important; gap: 14px !important; margin-bottom: 12px !important; } .param-col { flex: 1 !important; display: flex !important; flex-direction: column !important; } .param-label { font-size: 12px !important; font-weight: 500 !important; color: #475569 !important; margin-bottom: 4px !important; } .param-info { font-size: 11px !important; color: #94a3b8 !important; margin-top: 2px !important; } .apply-btn-container { text-align: center; margin-top: 20px; } /* ---- Accordion ---- */ .gradio-accordion { border-radius: var(--radius-md) !important; border: 1px solid var(--color-border) !important; margin-bottom: 6px !important; overflow: hidden !important; background: #ffffff !important; } .gradio-accordion:last-child { margin-bottom: 0 !important; } .gradio-accordion .label-wrap { padding: 9px 14px !important; font-size: 13px !important; font-weight: 500 !important; color: #475569 !important; background: transparent !important; border: none !important; border-radius: var(--radius-md) !important; } .gradio-accordion[open] > .label-wrap { border-radius: var(--radius-md) var(--radius-md) 0 0 !important; border-bottom: 1px solid var(--color-border) !important; } /* ---- Number Input ---- */ .gradio-container input[type="number"] { font-size: 13px !important; padding: 7px 10px !important; border-radius: var(--radius-md) !important; } /* ---- 滚动条 ---- */ ::-webkit-scrollbar { width: 5px; height: 5px; } ::-webkit-scrollbar-track { background: transparent; } ::-webkit-scrollbar-thumb { background: #cbd5e1; border-radius: 10px; } ::-webkit-scrollbar-thumb:hover { background: #94a3b8; } /* ---- Footer ---- */ footer { display: none !important; } /* ---- Tab 内容区 ---- */ .tabs > .tabitem { padding-top: 8px !important; } /* ---- 表单行间距 ---- */ .form { gap: 10px !important; } """ # ---------- 全局状态控制 ---------- _videorag: VideoRAG | None = None _rag_lock = threading.Lock() # 添加清理缓存函数 def cleanup_temp_dir(): """清理Gradio临时目录,但保留working_dir中的视频文件""" try: if os.path.exists(_temp_dir): # 只删除非working_dir的临时文件 for item in os.listdir(_temp_dir): item_path = os.path.join(_temp_dir, item) # 跳过处理过的视频文件,只清理临时上传文件 if os.path.isfile(item_path): os.remove(item_path) elif os.path.isdir(item_path): # 递归删除子目录 shutil.rmtree(item_path) print(f"已清理Gradio临时目录: {_temp_dir}") except Exception as e: print(f"清理临时目录时出错: {e}") # 注册退出时清理函数 atexit.register(cleanup_temp_dir) _RAG_ENV_MAP = { "video_segment_length": "VIDEORAG_VIDEO_SEGMENT_LENGTH", "rough_num_frames_per_segment": "VIDEORAG_ROUGH_NUM_FRAMES_PER_SEGMENT", "retrieval_topk_chunks": "VIDEORAG_RETRIEVAL_TOPK_CHUNKS", "query_better_than_threshold": "VIDEORAG_QUERY_BETTER_THAN_THRESHOLD", "chunk_token_size": "VIDEORAG_CHUNK_TOKEN_SIZE", "segment_retrieval_top_k": "VIDEORAG_SEGMENT_RETRIEVAL_TOP_K", } def _read_int_env(key: str, default: int) -> int: try: return int(os.getenv(key, str(default)).strip()) except Exception: return default def _read_float_env(key: str, default: float) -> float: try: return float(os.getenv(key, str(default)).strip()) except Exception: return default def _load_rag_runtime_settings() -> dict: return { "video_segment_length": _read_int_env(_RAG_ENV_MAP["video_segment_length"], 20), "rough_num_frames_per_segment": _read_int_env(_RAG_ENV_MAP["rough_num_frames_per_segment"], 10), "retrieval_topk_chunks": _read_int_env(_RAG_ENV_MAP["retrieval_topk_chunks"], 2), "query_better_than_threshold": _read_float_env(_RAG_ENV_MAP["query_better_than_threshold"], 0.2), "chunk_token_size": _read_int_env(_RAG_ENV_MAP["chunk_token_size"], 1000), "segment_retrieval_top_k": _read_int_env(_RAG_ENV_MAP["segment_retrieval_top_k"], 3), } _rag_runtime_settings = _load_rag_runtime_settings() def _get_rag(working_dir: str) -> VideoRAG: global _videorag with _rag_lock: need_rebuild = _videorag is None or _videorag.working_dir != working_dir if not need_rebuild and _videorag is not None: for k, v in _rag_runtime_settings.items(): if getattr(_videorag, k, None) != v: need_rebuild = True break if need_rebuild: _videorag = VideoRAG(working_dir=working_dir, **_rag_runtime_settings) return _videorag def _read_indexed_videos(working_dir: str) -> list[str]: kv_path = os.path.join(working_dir, "kv_store_video_path.json") if not os.path.exists(kv_path): return [] try: with open(kv_path, "r", encoding="utf-8") as f: data = json.load(f) return list(data.keys()) except Exception: return [] def _fmt_video_list(videos: list[str]) -> str: if not videos: return "📦 暂无已索引视频" return "\n".join(f"• {v}" for v in sorted(videos)) def _get_path_from_file(file_obj): if file_obj is None: return None if isinstance(file_obj, str): return file_obj if isinstance(file_obj, os.PathLike): return os.fspath(file_obj) if isinstance(file_obj, dict): for k in ("path", "name"): v = file_obj.get(k) if isinstance(v, str) and v.strip(): return v return None for attr in ("path", "name"): v = getattr(file_obj, attr, None) if isinstance(v, str) and v.strip(): return v return None def _parse_clock_text(clock_text: str) -> float: t = clock_text.strip() parts = t.split(":") if len(parts) == 2: mm, ss = parts return int(mm) * 60 + int(ss) if len(parts) == 3: hh, mm, ss = parts return int(hh) * 3600 + int(mm) * 60 + int(ss) raise ValueError(f"无效时间格式: {clock_text}") _TIME_TOKEN_RE = r"[0-9]{1,2}:[0-9]{1,2}(?::[0-9]{1,2})?" _REF_LINE_RE = re.compile( rf"^\s*(?:[-*•]\s*)?(?:\[(?P\d+)\]\s*)?(?:\d+[.)、]\s*)?" rf"(?:(?:\*\*)?(?:reference|参考)(?:\*\*)?\s*[::]\s*)?" rf"(?P