Spaces:
Runtime error
Runtime error
| from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor | |
| import torch | |
| import numpy as np | |
| import av | |
| import spaces | |
| import gradio as gr | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16 | |
| ) | |
| model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' | |
| processor = LlavaNextVideoProcessor.from_pretrained(model_name) | |
| model = LlavaNextVideoForConditionalGeneration.from_pretrained( | |
| model_name, | |
| quantization_config=quantization_config, | |
| device_map='auto' | |
| ) | |
| def read_video_pyav(container, indices): | |
| ''' | |
| Decode the video with PyAV decoder. | |
| Args: | |
| container (av.container.input.InputContainer): PyAV container. | |
| indices (List[int]): List of frame indices to decode. | |
| Returns: | |
| np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). | |
| ''' | |
| frames = [] | |
| container.seek(0) | |
| start_index = indices[0] | |
| end_index = indices[-1] | |
| for i, frame in enumerate(container.decode(video=0)): | |
| if i > end_index: | |
| break | |
| if i >= start_index and i in indices: | |
| frames.append(frame) | |
| return np.stack([x.to_ndarray(format="rgb24") for x in frames]) | |
| def process_video(video_file, question): | |
| # Open video and sample frames | |
| with av.open(video_file) as container: | |
| total_frames = container.streams.video[0].frames | |
| indices = np.arange(0, total_frames, total_frames / 8).astype(int) | |
| video_clip = read_video_pyav(container, indices) | |
| # Prepare conversation | |
| conversation = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": f"{question}"}, | |
| {"type": "video"}, | |
| ], | |
| }, | |
| ] | |
| prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) | |
| # Prepare inputs for the model | |
| input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device) | |
| # Generate output | |
| generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} | |
| output = model.generate(**input, **generate_kwargs) | |
| generated_text = processor.batch_decode(output, skip_special_tokens=True)[0] | |
| return generated_text.split("ASSISTANT: ", 1)[-1].strip() | |
| # Define Gradio interface | |
| def gradio_interface(video, question): | |
| return process_video(video, question) | |
| iface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=[ | |
| gr.Video(label="Upload Video"), | |
| gr.Textbox(label="Enter Question") | |
| ], | |
| outputs=gr.Textbox(label="Generated Answer"), | |
| title="Video Question Answering", | |
| description="Upload a video and enter a question to get a generated text response." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(debug=True) | |