Spaces:
Build error
Build error
| import gradio as gr | |
| import subprocess | |
| import whisper | |
| from transformers import pipeline , T5ForConditionalGeneration, T5Tokenizer | |
| import os | |
| import torch | |
| import spacy | |
| # Load models once | |
| whisper_model = whisper.load_model("base") | |
| summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=-1) | |
| # Load model and tokenizer | |
| model_name = "valhalla/t5-base-qg-hl" | |
| tokenizer = T5Tokenizer.from_pretrained(model_name) | |
| model = T5ForConditionalGeneration.from_pretrained(model_name) | |
| import spacy | |
| try: | |
| nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| from spacy.cli import download | |
| download("en_core_web_sm") | |
| nlp = spacy.load("en_core_web_sm") | |
| # Load QA pipeline | |
| qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2") | |
| def extract_audio(video_path, audio_output_path): | |
| command = ['ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', audio_output_path] | |
| subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return audio_output_path | |
| def process_video(video_file): | |
| try: | |
| import whisper | |
| from transformers import pipeline | |
| import subprocess | |
| import os | |
| audio_path = "extracted_audio.wav" | |
| # Extract audio from video using FFmpeg | |
| command = ['ffmpeg', '-i', video_file, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', audio_path] | |
| subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if not os.path.exists(audio_path): | |
| return "Audio extraction failed.", "No summary generated." | |
| # Load Whisper model | |
| model = whisper.load_model("base") | |
| result = model.transcribe(audio_path) | |
| transcript_text = result['text'] | |
| # Load summarizer | |
| summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=-1) | |
| # Chunk text if needed | |
| chunks = [transcript_text[i:i + 1024] for i in range(0, len(transcript_text), 1024)] | |
| summaries = [summarizer(chunk, max_length=100, min_length=30, do_sample=False)[0]['summary_text'] for chunk in chunks] | |
| final_summary = ' '.join(summaries) | |
| return transcript_text, final_summary | |
| except Exception as e: | |
| return f"Error: {str(e)}", f"Error: {str(e)}" | |
| # Extract top named entities for highlighting | |
| def select_top_entities(text, max_entities=3): | |
| doc = nlp(text) | |
| candidates = [ent.text for ent in doc.ents if 2 <= len(ent.text) <= 30 and len(ent.text.split()) <= 5] | |
| seen = set() | |
| top_entities = [] | |
| for entity in candidates: | |
| if entity not in seen: | |
| seen.add(entity) | |
| top_entities.append(entity) | |
| if len(top_entities) >= max_entities: | |
| break | |
| return top_entities | |
| # Generate questions for each highlighted entity | |
| def generate_questions(context): | |
| entities = select_top_entities(context, max_entities=3) | |
| questions = [] | |
| for ent in entities: | |
| highlighted = context.replace(ent, f"<hl> {ent} <hl>", 1) | |
| input_text = f"generate question: {highlighted}" | |
| input_ids = tokenizer.encode(input_text, return_tensors="pt", truncation=True) | |
| outputs = model.generate( | |
| input_ids=input_ids, | |
| max_length=64, | |
| num_beams=4, | |
| num_return_sequences=1, | |
| no_repeat_ngram_size=2, | |
| early_stopping=True | |
| ) | |
| question = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| questions.append(question) | |
| return "\n".join(f"Q{i+1}: {q}" for i, q in enumerate(questions)) | |
| def generate_answers(context, questions): | |
| """ | |
| context: str β typically the summary | |
| questions: list[str] or str β can be multiline string or list | |
| returns: str β formatted answers | |
| """ | |
| if isinstance(questions, str): | |
| questions = questions.strip().split('\n') | |
| answers = [] | |
| for q in questions: | |
| if q.strip(): | |
| result = qa_pipeline(question=q.strip(), context=context) | |
| answers.append(f"Q: {q.strip()}\nA: {result['answer']}") | |
| return "\n\n".join(answers) | |
| import gradio as gr | |
| # Dummy processing functions β replace these with your actual logic | |
| def process_video_(video_path): | |
| # Step 1: Transcribe the video | |
| transcript , summary = process_video(video_path) | |
| questions = generate_questions(summary) | |
| answers = generate_answers(summary, questions) | |
| return transcript, summary, questions , answers | |
| # Gradio Interface | |
| iface = gr.Interface( | |
| fn=process_video_, | |
| inputs=gr.Video(label="Upload a video"), | |
| outputs=[ | |
| gr.Textbox(label="Transcript"), | |
| gr.Textbox(label="Summary"), | |
| gr.Textbox(label="Generated Questions"), | |
| gr.Textbox(label="Generated Answers") | |
| ], | |
| title="Vision to Insight", | |
| description="Upload a video to extract a transcript, generate a summary, and get 2β3 meaningful questions based on the summary." | |
| ) | |
| iface.launch(share=True) | |