Data_leach / app.py
Kung-Hsun's picture
Update app.py
78b5d36 verified
# app.py
import os
import re
import time
from typing import Dict, List, Optional, Tuple
import gradio as gr
import pandas as pd
import requests
from bs4 import BeautifulSoup
# =============================
# Config
# =============================
DEFAULT_DB_CODE = "XQ62ks"
BASE = "https://demoplants21.best-research.eu"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
}
# =============================
# Utilities
# =============================
def _clean(s: str) -> str:
s = (s or "").replace("\xa0", " ")
s = re.sub(r"\s+", " ", s).strip()
return s
def fetch_url_text(url: str, timeout: int = 30, retries: int = 3, backoff: float = 1.5) -> str:
last_err = None
for attempt in range(1, retries + 1):
try:
r = requests.get(url, headers=HEADERS, timeout=timeout)
r.raise_for_status()
return r.content.decode("utf-8", errors="ignore")
except Exception as e:
last_err = e
time.sleep(backoff * attempt)
raise last_err # type: ignore
def extract_info_urls_from_displaymap_html(html_text: str, db_code: str) -> List[str]:
ids = sorted({int(x) for x in re.findall(rf"/projects/info/(\d+)/{re.escape(db_code)}", html_text)})
return [f"{BASE}/projects/info/{pid}/{db_code}" for pid in ids]
# =============================
# Parser (table-based) for info pages
# =============================
def parse_info_page(html_text: str) -> Dict[str, Optional[str]]:
soup = BeautifulSoup(html_text, "lxml")
title = None
h4 = soup.select_one("h4.modal-title")
if h4:
t = _clean(h4.get_text(" ", strip=True))
title = re.sub(r"^Project\s+", "", t).strip()
last_update = None
m = re.search(
r"Last Update:\s*([0-9]{4}-[0-9]{2}-[0-9]{2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2})",
html_text,
)
if m:
last_update = m.group(1)
data: Dict[str, Optional[str]] = {
"ProjectTitle": title,
"ProjectOwner": None,
"ProjectName": None,
"Status": None,
"Startup": None,
"Country": None,
"City": None,
"ZIP": None,
"Type": None,
"Technology": None,
"TechnologyAdditionalInfo": None,
"RawMaterial": None,
"Inputs": None,
"Outputs": None,
"TechnologyBrief": None,
"AdditionalInformation": None,
"ContactInformation": None,
"LastUpdate": last_update,
"Extra": None,
}
inputs: List[str] = []
outputs: List[str] = []
extras: Dict[str, List[Tuple[str, Optional[str]]]] = {}
current_section = "Main"
for el in soup.select("legend, table.viewproject_table"):
if el.name == "legend":
current_section = _clean(el.get_text(" ", strip=True)) or "Section"
continue
for tr in el.select("tr"):
tds = tr.find_all("td")
if len(tds) >= 2:
k = _clean(tds[0].get_text(" ", strip=True))
v = _clean(tds[1].get_text(" ", strip=True))
v = v if v != "" else None
k_norm = (k or "").lower()
if k_norm.startswith("project owner"):
data["ProjectOwner"] = v
elif k_norm == "project name":
data["ProjectName"] = v
elif k_norm == "status":
data["Status"] = v
elif k_norm == "startup":
data["Startup"] = v
elif k_norm == "country":
data["Country"] = v
elif k_norm == "city":
data["City"] = v
elif k_norm == "zip":
data["ZIP"] = v
elif k_norm == "type":
data["Type"] = v
elif k_norm == "technology":
data["Technology"] = v
elif k_norm == "technology additional information":
data["TechnologyAdditionalInfo"] = v
elif k_norm == "raw material":
data["RawMaterial"] = v
elif re.match(r"^input\s*\d+$", k_norm):
if v:
inputs.append(v)
elif re.match(r"^output\s*\d+$", k_norm):
if v:
outputs.append(v)
elif k_norm == "technology brief":
data["TechnologyBrief"] = v
else:
extras.setdefault(current_section, [])
extras[current_section].append((k, v))
elif len(tds) == 1:
v = _clean(tds[0].get_text(" ", strip=True))
if not v:
continue
if current_section.lower() == "contact information":
data["ContactInformation"] = v
else:
extras.setdefault(current_section, [])
extras[current_section].append(("(single_cell)", v))
data["Inputs"] = " | ".join(inputs) if inputs else None
data["Outputs"] = " | ".join(outputs) if outputs else None
for k in ("Technology", "RawMaterial", "Status", "Type", "Country"):
if data.get(k) is not None and _clean(data[k] or "") == "":
data[k] = None
if extras:
lines = []
for sec, kvs in extras.items():
for k, v in kvs:
lines.append(f"[{sec}] {k}: {v}")
data["Extra"] = "\n".join(lines)
return data
# =============================
# Derived fields
# =============================
def derive_trl(type_text: Optional[str]) -> str:
t = (type_text or "").strip()
m = re.search(r"TRL\s*([0-9]+(?:\s*-\s*[0-9]+)?)", t, flags=re.I)
if not m:
return "Unknown"
trl = m.group(1).replace(" ", "")
return f"TRL{trl}"
def classify_output(outputs_text: Optional[str]) -> str:
x = (outputs_text or "").lower()
if "sng" in x:
return "SNG"
if "hydrogen" in x or re.search(r"\bh2\b", x):
return "H2"
if "methanol" in x:
return "Methanol"
if "ethanol" in x:
return "Ethanol"
if "saf" in x or "jet" in x:
return "SAF/Jet"
if "ft" in x or "fischer" in x:
return "FT liquids"
if "power" in x or "electric" in x:
return "Power"
if "heat" in x or "district" in x:
return "Heat"
return "Other/Unclear"
# =============================
# Summary helpers (fixed col names)
# =============================
def vc(series: pd.Series, key: str, topn: Optional[int] = None) -> pd.DataFrame:
if series is None:
return pd.DataFrame({key: [], "Count": []})
s = series.fillna("Unknown").astype(str).str.strip().replace({"": "Unknown"})
if len(s) == 0:
return pd.DataFrame({key: [], "Count": []})
out = s.value_counts()
if topn:
out = out.head(topn)
out_df = out.rename_axis(key).reset_index(name="Count")
out_df["Count"] = pd.to_numeric(out_df["Count"], errors="coerce").fillna(0).astype(int)
return out_df
def build_summaries(df: pd.DataFrame):
status_df = vc(df.get("Status", pd.Series(dtype=str)), "Status")
country_df = vc(df.get("Country", pd.Series(dtype=str)), "Country", topn=15)
type_df = vc(df.get("Type", pd.Series(dtype=str)), "Type", topn=15)
raw_df = vc(df.get("RawMaterial", pd.Series(dtype=str)), "RawMaterial", topn=30)
out_df = vc(df.get("OutputClass", pd.Series(dtype=str)), "OutputClass")
return status_df, country_df, type_df, raw_df, out_df
def write_outputs(df: pd.DataFrame, out_xlsx: str, out_csv: str) -> None:
df.to_csv(out_csv, index=False, encoding="utf-8-sig")
status_df, country_df, type_df, raw_df, out_df = build_summaries(df)
with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer:
df.to_excel(writer, sheet_name="cases_fullinfo", index=False)
status_df.to_excel(writer, sheet_name="summary_status", index=False)
country_df.to_excel(writer, sheet_name="summary_country_top15", index=False)
type_df.to_excel(writer, sheet_name="summary_type_top15", index=False)
raw_df.to_excel(writer, sheet_name="summary_raw_material_top30", index=False)
out_df.to_excel(writer, sheet_name="summary_output_class", index=False)
def write_chart_tables(out_xlsx: str, tables: Dict[str, pd.DataFrame]) -> None:
"""Write all chart/analysis tables to a single Excel workbook for download."""
with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer:
for sheet, df in tables.items():
if df is None:
continue
try:
_df = df.copy()
except Exception:
continue
sheet_name = str(sheet)[:31] # Excel sheet name max length
_df.to_excel(writer, sheet_name=sheet_name, index=False)
# =============================
# SNG-focused summaries
# =============================
def build_sng_summaries(df: pd.DataFrame):
df_sng = df.copy()
if "OutputClass" in df_sng.columns:
df_sng = df_sng[df_sng["OutputClass"].fillna("").astype(str) == "SNG"]
else:
df_sng = df_sng.iloc[0:0]
sng_trl_df = vc(df_sng.get("TRL", pd.Series(dtype=str)), "TRL")
sng_raw_df = vc(df_sng.get("RawMaterial", pd.Series(dtype=str)), "RawMaterial", topn=30)
sng_country_df = vc(df_sng.get("Country", pd.Series(dtype=str)), "Country", topn=30)
sng_status_df = vc(df_sng.get("Status", pd.Series(dtype=str)), "Status")
# NEW: OutputClass=SNG AND Status in {operational, planned} -> TRL distribution
df_sng_op_pl = df_sng.copy()
if "Status" in df_sng_op_pl.columns:
st = df_sng_op_pl["Status"].fillna("").astype(str).str.strip().str.lower()
df_sng_op_pl = df_sng_op_pl[st.isin(["operational", "planned"])]
sng_trl_op_pl_df = vc(df_sng_op_pl.get("TRL", pd.Series(dtype=str)), "TRL")
meta = pd.DataFrame({
"Metric": ["SNG cases", "Total cases"],
"Value": [len(df_sng), len(df)],
})
return meta, sng_trl_df, sng_raw_df, sng_country_df, sng_status_df, sng_trl_op_pl_df, df_sng.head(200)
# =============================
# Main scrape function
# =============================
def run_scrape(
db_code: str,
use_uploaded_html: bool,
uploaded_html_file,
sleep_sec: float,
timeout_sec: int,
max_projects: int,
retries: int,
progress=gr.Progress(track_tqdm=False),
):
db_code = (db_code or DEFAULT_DB_CODE).strip()
sleep_sec = float(sleep_sec or 0.0)
timeout_sec = int(timeout_sec or 30)
max_projects = int(max_projects or 0)
retries = int(retries or 3)
log_lines = []
def log(msg: str):
log_lines.append(msg)
# 1) displaymap HTML
if use_uploaded_html:
if uploaded_html_file is None:
raise gr.Error("你選了「使用上傳 HTML」,但未上傳檔案。")
file_path = str(uploaded_html_file)
if not os.path.exists(file_path):
raise gr.Error(f"找不到上傳檔案路徑:{file_path}")
with open(file_path, "rb") as f:
displaymap_html = f.read().decode("utf-8", errors="ignore")
log("Loaded displaymap HTML from uploaded file.")
else:
displaymap_url = f"{BASE}/projects/displaymap/{db_code}"
log(f"Fetching displaymap: {displaymap_url}")
displaymap_html = fetch_url_text(displaymap_url, timeout=timeout_sec, retries=retries)
log("Fetched displaymap HTML from web.")
# 2) extract info URLs
info_urls = extract_info_urls_from_displaymap_html(displaymap_html, db_code=db_code)
total = len(info_urls)
if total == 0:
raise gr.Error("未解析到任何 /projects/info/{id}/... 連結。請改用上傳 displaymap HTML。")
log(f"Extracted {total} info URLs.")
if max_projects > 0:
info_urls = info_urls[:max_projects]
log(f"Apply max_projects={max_projects}, will scrape {len(info_urls)} URLs.")
# 3) scrape each info page
rows = []
for idx, url in enumerate(info_urls, start=1):
progress((idx - 1) / len(info_urls), desc=f"Scraping {idx}/{len(info_urls)}")
pid_match = re.search(r"/projects/info/(\d+)/", url)
pid = int(pid_match.group(1)) if pid_match else None
try:
html_text = fetch_url_text(url, timeout=timeout_sec, retries=retries)
data = parse_info_page(html_text)
data["ProjectID"] = pid
data["InfoURL"] = url
rows.append(data)
except Exception as e:
rows.append({"ProjectID": pid, "InfoURL": url, "Error": str(e)})
time.sleep(max(0.0, sleep_sec))
if idx % 25 == 0:
log(f"Fetched {idx}/{len(info_urls)}")
progress(1.0, desc="Building outputs...")
df = pd.DataFrame(rows)
# derived columns
df["TRL"] = df["Type"].apply(derive_trl) if "Type" in df.columns else "Unknown"
df["OutputClass"] = df["Outputs"].apply(classify_output) if "Outputs" in df.columns else "Other/Unclear"
out_xlsx = f"IEA_Task33_cases_fullinfo_{db_code}.xlsx"
out_csv = f"IEA_Task33_cases_fullinfo_{db_code}.csv"
write_outputs(df, out_xlsx, out_csv)
status_df, country_df, type_df, raw_df, out_df = build_summaries(df)
sng_meta, sng_trl_df, sng_raw_df, sng_country_df, sng_status_df, sng_trl_op_pl_df, sng_preview = build_sng_summaries(df)
# Chart tables workbook (download all analysis tables)
out_charts_xlsx = f"IEA_Task33_chart_tables_{db_code}.xlsx"
chart_tables = {
"all_status": status_df,
"all_country_top15": country_df,
"all_type_top15": type_df,
"all_raw_top30": raw_df,
"all_outputclass": out_df,
"sng_meta": sng_meta,
"sng_by_trl": sng_trl_df,
"sng_by_raw_top30": sng_raw_df,
"sng_by_country_top30": sng_country_df,
"sng_by_status": sng_status_df,
"sng_trl_oper_planned": sng_trl_op_pl_df,
"sng_preview_top200": sng_preview,
}
write_chart_tables(out_charts_xlsx, chart_tables)
# SNG preview workbook (Top 200) for direct download
out_sng_preview_xlsx = f"IEA_Task33_sng_preview_{db_code}.xlsx"
with pd.ExcelWriter(out_sng_preview_xlsx, engine="openpyxl") as writer:
sng_preview.to_excel(writer, sheet_name="sng_preview_top200", index=False)
log("DONE.")
log(f"XLSX: {out_xlsx}")
log(f"CSV : {out_csv}")
return (
"\n".join(log_lines),
out_xlsx,
out_csv,
out_charts_xlsx,
out_sng_preview_xlsx,
status_df,
country_df,
type_df,
raw_df,
out_df,
sng_meta,
sng_trl_df,
sng_raw_df,
sng_country_df,
sng_status_df,
sng_trl_op_pl_df,
sng_preview,
)
# =============================
# UI (single page)
# =============================
with gr.Blocks(title="IEA Task 33 – Scrape + Summary + SNG Charts") as demo:
gr.Markdown(
"""
# IEA Task 33 Database – Scraper + 統計圖表(含 SNG 專題統計)
已移除 Table2(第二頁)功能,所有統計集中在本頁(Table1)。
新增 SNG 專題統計(OutputClass = SNG):
1) 依 TRL 分布計數(TRL 由 Type 解析)
2) 依 RawMaterial 分類計數
3) 依 Country 計數
4) 依 Status 計數
""".strip()
)
with gr.Row():
db_code = gr.Textbox(label="DB code", value=DEFAULT_DB_CODE)
timeout_sec = gr.Number(label="Timeout (sec)", value=30, precision=0)
retries = gr.Number(label="Retries", value=3, precision=0)
with gr.Row():
use_uploaded_html = gr.Checkbox(
label="使用上傳的 displaymap HTML(不從網路抓 displaymap)",
value=False,
)
uploaded_html_file = gr.File(
label="上傳 displaymap HTML(可選)",
file_types=[".html", ".htm"],
type="filepath",
)
with gr.Row():
sleep_sec = gr.Number(label="每筆請求延遲 (sec)", value=0.7)
max_projects = gr.Number(label="最多抓取筆數(0=全部)", value=0, precision=0)
run_btn = gr.Button("開始抓取並輸出 fullinfo (Excel/CSV) + 圖表", variant="primary")
log_box = gr.Textbox(label="Log", lines=10)
with gr.Row():
out_xlsx = gr.File(label="下載 Excel (fullinfo)")
out_csv = gr.File(label="下載 CSV (fullinfo)")
out_charts = gr.File(label="下載圖表統計表 (Excel)")
out_sng_preview = gr.File(label="下載 SNG cases preview (Top 200)")
gr.Markdown("## 全部案例:彙整圖表")
status_plot = gr.BarPlot(label="Status 分布", x="Count", y="Status")
country_plot = gr.BarPlot(label="Country Top 15", x="Count", y="Country")
type_plot = gr.BarPlot(label="Type / TRL Top 15", x="Count", y="Type")
raw_plot = gr.BarPlot(label="Raw Material Top 30", x="Count", y="RawMaterial")
out_plot = gr.BarPlot(label="Output 類型分布", x="Count", y="OutputClass")
gr.Markdown("## OutputClass = SNG:專題統計圖表")
sng_meta_df = gr.Dataframe(label="SNG Meta", interactive=False)
with gr.Row():
sng_trl_plot = gr.BarPlot(label="SNG cases by TRL (derived from Type)", x="Count", y="TRL")
sng_status_plot = gr.BarPlot(label="SNG cases by Status", x="Count", y="Status")
with gr.Row():
sng_trl_op_pl_plot = gr.BarPlot(label="SNG cases by TRL (Status=operational+planned)", x="Count", y="TRL")
with gr.Row():
sng_raw_plot = gr.BarPlot(label="SNG cases by RawMaterial (Top 30)", x="Count", y="RawMaterial")
sng_country_plot = gr.BarPlot(label="SNG cases by Country (Top 30)", x="Count", y="Country")
sng_preview_df = gr.Dataframe(label="SNG cases preview (Top 200)", interactive=False)
run_btn.click(
fn=run_scrape,
inputs=[db_code, use_uploaded_html, uploaded_html_file, sleep_sec, timeout_sec, max_projects, retries],
outputs=[
log_box,
out_xlsx,
out_csv,
out_charts,
out_sng_preview,
status_plot,
country_plot,
type_plot,
raw_plot,
out_plot,
sng_meta_df,
sng_trl_plot,
sng_raw_plot,
sng_country_plot,
sng_status_plot,
sng_trl_op_pl_plot,
sng_preview_df,
],
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)