import streamlit as st
import pandas as pd
import requests
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import io
# 设置页面配置
st.set_page_config(
page_title="台湾上市公司碳排放数据分析",
page_icon="🌱",
layout="wide",
initial_sidebar_state="expanded"
)
# 缓存数据下载函数
@st.cache_data
def download_and_process_data():
"""下载并处理碳排放数据"""
try:
url = "https://mopsfin.twse.com.tw/opendata/t187ap46_O_1.csv"
response = requests.get(url, timeout=30)
response.raise_for_status()
# 使用StringIO读取CSV数据
df = pd.read_csv(io.StringIO(response.content.decode('utf-8-sig')))
# 移除空值
df = df.dropna()
# 检查关键栏位
scope1_col = "範疇一排放量(公噸CO2e)"
scope2_col = "範疇二排放量(公噸CO2e)"
company_col = "公司代號"
# 寻找公司代号相关栏位
company_cols = [col for col in df.columns if "公司" in col or "代號" in col or "股票" in col]
emission_cols = [col for col in df.columns if "排放" in col]
# 自动找到正确的栏位名称
if company_col not in df.columns and company_cols:
company_col = company_cols[0]
if scope1_col not in df.columns:
scope1_candidates = [col for col in emission_cols if "範疇一" in col or "Scope1" in col]
if scope1_candidates:
scope1_col = scope1_candidates[0]
if scope2_col not in df.columns:
scope2_candidates = [col for col in emission_cols if "範疇二" in col or "Scope2" in col]
if scope2_candidates:
scope2_col = scope2_candidates[0]
# 确保数值栏位为数字格式
if scope1_col in df.columns:
df[scope1_col] = pd.to_numeric(df[scope1_col], errors='coerce')
if scope2_col in df.columns:
df[scope2_col] = pd.to_numeric(df[scope2_col], errors='coerce')
# 移除转换后的空值
required_cols = [col for col in [scope1_col, scope2_col, company_col] if col in df.columns]
df = df.dropna(subset=required_cols)
return df, company_col, scope1_col, scope2_col, True
except Exception as e:
st.error(f"数据下载失败: {str(e)}")
return None, None, None, None, False
# 创建旭日图
def create_sunburst_chart(df, company_col, scope1_col, scope2_col, num_companies):
"""创建旭日图"""
if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]):
return None
sunburst_data = []
df_top = df.nlargest(num_companies, scope1_col)
for _, row in df_top.iterrows():
company = str(row[company_col])
scope1 = row[scope1_col]
scope2 = row[scope2_col]
sunburst_data.extend([
dict(ids=f"公司-{company}", labels=f"公司 {company}", parents="", values=scope1 + scope2),
dict(ids=f"範疇一-{company}", labels=f"範疇一: {scope1:.0f}", parents=f"公司-{company}", values=scope1),
dict(ids=f"範疇二-{company}", labels=f"範疇二: {scope2:.0f}", parents=f"公司-{company}", values=scope2)
])
fig = go.Figure(go.Sunburst(
ids=[d['ids'] for d in sunburst_data],
labels=[d['labels'] for d in sunburst_data],
parents=[d['parents'] for d in sunburst_data],
values=[d['values'] for d in sunburst_data],
branchvalues="total",
hovertemplate='%{label}
排放量: %{value:.0f} 公噸CO2e',
maxdepth=3
))
fig.update_layout(
title=f"碳排放量旭日图 (前{num_companies}家公司)",
font_size=12,
height=700
)
return fig
# 创建双层圆饼图
def create_nested_pie_chart(df, company_col, scope1_col, scope2_col, num_companies):
"""创建双层圆饼图"""
if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]):
return None
df_top = df.nlargest(num_companies, scope1_col)
fig = make_subplots(
rows=1, cols=2,
specs=[[{"type": "pie"}, {"type": "pie"}]],
subplot_titles=("範疇一排放量", "範疇二排放量")
)
# 範疇一圆饼图
fig.add_trace(go.Pie(
labels=df_top[company_col],
values=df_top[scope1_col],
name="範疇一",
hovertemplate='%{label}
範疇一排放量: %{value:.0f} 公噸CO2e
占比: %{percent}',
textinfo='label+percent',
textposition='auto'
), row=1, col=1)
# 範疇二圆饼图
fig.add_trace(go.Pie(
labels=df_top[company_col],
values=df_top[scope2_col],
name="範疇二",
hovertemplate='%{label}
範疇二排放量: %{value:.0f} 公噸CO2e
占比: %{percent}',
textinfo='label+percent',
textposition='auto'
), row=1, col=2)
fig.update_layout(
title_text=f"碳排放量圆饼图比较 (前{num_companies}家公司)",
showlegend=True,
height=600
)
return fig
# 创建散点图
def create_scatter_plot(df, company_col, scope1_col, scope2_col):
"""创建散点图"""
if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]):
return None
fig = px.scatter(
df,
x=scope1_col,
y=scope2_col,
hover_data=[company_col],
title="範疇一 vs 範疇二排放量散点图",
labels={
scope1_col: "範疇一排放量 (公噸CO2e)",
scope2_col: "範疇二排放量 (公噸CO2e)"
},
hover_name=company_col
)
fig.update_layout(height=600)
return fig
# 创建综合旭日图
def create_comprehensive_sunburst(df, company_col, scope1_col, scope2_col):
"""创建综合旭日图"""
if not all(col in df.columns for col in [company_col, scope1_col, scope2_col]):
return None
df_copy = df.copy()
df_copy['total_emission'] = df_copy[scope1_col] + df_copy[scope2_col]
df_copy['emission_level'] = pd.cut(df_copy['total_emission'],
bins=[0, 1000, 5000, 20000, float('inf')],
labels=['低排放(<1K)', '中排放(1K-5K)', '高排放(5K-20K)', '超高排放(>20K)'])
sunburst_data = []
for level in df_copy['emission_level'].unique():
if pd.isna(level):
continue
level_companies = df_copy[df_copy['emission_level'] == level].nlargest(8, 'total_emission')
for _, row in level_companies.iterrows():
company = str(row[company_col])
scope1 = row[scope1_col]
scope2 = row[scope2_col]
total = scope1 + scope2
sunburst_data.extend([
dict(ids=str(level), labels=str(level), parents="", values=total),
dict(ids=f"{level}-{company}", labels=f"{company}", parents=str(level), values=total),
dict(ids=f"{level}-{company}-範疇一", labels=f"範疇一({scope1:.0f})",
parents=f"{level}-{company}", values=scope1),
dict(ids=f"{level}-{company}-範疇二", labels=f"範疇二({scope2:.0f})",
parents=f"{level}-{company}", values=scope2)
])
fig = go.Figure(go.Sunburst(
ids=[d['ids'] for d in sunburst_data],
labels=[d['labels'] for d in sunburst_data],
parents=[d['parents'] for d in sunburst_data],
values=[d['values'] for d in sunburst_data],
branchvalues="total",
hovertemplate='%{label}
排放量: %{value:.0f} 公噸CO2e',
maxdepth=4
))
fig.update_layout(
title="分级碳排放量旭日图",
font_size=10,
height=700
)
return fig
# 主应用
def main():
st.title("🌱 台湾上市公司碳排放数据分析")
st.markdown("---")
# 侧边栏设置
st.sidebar.header("📊 图表设置")
# 下载数据
with st.spinner("正在下载和处理数据..."):
df, company_col, scope1_col, scope2_col, success = download_and_process_data()
if not success or df is None:
st.error("无法获取数据,请稍后再试。")
return
# 显示数据概览
st.success(f"✅ 成功加载 {len(df)} 家公司的碳排放数据")
# 数据统计信息
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("总公司数", len(df))
with col2:
if scope1_col in df.columns:
st.metric("範疇一平均排放量", f"{df[scope1_col].mean():.0f} 公噸")
with col3:
if scope2_col in df.columns:
st.metric("範疇二平均排放量", f"{df[scope2_col].mean():.0f} 公噸")
with col4:
if all(col in df.columns for col in [scope1_col, scope2_col]):
total_emission = df[scope1_col].sum() + df[scope2_col].sum()
st.metric("总排放量", f"{total_emission:.0f} 公噸")
st.markdown("---")
# 侧边栏控制选项
chart_type = st.sidebar.selectbox(
"选择图表类型",
["旭日图", "双层圆饼图", "散点图", "综合旭日图", "全部图表"]
)
num_companies = st.sidebar.slider(
"显示公司数量 (适用于旭日图和圆饼图)",
min_value=5,
max_value=min(30, len(df)),
value=15,
step=1
)
# 显示原始数据选项
if st.sidebar.checkbox("显示原始数据"):
st.subheader("📋 原始数据预览")
st.dataframe(df.head(20), use_container_width=True)
st.markdown("---")
# 根据选择显示图表
if chart_type == "旭日图" or chart_type == "全部图表":
st.subheader("🌞 碳排放量旭日图")
fig1 = create_sunburst_chart(df, company_col, scope1_col, scope2_col, num_companies)
if fig1:
st.plotly_chart(fig1, use_container_width=True)
else:
st.error("无法创建旭日图,缺少必要数据字段")
if chart_type == "双层圆饼图" or chart_type == "全部图表":
st.subheader("🥧 双层圆饼图")
fig2 = create_nested_pie_chart(df, company_col, scope1_col, scope2_col, num_companies)
if fig2:
st.plotly_chart(fig2, use_container_width=True)
else:
st.error("无法创建圆饼图,缺少必要数据字段")
if chart_type == "散点图" or chart_type == "全部图表":
st.subheader("📈 範疇一 vs 範疇二散点图")
fig3 = create_scatter_plot(df, company_col, scope1_col, scope2_col)
if fig3:
st.plotly_chart(fig3, use_container_width=True)
else:
st.error("无法创建散点图,缺少必要数据字段")
if chart_type == "综合旭日图" or chart_type == "全部图表":
st.subheader("🎯 分级碳排放量旭日图")
fig4 = create_comprehensive_sunburst(df, company_col, scope1_col, scope2_col)
if fig4:
st.plotly_chart(fig4, use_container_width=True)
else:
st.error("无法创建综合旭日图,缺少必要数据字段")
# 详细统计信息
if st.sidebar.checkbox("显示详细统计"):
st.subheader("📊 详细统计信息")
if all(col in df.columns for col in [scope1_col, scope2_col]):
col1, col2 = st.columns(2)
with col1:
st.write("**範疇一排放量统计:**")
st.write(f"• 平均值: {df[scope1_col].mean():.2f} 公噸CO2e")
st.write(f"• 中位数: {df[scope1_col].median():.2f} 公噸CO2e")
st.write(f"• 最大值: {df[scope1_col].max():.2f} 公噸CO2e")
st.write(f"• 最小值: {df[scope1_col].min():.2f} 公噸CO2e")
with col2:
st.write("**範疇二排放量统计:**")
st.write(f"• 平均值: {df[scope2_col].mean():.2f} 公噸CO2e")
st.write(f"• 中位数: {df[scope2_col].median():.2f} 公噸CO2e")
st.write(f"• 最大值: {df[scope2_col].max():.2f} 公噸CO2e")
st.write(f"• 最小值: {df[scope2_col].min():.2f} 公噸CO2e")
# 页脚
st.markdown("---")
st.markdown(
"""
""",
unsafe_allow_html=True
)
if __name__ == "__main__":
main()