File size: 7,423 Bytes
7ee2bc7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa9aec9
 
 
7ee2bc7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa9aec9
 
7ee2bc7
fa9aec9
7ee2bc7
fa9aec9
7ee2bc7
 
 
 
 
 
 
 
 
 
 
 
 
fa9aec9
7ee2bc7
 
 
fa9aec9
7ee2bc7
 
 
 
 
 
 
 
 
fa9aec9
 
7ee2bc7
 
 
 
 
 
 
fa9aec9
7ee2bc7
 
 
 
 
 
 
fa9aec9
7ee2bc7
fa9aec9
7ee2bc7
 
 
 
 
 
 
 
fa9aec9
 
7ee2bc7
 
fa9aec9
7ee2bc7
 
fa9aec9
7ee2bc7
 
fa9aec9
7ee2bc7
 
 
 
 
 
 
 
 
 
 
 
fa9aec9
 
 
 
7ee2bc7
 
 
fa9aec9
 
7ee2bc7
 
 
 
 
 
 
fa9aec9
 
7ee2bc7
 
 
 
 
fa9aec9
7ee2bc7
 
 
 
 
 
fa9aec9
7ee2bc7
fa9aec9
 
 
7ee2bc7
 
 
 
 
fa9aec9
 
 
 
7ee2bc7
 
 
fa9aec9
7ee2bc7
 
