Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import re | |
| import time | |
| from typing import Dict, List, Optional, Tuple | |
| import gradio as gr | |
| import pandas as pd | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # ============================= | |
| # Config | |
| # ============================= | |
| DEFAULT_DB_CODE = "XQ62ks" | |
| BASE = "https://demoplants21.best-research.eu" | |
| HEADERS = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " | |
| "(KHTML, like Gecko) Chrome/120.0 Safari/537.36" | |
| ), | |
| "Accept-Language": "en-US,en;q=0.9", | |
| } | |
| # ============================= | |
| # Utilities | |
| # ============================= | |
| def _clean(s: str) -> str: | |
| s = (s or "").replace("\xa0", " ") | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def fetch_url_text(url: str, timeout: int = 30, retries: int = 3, backoff: float = 1.5) -> str: | |
| last_err = None | |
| for attempt in range(1, retries + 1): | |
| try: | |
| r = requests.get(url, headers=HEADERS, timeout=timeout) | |
| r.raise_for_status() | |
| return r.content.decode("utf-8", errors="ignore") | |
| except Exception as e: | |
| last_err = e | |
| time.sleep(backoff * attempt) | |
| raise last_err # type: ignore | |
| def extract_info_urls_from_displaymap_html(html_text: str, db_code: str) -> List[str]: | |
| ids = sorted({int(x) for x in re.findall(rf"/projects/info/(\d+)/{re.escape(db_code)}", html_text)}) | |
| return [f"{BASE}/projects/info/{pid}/{db_code}" for pid in ids] | |
| # ============================= | |
| # Parser (table-based) for info pages | |
| # ============================= | |
| def parse_info_page(html_text: str) -> Dict[str, Optional[str]]: | |
| soup = BeautifulSoup(html_text, "lxml") | |
| title = None | |
| h4 = soup.select_one("h4.modal-title") | |
| if h4: | |
| t = _clean(h4.get_text(" ", strip=True)) | |
| title = re.sub(r"^Project\s+", "", t).strip() | |
| last_update = None | |
| m = re.search( | |
| r"Last Update:\s*([0-9]{4}-[0-9]{2}-[0-9]{2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2})", | |
| html_text, | |
| ) | |
| if m: | |
| last_update = m.group(1) | |
| data: Dict[str, Optional[str]] = { | |
| "ProjectTitle": title, | |
| "ProjectOwner": None, | |
| "ProjectName": None, | |
| "Status": None, | |
| "Startup": None, | |
| "Country": None, | |
| "City": None, | |
| "ZIP": None, | |
| "Type": None, | |
| "Technology": None, | |
| "TechnologyAdditionalInfo": None, | |
| "RawMaterial": None, | |
| "Inputs": None, | |
| "Outputs": None, | |
| "TechnologyBrief": None, | |
| "AdditionalInformation": None, | |
| "ContactInformation": None, | |
| "LastUpdate": last_update, | |
| "Extra": None, | |
| } | |
| inputs: List[str] = [] | |
| outputs: List[str] = [] | |
| extras: Dict[str, List[Tuple[str, Optional[str]]]] = {} | |
| current_section = "Main" | |
| for el in soup.select("legend, table.viewproject_table"): | |
| if el.name == "legend": | |
| current_section = _clean(el.get_text(" ", strip=True)) or "Section" | |
| continue | |
| for tr in el.select("tr"): | |
| tds = tr.find_all("td") | |
| if len(tds) >= 2: | |
| k = _clean(tds[0].get_text(" ", strip=True)) | |
| v = _clean(tds[1].get_text(" ", strip=True)) | |
| v = v if v != "" else None | |
| k_norm = (k or "").lower() | |
| if k_norm.startswith("project owner"): | |
| data["ProjectOwner"] = v | |
| elif k_norm == "project name": | |
| data["ProjectName"] = v | |
| elif k_norm == "status": | |
| data["Status"] = v | |
| elif k_norm == "startup": | |
| data["Startup"] = v | |
| elif k_norm == "country": | |
| data["Country"] = v | |
| elif k_norm == "city": | |
| data["City"] = v | |
| elif k_norm == "zip": | |
| data["ZIP"] = v | |
| elif k_norm == "type": | |
| data["Type"] = v | |
| elif k_norm == "technology": | |
| data["Technology"] = v | |
| elif k_norm == "technology additional information": | |
| data["TechnologyAdditionalInfo"] = v | |
| elif k_norm == "raw material": | |
| data["RawMaterial"] = v | |
| elif re.match(r"^input\s*\d+$", k_norm): | |
| if v: | |
| inputs.append(v) | |
| elif re.match(r"^output\s*\d+$", k_norm): | |
| if v: | |
| outputs.append(v) | |
| elif k_norm == "technology brief": | |
| data["TechnologyBrief"] = v | |
| else: | |
| extras.setdefault(current_section, []) | |
| extras[current_section].append((k, v)) | |
| elif len(tds) == 1: | |
| v = _clean(tds[0].get_text(" ", strip=True)) | |
| if not v: | |
| continue | |
| if current_section.lower() == "contact information": | |
| data["ContactInformation"] = v | |
| else: | |
| extras.setdefault(current_section, []) | |
| extras[current_section].append(("(single_cell)", v)) | |
| data["Inputs"] = " | ".join(inputs) if inputs else None | |
| data["Outputs"] = " | ".join(outputs) if outputs else None | |
| for k in ("Technology", "RawMaterial", "Status", "Type", "Country"): | |
| if data.get(k) is not None and _clean(data[k] or "") == "": | |
| data[k] = None | |
| if extras: | |
| lines = [] | |
| for sec, kvs in extras.items(): | |
| for k, v in kvs: | |
| lines.append(f"[{sec}] {k}: {v}") | |
| data["Extra"] = "\n".join(lines) | |
| return data | |
| # ============================= | |
| # Derived fields | |
| # ============================= | |
| def derive_trl(type_text: Optional[str]) -> str: | |
| t = (type_text or "").strip() | |
| m = re.search(r"TRL\s*([0-9]+(?:\s*-\s*[0-9]+)?)", t, flags=re.I) | |
| if not m: | |
| return "Unknown" | |
| trl = m.group(1).replace(" ", "") | |
| return f"TRL{trl}" | |
| def classify_output(outputs_text: Optional[str]) -> str: | |
| x = (outputs_text or "").lower() | |
| if "sng" in x: | |
| return "SNG" | |
| if "hydrogen" in x or re.search(r"\bh2\b", x): | |
| return "H2" | |
| if "methanol" in x: | |
| return "Methanol" | |
| if "ethanol" in x: | |
| return "Ethanol" | |
| if "saf" in x or "jet" in x: | |
| return "SAF/Jet" | |
| if "ft" in x or "fischer" in x: | |
| return "FT liquids" | |
| if "power" in x or "electric" in x: | |
| return "Power" | |
| if "heat" in x or "district" in x: | |
| return "Heat" | |
| return "Other/Unclear" | |
| # ============================= | |
| # Summary helpers (fixed col names) | |
| # ============================= | |
| def vc(series: pd.Series, key: str, topn: Optional[int] = None) -> pd.DataFrame: | |
| if series is None: | |
| return pd.DataFrame({key: [], "Count": []}) | |
| s = series.fillna("Unknown").astype(str).str.strip().replace({"": "Unknown"}) | |
| if len(s) == 0: | |
| return pd.DataFrame({key: [], "Count": []}) | |
| out = s.value_counts() | |
| if topn: | |
| out = out.head(topn) | |
| out_df = out.rename_axis(key).reset_index(name="Count") | |
| out_df["Count"] = pd.to_numeric(out_df["Count"], errors="coerce").fillna(0).astype(int) | |
| return out_df | |
| def build_summaries(df: pd.DataFrame): | |
| status_df = vc(df.get("Status", pd.Series(dtype=str)), "Status") | |
| country_df = vc(df.get("Country", pd.Series(dtype=str)), "Country", topn=15) | |
| type_df = vc(df.get("Type", pd.Series(dtype=str)), "Type", topn=15) | |
| raw_df = vc(df.get("RawMaterial", pd.Series(dtype=str)), "RawMaterial", topn=30) | |
| out_df = vc(df.get("OutputClass", pd.Series(dtype=str)), "OutputClass") | |
| return status_df, country_df, type_df, raw_df, out_df | |
| def write_outputs(df: pd.DataFrame, out_xlsx: str, out_csv: str) -> None: | |
| df.to_csv(out_csv, index=False, encoding="utf-8-sig") | |
| status_df, country_df, type_df, raw_df, out_df = build_summaries(df) | |
| with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer: | |
| df.to_excel(writer, sheet_name="cases_fullinfo", index=False) | |
| status_df.to_excel(writer, sheet_name="summary_status", index=False) | |
| country_df.to_excel(writer, sheet_name="summary_country_top15", index=False) | |
| type_df.to_excel(writer, sheet_name="summary_type_top15", index=False) | |
| raw_df.to_excel(writer, sheet_name="summary_raw_material_top30", index=False) | |
| out_df.to_excel(writer, sheet_name="summary_output_class", index=False) | |
| def write_chart_tables(out_xlsx: str, tables: Dict[str, pd.DataFrame]) -> None: | |
| """Write all chart/analysis tables to a single Excel workbook for download.""" | |
| with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer: | |
| for sheet, df in tables.items(): | |
| if df is None: | |
| continue | |
| try: | |
| _df = df.copy() | |
| except Exception: | |
| continue | |
| sheet_name = str(sheet)[:31] # Excel sheet name max length | |
| _df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| # ============================= | |
| # SNG-focused summaries | |
| # ============================= | |
| def build_sng_summaries(df: pd.DataFrame): | |
| df_sng = df.copy() | |
| if "OutputClass" in df_sng.columns: | |
| df_sng = df_sng[df_sng["OutputClass"].fillna("").astype(str) == "SNG"] | |
| else: | |
| df_sng = df_sng.iloc[0:0] | |
| sng_trl_df = vc(df_sng.get("TRL", pd.Series(dtype=str)), "TRL") | |
| sng_raw_df = vc(df_sng.get("RawMaterial", pd.Series(dtype=str)), "RawMaterial", topn=30) | |
| sng_country_df = vc(df_sng.get("Country", pd.Series(dtype=str)), "Country", topn=30) | |
| sng_status_df = vc(df_sng.get("Status", pd.Series(dtype=str)), "Status") | |
| # NEW: OutputClass=SNG AND Status in {operational, planned} -> TRL distribution | |
| df_sng_op_pl = df_sng.copy() | |
| if "Status" in df_sng_op_pl.columns: | |
| st = df_sng_op_pl["Status"].fillna("").astype(str).str.strip().str.lower() | |
| df_sng_op_pl = df_sng_op_pl[st.isin(["operational", "planned"])] | |
| sng_trl_op_pl_df = vc(df_sng_op_pl.get("TRL", pd.Series(dtype=str)), "TRL") | |
| meta = pd.DataFrame({ | |
| "Metric": ["SNG cases", "Total cases"], | |
| "Value": [len(df_sng), len(df)], | |
| }) | |
| return meta, sng_trl_df, sng_raw_df, sng_country_df, sng_status_df, sng_trl_op_pl_df, df_sng.head(200) | |
| # ============================= | |
| # Main scrape function | |
| # ============================= | |
| def run_scrape( | |
| db_code: str, | |
| use_uploaded_html: bool, | |
| uploaded_html_file, | |
| sleep_sec: float, | |
| timeout_sec: int, | |
| max_projects: int, | |
| retries: int, | |
| progress=gr.Progress(track_tqdm=False), | |
| ): | |
| db_code = (db_code or DEFAULT_DB_CODE).strip() | |
| sleep_sec = float(sleep_sec or 0.0) | |
| timeout_sec = int(timeout_sec or 30) | |
| max_projects = int(max_projects or 0) | |
| retries = int(retries or 3) | |
| log_lines = [] | |
| def log(msg: str): | |
| log_lines.append(msg) | |
| # 1) displaymap HTML | |
| if use_uploaded_html: | |
| if uploaded_html_file is None: | |
| raise gr.Error("你選了「使用上傳 HTML」,但未上傳檔案。") | |
| file_path = str(uploaded_html_file) | |
| if not os.path.exists(file_path): | |
| raise gr.Error(f"找不到上傳檔案路徑:{file_path}") | |
| with open(file_path, "rb") as f: | |
| displaymap_html = f.read().decode("utf-8", errors="ignore") | |
| log("Loaded displaymap HTML from uploaded file.") | |
| else: | |
| displaymap_url = f"{BASE}/projects/displaymap/{db_code}" | |
| log(f"Fetching displaymap: {displaymap_url}") | |
| displaymap_html = fetch_url_text(displaymap_url, timeout=timeout_sec, retries=retries) | |
| log("Fetched displaymap HTML from web.") | |
| # 2) extract info URLs | |
| info_urls = extract_info_urls_from_displaymap_html(displaymap_html, db_code=db_code) | |
| total = len(info_urls) | |
| if total == 0: | |
| raise gr.Error("未解析到任何 /projects/info/{id}/... 連結。請改用上傳 displaymap HTML。") | |
| log(f"Extracted {total} info URLs.") | |
| if max_projects > 0: | |
| info_urls = info_urls[:max_projects] | |
| log(f"Apply max_projects={max_projects}, will scrape {len(info_urls)} URLs.") | |
| # 3) scrape each info page | |
| rows = [] | |
| for idx, url in enumerate(info_urls, start=1): | |
| progress((idx - 1) / len(info_urls), desc=f"Scraping {idx}/{len(info_urls)}") | |
| pid_match = re.search(r"/projects/info/(\d+)/", url) | |
| pid = int(pid_match.group(1)) if pid_match else None | |
| try: | |
| html_text = fetch_url_text(url, timeout=timeout_sec, retries=retries) | |
| data = parse_info_page(html_text) | |
| data["ProjectID"] = pid | |
| data["InfoURL"] = url | |
| rows.append(data) | |
| except Exception as e: | |
| rows.append({"ProjectID": pid, "InfoURL": url, "Error": str(e)}) | |
| time.sleep(max(0.0, sleep_sec)) | |
| if idx % 25 == 0: | |
| log(f"Fetched {idx}/{len(info_urls)}") | |
| progress(1.0, desc="Building outputs...") | |
| df = pd.DataFrame(rows) | |
| # derived columns | |
| df["TRL"] = df["Type"].apply(derive_trl) if "Type" in df.columns else "Unknown" | |
| df["OutputClass"] = df["Outputs"].apply(classify_output) if "Outputs" in df.columns else "Other/Unclear" | |
| out_xlsx = f"IEA_Task33_cases_fullinfo_{db_code}.xlsx" | |
| out_csv = f"IEA_Task33_cases_fullinfo_{db_code}.csv" | |
| write_outputs(df, out_xlsx, out_csv) | |
| status_df, country_df, type_df, raw_df, out_df = build_summaries(df) | |
| sng_meta, sng_trl_df, sng_raw_df, sng_country_df, sng_status_df, sng_trl_op_pl_df, sng_preview = build_sng_summaries(df) | |
| # Chart tables workbook (download all analysis tables) | |
| out_charts_xlsx = f"IEA_Task33_chart_tables_{db_code}.xlsx" | |
| chart_tables = { | |
| "all_status": status_df, | |
| "all_country_top15": country_df, | |
| "all_type_top15": type_df, | |
| "all_raw_top30": raw_df, | |
| "all_outputclass": out_df, | |
| "sng_meta": sng_meta, | |
| "sng_by_trl": sng_trl_df, | |
| "sng_by_raw_top30": sng_raw_df, | |
| "sng_by_country_top30": sng_country_df, | |
| "sng_by_status": sng_status_df, | |
| "sng_trl_oper_planned": sng_trl_op_pl_df, | |
| "sng_preview_top200": sng_preview, | |
| } | |
| write_chart_tables(out_charts_xlsx, chart_tables) | |
| # SNG preview workbook (Top 200) for direct download | |
| out_sng_preview_xlsx = f"IEA_Task33_sng_preview_{db_code}.xlsx" | |
| with pd.ExcelWriter(out_sng_preview_xlsx, engine="openpyxl") as writer: | |
| sng_preview.to_excel(writer, sheet_name="sng_preview_top200", index=False) | |
| log("DONE.") | |
| log(f"XLSX: {out_xlsx}") | |
| log(f"CSV : {out_csv}") | |
| return ( | |
| "\n".join(log_lines), | |
| out_xlsx, | |
| out_csv, | |
| out_charts_xlsx, | |
| out_sng_preview_xlsx, | |
| status_df, | |
| country_df, | |
| type_df, | |
| raw_df, | |
| out_df, | |
| sng_meta, | |
| sng_trl_df, | |
| sng_raw_df, | |
| sng_country_df, | |
| sng_status_df, | |
| sng_trl_op_pl_df, | |
| sng_preview, | |
| ) | |
| # ============================= | |
| # UI (single page) | |
| # ============================= | |
| with gr.Blocks(title="IEA Task 33 – Scrape + Summary + SNG Charts") as demo: | |
| gr.Markdown( | |
| """ | |
| # IEA Task 33 Database – Scraper + 統計圖表(含 SNG 專題統計) | |
| 已移除 Table2(第二頁)功能,所有統計集中在本頁(Table1)。 | |
| 新增 SNG 專題統計(OutputClass = SNG): | |
| 1) 依 TRL 分布計數(TRL 由 Type 解析) | |
| 2) 依 RawMaterial 分類計數 | |
| 3) 依 Country 計數 | |
| 4) 依 Status 計數 | |
| """.strip() | |
| ) | |
| with gr.Row(): | |
| db_code = gr.Textbox(label="DB code", value=DEFAULT_DB_CODE) | |
| timeout_sec = gr.Number(label="Timeout (sec)", value=30, precision=0) | |
| retries = gr.Number(label="Retries", value=3, precision=0) | |
| with gr.Row(): | |
| use_uploaded_html = gr.Checkbox( | |
| label="使用上傳的 displaymap HTML(不從網路抓 displaymap)", | |
| value=False, | |
| ) | |
| uploaded_html_file = gr.File( | |
| label="上傳 displaymap HTML(可選)", | |
| file_types=[".html", ".htm"], | |
| type="filepath", | |
| ) | |
| with gr.Row(): | |
| sleep_sec = gr.Number(label="每筆請求延遲 (sec)", value=0.7) | |
| max_projects = gr.Number(label="最多抓取筆數(0=全部)", value=0, precision=0) | |
| run_btn = gr.Button("開始抓取並輸出 fullinfo (Excel/CSV) + 圖表", variant="primary") | |
| log_box = gr.Textbox(label="Log", lines=10) | |
| with gr.Row(): | |
| out_xlsx = gr.File(label="下載 Excel (fullinfo)") | |
| out_csv = gr.File(label="下載 CSV (fullinfo)") | |
| out_charts = gr.File(label="下載圖表統計表 (Excel)") | |
| out_sng_preview = gr.File(label="下載 SNG cases preview (Top 200)") | |
| gr.Markdown("## 全部案例:彙整圖表") | |
| status_plot = gr.BarPlot(label="Status 分布", x="Count", y="Status") | |
| country_plot = gr.BarPlot(label="Country Top 15", x="Count", y="Country") | |
| type_plot = gr.BarPlot(label="Type / TRL Top 15", x="Count", y="Type") | |
| raw_plot = gr.BarPlot(label="Raw Material Top 30", x="Count", y="RawMaterial") | |
| out_plot = gr.BarPlot(label="Output 類型分布", x="Count", y="OutputClass") | |
| gr.Markdown("## OutputClass = SNG:專題統計圖表") | |
| sng_meta_df = gr.Dataframe(label="SNG Meta", interactive=False) | |
| with gr.Row(): | |
| sng_trl_plot = gr.BarPlot(label="SNG cases by TRL (derived from Type)", x="Count", y="TRL") | |
| sng_status_plot = gr.BarPlot(label="SNG cases by Status", x="Count", y="Status") | |
| with gr.Row(): | |
| sng_trl_op_pl_plot = gr.BarPlot(label="SNG cases by TRL (Status=operational+planned)", x="Count", y="TRL") | |
| with gr.Row(): | |
| sng_raw_plot = gr.BarPlot(label="SNG cases by RawMaterial (Top 30)", x="Count", y="RawMaterial") | |
| sng_country_plot = gr.BarPlot(label="SNG cases by Country (Top 30)", x="Count", y="Country") | |
| sng_preview_df = gr.Dataframe(label="SNG cases preview (Top 200)", interactive=False) | |
| run_btn.click( | |
| fn=run_scrape, | |
| inputs=[db_code, use_uploaded_html, uploaded_html_file, sleep_sec, timeout_sec, max_projects, retries], | |
| outputs=[ | |
| log_box, | |
| out_xlsx, | |
| out_csv, | |
| out_charts, | |
| out_sng_preview, | |
| status_plot, | |
| country_plot, | |
| type_plot, | |
| raw_plot, | |
| out_plot, | |
| sng_meta_df, | |
| sng_trl_plot, | |
| sng_raw_plot, | |
| sng_country_plot, | |
| sng_status_plot, | |
| sng_trl_op_pl_plot, | |
| sng_preview_df, | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) | |