H022329's picture
Upload folder using huggingface_hub
9cf08e9 verified
Raw
History Blame Contribute Delete
7.13 kB
import os
import numpy as np
from PIL import Image
from tqdm import tqdm
from dotenv import load_dotenv
from moviepy.video.io.VideoFileClip import VideoFileClip
load_dotenv()
from .._llm import Qwen3VLModel
from .._utils import _pil_to_base64
import logging
# 将日志级别设置为 WARNING,这样 INFO 级别的日志就会被忽略
logging.getLogger("httpx").setLevel(logging.WARNING)
def encode_video(video, frame_times):
frames = []
for t in frame_times:
frames.append(video.get_frame(t))
frames = np.stack(frames, axis=0)
frames = [Image.fromarray(v.astype('uint8')) for v in frames]
return frames
model = Qwen3VLModel()
def segment_caption(video_name, video_path, segment_index2name, transcripts, segment_times_info, caption_result, error_queue):
try:
with VideoFileClip(video_path) as video:
for index in tqdm(segment_index2name, desc=f"Captioning Video {video_name}"):
frame_times = segment_times_info[index]["frame_times"]
video_frames = encode_video(video, frame_times)
segment_transcript = transcripts[index]
query = f"The transcript of the current video:\n{segment_transcript}.\nNow provide a description (caption) of the video in Chinese."
encoded_frames = [_pil_to_base64(frame) for frame in video_frames]
content = []
for encoded_frame in encoded_frames:
content.append( {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_frame}"}})
content.append({"type": "text", "text": query})
messages_input = [
{"role": "user",
"content": content
}
]
segment_caption = model.generate_result(messages_input, max_new_tokens=256)
caption_result[index] = segment_caption
del video_frames
except Exception as e:
error_queue.put(f"Error in segment_caption:\n {str(e)}")
raise RuntimeError
def merge_segment_information(segment_index2name, segment_times_info, transcripts, captions):
inserting_segments = {}
for index in segment_index2name:
inserting_segments[index] = {"content": None, "time": None}
segment_name = segment_index2name[index]
inserting_segments[index]["time"] = '-'.join(segment_name.split('-')[-2:])
# 检查caption是否存在,如果不存在则使用transcript作为fallback
if index in captions:
caption_text = captions[index]
else:
caption_text = f"[Caption generation failed for segment {index}]"
inserting_segments[index]["content"] = f"Caption:\n{caption_text}\nTranscript:\n{transcripts[index]}\n\n"
inserting_segments[index]["transcript"] = transcripts[index]
inserting_segments[index]["frame_times"] = segment_times_info[index]["frame_times"].tolist()
return inserting_segments
def retrieved_segment_caption(retrieved_segments, video_path_db, video_segments, num_sampled_frames):
caption_result = {}
for this_segment in tqdm(retrieved_segments, desc='Captioning Segments for Given Query'):
video_name = '_'.join(this_segment.split('_')[:-1])
index = this_segment.split('_')[-1]
video_path = video_path_db._data[video_name]
timestamp = video_segments._data[video_name][index]["time"].split('-')
start, end = eval(timestamp[0]), eval(timestamp[1])
video = VideoFileClip(video_path)
frame_times = np.linspace(start, end, num_sampled_frames, endpoint=False)
video_frames = encode_video(video, frame_times)
segment_transcript = video_segments._data[video_name][index]["transcript"]
query = f"The transcript of the current video:\n{segment_transcript}.\nNow provide a very detailed description (caption) of the video in Chinese."
encoded_frames = [_pil_to_base64(frame) for frame in video_frames]
content = []
for encoded_frame in encoded_frames:
content.append( {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_frame}"}})
content.append({"type": "text", "text": query})
messages_input = [
{"role": "user",
"content": content
}
]
segment_caption = model.generate_result(messages_input, max_new_tokens=256)
caption_result[this_segment] = f"Caption:\n{segment_caption}\nTranscript:\n{segment_transcript}\n\n"
del video_frames
video.close()
return caption_result
def retrieved_segment_caption_kw(retrieved_segments, video_path_db, video_segments, refine_knowledge, num_sampled_frames):
caption_result = {}
for this_segment in tqdm(retrieved_segments, desc='Captioning Segments for Given Query'):
video_name = '_'.join(this_segment.split('_')[:-1])
index = this_segment.split('_')[-1]
video_path = video_path_db._data[video_name]
# Check if video file exists before trying to open it
if not os.path.exists(video_path):
error_msg = f"Video file not found for segment '{this_segment}': {video_path}"
raise FileNotFoundError(error_msg)
try:
timestamp = video_segments._data[video_name][index]["time"].split('-')
start, end = eval(timestamp[0]), eval(timestamp[1])
video = VideoFileClip(video_path)
frame_times = np.linspace(start, end, num_sampled_frames, endpoint=False)
video_frames = encode_video(video, frame_times)
segment_transcript = video_segments._data[video_name][index]["transcript"]
query = f"The transcript of the current video:\n{segment_transcript}.\nNow provide a very detailed description (caption) of the video in Chinese and extract relevant information about: {refine_knowledge}"
encoded_frames = [_pil_to_base64(frame) for frame in video_frames]
# print("encoded_frames: \n",len(encoded_frames))
content = []
for encoded_frame in encoded_frames:
content.append( {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_frame}"}})
content.append({"type": "text", "text": query})
messages_input = [
{"role": "user",
"content": content
}
]
segment_caption = model.generate_result(messages_input, max_new_tokens=256)
caption_result[this_segment] = f"Caption:\n{segment_caption}\nTranscript:\n{segment_transcript}\n\n"
del video_frames
video.close()
except FileNotFoundError:
raise
except Exception as e:
error_msg = f"Error processing segment '{this_segment}' (video: {video_path}): {str(e)}"
raise RuntimeError(error_msg) from e
return caption_result