fa9aec9
 
 
5fdd6f4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
Simplified transcription core for HuggingFace Spaces deployment.
Version with chunking support for large files (>30MB).
Now supports multiple AI providers via provider abstraction.
"""

import os
from datetime import date, timedelta
import yaml
import uuid
from typing import List, Dict, Tuple
import ffmpeg
import gc
import psutil
import zipfile
import time
from ai_providers import TranscriptionProvider

# Define absolute output directory relative to this file
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
OUTPUT_DIR = os.path.join(CURRENT_DIR, "outputs")

def format_timestamp(seconds: float) -> str:
    """Convert seconds to ffmpeg time format (HH:MM:SS.xxx)."""
    td = timedelta(seconds=float(seconds))
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = seconds % 60
    return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"

def check_memory_usage() -> bool:
    """Check current memory usage and print warning if too high."""
    process = psutil.Process()
    memory_percent = process.memory_percent()
    if memory_percent > 80:
        print(f"Warning: High memory usage ({memory_percent:.1f}%)")
        return False
    return True

def clean_partial_chunks(base_file_path: str) -> None:
    """Clean up any existing partial chunks before starting."""
    try:
        base_name = os.path.splitext(os.path.basename(base_file_path))[0]
        # Ensure we look in the same directory as the audio file for chunks
        chunk_folder = os.path.dirname(base_file_path)
        
        for file in os.listdir(chunk_folder):
            if file.startswith(f"{base_name}_part") and file.endswith(".mp3"):
                file_path = os.path.join(chunk_folder, file)
                try:
                    os.remove(file_path)
                except Exception as e:
                    print(f"Warning: Could not remove {file}: {e}")
    except Exception as e:
        print(f"Warning: Error during cleanup: {e}")

def chunk_audio_file(audio_file_path: str, chunk_duration_minutes: int = 25, overlap_seconds: int = 5) -> List[str]:
    """Chunks an audio file into smaller parts using ffmpeg streaming."""
    chunked_files = []
    try:
        clean_partial_chunks(audio_file_path)
        duration = get_audio_duration(audio_file_path)
        
        chunk_length = chunk_duration_minutes * 60
        start_time = 0
        chunk_index = 1
        
        base_name = os.path.splitext(os.path.basename(audio_file_path))[0]
        output_folder = os.path.dirname(audio_file_path)

        while start_time < duration:
            if not check_memory_usage():
                time.sleep(5)
                continue

            end_time = min(start_time + chunk_length, duration)
            if end_time - start_time < 30 and chunk_index > 1:
                break

            chunk_file_name = f"{base_name}_part{chunk_index}.mp3"
            chunk_file_path = os.path.join(output_folder, chunk_file_name)

            try:
                stream = ffmpeg.input(audio_file_path, ss=start_time, t=end_time-start_time)
                stream = ffmpeg.output(stream, chunk_file_path, acodec='libmp3lame', loglevel='error')
                ffmpeg.run(stream, overwrite_output=True)
                
                if os.path.exists(chunk_file_path):
                    chunked_files.append(chunk_file_path)
                    chunk_index += 1
            except ffmpeg.Error as e:
                break

            if end_time == duration:
                break
            start_time = end_time - overlap_seconds
            gc.collect()

    except Exception as e:
        print(f"Error during audio chunking: {e}")
    return chunked_files

def get_audio_duration(file_path: str) -> float:
    """Get the duration of an audio file using ffmpeg."""
    probe = ffmpeg.probe(file_path)
    return float(probe['format']['duration'])

def generate_transcription(audio_file_path: str, provider: TranscriptionProvider) -> str:
    return provider.transcribe(audio_file_path)

def generate_summary(transcription_text: str, provider: TranscriptionProvider) -> str:
    return provider.generate_summary(transcription_text)

def generate_key_ideas(transcription_text: str, provider: TranscriptionProvider) -> List[Dict[str, str]]:
    return provider.generate_key_ideas(transcription_text)

def create_transcript_markdown(audio_filename: str, transcription: str, summary: str, key_ideas: List[Dict[str, str]]) -> str:
    base_name = os.path.splitext(audio_filename)[0]
    yaml_metadata = {
        'title': base_name,
        'audio_file': audio_filename,
        'date_processed': str(date.today()),
        'summary': summary,
        'key_ideas': key_ideas,
        'note_id': str(uuid.uuid4())
    }
    yaml_frontmatter = "---\n" + yaml.dump(yaml_metadata, sort_keys=False, indent=2, allow_unicode=True) + "---\n\n"
    content = yaml_frontmatter + "## Key Ideas\n\n"
    for idea_item in key_ideas:
        content += f"- **{idea_item['idea']}:** {idea_item['description']}\n"
    content += "\n## Full Transcription\n\n" + transcription
    return content

def process_audio_file(audio_file_path: str, gemini_provider: TranscriptionProvider, openrouter_provider: TranscriptionProvider = None, progress_callback=None) -> Tuple[str, str]:
    # Ensure the absolute output directory exists
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    audio_filename = os.path.basename(audio_file_path)
    base_name = os.path.splitext(audio_filename)[0]
    file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024)
    
    files_to_transcribe = []
    if file_size_mb > 30:
        if progress_callback: progress_callback("📦 Chunking file...", 0.1)
        files_to_transcribe = chunk_audio_file(audio_file_path)
    else:
        files_to_transcribe.append(audio_file_path)
    
    markdown_files = []
    for idx, file_path in enumerate(files_to_transcribe, 1):
        if progress_callback: progress_callback(f"🎙️ Transcribing {idx}/{len(files_to_transcribe)}...", 0.2 + (0.6 * idx/len(files_to_transcribe)))
        
        transcription = generate_transcription(file_path, gemini_provider)
        text_provider = openrouter_provider if openrouter_provider else gemini_provider
        summary = generate_summary(transcription, text_provider)
        key_ideas = generate_key_ideas(transcription, text_provider)
        
        markdown_content = create_transcript_markdown(os.path.basename(file_path), transcription, summary, key_ideas)
        
        # Use the global absolute OUTPUT_DIR
        output_filename = os.path.splitext(os.path.basename(file_path))[0] + ".md"
        markdown_path = os.path.join(OUTPUT_DIR, output_filename)
        
        with open(markdown_path, 'w', encoding='utf-8') as f:
            f.write(markdown_content)
        markdown_files.append(markdown_path)
        
        if "_part" in file_path:
            try: os.remove(file_path)
            except: pass

    if len(markdown_files) == 1:
        return markdown_files[0], "False"
    else:
        zip_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcripts.zip")
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for md_file in markdown_files:
                zipf.write(md_file, os.path.basename(md_file))
                try: os.remove(md_file)
                except: pass
        return zip_path, "True"