|
|
import os |
|
|
import cv2 |
|
|
import base64 |
|
|
import gradio as gr |
|
|
from openai import OpenAI |
|
|
|
|
|
|
|
|
def extract_frames(video_path: str, num_frames: int = 8, max_resolution: int = 720): |
|
|
frames_base64 = [] |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
raise RuntimeError(f"Cannot open video file: {video_path}") |
|
|
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
step = max(total_frames // num_frames, 1) |
|
|
frame_indices = [min(i * step, total_frames - 1) for i in range(num_frames)] |
|
|
|
|
|
for index in frame_indices: |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, index) |
|
|
ret, frame = cap.read() |
|
|
if not ret or frame is None: |
|
|
continue |
|
|
|
|
|
h, w, _ = frame.shape |
|
|
if max(h, w) > max_resolution: |
|
|
scale = max_resolution / float(max(h, w)) |
|
|
frame = cv2.resize(frame, (int(w * scale), int(h * scale))) |
|
|
|
|
|
success, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) |
|
|
if success: |
|
|
b64 = base64.b64encode(buffer).decode("utf-8") |
|
|
data_uri = f"data:image/jpeg;base64,{b64}" |
|
|
frames_base64.append(data_uri) |
|
|
|
|
|
cap.release() |
|
|
return frames_base64 |
|
|
|
|
|
|
|
|
def build_prompt(frames, question): |
|
|
content = [{"type": "text", "text": question}] |
|
|
for image_data_uri in frames: |
|
|
content.append({ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": image_data_uri} |
|
|
}) |
|
|
return content |
|
|
|
|
|
|
|
|
def query_qwen(prompt_content): |
|
|
api_key = os.getenv("NEBIUS_API_KEY") |
|
|
print(api_key) |
|
|
if not api_key: |
|
|
raise ValueError("NEBIUS_API_KEY not found in environment variables.") |
|
|
|
|
|
client = OpenAI(api_key=api_key, base_url="https://api.studio.nebius.ai/v1/") |
|
|
try: |
|
|
response = client.chat.completions.create( |
|
|
model="Qwen/Qwen2.5-VL-72B-Instruct", |
|
|
messages=[{"role": "user", "content": prompt_content}], |
|
|
temperature=0.2, |
|
|
max_tokens=512 |
|
|
) |
|
|
return response |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def parse_response(response): |
|
|
if isinstance(response, dict) and "error" in response: |
|
|
return f"Error: {response['error']}" |
|
|
|
|
|
try: |
|
|
choice = response.choices[0] |
|
|
if hasattr(choice, "message"): |
|
|
return choice.message.content.strip() |
|
|
else: |
|
|
return choice.get("message", {}).get("content", "No message received.") |
|
|
except Exception as e: |
|
|
return f"Failed to parse response: {str(e)}" |
|
|
|
|
|
|
|
|
def answer_question(video_path: str, question: str) -> str: |
|
|
try: |
|
|
frames = extract_frames(video_path) |
|
|
prompt = build_prompt(frames, question) |
|
|
response = query_qwen(prompt) |
|
|
return parse_response(response) |
|
|
except Exception as e: |
|
|
return f"Something went wrong: {str(e)}" |
|
|
|
|
|
|
|
|
def gradio_interface(video, question): |
|
|
return answer_question(video, question) |
|
|
|
|
|
with gr.Blocks(title="🎥 Video QA with Qwen2.5-VL") as demo: |
|
|
gr.Markdown("## 🎥 Interactive Video Question Answering\nUpload a video and ask a question about it.") |
|
|
|
|
|
with gr.Row(): |
|
|
video_input = gr.Video(label="Upload Video") |
|
|
question_input = gr.Textbox(label="Your Question", placeholder="e.g., What color was the car in the first scene?") |
|
|
|
|
|
answer_output = gr.Textbox(label="Model Answer", lines=3) |
|
|
|
|
|
submit_btn = gr.Button("Get Answer") |
|
|
submit_btn.click(fn=gradio_interface, inputs=[video_input, question_input], outputs=answer_output) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(mcp_server=True) |