import hashlib import json import logging import os import queue as _queue import re import subprocess import tempfile import threading import time import warnings import shutil import atexit import gradio as gr from dotenv import load_dotenv from openai import OpenAI # ---------- 基础环境配置 ---------- _script_dir = os.path.dirname(os.path.abspath(__file__)) _temp_dir = os.path.join(_script_dir, ".gradio_temp") os.makedirs(_temp_dir, exist_ok=True) os.environ["GRADIO_TEMP_DIR"] = _temp_dir tempfile.gettempdir = lambda: _temp_dir load_dotenv() warnings.filterwarnings("ignore") logging.getLogger("httpx").setLevel(logging.WARNING) from VideoAgent import QueryParam, VideoRAG # ---------- 样式表 (CSS) ---------- custom_css = """ .gradio-container { background: radial-gradient(1000px 320px at 50% -80px, #dbeafe 0%, #f6f8fc 45%, #f6f8fc 100%); color: #0f172a; } .app-title { text-align: center; margin: 6px 0 14px 0; } .app-title h1 { margin: 0; font-size: 25px; font-weight: 700; color: #1e293b; } .card-style { border-radius: 10px !important; border: 1px solid #dde5f1 !important; padding: 12px !important; background: #ffffff !important; box-shadow: 0 3px 12px rgba(30, 41, 59, 0.05); } .section-label { font-weight: 600; color: #1e293b; margin-bottom: 8px; display: flex; align-items: center; font-size: 14px; } .gradio-container .gr-button-primary { background: linear-gradient(135deg, #4f46e5 0%, #2563eb 100%) !important; border: none !important; } .gradio-container .gr-button-secondary { border-color: #cbd5e1 !important; } .console-font textarea { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, "Courier New", monospace !important; font-size: 12px !important; background: #0f172a !important; color: #e2e8f0 !important; } .video-box { border-radius: 10px !important; overflow: hidden !important; border: 1px solid #d7deeb; } .helper-text { font-size: 12px; color: #64748b; margin: 0; } .search-toolbar { padding: 10px 12px !important; margin-bottom: 4px; } .search-query textarea { font-size: 14px !important; line-height: 1.5 !important; min-height: 68px !important; } .search-actions { margin-top: 6px; justify-content: flex-end; gap: 8px; } .search-actions .gr-button { min-height: 40px !important; font-size: 13px !important; border-radius: 8px !important; min-width: 118px; } .search-panel { margin-top: 4px; } .result-box { border: 1px solid #d7deeb; border-radius: 10px; background: linear-gradient(180deg, #ffffff 0%, #fbfdff 100%); padding: 12px 14px; min-height: 360px; max-height: 360px; overflow: auto; line-height: 1.5; box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.5); } .result-box h1, .result-box h2, .result-box h3 { margin-top: 0.35em; margin-bottom: 0.35em; } .result-box p { margin: 0.45em 0; } .clip-gallery { border: 1px solid #d7deeb; border-radius: 10px; padding: 6px; background: #ffffff; } .clip-gallery img, .clip-gallery video { border-radius: 8px !important; } .settings-group { margin-bottom: 20px; } .settings-section-title { font-size: 16px !important; font-weight: 600 !important; color: #334155 !important; margin-bottom: 12px !important; padding-bottom: 8px !important; border-bottom: 1px solid #e2e8f0; } .config-card { background: #f8fafc !important; border-radius: 8px !important; padding: 12px !important; border: 1px solid #e2e8f0 !important; margin-bottom: 12px; } .param-row { display: flex !important; gap: 15px !important; margin-bottom: 12px !important; } .param-col { flex: 1 !important; display: flex !important; flex-direction: column !important; } .param-label { font-size: 13px !important; font-weight: 500 !important; color: #475569 !important; margin-bottom: 4px !important; } .param-info { font-size: 11px !important; color: #94a3b8 !important; margin-top: 2px !important; } .apply-btn-container { text-align: center; margin-top: 20px; } .gradio-accordion .label-wrap { padding: 8px 12px !important; } """ # ---------- 全局状态控制 ---------- _videorag: VideoRAG | None = None _rag_lock = threading.Lock() # 添加清理缓存函数 def cleanup_temp_dir(): """清理Gradio临时目录,但保留working_dir中的视频文件""" try: if os.path.exists(_temp_dir): # 只删除非working_dir的临时文件 for item in os.listdir(_temp_dir): item_path = os.path.join(_temp_dir, item) # 跳过处理过的视频文件,只清理临时上传文件 if os.path.isfile(item_path): os.remove(item_path) elif os.path.isdir(item_path): # 递归删除子目录 shutil.rmtree(item_path) print(f"已清理Gradio临时目录: {_temp_dir}") except Exception as e: print(f"清理临时目录时出错: {e}") # 注册退出时清理函数 atexit.register(cleanup_temp_dir) _RAG_ENV_MAP = { "video_segment_length": "VIDEORAG_VIDEO_SEGMENT_LENGTH", "rough_num_frames_per_segment": "VIDEORAG_ROUGH_NUM_FRAMES_PER_SEGMENT", "retrieval_topk_chunks": "VIDEORAG_RETRIEVAL_TOPK_CHUNKS", "query_better_than_threshold": "VIDEORAG_QUERY_BETTER_THAN_THRESHOLD", "chunk_token_size": "VIDEORAG_CHUNK_TOKEN_SIZE", "segment_retrieval_top_k": "VIDEORAG_SEGMENT_RETRIEVAL_TOP_K", } def _read_int_env(key: str, default: int) -> int: try: return int(os.getenv(key, str(default)).strip()) except Exception: return default def _read_float_env(key: str, default: float) -> float: try: return float(os.getenv(key, str(default)).strip()) except Exception: return default def _load_rag_runtime_settings() -> dict: return { "video_segment_length": _read_int_env(_RAG_ENV_MAP["video_segment_length"], 20), "rough_num_frames_per_segment": _read_int_env(_RAG_ENV_MAP["rough_num_frames_per_segment"], 10), "retrieval_topk_chunks": _read_int_env(_RAG_ENV_MAP["retrieval_topk_chunks"], 2), "query_better_than_threshold": _read_float_env(_RAG_ENV_MAP["query_better_than_threshold"], 0.2), "chunk_token_size": _read_int_env(_RAG_ENV_MAP["chunk_token_size"], 1000), "segment_retrieval_top_k": _read_int_env(_RAG_ENV_MAP["segment_retrieval_top_k"], 3), } _rag_runtime_settings = _load_rag_runtime_settings() def _get_rag(working_dir: str) -> VideoRAG: global _videorag with _rag_lock: need_rebuild = _videorag is None or _videorag.working_dir != working_dir if not need_rebuild and _videorag is not None: for k, v in _rag_runtime_settings.items(): if getattr(_videorag, k, None) != v: need_rebuild = True break if need_rebuild: _videorag = VideoRAG(working_dir=working_dir, **_rag_runtime_settings) return _videorag def _read_indexed_videos(working_dir: str) -> list[str]: kv_path = os.path.join(working_dir, "kv_store_video_path.json") if not os.path.exists(kv_path): return [] try: with open(kv_path, "r", encoding="utf-8") as f: data = json.load(f) return list(data.keys()) except Exception: return [] def _fmt_video_list(videos: list[str]) -> str: if not videos: return "📦 暂无已索引视频" return "\n".join(f"• {v}" for v in sorted(videos)) def _get_path_from_file(file_obj): if file_obj is None: return None if isinstance(file_obj, str): return file_obj if isinstance(file_obj, os.PathLike): return os.fspath(file_obj) if isinstance(file_obj, dict): for k in ("path", "name"): v = file_obj.get(k) if isinstance(v, str) and v.strip(): return v return None for attr in ("path", "name"): v = getattr(file_obj, attr, None) if isinstance(v, str) and v.strip(): return v return None def _parse_clock_text(clock_text: str) -> float: t = clock_text.strip() parts = t.split(":") if len(parts) == 2: mm, ss = parts return int(mm) * 60 + int(ss) if len(parts) == 3: hh, mm, ss = parts return int(hh) * 3600 + int(mm) * 60 + int(ss) raise ValueError(f"无效时间格式: {clock_text}") _TIME_TOKEN_RE = r"[0-9]{1,2}:[0-9]{1,2}(?::[0-9]{1,2})?" _REF_LINE_RE = re.compile( rf"^\s*(?:[-*•]\s*)?(?:\[(?P\d+)\]\s*)?(?:\d+[.)、]\s*)?" rf"(?:(?:\*\*)?(?:reference|参考)(?:\*\*)?\s*[::]\s*)?" rf"(?P