""" Application Streamlit standalone pour le scoring de feedbacks Architecture backend/frontend modulaire avec persistance """ import streamlit as st from pathlib import Path from backend.data_loader import ( load_dataset_from_jsonl, load_dataset_from_hf, filter_items_with_positive ) from backend.export import prepare_export, export_to_jsonl from backend.statistics import ( compute_progress, compute_score_distribution, compute_average_score, compute_most_common_score, find_unscored_indices ) from backend.persistence import SessionManager from frontend.styles import apply_custom_css from frontend.components import ( render_navigation, render_code_block, render_feedback_block, render_score_slider, render_comment_field, render_progress_metrics, render_statistics, render_export_section, render_quick_actions ) from frontend.help_page import render_help_page # Configuration de la page st.set_page_config( page_title="Feedback Scoring Tool", page_icon="⭐", layout="wide", initial_sidebar_state="expanded" ) apply_custom_css() DATA_DIR = Path("/app/data") DATA_DIR.mkdir(exist_ok=True) # Initialize session manager session_manager = SessionManager(DATA_DIR) # Initialize session state if 'dataset' not in st.session_state: st.session_state.dataset = None if 'feedback_scores' not in st.session_state: st.session_state.feedback_scores = {} if 'scoring_index' not in st.session_state: st.session_state.scoring_index = 0 if 'feedback_comments' not in st.session_state: st.session_state.feedback_comments = {} if 'items_with_positive' not in st.session_state: st.session_state.items_with_positive = [] if 'show_help' not in st.session_state: st.session_state.show_help = False if 'session_loaded' not in st.session_state: st.session_state.session_loaded = False if not st.session_state.session_loaded: session_info = session_manager.get_session_info() if session_info: st.session_state.session_loaded = True # Sidebar - Configuration with st.sidebar: st.markdown('
Menu
', unsafe_allow_html=True) # Help button - Change text based on current state button_text = "Retour aux annotations" if st.session_state.show_help else "Aide & Documentation" button_type = "secondary" if st.session_state.show_help else "primary" if st.button(button_text, use_container_width=True, type=button_type): st.session_state.show_help = not st.session_state.show_help st.rerun() # Check for saved session session_info = session_manager.get_session_info() if session_info and st.session_state.dataset is None: st.markdown('
Session Sauvegardée
', unsafe_allow_html=True) st.info(f""" **Session trouvée** - Dataset: {session_info['dataset_size']} exemples - Scorés: {session_info['total_scored']} - Position: {session_info['scoring_index'] + 1} - Dernière sauvegarde: {session_info['last_saved'][:19]} """) col1, col2 = st.columns(2) with col1: if st.button("Reprendre", use_container_width=True, type="primary"): with st.spinner("Chargement de la session..."): dataset, _ = session_manager.load_dataset() session_data = session_manager.load_session() st.session_state.dataset = dataset st.session_state.feedback_scores = session_data['feedback_scores'] st.session_state.feedback_comments = session_data['feedback_comments'] st.session_state.scoring_index = session_data['scoring_index'] st.session_state.show_help = False st.success("Session reprise") st.rerun() with col2: if st.button("Nouvelle", use_container_width=True): if 'confirm_clear' not in st.session_state: st.session_state.confirm_clear = True st.warning("Cliquer encore pour confirmer") else: session_manager.clear_session() st.session_state.confirm_clear = False st.success("Session effacée") st.rerun() st.markdown("---") st.markdown('
Nouveau Dataset
', unsafe_allow_html=True) # Charger un dataset upload_option = st.radio( "Source:", ["Fichier local (.jsonl)", "HuggingFace Hub"], label_visibility="collapsed" ) if upload_option == "Fichier local (.jsonl)": uploaded_file = st.file_uploader("Fichier JSONL", type=['jsonl'], label_visibility="collapsed") if uploaded_file is not None: if st.button("Charger", use_container_width=True): with st.spinner("Chargement..."): dataset = load_dataset_from_jsonl(uploaded_file) # Save dataset session_manager.save_dataset(dataset, { 'source': 'local_file', 'filename': uploaded_file.name }) st.session_state.dataset = dataset st.session_state.scoring_index = 0 st.session_state.feedback_scores = {} st.session_state.feedback_comments = {} st.session_state.show_help = False # Save initial empty session session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.success(f"Dataset chargé: {len(dataset)} exemples") st.rerun() else: # HuggingFace Hub hf_dataset = st.text_input("Dataset HF", placeholder="username/dataset", label_visibility="collapsed") hf_split = st.text_input("Split", value="train", label_visibility="collapsed") if st.button("Charger depuis HF", use_container_width=True): if hf_dataset: with st.spinner(f"Téléchargement de {hf_dataset}..."): try: dataset = load_dataset_from_hf(hf_dataset, hf_split) # Save dataset locally session_manager.save_dataset(dataset, { 'source': 'huggingface', 'dataset_name': hf_dataset, 'split': hf_split }) st.session_state.dataset = dataset st.session_state.scoring_index = 0 st.session_state.feedback_scores = {} st.session_state.feedback_comments = {} st.session_state.show_help = False # Save initial empty session session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.success(f"Dataset chargé et sauvegardé: {len(dataset)} exemples") st.rerun() except Exception as e: st.error(f"Erreur: {str(e)}") else: st.warning("Entrez un nom de dataset") # Main content - Show help or scoring interface if st.session_state.show_help: render_help_page() st.stop() # Titre st.title("Scoring des Feedbacks") # Main content if not st.session_state.dataset: st.warning("Aucun dataset chargé.") st.info("Vérifiez la sidebar : vous avez peut-être une session sauvegardée à reprendre") st.info("Cliquez sur Aide & Documentation pour le guide complet") st.stop() # Filter items with positive feedback dataset = st.session_state.dataset items_with_positive = filter_items_with_positive(dataset) st.session_state.items_with_positive = items_with_positive if not items_with_positive: st.error("Aucun feedback positif trouvé dans le dataset.") st.info("Le dataset doit contenir un champ 'positive' avec du texte.") st.stop() # Navigation new_index = render_navigation(st.session_state.scoring_index, len(items_with_positive)) if new_index is not None: st.session_state.scoring_index = new_index # Auto-save on navigation session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.rerun() st.markdown("---") # Get current item original_idx, current_item = items_with_positive[st.session_state.scoring_index] # Display code code_text = current_item.get('anchor', current_item.get('code', 'N/A')) language = current_item.get('language', 'python') render_code_block(code_text, language) st.markdown("---") # Display positive feedback positive_feedback = current_item.get('positive', 'N/A') render_feedback_block(positive_feedback) st.markdown("---") # Scoring interface score_key = f"score_{original_idx}" current_score = st.session_state.feedback_scores.get(original_idx, 3) score = render_score_slider(score_key, current_score) # Auto-save when score changes if score != current_score: st.session_state.feedback_scores[original_idx] = score session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.session_state.feedback_scores[original_idx] = score # Comment field comment_key = f"comment_{original_idx}" current_comment = st.session_state.feedback_comments.get(original_idx, "") comment = render_comment_field(comment_key, current_comment) st.session_state.feedback_comments[original_idx] = comment # Auto-save when comment changes if comment != current_comment: session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.markdown("---") # Progress and statistics scored_items = len(st.session_state.feedback_scores) total_items = len(items_with_positive) progress_pct = compute_progress(scored_items, total_items) * 100 render_progress_metrics( total=total_items, scored=scored_items, remaining=total_items - scored_items, progress_pct=progress_pct ) # Statistics if st.session_state.feedback_scores: avg_score = compute_average_score(st.session_state.feedback_scores) most_common = compute_most_common_score(st.session_state.feedback_scores) score_counts = compute_score_distribution(st.session_state.feedback_scores) render_statistics(avg_score, most_common, score_counts) st.markdown("---") # Export section export_data = prepare_export( items_with_positive, st.session_state.feedback_scores, st.session_state.feedback_comments ) col1, col2 = render_export_section(export_data) # Téléchargement JSONL with col2: if export_data: jsonl_content = export_to_jsonl(export_data) st.download_button( label="Télécharger JSONL", data=jsonl_content, file_name="feedback_scores.jsonl", mime="application/jsonl", use_container_width=True, type="primary" ) else: st.button( "📥 Télécharger JSONL", disabled=True, use_container_width=True, help="Aucun score enregistré" ) # Show export preview if export_data: with st.expander("Aperçu Export (5 premiers)"): preview_items = export_data[:5] st.json(preview_items) # Quick actions st.markdown("---") reset_requested, jump_to_unscored, jump_to_index = render_quick_actions( items_with_positive, st.session_state.feedback_scores, st.session_state.scoring_index ) # Handle quick actions if reset_requested: if st.session_state.feedback_scores: if 'confirm_reset' not in st.session_state: st.session_state.confirm_reset = True st.warning("Cliquez à nouveau pour confirmer la suppression de tous les scores") else: st.session_state.feedback_scores = {} st.session_state.feedback_comments = {} st.session_state.confirm_reset = False # Save cleared session session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.success("Scores réinitialisés") st.rerun() if jump_to_unscored: unscored_indices = find_unscored_indices(items_with_positive, st.session_state.feedback_scores) if unscored_indices: for pos, (idx, _) in enumerate(items_with_positive): if idx == unscored_indices[0]: st.session_state.scoring_index = pos # Save position session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.rerun() break if jump_to_index is not None: st.session_state.scoring_index = jump_to_index # Save position session_manager.save_session( st.session_state.feedback_scores, st.session_state.feedback_comments, st.session_state.scoring_index ) st.rerun() # Footer st.markdown("---") st.caption("Sauvegarde automatique activée | Vous pouvez fermer et reprendre plus tard")