| | import streamlit as st |
| | import re |
| | import lark_oapi as lark |
| | from lark_oapi.api.bitable.v1 import * |
| | import time |
| | import openpyxl |
| | import os |
| | from dotenv import load_dotenv |
| | import logging |
| | from tenacity import retry, stop_after_attempt, wait_exponential |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | load_dotenv() |
| |
|
| | def parse_bitable_url(url): |
| | """ |
| | Extracts app_token and table_id from a Bitable URL. |
| | Expected format: .../base/<app_token>?table=<table_id>... |
| | """ |
| | try: |
| | |
| | app_token_match = re.search(r'/base/([^/?]+)', url) |
| | if not app_token_match: |
| | return None, None |
| | app_token = app_token_match.group(1) |
| |
|
| | |
| | table_id_match = re.search(r'table=([^&]+)', url) |
| | if not table_id_match: |
| | return None, None |
| | table_id = table_id_match.group(1) |
| | |
| | return app_token, table_id |
| | except Exception: |
| | return None, None |
| |
|
| | def get_bitable_client(app_id, app_secret): |
| | return lark.Client.builder().app_id(app_id).app_secret(app_secret).build() |
| |
|
| | def get_table_fields(client, app_token, table_id): |
| | logger.info(f"Fetching fields for app_token={app_token}, table_id={table_id}") |
| | fields = [] |
| | page_token = None |
| | has_more = True |
| | |
| | while has_more: |
| | req = ListAppTableFieldRequest.builder() \ |
| | .app_token(app_token) \ |
| | .table_id(table_id) \ |
| | .page_size(100) |
| | |
| | if page_token: |
| | req.page_token(page_token) |
| | |
| | req = req.build() |
| | |
| | try: |
| | resp = client.bitable.v1.app_table_field.list(req) |
| | except Exception as e: |
| | logger.error(f"API call failed: {e}", exc_info=True) |
| | raise |
| |
|
| | if not resp.success(): |
| | logger.error(f"Failed to list fields: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}") |
| | raise Exception(f"Failed to list fields: {resp.msg}, log_id: {resp.get_log_id()}") |
| | |
| | if resp.data and resp.data.items: |
| | fields.extend(resp.data.items) |
| | logger.info(f"Fetched {len(resp.data.items)} fields. Current total: {len(fields)}") |
| | logger.info(f"Page token: {page_token} -> {resp.data.page_token}, Has more: {resp.data.has_more}") |
| | |
| | |
| | |
| | |
| | |
| | |
| | items_count = len(resp.data.items) if (resp.data and resp.data.items) else 0 |
| | |
| | if items_count < 100: |
| | has_more = False |
| | else: |
| | has_more = resp.data.has_more |
| | |
| | |
| | new_page_token = resp.data.page_token |
| | |
| | if has_more and new_page_token == page_token: |
| | logger.warning("Infinite loop detected: page_token did not update. Breaking.") |
| | has_more = False |
| | |
| | page_token = new_page_token |
| | |
| | logger.info(f"Total fields fetched: {len(fields)}") |
| | return fields |
| |
|
| | def validate_primary_keys(excel_columns, target_fields): |
| | |
| | |
| | |
| | |
| | |
| | target_pk = next((f for f in target_fields if f.is_primary), None) |
| | |
| | if not target_pk: |
| | return False, "Target table has no primary key." |
| | |
| | if target_pk.field_name not in excel_columns: |
| | return False, f"Target Primary Key '{target_pk.field_name}' not found in Excel columns." |
| | |
| | |
| | |
| | return True, f"Primary key '{target_pk.field_name}' found in Excel." |
| |
|
| | @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) |
| | def _send_batch(client, app_token, table_id, records): |
| | req = BatchCreateAppTableRecordRequest.builder() \ |
| | .app_token(app_token) \ |
| | .table_id(table_id) \ |
| | .request_body(BatchCreateAppTableRecordRequestBody.builder().records(records).build()) \ |
| | .build() |
| | |
| | resp = client.bitable.v1.app_table_record.batch_create(req) |
| | if not resp.success(): |
| | logger.error(f"Batch create failed: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}") |
| | if resp.error: |
| | logger.error(f"Error details: {resp.error}") |
| | raise Exception(f"Batch create failed: {resp.msg}, log_id: {resp.get_log_id()}") |
| |
|
| | def sync_data(client, excel_rows, excel_columns, special_col_name, manual_source_value, excel_file_name, target_app_token, target_table_id, target_fields): |
| | target_field_names = {f.field_name: f for f in target_fields} |
| | |
| | batch_records = [] |
| | total_records = len(excel_rows) |
| | |
| | if total_records == 0: |
| | st.warning("No records to sync.") |
| | return |
| |
|
| | progress_bar = st.progress(0) |
| | status_text = st.empty() |
| | processed_count = 0 |
| | |
| | for row in excel_rows: |
| | new_fields = {} |
| | |
| | |
| | row_dict = dict(zip(excel_columns, row)) |
| | |
| | |
| | for col_name, val in row_dict.items(): |
| | if col_name in target_field_names and val is not None: |
| | |
| | |
| | |
| | |
| | |
| | |
| | target_type = target_field_names[col_name].type |
| | |
| | if target_type == 5: |
| | if hasattr(val, 'timestamp'): |
| | new_fields[col_name] = int(val.timestamp() * 1000) |
| | else: |
| | new_fields[col_name] = val |
| | elif target_type == 4: |
| | |
| | if isinstance(val, str): |
| | new_fields[col_name] = [v.strip() for v in val.split(',')] |
| | else: |
| | new_fields[col_name] = val |
| | elif target_type == 11: |
| | |
| | |
| | |
| | |
| | pass |
| | else: |
| | new_fields[col_name] = val |
| | |
| | |
| | if special_col_name and special_col_name in target_field_names: |
| | target_col_def = target_field_names[special_col_name] |
| | |
| | |
| | final_val = manual_source_value if manual_source_value else f"Imported from {excel_file_name}" |
| | |
| | if target_col_def.type == 15: |
| | |
| | |
| | link_url = str(final_val) if final_val else "" |
| | if not link_url.startswith("http"): |
| | link_url = "" |
| | |
| | new_fields[special_col_name] = { |
| | "link": link_url, |
| | "text": str(final_val) |
| | } |
| | else: |
| | new_fields[special_col_name] = final_val |
| |
|
| | batch_records.append(AppTableRecord.builder().fields(new_fields).build()) |
| | |
| | if len(batch_records) >= 100: |
| | _send_batch(client, target_app_token, target_table_id, batch_records) |
| | processed_count += len(batch_records) |
| | progress_bar.progress(processed_count / total_records) |
| | status_text.text(f"Synced {processed_count}/{total_records} records") |
| | batch_records = [] |
| | |
| | if batch_records: |
| | _send_batch(client, target_app_token, target_table_id, batch_records) |
| | processed_count += len(batch_records) |
| | progress_bar.progress(1.0) |
| | status_text.text(f"Synced {processed_count}/{total_records} records") |
| |
|
| | def main(): |
| | st.set_page_config(page_title="Bitbcpy - 多维表格合并工具", layout="wide") |
| | st.title("Bitbcpy - 多维表格合并脚本") |
| | st.markdown("将本地 Excel 的行抄到目标表,同名列合并,忽略目标表不存在的列。") |
| |
|
| | if 'target_fields' not in st.session_state: |
| | st.session_state.target_fields = [] |
| | if 'target_fields_loaded' not in st.session_state: |
| | st.session_state.target_fields_loaded = False |
| |
|
| | with st.sidebar: |
| | st.header("配置") |
| | |
| | default_app_id = os.getenv("LARK_OAPI_APPID", "") |
| | default_app_secret = os.getenv("LARK_OAPI_SECRET", "") |
| | |
| | app_id = st.text_input("App ID", value=default_app_id) |
| | app_secret = st.text_input("App Secret", type="password", value=default_app_secret) |
| | |
| | target_url = st.text_input("目标文档链接") |
| | |
| | if st.button("读取目标表字段"): |
| | if not app_id or not app_secret: |
| | st.error("请填写 App ID 和 App Secret") |
| | elif not target_url: |
| | st.error("请填写目标文档链接") |
| | else: |
| | t_token, t_table = parse_bitable_url(target_url) |
| | if not (t_token and t_table): |
| | st.error("目标链接解析失败") |
| | else: |
| | client = get_bitable_client(app_id, app_secret) |
| | try: |
| | with st.spinner("Fetching schema..."): |
| | fields = get_table_fields(client, t_token, t_table) |
| | st.session_state.target_fields = fields |
| | st.session_state.target_fields_loaded = True |
| | st.success("字段加载成功") |
| | except Exception as e: |
| | st.error(f"Error: {str(e)}") |
| | |
| | source_col = None |
| | manual_source_val = None |
| | |
| | if st.session_state.target_fields_loaded: |
| | field_names = [f.field_name for f in st.session_state.target_fields] |
| | source_col = st.selectbox("哪一列记录来源", options=field_names) |
| | manual_source_val = st.text_input("该列统一填写的来源信息", help="例如:2023年Q1销售数据") |
| |
|
| | uploaded_file = st.file_uploader("上传来源 Excel (xlsx)", type=['xlsx']) |
| |
|
| | if st.button("开始合并"): |
| | if not app_id or not app_secret: |
| | st.error("请在左侧侧边栏填写 App ID 和 App Secret") |
| | return |
| | |
| | if not uploaded_file: |
| | st.error("请上传来源 Excel 文件") |
| | return |
| | |
| | t_token, t_table = parse_bitable_url(target_url) |
| | |
| | if not (t_token and t_table): |
| | st.error("目标链接解析失败,请检查链接格式") |
| | return |
| | |
| | client = get_bitable_client(app_id, app_secret) |
| | |
| | try: |
| | |
| | wb = openpyxl.load_workbook(uploaded_file, data_only=True) |
| | sheet = wb.active |
| | |
| | |
| | rows = list(sheet.iter_rows(values_only=True)) |
| | if not rows: |
| | st.error("Excel 文件为空") |
| | return |
| | |
| | header = rows[0] |
| | data_rows = rows[1:] |
| | |
| | |
| | excel_columns = [str(h) for h in header if h is not None] |
| | |
| | |
| | t_fields = st.session_state.target_fields |
| | if not t_fields: |
| | with st.spinner("Fetching target table schema..."): |
| | t_fields = get_table_fields(client, t_token, t_table) |
| | |
| | valid, msg = validate_primary_keys(excel_columns, t_fields) |
| | if not valid: |
| | st.error(f"主键校验失败: {msg}") |
| | return |
| | else: |
| | st.success("主键校验通过") |
| | |
| | st.info(f"Found {len(data_rows)} records in Excel.") |
| | |
| | sync_data(client, data_rows, excel_columns, source_col, manual_source_val, uploaded_file.name, t_token, t_table, t_fields) |
| | st.success("合并完成!") |
| | |
| | except Exception as e: |
| | logger.error(f"Error in main loop: {e}", exc_info=True) |
| | st.error(f"Error: {str(e)}") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|