Spaces:
Sleeping
Sleeping
File size: 11,585 Bytes
d8fd28f 0e8b4d0 d8fd28f a4af32a d8fd28f a4af32a d8fd28f a4af32a d8fd28f a4af32a d8fd28f 83f6eac d8fd28f a4af32a 83f6eac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | # src/main_flow.py
from src.preprocessing.gdrive_manager import GoogleDriveManager
from src.preprocessing.download_manager import GoogleDriveDownloader
from src.preprocessing.video_processor import VideoProcessor
from src.preprocessing.transcript_generator import TranscriptGenerator
from src.preprocessing.file_processor import FileProcessor
from src.report_generation.openai_client import OpenAIClient
from src.report_generation.report_generator import ReportGenerator
import json
import os
import glob
import asyncio
import logging
import tempfile
import shutil
import googleapiclient.errors
logger = logging.getLogger(__name__)
class MainFlow:
def __init__(self, config_path: str):
logger.info("Initializing MainFlow with config: %s", config_path)
with open(config_path) as f:
self.config = json.load(f)
self.paths = self.config["PATHS"]
self.drive_folders = {
"VIDEOS": "1angHYyiE_sPTKpRsrHyPAJkYF7b78iPf",
"AUDIOS": "1bLbSXaO3AS-EuBg_o7sGc8AonkHt7SdG",
"TRANSCRIPTS": "1TbqLgYXOguxYivMiy17s1UVqUKxpGtt3",
"REPORTS": "1JgwDdQVc1YsyKNhag5l1PdD607vMjy1H",
"MENTOR_MATERIALS": "1OVhmzLD5NHmrHknSSWwAYC_NVlD39-sh"
}
self._create_directories()
def _create_directories(self):
for path in self.paths.values():
os.makedirs(path, exist_ok=True)
async def process_drive_url(self, folder_url: str):
logger.info("Processing Google Drive folder: %s", folder_url)
VideoProcessor.clean_directory(self.paths["VIDEOS"])
VideoProcessor.clean_directory(self.paths["AUDIOS"])
download_manager = GoogleDriveDownloader(self.paths["VIDEOS"], self.drive_folders)
gdrive = download_manager.gdrive
# Get all video files in Drive
video_files = download_manager.list_all_videos(folder_url)
if not video_files:
logger.warning("No videos to process in folder: %s", folder_url)
return
# Get all transcript files in Drive Transcripts folder
transcript_drive_files = gdrive.list_txt_files(self.drive_folders["TRANSCRIPTS"])
transcript_names = {os.path.splitext(f['name'])[0] for f in transcript_drive_files}
for video in video_files:
base_name = os.path.splitext(video['name'])[0]
if base_name in transcript_names:
logger.info(f"Transcript for {base_name} already exists in Drive. Skipping video.")
continue
# Download and process this video
local_video_path = os.path.join(self.paths["VIDEOS"], video['name'])
gdrive.download_file(video['id'], local_video_path)
logger.info(f"Downloaded: {video['name']}")
video_id = video['id']
video_name = video['name']
video_path = local_video_path
audio_path = os.path.join(self.paths["AUDIOS"], f"{base_name}.wav")
transcript_path = os.path.join(self.paths["TRANSCRIPTS"], f"{base_name}.txt")
video_processor = VideoProcessor()
# transcript_generator = TranscriptGenerator(
# os.getenv("AZURE_SPEECH_KEY"),
# os.getenv("AZURE_SPEECH_REGION")
# )
from src.preprocessing.transcript_generator import TranscriptGenerator
transcript_generator = TranscriptGenerator(model_size="base.en", compute_type="int8")
try:
# Convert video to audio
if video_processor.convert_video_to_wav(video_path, audio_path):
logger.info("Converted video to audio: %s", audio_path)
# Upload audio to Drive
download_manager.upload_to_drive(audio_path, "AUDIOS", "audio/wav")
# Delete video from Drive and local
try:
download_manager.delete_drive_file(video_id)
except googleapiclient.errors.HttpError as e:
if e.resp.status == 403:
logger.warning(f"Skipping delete for {video_name} due to insufficient permissions.")
else:
logger.error(f"Error deleting video from Drive: {e}")
try:
os.remove(video_path)
except Exception as e:
logger.warning(f"Could not delete local video file: {e}")
# Generate transcript
if transcript_generator.transcribe_audio(audio_path, transcript_path):
logger.info(f"Generated transcript: {transcript_path}")
# Upload transcript to Drive
download_manager.upload_to_drive(transcript_path, "TRANSCRIPTS", "text/plain")
# Delete audio from Drive and local
audio_drive_id = gdrive.find_file_by_name(self.drive_folders["AUDIOS"], f"{base_name}.wav")
if audio_drive_id:
try:
download_manager.delete_drive_file(audio_drive_id)
except googleapiclient.errors.HttpError as e:
if e.resp.status == 403:
logger.warning(f"Skipping delete for audio {base_name}.wav due to insufficient permissions.")
else:
logger.error(f"Error deleting audio from Drive: {e}")
try:
os.remove(audio_path)
except Exception as e:
logger.warning(f"Could not delete local audio file: {e}")
else:
logger.error(f"Failed to generate transcript for {video_name}")
else:
logger.error(f"Failed to convert video: {video_name}")
except googleapiclient.errors.HttpError as e:
if e.resp.status == 403:
logger.warning(f"Skipping file {video_name} due to insufficient permissions.")
else:
logger.error(f"Google API error: {e}")
except Exception as e:
logger.error(f"Unexpected error processing {video_name}: {e}")
def process_mentor_materials(self, files: dict):
"""Process mentor materials (PPTX/IPYNB) and combine all of each type."""
logger.info("Processing mentor materials...")
VideoProcessor.clean_directory(self.paths["MENTOR_MATERIALS"])
slides_contents = []
notebook_contents = []
for file_type, file_list in files.items():
for file_data in file_list:
if not file_data:
continue
file_path = os.path.join(self.paths["MENTOR_MATERIALS"], file_data.name)
with open(file_path, "wb") as f:
f.write(file_data.getbuffer())
base_name = os.path.splitext(file_data.name)[0]
output_path = os.path.join(self.paths["MENTOR_MATERIALS"], f"{base_name}.txt")
if file_type == "slides" and file_path.lower().endswith(('.pptx', '.ppt')):
content = FileProcessor.process_slide_file(file_path)
slides_contents.append(content)
elif file_type == "notebook" and file_path.lower().endswith('.ipynb'):
content = FileProcessor.process_notebook_file(file_path)
notebook_contents.append(content)
else:
logger.error(f"Unsupported file type: {file_data.name}")
continue
with open(output_path, 'w', encoding='utf-8') as f:
f.write(content)
os.remove(file_path)
return {
"slides": "\n".join(slides_contents),
"notebook": "\n".join(notebook_contents)
}
def generate_quality_reports(self, mentor_materials=None):
"""Generate quality reports"""
logger.info("Generating quality reports...")
# Remove duplicates first
self.remove_drive_duplicates()
# List transcript files from Drive
gdrive = GoogleDriveManager()
transcript_drive_files = gdrive.list_txt_files(self.drive_folders["TRANSCRIPTS"])
# Download transcripts from Drive to local if not present
for file in transcript_drive_files:
local_path = os.path.join(self.paths["TRANSCRIPTS"], file["name"])
if not os.path.exists(local_path):
gdrive.download_file(file["id"], local_path)
# Now generate reports from these local files (which mirror Drive)
# Load checklist
with open("config/checklist.txt", "r") as f:
checklist = f.read()
# Initialize OpenAI client
openai_client = OpenAIClient("config/config.json")
client = openai_client.get_client()
deployment = openai_client.get_deployment()
report_generator = ReportGenerator(client, deployment, checklist)
# Prepare mentor materials (if not passed, load from disk as fallback)
if mentor_materials is None:
mentor_materials = {"slides": "", "notebook": ""}
for file_path in glob.glob(os.path.join(self.paths["MENTOR_MATERIALS"], "*.txt")):
if "slide" in file_path.lower():
with open(file_path, "r", encoding="utf-8") as f:
mentor_materials["slides"] += f.read() + "\n"
elif "notebook" in file_path.lower():
with open(file_path, "r", encoding="utf-8") as f:
mentor_materials["notebook"] += f.read() + "\n"
# For each transcript, generate a report using all mentor materials
for transcript_file in glob.glob(os.path.join(self.paths["TRANSCRIPTS"], "*.txt")):
with open(transcript_file, "r", encoding="utf-8") as f:
transcript_content = f.read()
base_name = os.path.splitext(os.path.basename(transcript_file))[0]
# Slides report
if mentor_materials["slides"]:
report = report_generator.quality_check(
transcript_content,
"slides",
mentor_materials["slides"]
)
report_file = os.path.join(self.paths["REPORTS"], f"report_{base_name}_slides.txt")
with open(report_file, "w", encoding="utf-8") as f:
f.write(report)
gdrive.upload_file(report_file, self.drive_folders["REPORTS"], "text/plain")
# Notebook report
if mentor_materials["notebook"]:
report = report_generator.quality_check(
transcript_content,
"notebook",
mentor_materials["notebook"]
)
report_file = os.path.join(self.paths["REPORTS"], f"report_{base_name}_notebook.txt")
with open(report_file, "w", encoding="utf-8") as f:
f.write(report)
gdrive.upload_file(report_file, self.drive_folders["REPORTS"], "text/plain")
def remove_drive_duplicates(self):
gdrive = GoogleDriveManager()
for key in ["REPORTS", "TRANSCRIPTS", "MENTOR_MATERIALS"]:
gdrive.remove_duplicates_by_name(self.drive_folders[key]) |