Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import subprocess | |
| from typing import Optional, Tuple, List | |
| import pytube | |
| from src.video_processing import extract_audio_from_video | |
| from src.quiz_processing import analyze_document | |
| import docx | |
| import PyPDF2 | |
| import re | |
| def parse_quiz_content(quiz_text): | |
| questions = [] | |
| lines = quiz_text.split('\n') | |
| current_question = None | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| q_match = re.match(r'^(?:\d+\.|\[?Q\d+\]?\.?)\s+(.*)', line, re.IGNORECASE) | |
| if q_match: | |
| if current_question: | |
| questions.append(current_question) | |
| current_question = {"question": q_match.group(1), "answer": ""} | |
| elif current_question and line.lower().startswith(("answer:", "a:", "ans:")): | |
| answer_text = re.sub(r'^(?:answer:|a:|ans:)\s*', '', line, flags=re.IGNORECASE) | |
| current_question["answer"] = answer_text.strip() | |
| if current_question: | |
| questions.append(current_question) | |
| return {"questions": questions} | |
| def transcribe_audio(audio_path, elevenlabs_api_key, model_id): | |
| import requests | |
| import json | |
| try: | |
| with open(audio_path, 'rb') as audio_file: | |
| response = requests.post( | |
| 'https://api.elevenlabs.io/v1/transcribe', | |
| headers={'xi-api-key': elevenlabs_api_key}, | |
| files={'audio': audio_file}, | |
| data={'model_id': model_id} | |
| ) | |
| if response.status_code == 200: | |
| transcription = response.json().get('transcription', '') | |
| transcript_path = tempfile.mktemp(suffix='.txt') | |
| with open(transcript_path, 'w', encoding='utf-8') as f: | |
| f.write(transcription) | |
| return transcription, transcript_path, "Transcription completed successfully" | |
| else: | |
| return None, None, f"Transcription failed: {response.text}" | |
| except Exception as e: | |
| return None, None, f"Transcription error: {str(e)}" | |
| def process_video_file(video_path, audio_format, elevenlabs_api_key, model_id, gemini_api_key, language, content_type): | |
| try: | |
| audio_path = extract_audio_from_video(video_path, audio_format) | |
| transcription, transcript_path, transcription_status = transcribe_audio( | |
| audio_path, | |
| elevenlabs_api_key, | |
| model_id | |
| ) | |
| if not transcription: | |
| return audio_path, "Audio extracted, but transcription failed", None, transcription_status, None, None, None | |
| formatted_output, json_path, txt_path = analyze_document( | |
| transcription, | |
| gemini_api_key, | |
| language, | |
| content_type | |
| ) | |
| return audio_path, "Processing completed successfully", transcript_path, transcription_status, formatted_output, txt_path, json_path | |
| except Exception as e: | |
| error_message = f"Error processing video: {str(e)}" | |
| return None, error_message, None, error_message, error_message, None, None | |
| def process_youtube_video(youtube_url, audio_format, elevenlabs_api_key, model_id, gemini_api_key, language, content_type): | |
| try: | |
| yt = pytube.YouTube(youtube_url) | |
| stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() | |
| if not stream: | |
| raise Exception("No suitable video stream found") | |
| # Download to temporary file | |
| video_path = tempfile.mktemp(suffix='.mp4') | |
| stream.download(filename=video_path) | |
| audio_path = extract_audio_from_video(video_path, audio_format) | |
| transcription, transcript_path, transcription_status = transcribe_audio( | |
| audio_path, | |
| elevenlabs_api_key, | |
| model_id | |
| ) | |
| if not transcription: | |
| return audio_path, "Audio extracted, but transcription failed", None, transcription_status, None, None, None | |
| formatted_output, json_path, txt_path = analyze_document( | |
| transcription, | |
| gemini_api_key, | |
| language, | |
| content_type | |
| ) | |
| return audio_path, "Processing completed successfully", transcript_path, transcription_status, formatted_output, txt_path, json_path | |
| except Exception as e: | |
| error_message = f"Error processing YouTube video: {str(e)}" | |
| return None, error_message, None, error_message, error_message, None, None | |
| def process_audio_document(audio_path, elevenlabs_api_key, model_id, gemini_api_key, language, content_type): | |
| """Process an audio file - transcribe and generate summary or quiz.""" | |
| try: | |
| transcription, transcript_path, transcription_status = transcribe_audio( | |
| audio_path, | |
| elevenlabs_api_key, | |
| model_id | |
| ) | |
| if not transcription: | |
| return "Transcription failed", None, None, None, None | |
| formatted_output, json_path, txt_path = analyze_document( | |
| transcription, | |
| gemini_api_key, | |
| language, | |
| content_type | |
| ) | |
| return "Processing completed successfully", transcript_path, formatted_output, txt_path, json_path | |
| except Exception as e: | |
| error_message = f"Error processing audio: {str(e)}" | |
| return error_message, None, error_message, None, None | |