# transcription.py - Szybka poprawka importów import os import time import streamlit as st from typing import List, Dict, Optional, Union, Tuple # Dodano Tuple from pathlib import Path try: from openai import OpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False st.error("❌ OpenAI library nie jest dostępna. Zainstaluj: pip install openai") from config import MODEL_SETTINGS, USER_MESSAGES class AudioTranscriber: """Klasa do transkrypcji audio używając OpenAI Whisper API""" def __init__(self, api_key: str): if not OPENAI_AVAILABLE: raise Exception("OpenAI library nie jest dostępna") self.client = OpenAI(api_key=api_key) self.api_key = api_key self.transcription_stats = { 'total_files': 0, 'successful': 0, 'failed': 0, 'total_duration': 0, 'total_cost_estimate': 0 } def transcribe_files(self, file_paths: Union[str, List[str]], language: str = "pl") -> str: """ Transkrypcja listy plików audio lub pojedynczego pliku Returns: Połączona transkrypcja wszystkich plików """ # Obsługa pojedynczego pliku if isinstance(file_paths, str): file_paths = [file_paths] transcriptions = [] for i, file_path in enumerate(file_paths): if not os.path.exists(file_path): st.error(f"❌ Plik nie istnieje: {file_path}") continue try: # Pokaż postęp if len(file_paths) > 1: st.info(f"🎙️ Transkrybuję część {i+1}/{len(file_paths)}") # Transkrypcja pojedynczego pliku z retry transcription = self.transcribe_with_retries(file_path, language) if transcription: transcriptions.append(transcription) self.transcription_stats['successful'] += 1 st.success(f"✅ Część {i+1} zakończona") else: self.transcription_stats['failed'] += 1 st.error(f"❌ Błąd części {i+1}") except Exception as e: st.error(f"❌ Błąd transkrypcji części {i+1}: {str(e)}") self.transcription_stats['failed'] += 1 # Połącz wszystkie transkrypcje if transcriptions: # Jeśli było więcej niż jeden plik, dodaj separatory if len(transcriptions) > 1: final_transcription = transcriptions[0] for i, text in enumerate(transcriptions[1:], 1): final_transcription += f"\n\n=== CZĘŚĆ {i+1} ===\n\n{text}" else: final_transcription = transcriptions[0] return final_transcription else: raise Exception("Wszystkie transkrypcje zakończone błędem") def transcribe_with_retries(self, file_path: str, language: str = "pl", max_retries: int = 3) -> Optional[str]: """Transkrypcja z ponawianiem przy błędach""" for attempt in range(max_retries): try: # Sprawdź rozmiar pliku przed każdą próbą file_size_mb = os.path.getsize(file_path) / (1024 * 1024) if file_size_mb > 25: raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB") result = self._transcribe_single_file(file_path, language) if result: return result except Exception as e: error_msg = str(e).lower() st.warning(f"⚠️ Próba {attempt + 1}/{max_retries} nieudana: {str(e)}") if attempt < max_retries - 1: # Exponential backoff z różnymi strategiami if "rate limit" in error_msg: wait_time = 60 + (attempt * 30) # Rate limit = długa przerwa st.info(f"⏳ Rate limit - czekam {wait_time}s...") elif "timeout" in error_msg: wait_time = 30 + (attempt * 15) # Timeout = średnia przerwa st.info(f"⏳ Timeout - czekam {wait_time}s...") else: wait_time = 15 + (attempt * 10) # Inne błędy = krótka przerwa st.info(f"⏳ Błąd - czekam {wait_time}s...") time.sleep(wait_time) else: st.error(f"❌ Wszystkie {max_retries} prób nieudane dla {file_path}") return None def _transcribe_single_file(self, file_path: str, language: str = "pl") -> Optional[str]: """Transkrypcja pojedynczego pliku""" try: self.transcription_stats['total_files'] += 1 # Sprawdź rozmiar pliku file_size = os.path.getsize(file_path) file_size_mb = file_size / (1024 * 1024) # OpenAI Whisper ma limit 25MB if file_size_mb > 25: raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB") # Sprawdź czy plik nie jest pusty if file_size == 0: raise Exception("Plik jest pusty") st.info(f"📤 Wysyłam do Whisper ({file_size_mb:.1f}MB)...") # Otwórz plik i wyślij do API with open(file_path, 'rb') as audio_file: # Ustaw parametry transkrypcji params = { 'model': MODEL_SETTINGS['whisper']['model'], 'file': audio_file, 'temperature': MODEL_SETTINGS['whisper']['temperature'] } # Dodaj język tylko jeśli nie jest auto if language != 'auto': params['language'] = language # Wywołaj API transcript = self.client.audio.transcriptions.create(**params) # Sprawdź czy otrzymaliśmy wynik if not transcript or not hasattr(transcript, 'text') or len(transcript.text.strip()) == 0: raise Exception("Pusty wynik transkrypcji") # Estymacja kosztu (Whisper API: $0.006 per minute) estimated_duration = file_size_mb * 60 # Rough estimate: 1MB ≈ 1 minute estimated_cost = (estimated_duration / 60) * 0.006 self.transcription_stats['total_duration'] += estimated_duration self.transcription_stats['total_cost_estimate'] += estimated_cost st.success(f"✅ Transkrypcja otrzymana ({len(transcript.text.split())} słów)") # Oczyść i zwróć transkrypcję return self.clean_transcription(transcript.text) except Exception as e: st.error(f"❌ Błąd Whisper API: {str(e)}") raise e def clean_transcription(self, transcription: str) -> str: """Oczyszczenie i formatowanie transkrypcji""" try: # Usuń nadmiarowe spacje i znaki cleaned = transcription.strip() # Usuń nadmiarowe spacje cleaned = ' '.join(cleaned.split()) # Podziel na akapity w rozsądnych miejscach sentences = cleaned.split('. ') paragraphs = [] current_paragraph = [] for sentence in sentences: current_paragraph.append(sentence) # Nowy akapit co 3-4 zdania if len(current_paragraph) >= 4: paragraphs.append('. '.join(current_paragraph) + '.') current_paragraph = [] # Dodaj ostatni akapit if current_paragraph: paragraphs.append('. '.join(current_paragraph)) # Połącz akapity formatted = '\n\n'.join(paragraphs) return formatted except Exception as e: st.warning(f"⚠️ Błąd formatowania transkrypcji: {e}") return transcription def detect_interview_type(self, transcription: str) -> str: """ Automatyczne rozpoznanie typu wywiadu na podstawie treści Returns: 'fgi', 'idi', lub 'unknown' """ text_lower = transcription.lower() # Wskaźniki FGI (Focus Group) fgi_indicators = [ 'moderator', 'grupa', 'wszyscy', 'kto jeszcze', 'a państwo', 'czy zgadzacie się', 'co myślicie', 'focus group', 'uczestnicy', 'grupa fokusowa', 'dyskusja grupowa', 'co sądzicie', 'może ktoś inny', 'a jak pan/pani' ] # Wskaźniki IDI (Individual) idi_indicators = [ 'wywiad indywidualny', 'jeden na jeden', 'prywatnie', 'osobiście', 'indywidualne', 'w cztery oczy', 'tylko między nami', 'powiedz mi', 'jak się czujesz' ] fgi_score = sum(1 for indicator in fgi_indicators if indicator in text_lower) idi_score = sum(1 for indicator in idi_indicators if indicator in text_lower) # Sprawdź także liczbę różnych głosów/osób # (FGI zwykle ma więcej przerywników, overlapping speech) interruption_patterns = ['...', '[niewyraźnie]', '[nakładanie się głosów]', '(śmiech)', '--'] interruption_count = sum(text_lower.count(pattern) for pattern in interruption_patterns) # Sprawdź długość - FGI są zwykle dłuższe word_count = len(transcription.split()) # Logika decyzyjna if fgi_score > idi_score * 1.5 and word_count > 1000: return 'fgi' elif idi_score > fgi_score * 1.5: return 'idi' elif interruption_count > 10 and word_count > 1500: return 'fgi' elif word_count < 800: return 'idi' else: return 'unknown' def validate_api_key(self) -> bool: """Sprawdź czy klucz API działa""" try: # Spróbuj pobrać listę modeli models = self.client.models.list() # Sprawdź czy whisper-1 jest dostępny model_names = [model.id for model in models.data] if 'whisper-1' not in model_names: st.warning("⚠️ Model whisper-1 nie jest dostępny") return False return True except Exception as e: st.error(f"❌ Nieprawidłowy klucz API: {str(e)}") return False def get_transcription_stats(self) -> Dict: """Zwróć statystyki transkrypcji""" stats = self.transcription_stats.copy() # Dodaj dodatkowe metryki if stats['total_files'] > 0: stats['success_rate'] = (stats['successful'] / stats['total_files']) * 100 else: stats['success_rate'] = 0 return stats def estimate_transcription_time(self, file_paths: List[str]) -> Dict: """Estymuj czas i koszt transkrypcji""" valid_files = [path for path in file_paths if os.path.exists(path)] if not valid_files: return { 'error': 'Brak prawidłowych plików', 'files_count': 0 } total_size = sum(os.path.getsize(path) for path in valid_files) total_size_mb = total_size / (1024 * 1024) # Estymacje estimated_duration_minutes = total_size_mb # 1MB ≈ 1 minute estimated_api_time = estimated_duration_minutes * 0.1 # Whisper jest ~10x szybszy estimated_cost = estimated_duration_minutes * 0.006 # $0.006 per minute # Sprawdź limity files_too_large = [] for path in valid_files: file_size_mb = os.path.getsize(path) / (1024 * 1024) if file_size_mb > 25: files_too_large.append((path, file_size_mb)) return { 'total_size_mb': total_size_mb, 'estimated_audio_duration': estimated_duration_minutes, 'estimated_processing_time': estimated_api_time, 'estimated_cost_usd': estimated_cost, 'files_count': len(valid_files), 'files_too_large': files_too_large } # Funkcje pomocnicze def validate_audio_file(file_path: str) -> Tuple[bool, str]: """Sprawdź czy plik audio jest prawidłowy""" if not os.path.exists(file_path): return False, "Plik nie istnieje" # Sprawdź rozmiar file_size = os.path.getsize(file_path) file_size_mb = file_size / (1024 * 1024) if file_size == 0: return False, "Plik jest pusty" if file_size_mb > 25: return False, f"Plik za duży: {file_size_mb:.1f}MB > 25MB" # Sprawdź rozszerzenie valid_extensions = ['.mp3', '.wav', '.mp4', '.m4a', '.aac'] file_ext = Path(file_path).suffix.lower() if file_ext not in valid_extensions: return False, f"Nieobsługiwane rozszerzenie: {file_ext}" return True, "OK" def get_file_duration_estimate(file_path: str) -> float: """Estymuj długość pliku audio w minutach""" try: file_size_mb = os.path.getsize(file_path) / (1024 * 1024) # Przybliżenie: 1MB ≈ 1 minuta dla typowego MP3 return file_size_mb except: return 0.0 # Test modułu if __name__ == "__main__": print("🧪 Test AudioTranscriber") print("✅ Import OK - wszystkie typy dostępne")