import streamlit as st import os from datetime import datetime import time import traceback st.set_page_config( page_title="QualiInsightLab", page_icon="🎙️", layout="wide", initial_sidebar_state="expanded" ) # Import modułów from transcription import AudioTranscriber from report_generator import ReportGenerator from file_handler import FileHandler from config import NVIDIA_THEME, DEFAULT_SETTINGS # Custom CSS - kolorystyka NVIDIA st.markdown(f""" """, unsafe_allow_html=True) class FGIIDIAnalyzer: def __init__(self): self.transcriber = None self.report_generator = None self.file_handler = FileHandler() self.initialize_session_state() def initialize_session_state(self): """Inicjalizacja session state""" if 'transcriptions' not in st.session_state: st.session_state.transcriptions = {} if 'uploaded_files' not in st.session_state: st.session_state.uploaded_files = [] if 'processing_status' not in st.session_state: st.session_state.processing_status = 'ready' if 'final_report' not in st.session_state: st.session_state.final_report = None if 'research_brief' not in st.session_state: st.session_state.research_brief = "" if 'logs' not in st.session_state: st.session_state.logs = [] def log_message(self, message, level="INFO"): """Dodaj wiadomość do logów""" timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] {level}: {message}" st.session_state.logs.append(log_entry) # Ograniczenie liczby logów do 100 if len(st.session_state.logs) > 100: st.session_state.logs = st.session_state.logs[-100:] # Wyświetl też w konsoli dla debug print(log_entry) def render_sidebar(self): """Renderuj sidebar z konfiguracją""" st.sidebar.title("🎙️ QualiInsightLab") st.sidebar.markdown("---") # API Keys st.sidebar.subheader("🔑 Konfiguracja API") openai_key = st.sidebar.text_input( "OpenAI API Key:", type="password", help="Klucz do Whisper (transkrypcja) i GPT-4o-mini (raporty)" ) if openai_key: self.transcriber = AudioTranscriber(openai_key) self.report_generator = ReportGenerator(openai_key) st.sidebar.success("✅ API połączone") else: st.sidebar.warning("⚠️ Wprowadź klucz API") st.sidebar.markdown("---") # Ustawienia transkrypcji st.sidebar.subheader("⚙️ Ustawienia") max_file_size = st.sidebar.selectbox( "Maksymalny rozmiar części:", [10, 15, 20, 25], index=1, help="MB - większe pliki będą dzielone na części (max 25MB dla Whisper)" ) auto_compress = st.sidebar.checkbox( "Auto-kompresja dużych plików", value=True, help="Automatyczna kompresja plików >25MB" ) language = st.sidebar.selectbox( "Język transkrypcji:", ["pl", "en", "auto"], index=0, help="Język audio dla Whisper" ) st.sidebar.markdown("---") # Status systemu st.sidebar.subheader("📊 Status") if st.session_state.uploaded_files: st.sidebar.info(f"📁 Plików: {len(st.session_state.uploaded_files)}") if st.session_state.transcriptions: st.sidebar.info(f"✅ Transkrypcji: {len(st.session_state.transcriptions)}") if st.session_state.final_report: st.sidebar.success("📄 Raport gotowy") # Reset session if st.sidebar.button("🔄 Reset sesji", type="secondary"): for key in list(st.session_state.keys()): del st.session_state[key] st.rerun() return { 'openai_key': openai_key, 'max_file_size': max_file_size, 'auto_compress': auto_compress, 'language': language } def render_file_upload(self, settings): """Renderuj sekcję upload plików""" st.header("📁 Upload plików audio/video") # Research brief st.subheader("📋 Brief badawczy (opcjonalny)") research_brief = st.text_area( "Opisz cele badania, grupę docelową, kluczowe pytania badawcze:", value=st.session_state.research_brief, height=100, help="Ten opis pomoże AI wygenerować lepszy raport" ) st.session_state.research_brief = research_brief # File uploader st.subheader("🎙️ Pliki do transkrypcji") uploaded_files = st.file_uploader( "Wybierz pliki audio/video:", type=['mp3', 'wav', 'mp4', 'm4a', 'aac'], accept_multiple_files=True, help="Obsługiwane formaty: MP3, WAV, MP4, M4A, AAC (max 25MB każdy dla Whisper)" ) if uploaded_files: # Walidacja plików valid_files = [] total_size = 0 for file in uploaded_files: file_size_mb = file.size / (1024 * 1024) total_size += file_size_mb # Sprawdź rozmiar pojedynczego pliku if file_size_mb > 100: # 100MB limit dla pojedynczego pliku st.error(f"❌ {file.name}: Plik za duży ({file_size_mb:.1f}MB). Maksymalnie 100MB.") continue valid_files.append({ 'file': file, 'name': file.name, 'size_mb': file_size_mb, 'needs_splitting': file_size_mb > settings['max_file_size'], 'needs_compression': file_size_mb > 25 # Whisper limit }) # Wyświetl informacje o plikach if valid_files: st.success(f"✅ Załadowano {len(valid_files)} plików ({total_size:.1f}MB)") # Tabela z informacjami o plikach for i, file_info in enumerate(valid_files): col1, col2, col3 = st.columns([3, 1, 1]) with col1: st.write(f"📄 {file_info['name']}") with col2: st.write(f"{file_info['size_mb']:.1f}MB") with col3: if file_info['needs_compression']: st.warning("Kompresja") elif file_info['needs_splitting']: st.warning("Podział") else: st.success("OK") st.session_state.uploaded_files = valid_files return True return False def render_processing_section(self, settings): """Renderuj sekcję przetwarzania""" if not st.session_state.uploaded_files: st.info("👆 Najpierw załaduj pliki audio/video") return if not settings['openai_key']: st.warning("⚠️ Wprowadź klucz OpenAI API w sidebarze") return st.header("🚀 Przetwarzanie") # Przycisk start if st.session_state.processing_status == 'ready': if st.button("🎯 Rozpocznij transkrypcję i analizę", type="primary"): st.session_state.processing_status = 'running' # Przetwarzaj synchronicznie - PROSTSZE I DZIAŁA self.process_files(settings) elif st.session_state.processing_status == 'running': st.info("⏳ Przetwarzanie w toku...") # Podczas przetwarzania nie pokazuj przycisku stop - może powodować problemy elif st.session_state.processing_status == 'completed': st.success("🎉 Przetwarzanie zakończone pomyślnie!") if st.button("🔄 Nowe przetwarzanie", type="primary"): st.session_state.processing_status = 'ready' st.session_state.transcriptions = {} st.session_state.final_report = None st.rerun() elif st.session_state.processing_status == 'error': st.error("❌ Przetwarzanie zakończone błędem") if st.button("🔄 Spróbuj ponownie", type="primary"): st.session_state.processing_status = 'ready' st.rerun() def process_files(self, settings): """Główna logika przetwarzania plików - SYNCHRONICZNA""" try: self.log_message("🚀 Rozpoczynam przetwarzanie plików") # Container dla live updates status_container = st.empty() progress_container = st.empty() # 1. Transkrypcja wszystkich plików for i, file_info in enumerate(st.session_state.uploaded_files): status_container.info(f"🎙️ Transkrybuję: {file_info['name']} ({i+1}/{len(st.session_state.uploaded_files)})") self.log_message(f"📝 Rozpoczynam transkrypcję: {file_info['name']}") try: # Przetwórz plik (podział jeśli potrzeba) processed_files = self.file_handler.process_file( file_info['file'], settings['max_file_size'], settings['auto_compress'] ) if not processed_files: raise Exception("Nie udało się przetworzyć pliku") # Transkrypcja transcription = self.transcriber.transcribe_files( processed_files, language=settings['language'] ) st.session_state.transcriptions[file_info['name']] = transcription self.log_message(f"✅ Zakończono transkrypcję: {file_info['name']}") # Update progress progress = (i + 1) / len(st.session_state.uploaded_files) progress_container.progress(progress) except Exception as e: self.log_message(f"❌ Błąd transkrypcji {file_info['name']}: {str(e)}", "ERROR") st.error(f"Błąd przy {file_info['name']}: {str(e)}") continue # 2. Generowanie raportu if st.session_state.transcriptions: status_container.info("📄 Generuję raport badawczy...") self.log_message("📄 Rozpoczynam generowanie raportu") try: final_report = self.report_generator.generate_comprehensive_report( st.session_state.transcriptions, st.session_state.research_brief ) st.session_state.final_report = final_report st.session_state.processing_status = 'completed' status_container.success("✅ Przetwarzanie zakończone!") self.log_message("🎉 Raport wygenerowany pomyślnie!") # Automatyczny refresh żeby pokazać wyniki st.rerun() except Exception as e: self.log_message(f"❌ Błąd generowania raportu: {str(e)}", "ERROR") st.error(f"Błąd generowania raportu: {str(e)}") st.session_state.processing_status = 'error' else: st.session_state.processing_status = 'error' self.log_message("❌ Brak transkrypcji do wygenerowania raportu", "ERROR") except Exception as e: self.log_message(f"💥 Błąd krytyczny: {str(e)}", "ERROR") st.error(f"Błąd krytyczny: {str(e)}") st.session_state.processing_status = 'error' def render_results(self): """Renderuj wyniki""" if not st.session_state.transcriptions and not st.session_state.final_report: return st.header("📊 Wyniki") # Tabs dla różnych widoków tab1, tab2, tab3 = st.tabs(["📄 Raport", "🎙️ Transkrypcje", "📋 Logi"]) with tab1: if st.session_state.final_report: st.subheader("📄 Raport z badania") # Download button if st.download_button( label="📥 Pobierz raport (TXT)", data=st.session_state.final_report, file_name=f"raport_badawczy_{datetime.now().strftime('%Y%m%d_%H%M')}.txt", mime="text/plain" ): st.success("✅ Raport pobierany!") # Display report st.markdown("---") st.markdown(st.session_state.final_report) else: st.info("Raport będzie dostępny po zakończeniu przetwarzania") with tab2: if st.session_state.transcriptions: st.subheader("🎙️ Transkrypcje") for filename, transcription in st.session_state.transcriptions.items(): with st.expander(f"📄 {filename}"): st.write(transcription) # Download individual transcription st.download_button( label=f"📥 Pobierz {filename}", data=transcription, file_name=f"transkrypcja_{filename}_{datetime.now().strftime('%Y%m%d_%H%M')}.txt", mime="text/plain", key=f"download_{filename}" ) else: st.info("Transkrypcje będą dostępne po przetworzeniu plików") with tab3: st.subheader("📋 Logi procesu") if st.session_state.logs: # Clear logs button if st.button("🧹 Wyczyść logi"): st.session_state.logs = [] st.rerun() # Display logs - pokazuj najnowsze na górze logs_text = "\n".join(reversed(st.session_state.logs)) st.text_area( "Logi systemu (najnowsze na górze):", value=logs_text, height=400, disabled=True ) else: st.info("Logi będą wyświetlane podczas przetwarzania") def run(self): """Główna funkcja aplikacji""" # Sidebar settings = self.render_sidebar() # Main content st.title("🎙️ QualiInsightLab") st.markdown("*Automatyczna transkrypcja i analiza wywiadów fokusowych oraz indywidualnych*") st.markdown("---") # File upload section files_uploaded = self.render_file_upload(settings) st.markdown("---") # Processing section self.render_processing_section(settings) st.markdown("---") # Results section self.render_results() # Główna aplikacja if __name__ == "__main__": try: app = FGIIDIAnalyzer() app.run() except Exception as e: st.error(f"💥 Błąd aplikacji: {str(e)}") st.code(traceback.format_exc()) # Log error for debugging with open('error_log.txt', 'w', encoding='utf-8') as f: f.write(f"Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") st.info("Szczegóły błędu zapisane w error_log.txt")