Spaces:
Sleeping
Sleeping
added dspy, to allow .mkv files, upload multiple slides and notebooks, remove base name matching in mentor materials
a4af32a
| # src/main_flow.py | |
| from src.preprocessing.gdrive_manager import GoogleDriveManager | |
| from src.preprocessing.download_manager import GoogleDriveDownloader | |
| from src.preprocessing.video_processor import VideoProcessor | |
| from src.preprocessing.transcript_generator import TranscriptGenerator | |
| from src.preprocessing.file_processor import FileProcessor | |
| from src.report_generation.openai_client import OpenAIClient | |
| from src.report_generation.report_generator import ReportGenerator | |
| import json | |
| import os | |
| import glob | |
| import asyncio | |
| import logging | |
| import tempfile | |
| import shutil | |
| import googleapiclient.errors | |
| logger = logging.getLogger(__name__) | |
| class MainFlow: | |
| def __init__(self, config_path: str): | |
| logger.info("Initializing MainFlow with config: %s", config_path) | |
| with open(config_path) as f: | |
| self.config = json.load(f) | |
| self.paths = self.config["PATHS"] | |
| self.drive_folders = { | |
| "VIDEOS": "1angHYyiE_sPTKpRsrHyPAJkYF7b78iPf", | |
| "AUDIOS": "1bLbSXaO3AS-EuBg_o7sGc8AonkHt7SdG", | |
| "TRANSCRIPTS": "1TbqLgYXOguxYivMiy17s1UVqUKxpGtt3", | |
| "REPORTS": "1JgwDdQVc1YsyKNhag5l1PdD607vMjy1H", | |
| "MENTOR_MATERIALS": "1OVhmzLD5NHmrHknSSWwAYC_NVlD39-sh" | |
| } | |
| self._create_directories() | |
| def _create_directories(self): | |
| for path in self.paths.values(): | |
| os.makedirs(path, exist_ok=True) | |
| async def process_drive_url(self, folder_url: str): | |
| logger.info("Processing Google Drive folder: %s", folder_url) | |
| VideoProcessor.clean_directory(self.paths["VIDEOS"]) | |
| VideoProcessor.clean_directory(self.paths["AUDIOS"]) | |
| download_manager = GoogleDriveDownloader(self.paths["VIDEOS"], self.drive_folders) | |
| gdrive = download_manager.gdrive | |
| # Get all video files in Drive | |
| video_files = download_manager.list_all_videos(folder_url) | |
| if not video_files: | |
| logger.warning("No videos to process in folder: %s", folder_url) | |
| return | |
| # Get all transcript files in Drive Transcripts folder | |
| transcript_drive_files = gdrive.list_txt_files(self.drive_folders["TRANSCRIPTS"]) | |
| transcript_names = {os.path.splitext(f['name'])[0] for f in transcript_drive_files} | |
| for video in video_files: | |
| base_name = os.path.splitext(video['name'])[0] | |
| if base_name in transcript_names: | |
| logger.info(f"Transcript for {base_name} already exists in Drive. Skipping video.") | |
| continue | |
| # Download and process this video | |
| local_video_path = os.path.join(self.paths["VIDEOS"], video['name']) | |
| gdrive.download_file(video['id'], local_video_path) | |
| logger.info(f"Downloaded: {video['name']}") | |
| video_id = video['id'] | |
| video_name = video['name'] | |
| video_path = local_video_path | |
| audio_path = os.path.join(self.paths["AUDIOS"], f"{base_name}.wav") | |
| transcript_path = os.path.join(self.paths["TRANSCRIPTS"], f"{base_name}.txt") | |
| video_processor = VideoProcessor() | |
| # transcript_generator = TranscriptGenerator( | |
| # os.getenv("AZURE_SPEECH_KEY"), | |
| # os.getenv("AZURE_SPEECH_REGION") | |
| # ) | |
| from src.preprocessing.transcript_generator import TranscriptGenerator | |
| transcript_generator = TranscriptGenerator(model_size="base.en", compute_type="int8") | |
| try: | |
| # Convert video to audio | |
| if video_processor.convert_video_to_wav(video_path, audio_path): | |
| logger.info("Converted video to audio: %s", audio_path) | |
| # Upload audio to Drive | |
| download_manager.upload_to_drive(audio_path, "AUDIOS", "audio/wav") | |
| # Delete video from Drive and local | |
| try: | |
| download_manager.delete_drive_file(video_id) | |
| except googleapiclient.errors.HttpError as e: | |
| if e.resp.status == 403: | |
| logger.warning(f"Skipping delete for {video_name} due to insufficient permissions.") | |
| else: | |
| logger.error(f"Error deleting video from Drive: {e}") | |
| try: | |
| os.remove(video_path) | |
| except Exception as e: | |
| logger.warning(f"Could not delete local video file: {e}") | |
| # Generate transcript | |
| if transcript_generator.transcribe_audio(audio_path, transcript_path): | |
| logger.info(f"Generated transcript: {transcript_path}") | |
| # Upload transcript to Drive | |
| download_manager.upload_to_drive(transcript_path, "TRANSCRIPTS", "text/plain") | |
| # Delete audio from Drive and local | |
| audio_drive_id = gdrive.find_file_by_name(self.drive_folders["AUDIOS"], f"{base_name}.wav") | |
| if audio_drive_id: | |
| try: | |
| download_manager.delete_drive_file(audio_drive_id) | |
| except googleapiclient.errors.HttpError as e: | |
| if e.resp.status == 403: | |
| logger.warning(f"Skipping delete for audio {base_name}.wav due to insufficient permissions.") | |
| else: | |
| logger.error(f"Error deleting audio from Drive: {e}") | |
| try: | |
| os.remove(audio_path) | |
| except Exception as e: | |
| logger.warning(f"Could not delete local audio file: {e}") | |
| else: | |
| logger.error(f"Failed to generate transcript for {video_name}") | |
| else: | |
| logger.error(f"Failed to convert video: {video_name}") | |
| except googleapiclient.errors.HttpError as e: | |
| if e.resp.status == 403: | |
| logger.warning(f"Skipping file {video_name} due to insufficient permissions.") | |
| else: | |
| logger.error(f"Google API error: {e}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error processing {video_name}: {e}") | |
| def process_mentor_materials(self, files: dict): | |
| """Process mentor materials (PPTX/IPYNB) and combine all of each type.""" | |
| logger.info("Processing mentor materials...") | |
| VideoProcessor.clean_directory(self.paths["MENTOR_MATERIALS"]) | |
| slides_contents = [] | |
| notebook_contents = [] | |
| for file_type, file_list in files.items(): | |
| for file_data in file_list: | |
| if not file_data: | |
| continue | |
| file_path = os.path.join(self.paths["MENTOR_MATERIALS"], file_data.name) | |
| with open(file_path, "wb") as f: | |
| f.write(file_data.getbuffer()) | |
| base_name = os.path.splitext(file_data.name)[0] | |
| output_path = os.path.join(self.paths["MENTOR_MATERIALS"], f"{base_name}.txt") | |
| if file_type == "slides" and file_path.lower().endswith(('.pptx', '.ppt')): | |
| content = FileProcessor.process_slide_file(file_path) | |
| slides_contents.append(content) | |
| elif file_type == "notebook" and file_path.lower().endswith('.ipynb'): | |
| content = FileProcessor.process_notebook_file(file_path) | |
| notebook_contents.append(content) | |
| else: | |
| logger.error(f"Unsupported file type: {file_data.name}") | |
| continue | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| os.remove(file_path) | |
| return { | |
| "slides": "\n".join(slides_contents), | |
| "notebook": "\n".join(notebook_contents) | |
| } | |
| def generate_quality_reports(self, mentor_materials=None): | |
| """Generate quality reports""" | |
| logger.info("Generating quality reports...") | |
| # Remove duplicates first | |
| self.remove_drive_duplicates() | |
| # List transcript files from Drive | |
| gdrive = GoogleDriveManager() | |
| transcript_drive_files = gdrive.list_txt_files(self.drive_folders["TRANSCRIPTS"]) | |
| # Download transcripts from Drive to local if not present | |
| for file in transcript_drive_files: | |
| local_path = os.path.join(self.paths["TRANSCRIPTS"], file["name"]) | |
| if not os.path.exists(local_path): | |
| gdrive.download_file(file["id"], local_path) | |
| # Now generate reports from these local files (which mirror Drive) | |
| # Load checklist | |
| with open("config/checklist.txt", "r") as f: | |
| checklist = f.read() | |
| # Initialize OpenAI client | |
| openai_client = OpenAIClient("config/config.json") | |
| client = openai_client.get_client() | |
| deployment = openai_client.get_deployment() | |
| report_generator = ReportGenerator(client, deployment, checklist) | |
| # Prepare mentor materials (if not passed, load from disk as fallback) | |
| if mentor_materials is None: | |
| mentor_materials = {"slides": "", "notebook": ""} | |
| for file_path in glob.glob(os.path.join(self.paths["MENTOR_MATERIALS"], "*.txt")): | |
| if "slide" in file_path.lower(): | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| mentor_materials["slides"] += f.read() + "\n" | |
| elif "notebook" in file_path.lower(): | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| mentor_materials["notebook"] += f.read() + "\n" | |
| # For each transcript, generate a report using all mentor materials | |
| for transcript_file in glob.glob(os.path.join(self.paths["TRANSCRIPTS"], "*.txt")): | |
| with open(transcript_file, "r", encoding="utf-8") as f: | |
| transcript_content = f.read() | |
| base_name = os.path.splitext(os.path.basename(transcript_file))[0] | |
| # Slides report | |
| if mentor_materials["slides"]: | |
| report = report_generator.quality_check( | |
| transcript_content, | |
| "slides", | |
| mentor_materials["slides"] | |
| ) | |
| report_file = os.path.join(self.paths["REPORTS"], f"report_{base_name}_slides.txt") | |
| with open(report_file, "w", encoding="utf-8") as f: | |
| f.write(report) | |
| gdrive.upload_file(report_file, self.drive_folders["REPORTS"], "text/plain") | |
| # Notebook report | |
| if mentor_materials["notebook"]: | |
| report = report_generator.quality_check( | |
| transcript_content, | |
| "notebook", | |
| mentor_materials["notebook"] | |
| ) | |
| report_file = os.path.join(self.paths["REPORTS"], f"report_{base_name}_notebook.txt") | |
| with open(report_file, "w", encoding="utf-8") as f: | |
| f.write(report) | |
| gdrive.upload_file(report_file, self.drive_folders["REPORTS"], "text/plain") | |
| def remove_drive_duplicates(self): | |
| gdrive = GoogleDriveManager() | |
| for key in ["REPORTS", "TRANSCRIPTS", "MENTOR_MATERIALS"]: | |
| gdrive.remove_duplicates_by_name(self.drive_folders[key]) |