### Script for Running the KliWinBa-Assessment-Tool in Projekt KliWinBa ### # Chair for Energy Economics, University of Duisburg-Essen # Authors: Tobias Ader and Daniel Brunsch import streamlit as st import pandas as pd # --- Internal modules --- from logic.find_heatingsystem import load_or_calculate_heating_systems, generate_preview_df, prepare_heating_system_df, determine_building_state from logic.calculations import calculate_annuities, calculate_annuity_block, finalize_batch_results from utils.session import check_auth, init_session_state, handle_language_switch, ensure_scenario_state from utils.misc import get_all_heating_systems, get_selected_scenario, COLUMN_MAP_INPUT, map_input_columns, apply_technology_name_language from utils.style import apply_custom_style, render_app_header from data_utils.loader import load_scenario_table, load_data from resources.texts import get_ui_texts, ui_texts from resources.colors import PRIMARY, SECONDARY, BG_LIGHT, ALT1, ALT2, ALT3 from ui.polygon_ui import render_polygon_selector from ui.inputs_ui import render_heating_load_input, render_basic_inputs, render_advanced_settings, show_specific_heat_demand_message, render_heating_system_editor, apply_user_values, render_batch_template_download, render_batch_heating_system_editor, apply_batch_user_values, render_basic_constraints from ui.charts import create_cost_composition_chart, render_result_table, render_download_button, render_summary_bar_chart, render_mean_costs_chart, render_object_detail_chart from ui.batch_errorlog import render_batch_errorlog # --- Streamlit page setup --- st.set_page_config( page_title="KliWinBa", layout="centered" ) # --- Auth config --- ENABLE_AUTH = True # --- Session-Init --- if "LANG" not in st.session_state: st.session_state.LANG = "de" if "authenticated" not in st.session_state: st.session_state.authenticated = not ENABLE_AUTH # = True wenn Auth deaktiviert # --- Spracheinstellung --- language = st.selectbox("Sprache auswählen / Select language", ["Deutsch", "English"], index=0 if st.session_state.LANG == "de" else 1) new_LANG = "de" if language == "Deutsch" else "en" if new_LANG != st.session_state.LANG: st.session_state.LANG = new_LANG st.rerun() # --- Texte laden (nach Sprachwahl!) --- ui = get_ui_texts(ui_texts, st.session_state.LANG) # --- Passwortprüfung --- if ENABLE_AUTH and not st.session_state.authenticated: check_auth(ui) # Übergib ui für sprachabhängige Texte # --- Design + Header --- apply_custom_style() render_app_header(ui) # --- Initialize session state and static data --- init_session_state() heating_system_list = get_all_heating_systems() # Heatigng system load from csv input # --- App control variables --- submitted = False # --- State init --- ensure_scenario_state() # --- Load scenario definitions from file --- scenario_df = load_scenario_table() scenario_list = scenario_df[f"Szenario_{st.session_state.LANG.upper()}"].tolist() # --- Mode selection: manual vs. batch --- modus = st.radio( ui["mode"]["label"], ui["mode"]["options"], horizontal=True, help=ui["mode"]["help"] ) # --- Scenario selection form (with submit button) --- with st.form("szenario_form"): st.markdown("### " + ui["scenario"]["label"]) with st.expander(ui["scenario"]["notes"]): st.info(ui["scenario"]["info"]) # Mapping: Anzeigename (abhängig von Sprache) -> Szenario_ID scenario_col = f"Szenario_{st.session_state.LANG.upper()}" options = {row[scenario_col]: row["Szenario_ID"] for _, row in scenario_df.iterrows()} # Aktuell gewählte ID current_id = st.session_state.get("scenario_id") # Vorauswahl bestimmen if current_id and current_id in options.values(): inv_options = {v: k for k, v in options.items()} default_index = list(options.keys()).index(inv_options[current_id]) else: default_index = 0 # Radio zeigt Namen an, speichert aber die ID selected_name = st.radio( ui["scenario"]["label"], list(options.keys()), index=default_index, key="radio_scenario" ) selected_id = options[selected_name] confirm_scenario = st.form_submit_button(ui["scenario"]["button"]) if confirm_scenario: st.session_state.scenario_confirmed = True st.session_state.scenario_id = selected_id # --- Map scenario ID to parameter values --- row = scenario_df[scenario_df["Szenario_ID"] == st.session_state.get("scenario_id")] if not row.empty: scen_values = row.iloc[0].to_dict() scen_id = st.session_state.scenario_id else: scen_values, scen_id = {}, "A" if modus == ui["mode"]["options"][0]: if st.session_state.scenario_confirmed: if st.session_state.scenario_confirmed: render_heating_load_input(ui) with st.form("input_form", clear_on_submit=False): # Basic inputs (intern bereits 2 Spalten) floor_area, year_built, total_demand_input, specific_demand = render_basic_inputs(ui) # Advanced settings (intern bereits 2 Spalten) interest_rate, observation_period, emission_price_change_factor, emission_cost = render_advanced_settings(ui, scen_values) submitted = st.form_submit_button(ui["input"]["submit_button"]) specific_energy_demand = show_specific_heat_demand_message( total_demand_input, floor_area, specific_demand, ui ) heat_load_user = st.session_state.get("heizlast_user") if st.session_state.get("kenne_heizlast", False) else None if submitted or st.session_state.df_heizsysteme is not None: building_state = determine_building_state(specific_energy_demand) df, floor_area, year_built, specific_energy_demand, interest_rate, \ observation_period, emission_price_change_factor, emission_cost = load_or_calculate_heating_systems( submitted, specific_energy_demand, year_built, floor_area, scen_id, interest_rate, observation_period, emission_price_change_factor, emission_cost, ui, heat_load_user, building_state) # Checkboxes-Part # constraints = render_basic_constraints() enable_custom, user_values = render_heating_system_editor(df, ui) if enable_custom: st.session_state.user_values = user_values st.markdown("---") if st.button(ui["input"]["calc_button"]): df_input = apply_user_values(df, st.session_state.user_values) if user_values else df.copy() try: df_calc = calculate_annuities( df_input, specific_energy_demand, floor_area, interest_rate, observation_period, emission_cost, emission_price_change_factor, ) render_result_table(apply_technology_name_language(df_calc, ui), ui) # Diagramm anzeigen chart = create_cost_composition_chart(apply_technology_name_language(df_calc, ui), ui, ALT1, PRIMARY, ALT2) st.altair_chart(chart, use_container_width=True) except Exception as e: st.error(ui["output"]["system_selection_error"].format(error=e)) else: st.info(ui["input"]["please_provide_input"]) elif modus == ui["mode"]["options"][1]: if st.session_state.scenario_confirmed: # Auswahl: CSV oder Polygon input_mode = st.radio( ui["batch"].get("input_mode_label", "Eingabemodus"), options=[ ui["batch"].get("upload_option", "CSV-Upload"), ui["polygon"]["checkbox"] ], horizontal=True, key="batch_input_mode" ) use_polygon = (input_mode == ui["polygon"]["checkbox"]) st.session_state["use_polygon"] = use_polygon df_input = st.session_state.get("df_input") if use_polygon: # Polygon-Workflow export_df = render_polygon_selector(load_data, ui) if export_df is not None and not export_df.empty: df_input = map_input_columns(export_df, COLUMN_MAP_INPUT) st.session_state["df_input"] = df_input else: # CSV-Workflow → Template-Download erst hier anzeigen render_batch_template_download(ui) st.caption(ui["batch"]["upload_caption"]) uploaded_file = st.file_uploader( ui["batch"]["upload_warning"], type=['csv'] ) if uploaded_file is not None: try: uploaded_file.seek(0) tmp = pd.read_csv(uploaded_file, sep=";", dtype=str) tmp = map_input_columns(tmp, COLUMN_MAP_INPUT) required = ["object_id", "floor_area", "year_built", "heat_demand", "specific_demand"] missing = [c for c in required if c not in tmp.columns] if missing: st.error(ui["batch"]["missing_column"]) st.stop() df_input = tmp st.session_state["df_input"] = df_input except Exception as e: st.error(ui["batch"]["file_read_error"].format(error=e)) df_input = None if df_input is not None: # Scenario settings etc. (dein bestehender Code) interest_rate, observation_period, emission_price_change_factor, emission_cost = render_advanced_settings(ui, scen_values) try: preview_df = generate_preview_df(df_input.iloc[0], scen_id) preview_df = apply_technology_name_language(preview_df, ui) except Exception as e: preview_df = None st.error(ui["errors"]["preview_error"].format(error=e)) if preview_df is not None and not preview_df.empty: submitted, user_values = render_batch_heating_system_editor(preview_df, ui) if user_values is not None: st.session_state.batch_user_values = user_values if submitted else st.session_state.get("batch_user_values") else: st.session_state.batch_user_values = None do_calc = st.button(ui["batch"]["start_button"]) if do_calc: try: progress_bar = st.progress(0) errorlog = [] df_out, annuitaeten_gesamt, df_mittel = finalize_batch_results( df_input, heating_system_list, scen_id, interest_rate, observation_period, emission_cost, emission_price_change_factor, ui, progress_callback=progress_bar.progress, error_log=errorlog ) st.session_state["batch_error_log"]=errorlog if "Günstigste Alternative" in df_out.columns: df_out = df_out.rename(columns={"Günstigste Alternative": "Guenstigste Alternative"}) st.session_state["df_out"] = df_out st.session_state["annuitaeten_gesamt_batch"] = annuitaeten_gesamt st.session_state["df_mittel"] = df_mittel # optional: auch df_best hier berechnen wie gehabt … if "Guenstigste Alternative" in df_out.columns: best_alt_counts = df_out["Guenstigste Alternative"].value_counts().reset_index() best_alt_counts.columns = ["System", "count"] st.session_state["df_best"] = best_alt_counts else: st.session_state["df_best"] = pd.DataFrame() progress_bar.empty() st.success(ui["batch"]["success"]) except Exception as e: msg = ui["batch"]["calculation_error"].format(error=e) st.session_state["batch_error_log"].append(msg) if st.session_state.get("batch_error_log"): render_batch_errorlog(st.session_state["batch_error_log"], ui) if st.session_state.get("df_out") is not None and not st.session_state["df_out"].empty: st.markdown("### Batch-Ausgabe und Diagramme") render_download_button(st.session_state["df_out"], heating_system_list, st.session_state.LANG, ui) if st.session_state.get("df_mittel") is not None and not st.session_state["df_mittel"].empty: render_summary_bar_chart(st.session_state["df_out"], ui) render_mean_costs_chart(st.session_state["df_mittel"], ui) else: st.info(ui["batch"]["mean_error"]) if st.session_state.get("annuitaeten_gesamt_batch"): render_object_detail_chart(st.session_state["annuitaeten_gesamt_batch"], ui) else: st.info(ui["batch"]["info"]) else: # Warnung nur im CSV-Upload-Modus anzeigen if not st.session_state.get("use_polygon", False): st.warning(ui["batch"]["file_missing"], icon="⚠️") else: st.info(ui["batch"]["info"])