Spaces:
Build error
Build error
File size: 7,423 Bytes
7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 7ee2bc7 fa9aec9 5fdd6f4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | """
Simplified transcription core for HuggingFace Spaces deployment.
Version with chunking support for large files (>30MB).
Now supports multiple AI providers via provider abstraction.
"""
import os
from datetime import date, timedelta
import yaml
import uuid
from typing import List, Dict, Tuple
import ffmpeg
import gc
import psutil
import zipfile
import time
from ai_providers import TranscriptionProvider
# Define absolute output directory relative to this file
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
OUTPUT_DIR = os.path.join(CURRENT_DIR, "outputs")
def format_timestamp(seconds: float) -> str:
"""Convert seconds to ffmpeg time format (HH:MM:SS.xxx)."""
td = timedelta(seconds=float(seconds))
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"
def check_memory_usage() -> bool:
"""Check current memory usage and print warning if too high."""
process = psutil.Process()
memory_percent = process.memory_percent()
if memory_percent > 80:
print(f"Warning: High memory usage ({memory_percent:.1f}%)")
return False
return True
def clean_partial_chunks(base_file_path: str) -> None:
"""Clean up any existing partial chunks before starting."""
try:
base_name = os.path.splitext(os.path.basename(base_file_path))[0]
# Ensure we look in the same directory as the audio file for chunks
chunk_folder = os.path.dirname(base_file_path)
for file in os.listdir(chunk_folder):
if file.startswith(f"{base_name}_part") and file.endswith(".mp3"):
file_path = os.path.join(chunk_folder, file)
try:
os.remove(file_path)
except Exception as e:
print(f"Warning: Could not remove {file}: {e}")
except Exception as e:
print(f"Warning: Error during cleanup: {e}")
def chunk_audio_file(audio_file_path: str, chunk_duration_minutes: int = 25, overlap_seconds: int = 5) -> List[str]:
"""Chunks an audio file into smaller parts using ffmpeg streaming."""
chunked_files = []
try:
clean_partial_chunks(audio_file_path)
duration = get_audio_duration(audio_file_path)
chunk_length = chunk_duration_minutes * 60
start_time = 0
chunk_index = 1
base_name = os.path.splitext(os.path.basename(audio_file_path))[0]
output_folder = os.path.dirname(audio_file_path)
while start_time < duration:
if not check_memory_usage():
time.sleep(5)
continue
end_time = min(start_time + chunk_length, duration)
if end_time - start_time < 30 and chunk_index > 1:
break
chunk_file_name = f"{base_name}_part{chunk_index}.mp3"
chunk_file_path = os.path.join(output_folder, chunk_file_name)
try:
stream = ffmpeg.input(audio_file_path, ss=start_time, t=end_time-start_time)
stream = ffmpeg.output(stream, chunk_file_path, acodec='libmp3lame', loglevel='error')
ffmpeg.run(stream, overwrite_output=True)
if os.path.exists(chunk_file_path):
chunked_files.append(chunk_file_path)
chunk_index += 1
except ffmpeg.Error as e:
break
if end_time == duration:
break
start_time = end_time - overlap_seconds
gc.collect()
except Exception as e:
print(f"Error during audio chunking: {e}")
return chunked_files
def get_audio_duration(file_path: str) -> float:
"""Get the duration of an audio file using ffmpeg."""
probe = ffmpeg.probe(file_path)
return float(probe['format']['duration'])
def generate_transcription(audio_file_path: str, provider: TranscriptionProvider) -> str:
return provider.transcribe(audio_file_path)
def generate_summary(transcription_text: str, provider: TranscriptionProvider) -> str:
return provider.generate_summary(transcription_text)
def generate_key_ideas(transcription_text: str, provider: TranscriptionProvider) -> List[Dict[str, str]]:
return provider.generate_key_ideas(transcription_text)
def create_transcript_markdown(audio_filename: str, transcription: str, summary: str, key_ideas: List[Dict[str, str]]) -> str:
base_name = os.path.splitext(audio_filename)[0]
yaml_metadata = {
'title': base_name,
'audio_file': audio_filename,
'date_processed': str(date.today()),
'summary': summary,
'key_ideas': key_ideas,
'note_id': str(uuid.uuid4())
}
yaml_frontmatter = "---\n" + yaml.dump(yaml_metadata, sort_keys=False, indent=2, allow_unicode=True) + "---\n\n"
content = yaml_frontmatter + "## Key Ideas\n\n"
for idea_item in key_ideas:
content += f"- **{idea_item['idea']}:** {idea_item['description']}\n"
content += "\n## Full Transcription\n\n" + transcription
return content
def process_audio_file(audio_file_path: str, gemini_provider: TranscriptionProvider, openrouter_provider: TranscriptionProvider = None, progress_callback=None) -> Tuple[str, str]:
# Ensure the absolute output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)
audio_filename = os.path.basename(audio_file_path)
base_name = os.path.splitext(audio_filename)[0]
file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024)
files_to_transcribe = []
if file_size_mb > 30:
if progress_callback: progress_callback("📦 Chunking file...", 0.1)
files_to_transcribe = chunk_audio_file(audio_file_path)
else:
files_to_transcribe.append(audio_file_path)
markdown_files = []
for idx, file_path in enumerate(files_to_transcribe, 1):
if progress_callback: progress_callback(f"🎙️ Transcribing {idx}/{len(files_to_transcribe)}...", 0.2 + (0.6 * idx/len(files_to_transcribe)))
transcription = generate_transcription(file_path, gemini_provider)
text_provider = openrouter_provider if openrouter_provider else gemini_provider
summary = generate_summary(transcription, text_provider)
key_ideas = generate_key_ideas(transcription, text_provider)
markdown_content = create_transcript_markdown(os.path.basename(file_path), transcription, summary, key_ideas)
# Use the global absolute OUTPUT_DIR
output_filename = os.path.splitext(os.path.basename(file_path))[0] + ".md"
markdown_path = os.path.join(OUTPUT_DIR, output_filename)
with open(markdown_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
markdown_files.append(markdown_path)
if "_part" in file_path:
try: os.remove(file_path)
except: pass
if len(markdown_files) == 1:
return markdown_files[0], "False"
else:
zip_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcripts.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for md_file in markdown_files:
zipf.write(md_file, os.path.basename(md_file))
try: os.remove(md_file)
except: pass
return zip_path, "True" |