Spaces:
Runtime error
Runtime error
| from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor | |
| import torch | |
| import numpy as np | |
| import av | |
| import spaces | |
| import gradio as gr | |
| import os | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16 | |
| ) | |
| model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' | |
| processor = LlavaNextVideoProcessor.from_pretrained(model_name) | |
| model = LlavaNextVideoForConditionalGeneration.from_pretrained( | |
| model_name, | |
| quantization_config=quantization_config, | |
| device_map='auto' | |
| ) | |
| def read_video_pyav(container, indices): | |
| ''' | |
| Decode the video with PyAV decoder. | |
| Args: | |
| container (av.container.input.InputContainer): PyAV container. | |
| indices (List[int]): List of frame indices to decode. | |
| Returns: | |
| np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). | |
| ''' | |
| frames = [] | |
| container.seek(0) | |
| start_index = indices[0] | |
| end_index = indices[-1] | |
| for i, frame in enumerate(container.decode(video=0)): | |
| if i > end_index: | |
| break | |
| if i >= start_index and i in indices: | |
| frames.append(frame) | |
| return np.stack([x.to_ndarray(format="rgb24") for x in frames]) | |
| def process_video(video_file, question_parts): | |
| # Open video and sample frames | |
| with av.open(video_file.name) as container: # Access file name from Gradio input | |
| total_frames = container.streams.video[0].frames | |
| indices = np.arange(0, total_frames, total_frames / 8).astype(int) | |
| video_clip = read_video_pyav(container, indices) | |
| # Combine question parts into a single question | |
| question = " ".join(question_parts) | |
| # Prepare conversation | |
| conversation = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": f"{question}"}, | |
| {"type": "video"}, | |
| ], | |
| }, | |
| ] | |
| prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) | |
| # Prepare inputs for the model | |
| input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device) | |
| # Generate output | |
| generate_kwargs = {"max_new_tokens": 500, "do_sample": False, "top_p": 0.9} | |
| output = model.generate(**input, **generate_kwargs) | |
| generated_text = processor.batch_decode(output, skip_special_tokens=True)[0] | |
| return generated_text.split("ASSISTANT: ", 1)[-1].strip() | |
| def process_videos(video_files, question): | |
| """Processes multiple videos and answers a single question for each.""" | |
| answers = [] | |
| for video_file in video_files: | |
| video_name = os.path.basename(video_file.name) | |
| answer = process_video(video_file, question) | |
| answers.append(f"**Video: {video_name}**\n{answer}\n") | |
| return "\n---\n".join(answers) | |
| # Define Gradio interface for multiple videos | |
| def gradio_interface(videos, indoors_outdoors, standing_sitting, hands_free, interacting_screen): | |
| question = "Is the subject in the video" | |
| if indoors_outdoors: | |
| question += "present indoors or outdoors? " | |
| if standing_sitting: | |
| question += "standing or sitting? " | |
| if hands_free: | |
| question += "hands free or not? " | |
| if interacting_screen: | |
| question += "interacting with any screen in the background?" | |
| answers = process_videos(videos, question) | |
| return answers | |
| iface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=[ | |
| gr.File(label="Upload Videos", file_count="multiple"), | |
| gr.Checkbox(label="Indoors or Outdoors", value=False), | |
| gr.Checkbox(label="Standing or Sitting", value=False), | |
| gr.Checkbox(label="Hands Free or Not", value=False), | |
| gr.Checkbox(label="Interacting with Screen", value=False), | |
| ], | |
| outputs=gr.Textbox(label="Generated Answers"), | |
| title="Video Question Answering", | |
| description="Upload multiple videos and select questions to get answers." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(debug=True) |