QualiLab / app.py
Marek4321's picture
Update app.py
5e8290e verified
import streamlit as st
import os
from datetime import datetime
import time
import traceback
st.set_page_config(
page_title="QualiInsightLab",
page_icon="🎙️",
layout="wide",
initial_sidebar_state="expanded"
)
# Import modułów
from transcription import AudioTranscriber
from report_generator import ReportGenerator
from file_handler import FileHandler
from config import NVIDIA_THEME, DEFAULT_SETTINGS
# Custom CSS - kolorystyka NVIDIA
st.markdown(f"""
<style>
.main {{
background-color: {NVIDIA_THEME['background']};
color: {NVIDIA_THEME['text']};
}}
.stButton > button {{
background-color: {NVIDIA_THEME['primary']};
color: {NVIDIA_THEME['background']};
border: none;
border-radius: 5px;
font-weight: bold;
}}
.stButton > button:hover {{
background-color: {NVIDIA_THEME['accent']};
color: {NVIDIA_THEME['background']};
}}
.sidebar .sidebar-content {{
background-color: {NVIDIA_THEME['secondary']};
}}
.stProgress > div > div {{
background-color: {NVIDIA_THEME['primary']};
}}
.success-box {{
background-color: rgba(118, 185, 0, 0.1);
border: 1px solid {NVIDIA_THEME['primary']};
border-radius: 5px;
padding: 10px;
margin: 10px 0;
}}
.error-box {{
background-color: rgba(255, 0, 0, 0.1);
border: 1px solid #ff0000;
border-radius: 5px;
padding: 10px;
margin: 10px 0;
}}
</style>
""", unsafe_allow_html=True)
class FGIIDIAnalyzer:
def __init__(self):
self.transcriber = None
self.report_generator = None
self.file_handler = FileHandler()
self.initialize_session_state()
def initialize_session_state(self):
"""Inicjalizacja session state"""
if 'transcriptions' not in st.session_state:
st.session_state.transcriptions = {}
if 'uploaded_files' not in st.session_state:
st.session_state.uploaded_files = []
if 'processing_status' not in st.session_state:
st.session_state.processing_status = 'ready'
if 'final_report' not in st.session_state:
st.session_state.final_report = None
if 'research_brief' not in st.session_state:
st.session_state.research_brief = ""
if 'logs' not in st.session_state:
st.session_state.logs = []
def log_message(self, message, level="INFO"):
"""Dodaj wiadomość do logów"""
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {level}: {message}"
st.session_state.logs.append(log_entry)
# Ograniczenie liczby logów do 100
if len(st.session_state.logs) > 100:
st.session_state.logs = st.session_state.logs[-100:]
# Wyświetl też w konsoli dla debug
print(log_entry)
def render_sidebar(self):
"""Renderuj sidebar z konfiguracją"""
st.sidebar.title("🎙️ QualiInsightLab")
st.sidebar.markdown("---")
# API Keys
st.sidebar.subheader("🔑 Konfiguracja API")
openai_key = st.sidebar.text_input(
"OpenAI API Key:",
type="password",
help="Klucz do Whisper (transkrypcja) i GPT-4o-mini (raporty)"
)
if openai_key:
self.transcriber = AudioTranscriber(openai_key)
self.report_generator = ReportGenerator(openai_key)
st.sidebar.success("✅ API połączone")
else:
st.sidebar.warning("⚠️ Wprowadź klucz API")
st.sidebar.markdown("---")
# Ustawienia transkrypcji
st.sidebar.subheader("⚙️ Ustawienia")
max_file_size = st.sidebar.selectbox(
"Maksymalny rozmiar części:",
[10, 15, 20, 25],
index=1,
help="MB - większe pliki będą dzielone na części (max 25MB dla Whisper)"
)
auto_compress = st.sidebar.checkbox(
"Auto-kompresja dużych plików",
value=True,
help="Automatyczna kompresja plików >25MB"
)
language = st.sidebar.selectbox(
"Język transkrypcji:",
["pl", "en", "auto"],
index=0,
help="Język audio dla Whisper"
)
st.sidebar.markdown("---")
# Status systemu
st.sidebar.subheader("📊 Status")
if st.session_state.uploaded_files:
st.sidebar.info(f"📁 Plików: {len(st.session_state.uploaded_files)}")
if st.session_state.transcriptions:
st.sidebar.info(f"✅ Transkrypcji: {len(st.session_state.transcriptions)}")
if st.session_state.final_report:
st.sidebar.success("📄 Raport gotowy")
# Reset session
if st.sidebar.button("🔄 Reset sesji", type="secondary"):
for key in list(st.session_state.keys()):
del st.session_state[key]
st.rerun()
return {
'openai_key': openai_key,
'max_file_size': max_file_size,
'auto_compress': auto_compress,
'language': language
}
def render_file_upload(self, settings):
"""Renderuj sekcję upload plików"""
st.header("📁 Upload plików audio/video")
# Research brief
st.subheader("📋 Brief badawczy (opcjonalny)")
research_brief = st.text_area(
"Opisz cele badania, grupę docelową, kluczowe pytania badawcze:",
value=st.session_state.research_brief,
height=100,
help="Ten opis pomoże AI wygenerować lepszy raport"
)
st.session_state.research_brief = research_brief
# File uploader
st.subheader("🎙️ Pliki do transkrypcji")
uploaded_files = st.file_uploader(
"Wybierz pliki audio/video:",
type=['mp3', 'wav', 'mp4', 'm4a', 'aac'],
accept_multiple_files=True,
help="Obsługiwane formaty: MP3, WAV, MP4, M4A, AAC (max 25MB każdy dla Whisper)"
)
if uploaded_files:
# Walidacja plików
valid_files = []
total_size = 0
for file in uploaded_files:
file_size_mb = file.size / (1024 * 1024)
total_size += file_size_mb
# Sprawdź rozmiar pojedynczego pliku
if file_size_mb > 100: # 100MB limit dla pojedynczego pliku
st.error(f"❌ {file.name}: Plik za duży ({file_size_mb:.1f}MB). Maksymalnie 100MB.")
continue
valid_files.append({
'file': file,
'name': file.name,
'size_mb': file_size_mb,
'needs_splitting': file_size_mb > settings['max_file_size'],
'needs_compression': file_size_mb > 25 # Whisper limit
})
# Wyświetl informacje o plikach
if valid_files:
st.success(f"✅ Załadowano {len(valid_files)} plików ({total_size:.1f}MB)")
# Tabela z informacjami o plikach
for i, file_info in enumerate(valid_files):
col1, col2, col3 = st.columns([3, 1, 1])
with col1:
st.write(f"📄 {file_info['name']}")
with col2:
st.write(f"{file_info['size_mb']:.1f}MB")
with col3:
if file_info['needs_compression']:
st.warning("Kompresja")
elif file_info['needs_splitting']:
st.warning("Podział")
else:
st.success("OK")
st.session_state.uploaded_files = valid_files
return True
return False
def render_processing_section(self, settings):
"""Renderuj sekcję przetwarzania"""
if not st.session_state.uploaded_files:
st.info("👆 Najpierw załaduj pliki audio/video")
return
if not settings['openai_key']:
st.warning("⚠️ Wprowadź klucz OpenAI API w sidebarze")
return
st.header("🚀 Przetwarzanie")
# Przycisk start
if st.session_state.processing_status == 'ready':
if st.button("🎯 Rozpocznij transkrypcję i analizę", type="primary"):
st.session_state.processing_status = 'running'
# Przetwarzaj synchronicznie - PROSTSZE I DZIAŁA
self.process_files(settings)
elif st.session_state.processing_status == 'running':
st.info("⏳ Przetwarzanie w toku...")
# Podczas przetwarzania nie pokazuj przycisku stop - może powodować problemy
elif st.session_state.processing_status == 'completed':
st.success("🎉 Przetwarzanie zakończone pomyślnie!")
if st.button("🔄 Nowe przetwarzanie", type="primary"):
st.session_state.processing_status = 'ready'
st.session_state.transcriptions = {}
st.session_state.final_report = None
st.rerun()
elif st.session_state.processing_status == 'error':
st.error("❌ Przetwarzanie zakończone błędem")
if st.button("🔄 Spróbuj ponownie", type="primary"):
st.session_state.processing_status = 'ready'
st.rerun()
def process_files(self, settings):
"""Główna logika przetwarzania plików - SYNCHRONICZNA"""
try:
self.log_message("🚀 Rozpoczynam przetwarzanie plików")
# Container dla live updates
status_container = st.empty()
progress_container = st.empty()
# 1. Transkrypcja wszystkich plików
for i, file_info in enumerate(st.session_state.uploaded_files):
status_container.info(f"🎙️ Transkrybuję: {file_info['name']} ({i+1}/{len(st.session_state.uploaded_files)})")
self.log_message(f"📝 Rozpoczynam transkrypcję: {file_info['name']}")
try:
# Przetwórz plik (podział jeśli potrzeba)
processed_files = self.file_handler.process_file(
file_info['file'],
settings['max_file_size'],
settings['auto_compress']
)
if not processed_files:
raise Exception("Nie udało się przetworzyć pliku")
# Transkrypcja
transcription = self.transcriber.transcribe_files(
processed_files,
language=settings['language']
)
st.session_state.transcriptions[file_info['name']] = transcription
self.log_message(f"✅ Zakończono transkrypcję: {file_info['name']}")
# Update progress
progress = (i + 1) / len(st.session_state.uploaded_files)
progress_container.progress(progress)
except Exception as e:
self.log_message(f"❌ Błąd transkrypcji {file_info['name']}: {str(e)}", "ERROR")
st.error(f"Błąd przy {file_info['name']}: {str(e)}")
continue
# 2. Generowanie raportu
if st.session_state.transcriptions:
status_container.info("📄 Generuję raport badawczy...")
self.log_message("📄 Rozpoczynam generowanie raportu")
try:
final_report = self.report_generator.generate_comprehensive_report(
st.session_state.transcriptions,
st.session_state.research_brief
)
st.session_state.final_report = final_report
st.session_state.processing_status = 'completed'
status_container.success("✅ Przetwarzanie zakończone!")
self.log_message("🎉 Raport wygenerowany pomyślnie!")
# Automatyczny refresh żeby pokazać wyniki
st.rerun()
except Exception as e:
self.log_message(f"❌ Błąd generowania raportu: {str(e)}", "ERROR")
st.error(f"Błąd generowania raportu: {str(e)}")
st.session_state.processing_status = 'error'
else:
st.session_state.processing_status = 'error'
self.log_message("❌ Brak transkrypcji do wygenerowania raportu", "ERROR")
except Exception as e:
self.log_message(f"💥 Błąd krytyczny: {str(e)}", "ERROR")
st.error(f"Błąd krytyczny: {str(e)}")
st.session_state.processing_status = 'error'
def render_results(self):
"""Renderuj wyniki"""
if not st.session_state.transcriptions and not st.session_state.final_report:
return
st.header("📊 Wyniki")
# Tabs dla różnych widoków
tab1, tab2, tab3 = st.tabs(["📄 Raport", "🎙️ Transkrypcje", "📋 Logi"])
with tab1:
if st.session_state.final_report:
st.subheader("📄 Raport z badania")
# Download button
if st.download_button(
label="📥 Pobierz raport (TXT)",
data=st.session_state.final_report,
file_name=f"raport_badawczy_{datetime.now().strftime('%Y%m%d_%H%M')}.txt",
mime="text/plain"
):
st.success("✅ Raport pobierany!")
# Display report
st.markdown("---")
st.markdown(st.session_state.final_report)
else:
st.info("Raport będzie dostępny po zakończeniu przetwarzania")
with tab2:
if st.session_state.transcriptions:
st.subheader("🎙️ Transkrypcje")
for filename, transcription in st.session_state.transcriptions.items():
with st.expander(f"📄 {filename}"):
st.write(transcription)
# Download individual transcription
st.download_button(
label=f"📥 Pobierz {filename}",
data=transcription,
file_name=f"transkrypcja_{filename}_{datetime.now().strftime('%Y%m%d_%H%M')}.txt",
mime="text/plain",
key=f"download_{filename}"
)
else:
st.info("Transkrypcje będą dostępne po przetworzeniu plików")
with tab3:
st.subheader("📋 Logi procesu")
if st.session_state.logs:
# Clear logs button
if st.button("🧹 Wyczyść logi"):
st.session_state.logs = []
st.rerun()
# Display logs - pokazuj najnowsze na górze
logs_text = "\n".join(reversed(st.session_state.logs))
st.text_area(
"Logi systemu (najnowsze na górze):",
value=logs_text,
height=400,
disabled=True
)
else:
st.info("Logi będą wyświetlane podczas przetwarzania")
def run(self):
"""Główna funkcja aplikacji"""
# Sidebar
settings = self.render_sidebar()
# Main content
st.title("🎙️ QualiInsightLab")
st.markdown("*Automatyczna transkrypcja i analiza wywiadów fokusowych oraz indywidualnych*")
st.markdown("---")
# File upload section
files_uploaded = self.render_file_upload(settings)
st.markdown("---")
# Processing section
self.render_processing_section(settings)
st.markdown("---")
# Results section
self.render_results()
# Główna aplikacja
if __name__ == "__main__":
try:
app = FGIIDIAnalyzer()
app.run()
except Exception as e:
st.error(f"💥 Błąd aplikacji: {str(e)}")
st.code(traceback.format_exc())
# Log error for debugging
with open('error_log.txt', 'w', encoding='utf-8') as f:
f.write(f"Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
st.info("Szczegóły błędu zapisane w error_log.txt")