Spaces:
Build error
Build error
| """ | |
| Simplified transcription core for HuggingFace Spaces deployment. | |
| Version with chunking support for large files (>30MB). | |
| Now supports multiple AI providers via provider abstraction. | |
| """ | |
| import os | |
| from datetime import date, timedelta | |
| import yaml | |
| import uuid | |
| from typing import List, Dict, Tuple | |
| import ffmpeg | |
| import gc | |
| import psutil | |
| import zipfile | |
| import time | |
| from ai_providers import TranscriptionProvider | |
| # Define absolute output directory relative to this file | |
| CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| OUTPUT_DIR = os.path.join(CURRENT_DIR, "outputs") | |
| def format_timestamp(seconds: float) -> str: | |
| """Convert seconds to ffmpeg time format (HH:MM:SS.xxx).""" | |
| td = timedelta(seconds=float(seconds)) | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = seconds % 60 | |
| return f"{hours:02d}:{minutes:02d}:{secs:06.3f}" | |
| def check_memory_usage() -> bool: | |
| """Check current memory usage and print warning if too high.""" | |
| process = psutil.Process() | |
| memory_percent = process.memory_percent() | |
| if memory_percent > 80: | |
| print(f"Warning: High memory usage ({memory_percent:.1f}%)") | |
| return False | |
| return True | |
| def clean_partial_chunks(base_file_path: str) -> None: | |
| """Clean up any existing partial chunks before starting.""" | |
| try: | |
| base_name = os.path.splitext(os.path.basename(base_file_path))[0] | |
| # Ensure we look in the same directory as the audio file for chunks | |
| chunk_folder = os.path.dirname(base_file_path) | |
| for file in os.listdir(chunk_folder): | |
| if file.startswith(f"{base_name}_part") and file.endswith(".mp3"): | |
| file_path = os.path.join(chunk_folder, file) | |
| try: | |
| os.remove(file_path) | |
| except Exception as e: | |
| print(f"Warning: Could not remove {file}: {e}") | |
| except Exception as e: | |
| print(f"Warning: Error during cleanup: {e}") | |
| def chunk_audio_file(audio_file_path: str, chunk_duration_minutes: int = 25, overlap_seconds: int = 5) -> List[str]: | |
| """Chunks an audio file into smaller parts using ffmpeg streaming.""" | |
| chunked_files = [] | |
| try: | |
| clean_partial_chunks(audio_file_path) | |
| duration = get_audio_duration(audio_file_path) | |
| chunk_length = chunk_duration_minutes * 60 | |
| start_time = 0 | |
| chunk_index = 1 | |
| base_name = os.path.splitext(os.path.basename(audio_file_path))[0] | |
| output_folder = os.path.dirname(audio_file_path) | |
| while start_time < duration: | |
| if not check_memory_usage(): | |
| time.sleep(5) | |
| continue | |
| end_time = min(start_time + chunk_length, duration) | |
| if end_time - start_time < 30 and chunk_index > 1: | |
| break | |
| chunk_file_name = f"{base_name}_part{chunk_index}.mp3" | |
| chunk_file_path = os.path.join(output_folder, chunk_file_name) | |
| try: | |
| stream = ffmpeg.input(audio_file_path, ss=start_time, t=end_time-start_time) | |
| stream = ffmpeg.output(stream, chunk_file_path, acodec='libmp3lame', loglevel='error') | |
| ffmpeg.run(stream, overwrite_output=True) | |
| if os.path.exists(chunk_file_path): | |
| chunked_files.append(chunk_file_path) | |
| chunk_index += 1 | |
| except ffmpeg.Error as e: | |
| break | |
| if end_time == duration: | |
| break | |
| start_time = end_time - overlap_seconds | |
| gc.collect() | |
| except Exception as e: | |
| print(f"Error during audio chunking: {e}") | |
| return chunked_files | |
| def get_audio_duration(file_path: str) -> float: | |
| """Get the duration of an audio file using ffmpeg.""" | |
| probe = ffmpeg.probe(file_path) | |
| return float(probe['format']['duration']) | |
| def generate_transcription(audio_file_path: str, provider: TranscriptionProvider) -> str: | |
| return provider.transcribe(audio_file_path) | |
| def generate_summary(transcription_text: str, provider: TranscriptionProvider) -> str: | |
| return provider.generate_summary(transcription_text) | |
| def generate_key_ideas(transcription_text: str, provider: TranscriptionProvider) -> List[Dict[str, str]]: | |
| return provider.generate_key_ideas(transcription_text) | |
| def create_transcript_markdown(audio_filename: str, transcription: str, summary: str, key_ideas: List[Dict[str, str]]) -> str: | |
| base_name = os.path.splitext(audio_filename)[0] | |
| yaml_metadata = { | |
| 'title': base_name, | |
| 'audio_file': audio_filename, | |
| 'date_processed': str(date.today()), | |
| 'summary': summary, | |
| 'key_ideas': key_ideas, | |
| 'note_id': str(uuid.uuid4()) | |
| } | |
| yaml_frontmatter = "---\n" + yaml.dump(yaml_metadata, sort_keys=False, indent=2, allow_unicode=True) + "---\n\n" | |
| content = yaml_frontmatter + "## Key Ideas\n\n" | |
| for idea_item in key_ideas: | |
| content += f"- **{idea_item['idea']}:** {idea_item['description']}\n" | |
| content += "\n## Full Transcription\n\n" + transcription | |
| return content | |
| def process_audio_file(audio_file_path: str, gemini_provider: TranscriptionProvider, openrouter_provider: TranscriptionProvider = None, progress_callback=None) -> Tuple[str, str]: | |
| # Ensure the absolute output directory exists | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| audio_filename = os.path.basename(audio_file_path) | |
| base_name = os.path.splitext(audio_filename)[0] | |
| file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024) | |
| files_to_transcribe = [] | |
| if file_size_mb > 30: | |
| if progress_callback: progress_callback("📦 Chunking file...", 0.1) | |
| files_to_transcribe = chunk_audio_file(audio_file_path) | |
| else: | |
| files_to_transcribe.append(audio_file_path) | |
| markdown_files = [] | |
| for idx, file_path in enumerate(files_to_transcribe, 1): | |
| if progress_callback: progress_callback(f"🎙️ Transcribing {idx}/{len(files_to_transcribe)}...", 0.2 + (0.6 * idx/len(files_to_transcribe))) | |
| transcription = generate_transcription(file_path, gemini_provider) | |
| text_provider = openrouter_provider if openrouter_provider else gemini_provider | |
| summary = generate_summary(transcription, text_provider) | |
| key_ideas = generate_key_ideas(transcription, text_provider) | |
| markdown_content = create_transcript_markdown(os.path.basename(file_path), transcription, summary, key_ideas) | |
| # Use the global absolute OUTPUT_DIR | |
| output_filename = os.path.splitext(os.path.basename(file_path))[0] + ".md" | |
| markdown_path = os.path.join(OUTPUT_DIR, output_filename) | |
| with open(markdown_path, 'w', encoding='utf-8') as f: | |
| f.write(markdown_content) | |
| markdown_files.append(markdown_path) | |
| if "_part" in file_path: | |
| try: os.remove(file_path) | |
| except: pass | |
| if len(markdown_files) == 1: | |
| return markdown_files[0], "False" | |
| else: | |
| zip_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcripts.zip") | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for md_file in markdown_files: | |
| zipf.write(md_file, os.path.basename(md_file)) | |
| try: os.remove(md_file) | |
| except: pass | |
| return zip_path, "True" |