|
|
|
|
|
|
|
|
import os |
|
|
import tempfile |
|
|
import math |
|
|
from io import BytesIO |
|
|
from typing import List, Dict, Tuple, Union |
|
|
import streamlit as st |
|
|
|
|
|
try: |
|
|
from pydub import AudioSegment |
|
|
PYDUB_AVAILABLE = True |
|
|
except ImportError: |
|
|
PYDUB_AVAILABLE = False |
|
|
st.warning("⚠️ Pydub nie jest dostępny. Funkcje kompresji ograniczone.") |
|
|
|
|
|
try: |
|
|
import librosa |
|
|
import soundfile as sf |
|
|
LIBROSA_AVAILABLE = True |
|
|
except ImportError: |
|
|
LIBROSA_AVAILABLE = False |
|
|
|
|
|
from config import FILE_PROCESSING, USER_MESSAGES |
|
|
|
|
|
class FileHandler: |
|
|
"""Klasa do obsługi plików audio/video - zoptymalizowana dla Whisper API (max 25MB)""" |
|
|
|
|
|
def __init__(self): |
|
|
self.temp_files = [] |
|
|
self.processing_stats = {} |
|
|
|
|
|
|
|
|
self.WHISPER_MAX_SIZE_MB = 25 |
|
|
self.SAFE_CHUNK_SIZE_MB = 20 |
|
|
|
|
|
def process_file(self, uploaded_file, max_chunk_size_mb: int = 20, auto_compress: bool = True) -> List[str]: |
|
|
""" |
|
|
Główna funkcja przetwarzania pliku dla Whisper API |
|
|
Returns: Lista ścieżek do plików gotowych do transkrypcji (każdy <25MB) |
|
|
""" |
|
|
try: |
|
|
file_size_mb = uploaded_file.size / (1024 * 1024) |
|
|
|
|
|
st.info(f"🔄 Przetwarzam {uploaded_file.name} ({file_size_mb:.1f}MB)") |
|
|
|
|
|
|
|
|
if file_size_mb <= self.WHISPER_MAX_SIZE_MB: |
|
|
|
|
|
temp_path = self._save_temp_file(uploaded_file) |
|
|
if temp_path: |
|
|
st.success(f"✅ Plik gotowy do transkrypcji ({file_size_mb:.1f}MB)") |
|
|
return [temp_path] |
|
|
else: |
|
|
return [] |
|
|
|
|
|
|
|
|
if file_size_mb > 100: |
|
|
st.error(f"❌ Plik zbyt duży ({file_size_mb:.1f}MB). Maksymalnie 100MB.") |
|
|
return [] |
|
|
|
|
|
|
|
|
if auto_compress and file_size_mb > self.WHISPER_MAX_SIZE_MB: |
|
|
compressed_file = self._compress_audio_for_whisper(uploaded_file) |
|
|
if compressed_file: |
|
|
compressed_size_mb = len(compressed_file.getvalue()) / (1024 * 1024) |
|
|
if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: |
|
|
temp_path = self._save_bytesio_to_temp(compressed_file, uploaded_file.name) |
|
|
if temp_path: |
|
|
st.success(f"✅ Skompresowano: {file_size_mb:.1f}MB → {compressed_size_mb:.1f}MB") |
|
|
return [temp_path] |
|
|
|
|
|
|
|
|
return self._split_audio_for_whisper(uploaded_file, max_chunk_size_mb) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"❌ Błąd przetwarzania {uploaded_file.name}: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def _compress_audio_for_whisper(self, uploaded_file) -> Union[BytesIO, None]: |
|
|
"""Agresywna kompresja audio dla Whisper API""" |
|
|
if not PYDUB_AVAILABLE: |
|
|
st.warning("Pydub niedostępny - pomijam kompresję") |
|
|
return None |
|
|
|
|
|
try: |
|
|
st.info("🗜️ Kompresuję audio...") |
|
|
|
|
|
|
|
|
audio_data = uploaded_file.read() |
|
|
uploaded_file.seek(0) |
|
|
|
|
|
audio = AudioSegment.from_file(BytesIO(audio_data)) |
|
|
|
|
|
|
|
|
compressed = audio.set_channels(1) |
|
|
compressed = compressed.set_frame_rate(16000) |
|
|
|
|
|
|
|
|
original_size_mb = uploaded_file.size / (1024 * 1024) |
|
|
|
|
|
if original_size_mb > 50: |
|
|
|
|
|
bitrate = "32k" |
|
|
elif original_size_mb > 35: |
|
|
|
|
|
bitrate = "48k" |
|
|
else: |
|
|
|
|
|
bitrate = "64k" |
|
|
|
|
|
|
|
|
output = BytesIO() |
|
|
compressed.export( |
|
|
output, |
|
|
format="mp3", |
|
|
bitrate=bitrate, |
|
|
parameters=["-ac", "1", "-ar", "16000"] |
|
|
) |
|
|
output.seek(0) |
|
|
|
|
|
|
|
|
compressed_size_mb = len(output.getvalue()) / (1024 * 1024) |
|
|
|
|
|
if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: |
|
|
return output |
|
|
else: |
|
|
st.warning(f"⚠️ Kompresja niewystarczająca ({compressed_size_mb:.1f}MB). Przechodzę do dzielenia.") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
st.warning(f"Kompresja nieudana: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _split_audio_for_whisper(self, uploaded_file, max_chunk_size_mb: int) -> List[str]: |
|
|
"""Dzieli plik audio na części <25MB dla Whisper""" |
|
|
try: |
|
|
if not PYDUB_AVAILABLE: |
|
|
st.error("❌ Pydub wymagany do dzielenia plików. Zainstaluj: pip install pydub") |
|
|
return [] |
|
|
|
|
|
st.info("✂️ Dzielę plik na części...") |
|
|
|
|
|
|
|
|
audio_data = uploaded_file.read() |
|
|
audio = AudioSegment.from_file(BytesIO(audio_data)) |
|
|
|
|
|
|
|
|
total_duration_ms = len(audio) |
|
|
file_size_mb = uploaded_file.size / (1024 * 1024) |
|
|
|
|
|
|
|
|
safe_chunk_size_mb = min(max_chunk_size_mb, self.SAFE_CHUNK_SIZE_MB) |
|
|
|
|
|
|
|
|
estimated_parts = math.ceil(file_size_mb / safe_chunk_size_mb) |
|
|
chunk_duration_ms = total_duration_ms // estimated_parts |
|
|
|
|
|
|
|
|
overlap_ms = 10 * 1000 |
|
|
|
|
|
st.info(f"📂 Dzielę na {estimated_parts} części (~{chunk_duration_ms//60000:.1f} min każda)") |
|
|
|
|
|
parts = [] |
|
|
base_name = os.path.splitext(uploaded_file.name)[0] |
|
|
|
|
|
for i in range(estimated_parts): |
|
|
start_ms = max(0, i * chunk_duration_ms - (overlap_ms if i > 0 else 0)) |
|
|
end_ms = min(total_duration_ms, (i + 1) * chunk_duration_ms + overlap_ms) |
|
|
|
|
|
|
|
|
chunk = audio[start_ms:end_ms] |
|
|
|
|
|
|
|
|
chunk = chunk.set_channels(1) |
|
|
chunk = chunk.set_frame_rate(22050) |
|
|
|
|
|
|
|
|
temp_fd, temp_path = tempfile.mkstemp( |
|
|
suffix=f"_part{i+1:02d}.mp3", |
|
|
prefix=f"{base_name}_" |
|
|
) |
|
|
os.close(temp_fd) |
|
|
|
|
|
chunk.export(temp_path, format="mp3", bitrate="96k") |
|
|
|
|
|
|
|
|
part_size_mb = os.path.getsize(temp_path) / (1024 * 1024) |
|
|
|
|
|
if part_size_mb > self.WHISPER_MAX_SIZE_MB: |
|
|
st.error(f"❌ Część {i+1} nadal za duża ({part_size_mb:.1f}MB)") |
|
|
os.remove(temp_path) |
|
|
continue |
|
|
|
|
|
parts.append(temp_path) |
|
|
self.temp_files.append(temp_path) |
|
|
|
|
|
st.success(f"✅ Część {i+1}/{estimated_parts}: {part_size_mb:.1f}MB, {(end_ms-start_ms)//60000:.1f} min") |
|
|
|
|
|
if not parts: |
|
|
st.error("❌ Nie udało się utworzyć żadnej prawidłowej części") |
|
|
|
|
|
return parts |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"❌ Błąd dzielenia pliku: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def _save_temp_file(self, uploaded_file) -> str: |
|
|
"""Zapisuje uploaded file do pliku tymczasowego""" |
|
|
try: |
|
|
suffix = f".{uploaded_file.name.split('.')[-1]}" |
|
|
temp_fd, temp_path = tempfile.mkstemp(suffix=suffix) |
|
|
|
|
|
|
|
|
with os.fdopen(temp_fd, 'wb') as tmp_file: |
|
|
content = uploaded_file.read() |
|
|
tmp_file.write(content) |
|
|
|
|
|
|
|
|
uploaded_file.seek(0) |
|
|
|
|
|
self.temp_files.append(temp_path) |
|
|
return temp_path |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"❌ Błąd zapisu tymczasowego: {str(e)}") |
|
|
return "" |
|
|
|
|
|
def _save_bytesio_to_temp(self, bytes_io: BytesIO, original_name: str) -> str: |
|
|
"""Zapisz BytesIO do pliku tymczasowego""" |
|
|
try: |
|
|
suffix = f"_compressed.mp3" |
|
|
base_name = os.path.splitext(original_name)[0] |
|
|
|
|
|
temp_fd, temp_path = tempfile.mkstemp( |
|
|
suffix=suffix, |
|
|
prefix=f"{base_name}_" |
|
|
) |
|
|
|
|
|
with os.fdopen(temp_fd, 'wb') as tmp_file: |
|
|
tmp_file.write(bytes_io.getvalue()) |
|
|
|
|
|
self.temp_files.append(temp_path) |
|
|
return temp_path |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"❌ Błąd zapisu skompresowanego: {str(e)}") |
|
|
return "" |
|
|
|
|
|
def validate_file_for_whisper(self, uploaded_file) -> Tuple[bool, str]: |
|
|
"""Walidacja pliku dla Whisper API""" |
|
|
try: |
|
|
|
|
|
file_size_mb = uploaded_file.size / (1024 * 1024) |
|
|
|
|
|
if file_size_mb == 0: |
|
|
return False, "Plik jest pusty" |
|
|
|
|
|
if file_size_mb > 100: |
|
|
return False, f"Plik za duży: {file_size_mb:.1f}MB > 100MB" |
|
|
|
|
|
|
|
|
file_ext = uploaded_file.name.split('.')[-1].lower() |
|
|
supported_formats = ['mp3', 'wav', 'mp4', 'm4a', 'aac', 'mov', 'avi'] |
|
|
|
|
|
if file_ext not in supported_formats: |
|
|
return False, f"Nieobsługiwany format: .{file_ext}" |
|
|
|
|
|
|
|
|
if file_size_mb > self.WHISPER_MAX_SIZE_MB: |
|
|
return True, f"Plik wymaga przetwarzania ({file_size_mb:.1f}MB > {self.WHISPER_MAX_SIZE_MB}MB)" |
|
|
|
|
|
return True, "OK" |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Błąd walidacji: {str(e)}" |
|
|
|
|
|
def get_audio_duration(self, file_path: str) -> float: |
|
|
"""Pobierz długość pliku audio w sekundach""" |
|
|
try: |
|
|
if LIBROSA_AVAILABLE: |
|
|
duration = librosa.get_duration(filename=file_path) |
|
|
return duration |
|
|
elif PYDUB_AVAILABLE: |
|
|
audio = AudioSegment.from_file(file_path) |
|
|
return len(audio) / 1000.0 |
|
|
else: |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
|
return file_size / (1024 * 1024) * 60 |
|
|
except: |
|
|
file_size = os.path.getsize(file_path) |
|
|
return file_size / (1024 * 1024) * 60 |
|
|
|
|
|
def estimate_processing_time(self, uploaded_files: List) -> Dict: |
|
|
"""Estymuj czas przetwarzania""" |
|
|
total_size_mb = sum(f.size for f in uploaded_files) / (1024 * 1024) |
|
|
total_duration_est = total_size_mb * 60 |
|
|
|
|
|
|
|
|
processing_time = 0 |
|
|
for f in uploaded_files: |
|
|
file_size_mb = f.size / (1024 * 1024) |
|
|
if file_size_mb > self.WHISPER_MAX_SIZE_MB: |
|
|
processing_time += file_size_mb * 2 |
|
|
|
|
|
|
|
|
transcription_time = total_duration_est * 0.1 |
|
|
|
|
|
|
|
|
report_time = len(uploaded_files) * 30 |
|
|
|
|
|
return { |
|
|
'total_size_mb': total_size_mb, |
|
|
'estimated_audio_duration': total_duration_est, |
|
|
'estimated_processing_time': processing_time, |
|
|
'estimated_transcription_time': transcription_time, |
|
|
'estimated_report_time': report_time, |
|
|
'total_estimated_time': processing_time + transcription_time + report_time, |
|
|
'files_needing_processing': sum(1 for f in uploaded_files |
|
|
if f.size / (1024 * 1024) > self.WHISPER_MAX_SIZE_MB) |
|
|
} |
|
|
|
|
|
def get_file_info(self, uploaded_file) -> Dict: |
|
|
"""Pobierz szczegółowe informacje o pliku""" |
|
|
file_size_mb = uploaded_file.size / (1024 * 1024) |
|
|
file_ext = uploaded_file.name.split('.')[-1].lower() |
|
|
|
|
|
return { |
|
|
'name': uploaded_file.name, |
|
|
'size_mb': file_size_mb, |
|
|
'format': file_ext, |
|
|
'whisper_ready': file_size_mb <= self.WHISPER_MAX_SIZE_MB, |
|
|
'needs_compression': file_size_mb > self.WHISPER_MAX_SIZE_MB and file_size_mb <= 50, |
|
|
'needs_splitting': file_size_mb > 50, |
|
|
'too_large': file_size_mb > 100, |
|
|
'estimated_duration': file_size_mb * 60, |
|
|
'estimated_processing_time': max(0, file_size_mb - self.WHISPER_MAX_SIZE_MB) * 2 |
|
|
} |
|
|
|
|
|
def cleanup_temp_files(self): |
|
|
"""Wyczyść pliki tymczasowe""" |
|
|
cleaned = 0 |
|
|
errors = 0 |
|
|
|
|
|
for temp_file in self.temp_files: |
|
|
try: |
|
|
if os.path.exists(temp_file): |
|
|
os.remove(temp_file) |
|
|
cleaned += 1 |
|
|
except Exception as e: |
|
|
errors += 1 |
|
|
st.warning(f"Nie można usunąć {temp_file}: {e}") |
|
|
|
|
|
self.temp_files = [] |
|
|
|
|
|
if cleaned > 0: |
|
|
st.success(f"🧹 Wyczyszczono {cleaned} plików tymczasowych") |
|
|
|
|
|
if errors > 0: |
|
|
st.warning(f"⚠️ {errors} plików nie udało się usunąć") |
|
|
|
|
|
def get_processing_stats(self) -> Dict: |
|
|
"""Zwróć statystyki przetwarzania""" |
|
|
return { |
|
|
'temp_files_count': len(self.temp_files), |
|
|
'whisper_max_size_mb': self.WHISPER_MAX_SIZE_MB, |
|
|
'safe_chunk_size_mb': self.SAFE_CHUNK_SIZE_MB, |
|
|
'processing_stats': self.processing_stats, |
|
|
'libraries_available': { |
|
|
'pydub': PYDUB_AVAILABLE, |
|
|
'librosa': LIBROSA_AVAILABLE |
|
|
} |
|
|
} |
|
|
|
|
|
def analyze_upload_batch(self, uploaded_files: List) -> Dict: |
|
|
"""Analizuj całą paczkę plików""" |
|
|
analysis = { |
|
|
'total_files': len(uploaded_files), |
|
|
'total_size_mb': 0, |
|
|
'whisper_ready': 0, |
|
|
'need_compression': 0, |
|
|
'need_splitting': 0, |
|
|
'too_large': 0, |
|
|
'estimated_parts': 0, |
|
|
'file_details': [] |
|
|
} |
|
|
|
|
|
for file in uploaded_files: |
|
|
info = self.get_file_info(file) |
|
|
analysis['file_details'].append(info) |
|
|
analysis['total_size_mb'] += info['size_mb'] |
|
|
|
|
|
if info['whisper_ready']: |
|
|
analysis['whisper_ready'] += 1 |
|
|
elif info['needs_compression']: |
|
|
analysis['need_compression'] += 1 |
|
|
elif info['needs_splitting']: |
|
|
analysis['need_splitting'] += 1 |
|
|
|
|
|
parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) |
|
|
analysis['estimated_parts'] += parts |
|
|
elif info['too_large']: |
|
|
analysis['too_large'] += 1 |
|
|
|
|
|
return analysis |
|
|
|
|
|
def create_processing_plan(self, uploaded_files: List) -> str: |
|
|
"""Stwórz plan przetwarzania dla użytkownika""" |
|
|
analysis = self.analyze_upload_batch(uploaded_files) |
|
|
|
|
|
plan = f""" |
|
|
📋 **PLAN PRZETWARZANIA** |
|
|
|
|
|
📊 **Podsumowanie:** |
|
|
- Plików: {analysis['total_files']} ({analysis['total_size_mb']:.1f}MB) |
|
|
- Gotowych do transkrypcji: {analysis['whisper_ready']} |
|
|
- Wymagających kompresji: {analysis['need_compression']} |
|
|
- Wymagających dzielenia: {analysis['need_splitting']} |
|
|
- Za dużych: {analysis['too_large']} |
|
|
|
|
|
""" |
|
|
|
|
|
if analysis['estimated_parts'] > 0: |
|
|
plan += f"- Szacowana liczba części: {analysis['estimated_parts']}\n" |
|
|
|
|
|
if analysis['too_large'] > 0: |
|
|
plan += f"\n❌ **PLIKI ZA DUŻE (>100MB):**\n" |
|
|
for info in analysis['file_details']: |
|
|
if info['too_large']: |
|
|
plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" |
|
|
|
|
|
if analysis['need_splitting'] > 0: |
|
|
plan += f"\n✂️ **PLIKI DO PODZIELENIA:**\n" |
|
|
for info in analysis['file_details']: |
|
|
if info['needs_splitting']: |
|
|
parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) |
|
|
plan += f"- {info['name']}: {info['size_mb']:.1f}MB → ~{parts} części\n" |
|
|
|
|
|
if analysis['need_compression'] > 0: |
|
|
plan += f"\n🗜️ **PLIKI DO KOMPRESJI:**\n" |
|
|
for info in analysis['file_details']: |
|
|
if info['needs_compression']: |
|
|
plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" |
|
|
|
|
|
|
|
|
times = self.estimate_processing_time(uploaded_files) |
|
|
plan += f""" |
|
|
⏱️ **ESTYMACJA CZASÓW:** |
|
|
- Przetwarzanie plików: ~{times['estimated_processing_time']:.1f}s |
|
|
- Transkrypcja: ~{times['estimated_transcription_time']:.1f}s |
|
|
- Generowanie raportu: ~{times['estimated_report_time']:.1f}s |
|
|
- **ŁĄCZNIE: ~{times['total_estimated_time']:.1f}s ({times['total_estimated_time']/60:.1f} min)** |
|
|
""" |
|
|
|
|
|
return plan |
|
|
|
|
|
|
|
|
def check_file_size_for_whisper(file_path: str) -> Tuple[bool, float]: |
|
|
"""Sprawdź czy plik mieści się w limicie Whisper""" |
|
|
try: |
|
|
size_mb = os.path.getsize(file_path) / (1024 * 1024) |
|
|
return size_mb <= 25, size_mb |
|
|
except: |
|
|
return False, 0 |
|
|
|
|
|
def estimate_compression_ratio(file_ext: str) -> float: |
|
|
"""Estymuj współczynnik kompresji dla różnych formatów""" |
|
|
ratios = { |
|
|
'wav': 0.1, |
|
|
'aac': 0.7, |
|
|
'mp3': 0.8, |
|
|
'm4a': 0.7, |
|
|
'mp4': 0.5, |
|
|
'mov': 0.5, |
|
|
'avi': 0.4 |
|
|
} |
|
|
return ratios.get(file_ext.lower(), 0.6) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("🧪 Test FileHandler") |
|
|
handler = FileHandler() |
|
|
|
|
|
stats = handler.get_processing_stats() |
|
|
print(f"📊 Biblioteki: {stats['libraries_available']}") |
|
|
print(f"🎯 Limit Whisper: {stats['whisper_max_size_mb']}MB") |
|
|
print(f"🔒 Bezpieczny chunk: {stats['safe_chunk_size_mb']}MB") |
|
|
|
|
|
print("✅ FileHandler gotowy do użycia") |