Spaces:
Sleeping
Sleeping
| from langchain_community.document_loaders import ( | |
| PyPDFLoader, | |
| UnstructuredWordDocumentLoader, | |
| YoutubeLoader | |
| ) | |
| from langchain_community.document_loaders.generic import GenericLoader | |
| from langchain_community.document_loaders.parsers.audio import OpenAIWhisperParser | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from youtube_transcript_api import ( | |
| YouTubeTranscriptApi, | |
| TranscriptsDisabled, | |
| NoTranscriptFound, | |
| NoTranscriptAvailable | |
| ) | |
| import re | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class ContentProcessor: | |
| def __init__(self): | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| def process_pdf(self, file_path): | |
| loader = PyPDFLoader(file_path) | |
| pages = loader.load_and_split(self.text_splitter) | |
| return pages | |
| def process_docx(self, file_path): | |
| loader = UnstructuredWordDocumentLoader(file_path) | |
| pages = loader.load_and_split(self.text_splitter) | |
| return pages | |
| def process_youtube(self, video_url): | |
| try: | |
| # Log the incoming URL | |
| logger.info(f"Processing YouTube URL: {video_url}") | |
| video_id = self._extract_video_id(video_url) | |
| if not video_id: | |
| logger.error(f"Invalid YouTube URL: {video_url}") | |
| raise ValueError("This appears to be an invalid YouTube URL. Please check the URL and try again.") | |
| # Log the extracted video ID | |
| logger.info(f"Extracted video ID: {video_id}") | |
| # List available transcripts | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| logger.info(f"Available transcripts: {transcript_list}") | |
| except Exception as e: | |
| logger.error(f"Error listing transcripts: {str(e)}") | |
| # Try to get the transcript | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| full_transcript = " ".join([entry['text'] for entry in transcript_list]) | |
| # Create a document-like structure | |
| from langchain.schema import Document | |
| doc = Document( | |
| page_content=full_transcript, | |
| metadata={"source": video_url} | |
| ) | |
| return self.text_splitter.split_documents([doc]) | |
| except Exception as e: | |
| logger.error(f"Error getting transcript: {str(e)}") | |
| raise Exception(f"Unable to access video transcript. Error: {str(e)}\nPlease try a video with available captions.") | |
| except Exception as e: | |
| logger.error(f"Process failed: {str(e)}") | |
| raise | |
| def _extract_video_id(self, url): | |
| # Handle different YouTube URL formats | |
| patterns = [ | |
| r'(?:youtube\.com\/watch\?v=|youtu.be\/|youtube.com\/embed\/)([^&\n?]*)', | |
| r'(?:youtube\.com\/shorts\/)([^&\n?]*)' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def process_audio(self, audio_file): | |
| loader = GenericLoader( | |
| audio_file, | |
| parser=OpenAIWhisperParser() | |
| ) | |
| transcript = loader.load() | |
| return self.text_splitter.split_documents(transcript) |