Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import requests | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import numpy as np | |
| import io | |
| # 设置页面配置 | |
| st.set_page_config( | |
| page_title="台湾上市公司碳排放数据分析", | |
| page_icon="🌱", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # 缓存数据下载函数 | |
| def download_and_process_data(): | |
| """下载并处理碳排放数据""" | |
| try: | |
| url = "https://mopsfin.twse.com.tw/opendata/t187ap46_O_1.csv" | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| # 使用StringIO读取CSV数据 | |
| df = pd.read_csv(io.StringIO(response.content.decode('utf-8-sig'))) | |
| # 移除空值 | |
| df = df.dropna() | |
| # 检查关键栏位 | |
| scope1_col = "範疇一排放量(公噸CO2e)" | |
| scope2_col = "範疇二排放量(公噸CO2e)" | |
| company_col = "公司代號" | |
| # 寻找公司代号相关栏位 | |
| company_cols = [col for col in df.columns if "公司" in col or "代號" in col or "股票" in col] | |
| emission_cols = [col for col in df.columns if "排放" in col] | |
| # 自动找到正确的栏位名称 | |
| if company_col not in df.columns and company_cols: | |
| company_col = company_cols[0] | |
| if scope1_col not in df.columns: | |
| scope1_candidates = [col for col in emission_cols if "範疇一" in col or "Scope1" in col] | |
| if scope1_candidates: | |
| scope1_col = scope1_candidates[0] | |
| if scope2_col not in df.columns: | |
| scope2_candidates = [col for col in emission_cols if "範疇二" in col or "Scope2" in col] | |
| if scope2_candidates: | |
| scope2_col = scope2_candidates[0] | |
| # 确保数值栏位为数字格式 | |
| if scope1_col in df.columns: | |
| df[scope1_col] = pd.to_numeric(df[scope1_col], errors='coerce') | |
| if scope2_col in df.columns: | |
| df[scope2_col] = pd.to_numeric(df[scope2_col], errors='coerce') | |
| # 移除转换后的空值 | |
| required_cols = [col for col in [scope1_col, scope2_col, company_col] if col in df.columns] | |
| df = df.dropna(subset=required_cols) | |
| return df, company_col, scope1_col, scope2_col, True | |
| except Exception as e: | |
| st.error(f"数据下载失败: {str(e)}") | |
| return None, None, None, None, False | |
| # 创建旭日图 | |
| def create_sunburst_chart(df, company_col, scope1_col, scope2_col, num_companies): | |
| """创建旭日图""" | |
| if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]): | |
| return None | |
| sunburst_data = [] | |
| df_top = df.nlargest(num_companies, scope1_col) | |
| for _, row in df_top.iterrows(): | |
| company = str(row[company_col]) | |
| scope1 = row[scope1_col] | |
| scope2 = row[scope2_col] | |
| sunburst_data.extend([ | |
| dict(ids=f"公司-{company}", labels=f"公司 {company}", parents="", values=scope1 + scope2), | |
| dict(ids=f"範疇一-{company}", labels=f"範疇一: {scope1:.0f}", parents=f"公司-{company}", values=scope1), | |
| dict(ids=f"範疇二-{company}", labels=f"範疇二: {scope2:.0f}", parents=f"公司-{company}", values=scope2) | |
| ]) | |
| fig = go.Figure(go.Sunburst( | |
| ids=[d['ids'] for d in sunburst_data], | |
| labels=[d['labels'] for d in sunburst_data], | |
| parents=[d['parents'] for d in sunburst_data], | |
| values=[d['values'] for d in sunburst_data], | |
| branchvalues="total", | |
| hovertemplate='<b>%{label}</b><br>排放量: %{value:.0f} 公噸CO2e<extra></extra>', | |
| maxdepth=3 | |
| )) | |
| fig.update_layout( | |
| title=f"碳排放量旭日图 (前{num_companies}家公司)", | |
| font_size=12, | |
| height=700 | |
| ) | |
| return fig | |
| # 创建双层圆饼图 | |
| def create_nested_pie_chart(df, company_col, scope1_col, scope2_col, num_companies): | |
| """创建双层圆饼图""" | |
| if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]): | |
| return None | |
| df_top = df.nlargest(num_companies, scope1_col) | |
| fig = make_subplots( | |
| rows=1, cols=2, | |
| specs=[[{"type": "pie"}, {"type": "pie"}]], | |
| subplot_titles=("範疇一排放量", "範疇二排放量") | |
| ) | |
| # 範疇一圆饼图 | |
| fig.add_trace(go.Pie( | |
| labels=df_top[company_col], | |
| values=df_top[scope1_col], | |
| name="範疇一", | |
| hovertemplate='<b>%{label}</b><br>範疇一排放量: %{value:.0f} 公噸CO2e<br>占比: %{percent}<extra></extra>', | |
| textinfo='label+percent', | |
| textposition='auto' | |
| ), row=1, col=1) | |
| # 範疇二圆饼图 | |
| fig.add_trace(go.Pie( | |
| labels=df_top[company_col], | |
| values=df_top[scope2_col], | |
| name="範疇二", | |
| hovertemplate='<b>%{label}</b><br>範疇二排放量: %{value:.0f} 公噸CO2e<br>占比: %{percent}<extra></extra>', | |
| textinfo='label+percent', | |
| textposition='auto' | |
| ), row=1, col=2) | |
| fig.update_layout( | |
| title_text=f"碳排放量圆饼图比较 (前{num_companies}家公司)", | |
| showlegend=True, | |
| height=600 | |
| ) | |
| return fig | |
| # 创建散点图 | |
| def create_scatter_plot(df, company_col, scope1_col, scope2_col): | |
| """创建散点图""" | |
| if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]): | |
| return None | |
| fig = px.scatter( | |
| df, | |
| x=scope1_col, | |
| y=scope2_col, | |
| hover_data=[company_col], | |
| title="範疇一 vs 範疇二排放量散点图", | |
| labels={ | |
| scope1_col: "範疇一排放量 (公噸CO2e)", | |
| scope2_col: "範疇二排放量 (公噸CO2e)" | |
| }, | |
| hover_name=company_col | |
| ) | |
| fig.update_layout(height=600) | |
| return fig | |
| # 创建综合旭日图 | |
| def create_comprehensive_sunburst(df, company_col, scope1_col, scope2_col): | |
| """创建综合旭日图""" | |
| if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]): | |
| return None | |
| df_copy = df.copy() | |
| df_copy['total_emission'] = df_copy[scope1_col] + df_copy[scope2_col] | |
| df_copy['emission_level'] = pd.cut(df_copy['total_emission'], | |
| bins=[0, 1000, 5000, 20000, float('inf')], | |
| labels=['低排放(<1K)', '中排放(1K-5K)', '高排放(5K-20K)', '超高排放(>20K)']) | |
| sunburst_data = [] | |
| for level in df_copy['emission_level'].unique(): | |
| if pd.isna(level): | |
| continue | |
| level_companies = df_copy[df_copy['emission_level'] == level].nlargest(8, 'total_emission') | |
| for _, row in level_companies.iterrows(): | |
| company = str(row[company_col]) | |
| scope1 = row[scope1_col] | |
| scope2 = row[scope2_col] | |
| total = scope1 + scope2 | |
| sunburst_data.extend([ | |
| dict(ids=str(level), labels=str(level), parents="", values=total), | |
| dict(ids=f"{level}-{company}", labels=f"{company}", parents=str(level), values=total), | |
| dict(ids=f"{level}-{company}-範疇一", labels=f"範疇一({scope1:.0f})", | |
| parents=f"{level}-{company}", values=scope1), | |
| dict(ids=f"{level}-{company}-範疇二", labels=f"範疇二({scope2:.0f})", | |
| parents=f"{level}-{company}", values=scope2) | |
| ]) | |
| fig = go.Figure(go.Sunburst( | |
| ids=[d['ids'] for d in sunburst_data], | |
| labels=[d['labels'] for d in sunburst_data], | |
| parents=[d['parents'] for d in sunburst_data], | |
| values=[d['values'] for d in sunburst_data], | |
| branchvalues="total", | |
| hovertemplate='<b>%{label}</b><br>排放量: %{value:.0f} 公噸CO2e<extra></extra>', | |
| maxdepth=4 | |
| )) | |
| fig.update_layout( | |
| title="分级碳排放量旭日图", | |
| font_size=10, | |
| height=700 | |
| ) | |
| return fig | |
| # 主应用 | |
| def main(): | |
| st.title("🌱 台湾上市公司碳排放数据分析") | |
| st.markdown("---") | |
| # 侧边栏设置 | |
| st.sidebar.header("📊 图表设置") | |
| # 下载数据 | |
| with st.spinner("正在下载和处理数据..."): | |
| df, company_col, scope1_col, scope2_col, success = download_and_process_data() | |
| if not success or df is None: | |
| st.error("无法获取数据,请稍后再试。") | |
| return | |
| # 显示数据概览 | |
| st.success(f"✅ 成功加载 {len(df)} 家公司的碳排放数据") | |
| # 数据统计信息 | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("总公司数", len(df)) | |
| with col2: | |
| if scope1_col in df.columns: | |
| st.metric("範疇一平均排放量", f"{df[scope1_col].mean():.0f} 公噸") | |
| with col3: | |
| if scope2_col in df.columns: | |
| st.metric("範疇二平均排放量", f"{df[scope2_col].mean():.0f} 公噸") | |
| with col4: | |
| if all(col in df.columns for col in [scope1_col, scope2_col]): | |
| total_emission = df[scope1_col].sum() + df[scope2_col].sum() | |
| st.metric("总排放量", f"{total_emission:.0f} 公噸") | |
| st.markdown("---") | |
| # 侧边栏控制选项 | |
| chart_type = st.sidebar.selectbox( | |
| "选择图表类型", | |
| ["旭日图", "双层圆饼图", "散点图", "综合旭日图", "全部图表"] | |
| ) | |
| num_companies = st.sidebar.slider( | |
| "显示公司数量 (适用于旭日图和圆饼图)", | |
| min_value=5, | |
| max_value=min(30, len(df)), | |
| value=15, | |
| step=1 | |
| ) | |
| # 显示原始数据选项 | |
| if st.sidebar.checkbox("显示原始数据"): | |
| st.subheader("📋 原始数据预览") | |
| st.dataframe(df.head(20), use_container_width=True) | |
| st.markdown("---") | |
| # 根据选择显示图表 | |
| if chart_type == "旭日图" or chart_type == "全部图表": | |
| st.subheader("🌞 碳排放量旭日图") | |
| fig1 = create_sunburst_chart(df, company_col, scope1_col, scope2_col, num_companies) | |
| if fig1: | |
| st.plotly_chart(fig1, use_container_width=True) | |
| else: | |
| st.error("无法创建旭日图,缺少必要数据字段") | |
| if chart_type == "双层圆饼图" or chart_type == "全部图表": | |
| st.subheader("🥧 双层圆饼图") | |
| fig2 = create_nested_pie_chart(df, company_col, scope1_col, scope2_col, num_companies) | |
| if fig2: | |
| st.plotly_chart(fig2, use_container_width=True) | |
| else: | |
| st.error("无法创建圆饼图,缺少必要数据字段") | |
| if chart_type == "散点图" or chart_type == "全部图表": | |
| st.subheader("📈 範疇一 vs 範疇二散点图") | |
| fig3 = create_scatter_plot(df, company_col, scope1_col, scope2_col) | |
| if fig3: | |
| st.plotly_chart(fig3, use_container_width=True) | |
| else: | |
| st.error("无法创建散点图,缺少必要数据字段") | |
| if chart_type == "综合旭日图" or chart_type == "全部图表": | |
| st.subheader("🎯 分级碳排放量旭日图") | |
| fig4 = create_comprehensive_sunburst(df, company_col, scope1_col, scope2_col) | |
| if fig4: | |
| st.plotly_chart(fig4, use_container_width=True) | |
| else: | |
| st.error("无法创建综合旭日图,缺少必要数据字段") | |
| # 详细统计信息 | |
| if st.sidebar.checkbox("显示详细统计"): | |
| st.subheader("📊 详细统计信息") | |
| if all(col in df.columns for col in [scope1_col, scope2_col]): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**範疇一排放量统计:**") | |
| st.write(f"• 平均值: {df[scope1_col].mean():.2f} 公噸CO2e") | |
| st.write(f"• 中位数: {df[scope1_col].median():.2f} 公噸CO2e") | |
| st.write(f"• 最大值: {df[scope1_col].max():.2f} 公噸CO2e") | |
| st.write(f"• 最小值: {df[scope1_col].min():.2f} 公噸CO2e") | |
| with col2: | |
| st.write("**範疇二排放量统计:**") | |
| st.write(f"• 平均值: {df[scope2_col].mean():.2f} 公噸CO2e") | |
| st.write(f"• 中位数: {df[scope2_col].median():.2f} 公噸CO2e") | |
| st.write(f"• 最大值: {df[scope2_col].max():.2f} 公噸CO2e") | |
| st.write(f"• 最小值: {df[scope2_col].min():.2f} 公噸CO2e") | |
| # 页脚 | |
| st.markdown("---") | |
| st.markdown( | |
| """ | |
| <div style='text-align: center; color: gray;'> | |
| <p>数据来源: <a href='https://mopsfin.twse.com.tw/' target='_blank'>台湾证券交易所公开资讯观测站</a></p> | |
| <p>© 2024 碳排放数据分析应用</p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |