QualiLab / file_handler.py
Marek4321's picture
Update file_handler.py
038b4c5 verified
# file_handler.py - Poprawiony handler plików dla Whisper API
import os
import tempfile
import math
from io import BytesIO
from typing import List, Dict, Tuple, Union
import streamlit as st
try:
from pydub import AudioSegment
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
st.warning("⚠️ Pydub nie jest dostępny. Funkcje kompresji ograniczone.")
try:
import librosa
import soundfile as sf
LIBROSA_AVAILABLE = True
except ImportError:
LIBROSA_AVAILABLE = False
from config import FILE_PROCESSING, USER_MESSAGES
class FileHandler:
"""Klasa do obsługi plików audio/video - zoptymalizowana dla Whisper API (max 25MB)"""
def __init__(self):
self.temp_files = []
self.processing_stats = {}
# Whisper API limits
self.WHISPER_MAX_SIZE_MB = 25
self.SAFE_CHUNK_SIZE_MB = 20 # Bezpieczny rozmiar chunka
def process_file(self, uploaded_file, max_chunk_size_mb: int = 20, auto_compress: bool = True) -> List[str]:
"""
Główna funkcja przetwarzania pliku dla Whisper API
Returns: Lista ścieżek do plików gotowych do transkrypcji (każdy <25MB)
"""
try:
file_size_mb = uploaded_file.size / (1024 * 1024)
st.info(f"🔄 Przetwarzam {uploaded_file.name} ({file_size_mb:.1f}MB)")
# Sprawdź czy plik mieści się w limicie Whisper
if file_size_mb <= self.WHISPER_MAX_SIZE_MB:
# Plik OK - zapisz bezpośrednio
temp_path = self._save_temp_file(uploaded_file)
if temp_path:
st.success(f"✅ Plik gotowy do transkrypcji ({file_size_mb:.1f}MB)")
return [temp_path]
else:
return []
# Plik za duży - wymaga przetwarzania
if file_size_mb > 100:
st.error(f"❌ Plik zbyt duży ({file_size_mb:.1f}MB). Maksymalnie 100MB.")
return []
# Strategia 1: Kompresja
if auto_compress and file_size_mb > self.WHISPER_MAX_SIZE_MB:
compressed_file = self._compress_audio_for_whisper(uploaded_file)
if compressed_file:
compressed_size_mb = len(compressed_file.getvalue()) / (1024 * 1024)
if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB:
temp_path = self._save_bytesio_to_temp(compressed_file, uploaded_file.name)
if temp_path:
st.success(f"✅ Skompresowano: {file_size_mb:.1f}MB → {compressed_size_mb:.1f}MB")
return [temp_path]
# Strategia 2: Podział na części
return self._split_audio_for_whisper(uploaded_file, max_chunk_size_mb)
except Exception as e:
st.error(f"❌ Błąd przetwarzania {uploaded_file.name}: {str(e)}")
return []
def _compress_audio_for_whisper(self, uploaded_file) -> Union[BytesIO, None]:
"""Agresywna kompresja audio dla Whisper API"""
if not PYDUB_AVAILABLE:
st.warning("Pydub niedostępny - pomijam kompresję")
return None
try:
st.info("🗜️ Kompresuję audio...")
# Załaduj audio
audio_data = uploaded_file.read()
uploaded_file.seek(0) # Reset dla dalszego użycia
audio = AudioSegment.from_file(BytesIO(audio_data))
# Agresywna kompresja dla Whisper (jakość mowy)
compressed = audio.set_channels(1) # Mono
compressed = compressed.set_frame_rate(16000) # 16kHz (wystarczy dla mowy)
# Jeszcze więcej kompresji jeśli potrzeba
original_size_mb = uploaded_file.size / (1024 * 1024)
if original_size_mb > 50:
# Bardzo duży plik - maksymalna kompresja
bitrate = "32k"
elif original_size_mb > 35:
# Duży plik - silna kompresja
bitrate = "48k"
else:
# Średni plik - umiarkowana kompresja
bitrate = "64k"
# Export do BytesIO
output = BytesIO()
compressed.export(
output,
format="mp3",
bitrate=bitrate,
parameters=["-ac", "1", "-ar", "16000"]
)
output.seek(0)
# Sprawdź rozmiar wyniku
compressed_size_mb = len(output.getvalue()) / (1024 * 1024)
if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB:
return output
else:
st.warning(f"⚠️ Kompresja niewystarczająca ({compressed_size_mb:.1f}MB). Przechodzę do dzielenia.")
return None
except Exception as e:
st.warning(f"Kompresja nieudana: {str(e)}")
return None
def _split_audio_for_whisper(self, uploaded_file, max_chunk_size_mb: int) -> List[str]:
"""Dzieli plik audio na części <25MB dla Whisper"""
try:
if not PYDUB_AVAILABLE:
st.error("❌ Pydub wymagany do dzielenia plików. Zainstaluj: pip install pydub")
return []
st.info("✂️ Dzielę plik na części...")
# Załaduj audio
audio_data = uploaded_file.read()
audio = AudioSegment.from_file(BytesIO(audio_data))
# Oblicz parametry dzielenia
total_duration_ms = len(audio)
file_size_mb = uploaded_file.size / (1024 * 1024)
# Bezpieczny rozmiar chunka (mniejszy niż limit Whisper)
safe_chunk_size_mb = min(max_chunk_size_mb, self.SAFE_CHUNK_SIZE_MB)
# Estymacja liczby części
estimated_parts = math.ceil(file_size_mb / safe_chunk_size_mb)
chunk_duration_ms = total_duration_ms // estimated_parts
# Overlap między częściami (10 sekund)
overlap_ms = 10 * 1000
st.info(f"📂 Dzielę na {estimated_parts} części (~{chunk_duration_ms//60000:.1f} min każda)")
parts = []
base_name = os.path.splitext(uploaded_file.name)[0]
for i in range(estimated_parts):
start_ms = max(0, i * chunk_duration_ms - (overlap_ms if i > 0 else 0))
end_ms = min(total_duration_ms, (i + 1) * chunk_duration_ms + overlap_ms)
# Wytnij część
chunk = audio[start_ms:end_ms]
# Lekka kompresja części
chunk = chunk.set_channels(1) # Mono
chunk = chunk.set_frame_rate(22050) # Dobra jakość ale kompaktowa
# Zapisz do pliku tymczasowego
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_part{i+1:02d}.mp3",
prefix=f"{base_name}_"
)
os.close(temp_fd)
chunk.export(temp_path, format="mp3", bitrate="96k")
# Sprawdź rozmiar części
part_size_mb = os.path.getsize(temp_path) / (1024 * 1024)
if part_size_mb > self.WHISPER_MAX_SIZE_MB:
st.error(f"❌ Część {i+1} nadal za duża ({part_size_mb:.1f}MB)")
os.remove(temp_path)
continue
parts.append(temp_path)
self.temp_files.append(temp_path)
st.success(f"✅ Część {i+1}/{estimated_parts}: {part_size_mb:.1f}MB, {(end_ms-start_ms)//60000:.1f} min")
if not parts:
st.error("❌ Nie udało się utworzyć żadnej prawidłowej części")
return parts
except Exception as e:
st.error(f"❌ Błąd dzielenia pliku: {str(e)}")
return []
def _save_temp_file(self, uploaded_file) -> str:
"""Zapisuje uploaded file do pliku tymczasowego"""
try:
suffix = f".{uploaded_file.name.split('.')[-1]}"
temp_fd, temp_path = tempfile.mkstemp(suffix=suffix)
# Zapisz dane
with os.fdopen(temp_fd, 'wb') as tmp_file:
content = uploaded_file.read()
tmp_file.write(content)
# Reset pozycji dla dalszego użycia
uploaded_file.seek(0)
self.temp_files.append(temp_path)
return temp_path
except Exception as e:
st.error(f"❌ Błąd zapisu tymczasowego: {str(e)}")
return ""
def _save_bytesio_to_temp(self, bytes_io: BytesIO, original_name: str) -> str:
"""Zapisz BytesIO do pliku tymczasowego"""
try:
suffix = f"_compressed.mp3"
base_name = os.path.splitext(original_name)[0]
temp_fd, temp_path = tempfile.mkstemp(
suffix=suffix,
prefix=f"{base_name}_"
)
with os.fdopen(temp_fd, 'wb') as tmp_file:
tmp_file.write(bytes_io.getvalue())
self.temp_files.append(temp_path)
return temp_path
except Exception as e:
st.error(f"❌ Błąd zapisu skompresowanego: {str(e)}")
return ""
def validate_file_for_whisper(self, uploaded_file) -> Tuple[bool, str]:
"""Walidacja pliku dla Whisper API"""
try:
# Sprawdź rozmiar
file_size_mb = uploaded_file.size / (1024 * 1024)
if file_size_mb == 0:
return False, "Plik jest pusty"
if file_size_mb > 100: # Rozumny limit dla przetwarzania
return False, f"Plik za duży: {file_size_mb:.1f}MB > 100MB"
# Sprawdź rozszerzenie
file_ext = uploaded_file.name.split('.')[-1].lower()
supported_formats = ['mp3', 'wav', 'mp4', 'm4a', 'aac', 'mov', 'avi']
if file_ext not in supported_formats:
return False, f"Nieobsługiwany format: .{file_ext}"
# Ostrzeżenie dla dużych plików
if file_size_mb > self.WHISPER_MAX_SIZE_MB:
return True, f"Plik wymaga przetwarzania ({file_size_mb:.1f}MB > {self.WHISPER_MAX_SIZE_MB}MB)"
return True, "OK"
except Exception as e:
return False, f"Błąd walidacji: {str(e)}"
def get_audio_duration(self, file_path: str) -> float:
"""Pobierz długość pliku audio w sekundach"""
try:
if LIBROSA_AVAILABLE:
duration = librosa.get_duration(filename=file_path)
return duration
elif PYDUB_AVAILABLE:
audio = AudioSegment.from_file(file_path)
return len(audio) / 1000.0
else:
# Fallback - estymacja na podstawie rozmiaru
file_size = os.path.getsize(file_path)
return file_size / (1024 * 1024) * 60
except:
file_size = os.path.getsize(file_path)
return file_size / (1024 * 1024) * 60
def estimate_processing_time(self, uploaded_files: List) -> Dict:
"""Estymuj czas przetwarzania"""
total_size_mb = sum(f.size for f in uploaded_files) / (1024 * 1024)
total_duration_est = total_size_mb * 60
# Czas przetwarzania (kompresja/dzielenie)
processing_time = 0
for f in uploaded_files:
file_size_mb = f.size / (1024 * 1024)
if file_size_mb > self.WHISPER_MAX_SIZE_MB:
processing_time += file_size_mb * 2 # ~2s per MB for processing
# Estymacja czasu transkrypcji (Whisper ~1:10 ratio)
transcription_time = total_duration_est * 0.1
# Estymacja czasu generowania raportu
report_time = len(uploaded_files) * 30
return {
'total_size_mb': total_size_mb,
'estimated_audio_duration': total_duration_est,
'estimated_processing_time': processing_time,
'estimated_transcription_time': transcription_time,
'estimated_report_time': report_time,
'total_estimated_time': processing_time + transcription_time + report_time,
'files_needing_processing': sum(1 for f in uploaded_files
if f.size / (1024 * 1024) > self.WHISPER_MAX_SIZE_MB)
}
def get_file_info(self, uploaded_file) -> Dict:
"""Pobierz szczegółowe informacje o pliku"""
file_size_mb = uploaded_file.size / (1024 * 1024)
file_ext = uploaded_file.name.split('.')[-1].lower()
return {
'name': uploaded_file.name,
'size_mb': file_size_mb,
'format': file_ext,
'whisper_ready': file_size_mb <= self.WHISPER_MAX_SIZE_MB,
'needs_compression': file_size_mb > self.WHISPER_MAX_SIZE_MB and file_size_mb <= 50,
'needs_splitting': file_size_mb > 50,
'too_large': file_size_mb > 100,
'estimated_duration': file_size_mb * 60,
'estimated_processing_time': max(0, file_size_mb - self.WHISPER_MAX_SIZE_MB) * 2
}
def cleanup_temp_files(self):
"""Wyczyść pliki tymczasowe"""
cleaned = 0
errors = 0
for temp_file in self.temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
cleaned += 1
except Exception as e:
errors += 1
st.warning(f"Nie można usunąć {temp_file}: {e}")
self.temp_files = []
if cleaned > 0:
st.success(f"🧹 Wyczyszczono {cleaned} plików tymczasowych")
if errors > 0:
st.warning(f"⚠️ {errors} plików nie udało się usunąć")
def get_processing_stats(self) -> Dict:
"""Zwróć statystyki przetwarzania"""
return {
'temp_files_count': len(self.temp_files),
'whisper_max_size_mb': self.WHISPER_MAX_SIZE_MB,
'safe_chunk_size_mb': self.SAFE_CHUNK_SIZE_MB,
'processing_stats': self.processing_stats,
'libraries_available': {
'pydub': PYDUB_AVAILABLE,
'librosa': LIBROSA_AVAILABLE
}
}
def analyze_upload_batch(self, uploaded_files: List) -> Dict:
"""Analizuj całą paczkę plików"""
analysis = {
'total_files': len(uploaded_files),
'total_size_mb': 0,
'whisper_ready': 0,
'need_compression': 0,
'need_splitting': 0,
'too_large': 0,
'estimated_parts': 0,
'file_details': []
}
for file in uploaded_files:
info = self.get_file_info(file)
analysis['file_details'].append(info)
analysis['total_size_mb'] += info['size_mb']
if info['whisper_ready']:
analysis['whisper_ready'] += 1
elif info['needs_compression']:
analysis['need_compression'] += 1
elif info['needs_splitting']:
analysis['need_splitting'] += 1
# Estymacja liczby części
parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB)
analysis['estimated_parts'] += parts
elif info['too_large']:
analysis['too_large'] += 1
return analysis
def create_processing_plan(self, uploaded_files: List) -> str:
"""Stwórz plan przetwarzania dla użytkownika"""
analysis = self.analyze_upload_batch(uploaded_files)
plan = f"""
📋 **PLAN PRZETWARZANIA**
📊 **Podsumowanie:**
- Plików: {analysis['total_files']} ({analysis['total_size_mb']:.1f}MB)
- Gotowych do transkrypcji: {analysis['whisper_ready']}
- Wymagających kompresji: {analysis['need_compression']}
- Wymagających dzielenia: {analysis['need_splitting']}
- Za dużych: {analysis['too_large']}
"""
if analysis['estimated_parts'] > 0:
plan += f"- Szacowana liczba części: {analysis['estimated_parts']}\n"
if analysis['too_large'] > 0:
plan += f"\n❌ **PLIKI ZA DUŻE (>100MB):**\n"
for info in analysis['file_details']:
if info['too_large']:
plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n"
if analysis['need_splitting'] > 0:
plan += f"\n✂️ **PLIKI DO PODZIELENIA:**\n"
for info in analysis['file_details']:
if info['needs_splitting']:
parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB)
plan += f"- {info['name']}: {info['size_mb']:.1f}MB → ~{parts} części\n"
if analysis['need_compression'] > 0:
plan += f"\n🗜️ **PLIKI DO KOMPRESJI:**\n"
for info in analysis['file_details']:
if info['needs_compression']:
plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n"
# Estymacja czasów
times = self.estimate_processing_time(uploaded_files)
plan += f"""
⏱️ **ESTYMACJA CZASÓW:**
- Przetwarzanie plików: ~{times['estimated_processing_time']:.1f}s
- Transkrypcja: ~{times['estimated_transcription_time']:.1f}s
- Generowanie raportu: ~{times['estimated_report_time']:.1f}s
- **ŁĄCZNIE: ~{times['total_estimated_time']:.1f}s ({times['total_estimated_time']/60:.1f} min)**
"""
return plan
# Funkcje pomocnicze
def check_file_size_for_whisper(file_path: str) -> Tuple[bool, float]:
"""Sprawdź czy plik mieści się w limicie Whisper"""
try:
size_mb = os.path.getsize(file_path) / (1024 * 1024)
return size_mb <= 25, size_mb
except:
return False, 0
def estimate_compression_ratio(file_ext: str) -> float:
"""Estymuj współczynnik kompresji dla różnych formatów"""
ratios = {
'wav': 0.1, # WAV kompresuje się bardzo dobrze
'aac': 0.7, # AAC już skompresowany
'mp3': 0.8, # MP3 już skompresowany
'm4a': 0.7, # M4A już skompresowany
'mp4': 0.5, # Video można mocno skompresować audio
'mov': 0.5,
'avi': 0.4
}
return ratios.get(file_ext.lower(), 0.6)
# Test modułu
if __name__ == "__main__":
print("🧪 Test FileHandler")
handler = FileHandler()
stats = handler.get_processing_stats()
print(f"📊 Biblioteki: {stats['libraries_available']}")
print(f"🎯 Limit Whisper: {stats['whisper_max_size_mb']}MB")
print(f"🔒 Bezpieczny chunk: {stats['safe_chunk_size_mb']}MB")
print("✅ FileHandler gotowy do użycia")