""" Simplified transcription core for HuggingFace Spaces deployment. Version with chunking support for large files (>30MB). Now supports multiple AI providers via provider abstraction. """ import os from datetime import date, timedelta import yaml import uuid from typing import List, Dict, Tuple import ffmpeg import gc import psutil import zipfile import time from ai_providers import TranscriptionProvider # Define absolute output directory relative to this file CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) OUTPUT_DIR = os.path.join(CURRENT_DIR, "outputs") def format_timestamp(seconds: float) -> str: """Convert seconds to ffmpeg time format (HH:MM:SS.xxx).""" td = timedelta(seconds=float(seconds)) hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = seconds % 60 return f"{hours:02d}:{minutes:02d}:{secs:06.3f}" def check_memory_usage() -> bool: """Check current memory usage and print warning if too high.""" process = psutil.Process() memory_percent = process.memory_percent() if memory_percent > 80: print(f"Warning: High memory usage ({memory_percent:.1f}%)") return False return True def clean_partial_chunks(base_file_path: str) -> None: """Clean up any existing partial chunks before starting.""" try: base_name = os.path.splitext(os.path.basename(base_file_path))[0] # Ensure we look in the same directory as the audio file for chunks chunk_folder = os.path.dirname(base_file_path) for file in os.listdir(chunk_folder): if file.startswith(f"{base_name}_part") and file.endswith(".mp3"): file_path = os.path.join(chunk_folder, file) try: os.remove(file_path) except Exception as e: print(f"Warning: Could not remove {file}: {e}") except Exception as e: print(f"Warning: Error during cleanup: {e}") def chunk_audio_file(audio_file_path: str, chunk_duration_minutes: int = 25, overlap_seconds: int = 5) -> List[str]: """Chunks an audio file into smaller parts using ffmpeg streaming.""" chunked_files = [] try: clean_partial_chunks(audio_file_path) duration = get_audio_duration(audio_file_path) chunk_length = chunk_duration_minutes * 60 start_time = 0 chunk_index = 1 base_name = os.path.splitext(os.path.basename(audio_file_path))[0] output_folder = os.path.dirname(audio_file_path) while start_time < duration: if not check_memory_usage(): time.sleep(5) continue end_time = min(start_time + chunk_length, duration) if end_time - start_time < 30 and chunk_index > 1: break chunk_file_name = f"{base_name}_part{chunk_index}.mp3" chunk_file_path = os.path.join(output_folder, chunk_file_name) try: stream = ffmpeg.input(audio_file_path, ss=start_time, t=end_time-start_time) stream = ffmpeg.output(stream, chunk_file_path, acodec='libmp3lame', loglevel='error') ffmpeg.run(stream, overwrite_output=True) if os.path.exists(chunk_file_path): chunked_files.append(chunk_file_path) chunk_index += 1 except ffmpeg.Error as e: break if end_time == duration: break start_time = end_time - overlap_seconds gc.collect() except Exception as e: print(f"Error during audio chunking: {e}") return chunked_files def get_audio_duration(file_path: str) -> float: """Get the duration of an audio file using ffmpeg.""" probe = ffmpeg.probe(file_path) return float(probe['format']['duration']) def generate_transcription(audio_file_path: str, provider: TranscriptionProvider) -> str: return provider.transcribe(audio_file_path) def generate_summary(transcription_text: str, provider: TranscriptionProvider) -> str: return provider.generate_summary(transcription_text) def generate_key_ideas(transcription_text: str, provider: TranscriptionProvider) -> List[Dict[str, str]]: return provider.generate_key_ideas(transcription_text) def create_transcript_markdown(audio_filename: str, transcription: str, summary: str, key_ideas: List[Dict[str, str]]) -> str: base_name = os.path.splitext(audio_filename)[0] yaml_metadata = { 'title': base_name, 'audio_file': audio_filename, 'date_processed': str(date.today()), 'summary': summary, 'key_ideas': key_ideas, 'note_id': str(uuid.uuid4()) } yaml_frontmatter = "---\n" + yaml.dump(yaml_metadata, sort_keys=False, indent=2, allow_unicode=True) + "---\n\n" content = yaml_frontmatter + "## Key Ideas\n\n" for idea_item in key_ideas: content += f"- **{idea_item['idea']}:** {idea_item['description']}\n" content += "\n## Full Transcription\n\n" + transcription return content def process_audio_file(audio_file_path: str, gemini_provider: TranscriptionProvider, openrouter_provider: TranscriptionProvider = None, progress_callback=None) -> Tuple[str, str]: # Ensure the absolute output directory exists os.makedirs(OUTPUT_DIR, exist_ok=True) audio_filename = os.path.basename(audio_file_path) base_name = os.path.splitext(audio_filename)[0] file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024) files_to_transcribe = [] if file_size_mb > 30: if progress_callback: progress_callback("📦 Chunking file...", 0.1) files_to_transcribe = chunk_audio_file(audio_file_path) else: files_to_transcribe.append(audio_file_path) markdown_files = [] for idx, file_path in enumerate(files_to_transcribe, 1): if progress_callback: progress_callback(f"🎙️ Transcribing {idx}/{len(files_to_transcribe)}...", 0.2 + (0.6 * idx/len(files_to_transcribe))) transcription = generate_transcription(file_path, gemini_provider) text_provider = openrouter_provider if openrouter_provider else gemini_provider summary = generate_summary(transcription, text_provider) key_ideas = generate_key_ideas(transcription, text_provider) markdown_content = create_transcript_markdown(os.path.basename(file_path), transcription, summary, key_ideas) # Use the global absolute OUTPUT_DIR output_filename = os.path.splitext(os.path.basename(file_path))[0] + ".md" markdown_path = os.path.join(OUTPUT_DIR, output_filename) with open(markdown_path, 'w', encoding='utf-8') as f: f.write(markdown_content) markdown_files.append(markdown_path) if "_part" in file_path: try: os.remove(file_path) except: pass if len(markdown_files) == 1: return markdown_files[0], "False" else: zip_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcripts.zip") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for md_file in markdown_files: zipf.write(md_file, os.path.basename(md_file)) try: os.remove(md_file) except: pass return zip_path, "True"