"""
Application Streamlit standalone pour le scoring de feedbacks
Architecture backend/frontend modulaire avec persistance
"""
import streamlit as st
from pathlib import Path
from backend.data_loader import (
load_dataset_from_jsonl,
load_dataset_from_hf,
filter_items_with_positive
)
from backend.export import prepare_export, export_to_jsonl
from backend.statistics import (
compute_progress,
compute_score_distribution,
compute_average_score,
compute_most_common_score,
find_unscored_indices
)
from backend.persistence import SessionManager
from frontend.styles import apply_custom_css
from frontend.components import (
render_navigation,
render_code_block,
render_feedback_block,
render_score_slider,
render_comment_field,
render_progress_metrics,
render_statistics,
render_export_section,
render_quick_actions
)
from frontend.help_page import render_help_page
# Configuration de la page
st.set_page_config(
page_title="Feedback Scoring Tool",
page_icon="⭐",
layout="wide",
initial_sidebar_state="expanded"
)
apply_custom_css()
DATA_DIR = Path("/app/data")
DATA_DIR.mkdir(exist_ok=True)
# Initialize session manager
session_manager = SessionManager(DATA_DIR)
# Initialize session state
if 'dataset' not in st.session_state:
st.session_state.dataset = None
if 'feedback_scores' not in st.session_state:
st.session_state.feedback_scores = {}
if 'scoring_index' not in st.session_state:
st.session_state.scoring_index = 0
if 'feedback_comments' not in st.session_state:
st.session_state.feedback_comments = {}
if 'items_with_positive' not in st.session_state:
st.session_state.items_with_positive = []
if 'show_help' not in st.session_state:
st.session_state.show_help = False
if 'session_loaded' not in st.session_state:
st.session_state.session_loaded = False
if not st.session_state.session_loaded:
session_info = session_manager.get_session_info()
if session_info:
st.session_state.session_loaded = True
# Sidebar - Configuration
with st.sidebar:
st.markdown('
', unsafe_allow_html=True)
# Help button - Change text based on current state
button_text = "Retour aux annotations" if st.session_state.show_help else "Aide & Documentation"
button_type = "secondary" if st.session_state.show_help else "primary"
if st.button(button_text, use_container_width=True, type=button_type):
st.session_state.show_help = not st.session_state.show_help
st.rerun()
# Check for saved session
session_info = session_manager.get_session_info()
if session_info and st.session_state.dataset is None:
st.markdown('', unsafe_allow_html=True)
st.info(f"""
**Session trouvée**
- Dataset: {session_info['dataset_size']} exemples
- Scorés: {session_info['total_scored']}
- Position: {session_info['scoring_index'] + 1}
- Dernière sauvegarde: {session_info['last_saved'][:19]}
""")
col1, col2 = st.columns(2)
with col1:
if st.button("Reprendre", use_container_width=True, type="primary"):
with st.spinner("Chargement de la session..."):
dataset, _ = session_manager.load_dataset()
session_data = session_manager.load_session()
st.session_state.dataset = dataset
st.session_state.feedback_scores = session_data['feedback_scores']
st.session_state.feedback_comments = session_data['feedback_comments']
st.session_state.scoring_index = session_data['scoring_index']
st.session_state.show_help = False
st.success("Session reprise")
st.rerun()
with col2:
if st.button("Nouvelle", use_container_width=True):
if 'confirm_clear' not in st.session_state:
st.session_state.confirm_clear = True
st.warning("Cliquer encore pour confirmer")
else:
session_manager.clear_session()
st.session_state.confirm_clear = False
st.success("Session effacée")
st.rerun()
st.markdown("---")
st.markdown('', unsafe_allow_html=True)
# Charger un dataset
upload_option = st.radio(
"Source:",
["Fichier local (.jsonl)", "HuggingFace Hub"],
label_visibility="collapsed"
)
if upload_option == "Fichier local (.jsonl)":
uploaded_file = st.file_uploader("Fichier JSONL", type=['jsonl'], label_visibility="collapsed")
if uploaded_file is not None:
if st.button("Charger", use_container_width=True):
with st.spinner("Chargement..."):
dataset = load_dataset_from_jsonl(uploaded_file)
# Save dataset
session_manager.save_dataset(dataset, {
'source': 'local_file',
'filename': uploaded_file.name
})
st.session_state.dataset = dataset
st.session_state.scoring_index = 0
st.session_state.feedback_scores = {}
st.session_state.feedback_comments = {}
st.session_state.show_help = False
# Save initial empty session
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.success(f"Dataset chargé: {len(dataset)} exemples")
st.rerun()
else: # HuggingFace Hub
hf_dataset = st.text_input("Dataset HF", placeholder="username/dataset", label_visibility="collapsed")
hf_split = st.text_input("Split", value="train", label_visibility="collapsed")
if st.button("Charger depuis HF", use_container_width=True):
if hf_dataset:
with st.spinner(f"Téléchargement de {hf_dataset}..."):
try:
dataset = load_dataset_from_hf(hf_dataset, hf_split)
# Save dataset locally
session_manager.save_dataset(dataset, {
'source': 'huggingface',
'dataset_name': hf_dataset,
'split': hf_split
})
st.session_state.dataset = dataset
st.session_state.scoring_index = 0
st.session_state.feedback_scores = {}
st.session_state.feedback_comments = {}
st.session_state.show_help = False
# Save initial empty session
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.success(f"Dataset chargé et sauvegardé: {len(dataset)} exemples")
st.rerun()
except Exception as e:
st.error(f"Erreur: {str(e)}")
else:
st.warning("Entrez un nom de dataset")
# Main content - Show help or scoring interface
if st.session_state.show_help:
render_help_page()
st.stop()
# Titre
st.title("Scoring des Feedbacks")
# Main content
if not st.session_state.dataset:
st.warning("Aucun dataset chargé.")
st.info("Vérifiez la sidebar : vous avez peut-être une session sauvegardée à reprendre")
st.info("Cliquez sur Aide & Documentation pour le guide complet")
st.stop()
# Filter items with positive feedback
dataset = st.session_state.dataset
items_with_positive = filter_items_with_positive(dataset)
st.session_state.items_with_positive = items_with_positive
if not items_with_positive:
st.error("Aucun feedback positif trouvé dans le dataset.")
st.info("Le dataset doit contenir un champ 'positive' avec du texte.")
st.stop()
# Navigation
new_index = render_navigation(st.session_state.scoring_index, len(items_with_positive))
if new_index is not None:
st.session_state.scoring_index = new_index
# Auto-save on navigation
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.rerun()
st.markdown("---")
# Get current item
original_idx, current_item = items_with_positive[st.session_state.scoring_index]
# Display code
code_text = current_item.get('anchor', current_item.get('code', 'N/A'))
language = current_item.get('language', 'python')
render_code_block(code_text, language)
st.markdown("---")
# Display positive feedback
positive_feedback = current_item.get('positive', 'N/A')
render_feedback_block(positive_feedback)
st.markdown("---")
# Scoring interface
score_key = f"score_{original_idx}"
current_score = st.session_state.feedback_scores.get(original_idx, 3)
score = render_score_slider(score_key, current_score)
# Auto-save when score changes
if score != current_score:
st.session_state.feedback_scores[original_idx] = score
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.session_state.feedback_scores[original_idx] = score
# Comment field
comment_key = f"comment_{original_idx}"
current_comment = st.session_state.feedback_comments.get(original_idx, "")
comment = render_comment_field(comment_key, current_comment)
st.session_state.feedback_comments[original_idx] = comment
# Auto-save when comment changes
if comment != current_comment:
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.markdown("---")
# Progress and statistics
scored_items = len(st.session_state.feedback_scores)
total_items = len(items_with_positive)
progress_pct = compute_progress(scored_items, total_items) * 100
render_progress_metrics(
total=total_items,
scored=scored_items,
remaining=total_items - scored_items,
progress_pct=progress_pct
)
# Statistics
if st.session_state.feedback_scores:
avg_score = compute_average_score(st.session_state.feedback_scores)
most_common = compute_most_common_score(st.session_state.feedback_scores)
score_counts = compute_score_distribution(st.session_state.feedback_scores)
render_statistics(avg_score, most_common, score_counts)
st.markdown("---")
# Export section
export_data = prepare_export(
items_with_positive,
st.session_state.feedback_scores,
st.session_state.feedback_comments
)
col1, col2 = render_export_section(export_data)
# Téléchargement JSONL
with col2:
if export_data:
jsonl_content = export_to_jsonl(export_data)
st.download_button(
label="Télécharger JSONL",
data=jsonl_content,
file_name="feedback_scores.jsonl",
mime="application/jsonl",
use_container_width=True,
type="primary"
)
else:
st.button(
"📥 Télécharger JSONL",
disabled=True,
use_container_width=True,
help="Aucun score enregistré"
)
# Show export preview
if export_data:
with st.expander("Aperçu Export (5 premiers)"):
preview_items = export_data[:5]
st.json(preview_items)
# Quick actions
st.markdown("---")
reset_requested, jump_to_unscored, jump_to_index = render_quick_actions(
items_with_positive,
st.session_state.feedback_scores,
st.session_state.scoring_index
)
# Handle quick actions
if reset_requested:
if st.session_state.feedback_scores:
if 'confirm_reset' not in st.session_state:
st.session_state.confirm_reset = True
st.warning("Cliquez à nouveau pour confirmer la suppression de tous les scores")
else:
st.session_state.feedback_scores = {}
st.session_state.feedback_comments = {}
st.session_state.confirm_reset = False
# Save cleared session
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.success("Scores réinitialisés")
st.rerun()
if jump_to_unscored:
unscored_indices = find_unscored_indices(items_with_positive, st.session_state.feedback_scores)
if unscored_indices:
for pos, (idx, _) in enumerate(items_with_positive):
if idx == unscored_indices[0]:
st.session_state.scoring_index = pos
# Save position
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.rerun()
break
if jump_to_index is not None:
st.session_state.scoring_index = jump_to_index
# Save position
session_manager.save_session(
st.session_state.feedback_scores,
st.session_state.feedback_comments,
st.session_state.scoring_index
)
st.rerun()
# Footer
st.markdown("---")
st.caption("Sauvegarde automatique activée | Vous pouvez fermer et reprendre plus tard")