import streamlit as st import re import lark_oapi as lark from lark_oapi.api.bitable.v1 import * import time import openpyxl import os from dotenv import load_dotenv import logging from tenacity import retry, stop_after_attempt, wait_exponential # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() def parse_bitable_url(url): """ Extracts app_token and table_id from a Bitable URL. Expected format: .../base/?table=... """ try: # Extract app_token app_token_match = re.search(r'/base/([^/?]+)', url) if not app_token_match: return None, None app_token = app_token_match.group(1) # Extract table_id table_id_match = re.search(r'table=([^&]+)', url) if not table_id_match: return None, None table_id = table_id_match.group(1) return app_token, table_id except Exception: return None, None def get_bitable_client(app_id, app_secret): return lark.Client.builder().app_id(app_id).app_secret(app_secret).build() def get_table_fields(client, app_token, table_id): logger.info(f"Fetching fields for app_token={app_token}, table_id={table_id}") fields = [] page_token = None has_more = True while has_more: req = ListAppTableFieldRequest.builder() \ .app_token(app_token) \ .table_id(table_id) \ .page_size(100) if page_token: req.page_token(page_token) req = req.build() try: resp = client.bitable.v1.app_table_field.list(req) except Exception as e: logger.error(f"API call failed: {e}", exc_info=True) raise if not resp.success(): logger.error(f"Failed to list fields: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}") raise Exception(f"Failed to list fields: {resp.msg}, log_id: {resp.get_log_id()}") if resp.data and resp.data.items: fields.extend(resp.data.items) logger.info(f"Fetched {len(resp.data.items)} fields. Current total: {len(fields)}") logger.info(f"Page token: {page_token} -> {resp.data.page_token}, Has more: {resp.data.has_more}") # Determine if we should continue based on result count # Some APIs return has_more=True even if they return fewer items than page_size on the last page. # But generally, if len(items) < page_size, it's the last page. # Or if len(items) == 0, we are done. items_count = len(resp.data.items) if (resp.data and resp.data.items) else 0 if items_count < 100: has_more = False else: has_more = resp.data.has_more # Still keep the infinite loop safeguard just in case new_page_token = resp.data.page_token if has_more and new_page_token == page_token: logger.warning("Infinite loop detected: page_token did not update. Breaking.") has_more = False page_token = new_page_token logger.info(f"Total fields fetched: {len(fields)}") return fields def validate_primary_keys(excel_columns, target_fields): # For Excel, we assume the first column is the primary key (or at least the one to check against) # But usually Excel doesn't have strict schema. # However, the user asked to check "Primary Key Name Type". # We can check if the Target PK name exists in Excel columns. target_pk = next((f for f in target_fields if f.is_primary), None) if not target_pk: return False, "Target table has no primary key." if target_pk.field_name not in excel_columns: return False, f"Target Primary Key '{target_pk.field_name}' not found in Excel columns." # We can't strictly validate TYPE from Excel as Excel types are loose, # but we validated the existence of the PK column. return True, f"Primary key '{target_pk.field_name}' found in Excel." @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def _send_batch(client, app_token, table_id, records): req = BatchCreateAppTableRecordRequest.builder() \ .app_token(app_token) \ .table_id(table_id) \ .request_body(BatchCreateAppTableRecordRequestBody.builder().records(records).build()) \ .build() resp = client.bitable.v1.app_table_record.batch_create(req) if not resp.success(): logger.error(f"Batch create failed: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}") if resp.error: logger.error(f"Error details: {resp.error}") raise Exception(f"Batch create failed: {resp.msg}, log_id: {resp.get_log_id()}") def sync_data(client, excel_rows, excel_columns, special_col_name, manual_source_value, excel_file_name, target_app_token, target_table_id, target_fields): target_field_names = {f.field_name: f for f in target_fields} batch_records = [] total_records = len(excel_rows) if total_records == 0: st.warning("No records to sync.") return progress_bar = st.progress(0) status_text = st.empty() processed_count = 0 for row in excel_rows: new_fields = {} # row is a tuple/list matching excel_columns order row_dict = dict(zip(excel_columns, row)) # 1. Copy common fields for col_name, val in row_dict.items(): if col_name in target_field_names and val is not None: # Basic type conversion for API # Excel reads everything as is (int, float, datetime, string) # API expects specific formats sometimes. # For simplicity, we send as is, assuming Lark SDK or API handles basic types. # DateTime might need conversion to timestamp if API requires it. # Lark API expects Unix timestamp (ms) for Date fields usually. target_type = target_field_names[col_name].type if target_type == 5: # Date if hasattr(val, 'timestamp'): # datetime object new_fields[col_name] = int(val.timestamp() * 1000) else: new_fields[col_name] = val elif target_type == 4: # MultiSelect # Excel might have comma separated string? if isinstance(val, str): new_fields[col_name] = [v.strip() for v in val.split(',')] else: new_fields[col_name] = val elif target_type == 11: # User # User field is complex (list of objects). Excel might just have names. # We might skip or try to pass simple text? # Writing text to User field usually fails or requires specific format. # Let's skip complex fields for now or assume user knows what they are doing. pass else: new_fields[col_name] = val # 2. Handle Special "Source Record" Column if special_col_name and special_col_name in target_field_names: target_col_def = target_field_names[special_col_name] # Use manual value provided by user if available, otherwise fallback to filename final_val = manual_source_value if manual_source_value else f"Imported from {excel_file_name}" if target_col_def.type == 15: # Hyperlink # If final_val is a string that looks like a URL, use it # If it's just text, treat as text link_url = str(final_val) if final_val else "" if not link_url.startswith("http"): link_url = "" # Can't link if no URL new_fields[special_col_name] = { "link": link_url, "text": str(final_val) } else: new_fields[special_col_name] = final_val batch_records.append(AppTableRecord.builder().fields(new_fields).build()) if len(batch_records) >= 100: _send_batch(client, target_app_token, target_table_id, batch_records) processed_count += len(batch_records) progress_bar.progress(processed_count / total_records) status_text.text(f"Synced {processed_count}/{total_records} records") batch_records = [] if batch_records: _send_batch(client, target_app_token, target_table_id, batch_records) processed_count += len(batch_records) progress_bar.progress(1.0) status_text.text(f"Synced {processed_count}/{total_records} records") def main(): st.set_page_config(page_title="Bitbcpy - 多维表格合并工具", layout="wide") st.title("Bitbcpy - 多维表格合并脚本") st.markdown("将本地 Excel 的行抄到目标表,同名列合并,忽略目标表不存在的列。") if 'target_fields' not in st.session_state: st.session_state.target_fields = [] if 'target_fields_loaded' not in st.session_state: st.session_state.target_fields_loaded = False with st.sidebar: st.header("配置") default_app_id = os.getenv("LARK_OAPI_APPID", "") default_app_secret = os.getenv("LARK_OAPI_SECRET", "") app_id = st.text_input("App ID", value=default_app_id) app_secret = st.text_input("App Secret", type="password", value=default_app_secret) target_url = st.text_input("目标文档链接") if st.button("读取目标表字段"): if not app_id or not app_secret: st.error("请填写 App ID 和 App Secret") elif not target_url: st.error("请填写目标文档链接") else: t_token, t_table = parse_bitable_url(target_url) if not (t_token and t_table): st.error("目标链接解析失败") else: client = get_bitable_client(app_id, app_secret) try: with st.spinner("Fetching schema..."): fields = get_table_fields(client, t_token, t_table) st.session_state.target_fields = fields st.session_state.target_fields_loaded = True st.success("字段加载成功") except Exception as e: st.error(f"Error: {str(e)}") source_col = None manual_source_val = None if st.session_state.target_fields_loaded: field_names = [f.field_name for f in st.session_state.target_fields] source_col = st.selectbox("哪一列记录来源", options=field_names) manual_source_val = st.text_input("该列统一填写的来源信息", help="例如:2023年Q1销售数据") uploaded_file = st.file_uploader("上传来源 Excel (xlsx)", type=['xlsx']) if st.button("开始合并"): if not app_id or not app_secret: st.error("请在左侧侧边栏填写 App ID 和 App Secret") return if not uploaded_file: st.error("请上传来源 Excel 文件") return t_token, t_table = parse_bitable_url(target_url) if not (t_token and t_table): st.error("目标链接解析失败,请检查链接格式") return client = get_bitable_client(app_id, app_secret) try: # Read Excel wb = openpyxl.load_workbook(uploaded_file, data_only=True) sheet = wb.active # Assuming first row is header rows = list(sheet.iter_rows(values_only=True)) if not rows: st.error("Excel 文件为空") return header = rows[0] data_rows = rows[1:] # Filter None headers excel_columns = [str(h) for h in header if h is not None] # Use cached fields if available, else fetch t_fields = st.session_state.target_fields if not t_fields: with st.spinner("Fetching target table schema..."): t_fields = get_table_fields(client, t_token, t_table) valid, msg = validate_primary_keys(excel_columns, t_fields) if not valid: st.error(f"主键校验失败: {msg}") return else: st.success("主键校验通过") st.info(f"Found {len(data_rows)} records in Excel.") sync_data(client, data_rows, excel_columns, source_col, manual_source_val, uploaded_file.name, t_token, t_table, t_fields) st.success("合并完成!") except Exception as e: logger.error(f"Error in main loop: {e}", exc_info=True) st.error(f"Error: {str(e)}") if __name__ == "__main__": main()