File size: 13,479 Bytes
9167aa8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | import streamlit as st
import re
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import *
import time
import openpyxl
import os
from dotenv import load_dotenv
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
def parse_bitable_url(url):
"""
Extracts app_token and table_id from a Bitable URL.
Expected format: .../base/<app_token>?table=<table_id>...
"""
try:
# Extract app_token
app_token_match = re.search(r'/base/([^/?]+)', url)
if not app_token_match:
return None, None
app_token = app_token_match.group(1)
# Extract table_id
table_id_match = re.search(r'table=([^&]+)', url)
if not table_id_match:
return None, None
table_id = table_id_match.group(1)
return app_token, table_id
except Exception:
return None, None
def get_bitable_client(app_id, app_secret):
return lark.Client.builder().app_id(app_id).app_secret(app_secret).build()
def get_table_fields(client, app_token, table_id):
logger.info(f"Fetching fields for app_token={app_token}, table_id={table_id}")
fields = []
page_token = None
has_more = True
while has_more:
req = ListAppTableFieldRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(100)
if page_token:
req.page_token(page_token)
req = req.build()
try:
resp = client.bitable.v1.app_table_field.list(req)
except Exception as e:
logger.error(f"API call failed: {e}", exc_info=True)
raise
if not resp.success():
logger.error(f"Failed to list fields: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}")
raise Exception(f"Failed to list fields: {resp.msg}, log_id: {resp.get_log_id()}")
if resp.data and resp.data.items:
fields.extend(resp.data.items)
logger.info(f"Fetched {len(resp.data.items)} fields. Current total: {len(fields)}")
logger.info(f"Page token: {page_token} -> {resp.data.page_token}, Has more: {resp.data.has_more}")
# Determine if we should continue based on result count
# Some APIs return has_more=True even if they return fewer items than page_size on the last page.
# But generally, if len(items) < page_size, it's the last page.
# Or if len(items) == 0, we are done.
items_count = len(resp.data.items) if (resp.data and resp.data.items) else 0
if items_count < 100:
has_more = False
else:
has_more = resp.data.has_more
# Still keep the infinite loop safeguard just in case
new_page_token = resp.data.page_token
if has_more and new_page_token == page_token:
logger.warning("Infinite loop detected: page_token did not update. Breaking.")
has_more = False
page_token = new_page_token
logger.info(f"Total fields fetched: {len(fields)}")
return fields
def validate_primary_keys(excel_columns, target_fields):
# For Excel, we assume the first column is the primary key (or at least the one to check against)
# But usually Excel doesn't have strict schema.
# However, the user asked to check "Primary Key Name Type".
# We can check if the Target PK name exists in Excel columns.
target_pk = next((f for f in target_fields if f.is_primary), None)
if not target_pk:
return False, "Target table has no primary key."
if target_pk.field_name not in excel_columns:
return False, f"Target Primary Key '{target_pk.field_name}' not found in Excel columns."
# We can't strictly validate TYPE from Excel as Excel types are loose,
# but we validated the existence of the PK column.
return True, f"Primary key '{target_pk.field_name}' found in Excel."
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def _send_batch(client, app_token, table_id, records):
req = BatchCreateAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.request_body(BatchCreateAppTableRecordRequestBody.builder().records(records).build()) \
.build()
resp = client.bitable.v1.app_table_record.batch_create(req)
if not resp.success():
logger.error(f"Batch create failed: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}")
if resp.error:
logger.error(f"Error details: {resp.error}")
raise Exception(f"Batch create failed: {resp.msg}, log_id: {resp.get_log_id()}")
def sync_data(client, excel_rows, excel_columns, special_col_name, manual_source_value, excel_file_name, target_app_token, target_table_id, target_fields):
target_field_names = {f.field_name: f for f in target_fields}
batch_records = []
total_records = len(excel_rows)
if total_records == 0:
st.warning("No records to sync.")
return
progress_bar = st.progress(0)
status_text = st.empty()
processed_count = 0
for row in excel_rows:
new_fields = {}
# row is a tuple/list matching excel_columns order
row_dict = dict(zip(excel_columns, row))
# 1. Copy common fields
for col_name, val in row_dict.items():
if col_name in target_field_names and val is not None:
# Basic type conversion for API
# Excel reads everything as is (int, float, datetime, string)
# API expects specific formats sometimes.
# For simplicity, we send as is, assuming Lark SDK or API handles basic types.
# DateTime might need conversion to timestamp if API requires it.
# Lark API expects Unix timestamp (ms) for Date fields usually.
target_type = target_field_names[col_name].type
if target_type == 5: # Date
if hasattr(val, 'timestamp'): # datetime object
new_fields[col_name] = int(val.timestamp() * 1000)
else:
new_fields[col_name] = val
elif target_type == 4: # MultiSelect
# Excel might have comma separated string?
if isinstance(val, str):
new_fields[col_name] = [v.strip() for v in val.split(',')]
else:
new_fields[col_name] = val
elif target_type == 11: # User
# User field is complex (list of objects). Excel might just have names.
# We might skip or try to pass simple text?
# Writing text to User field usually fails or requires specific format.
# Let's skip complex fields for now or assume user knows what they are doing.
pass
else:
new_fields[col_name] = val
# 2. Handle Special "Source Record" Column
if special_col_name and special_col_name in target_field_names:
target_col_def = target_field_names[special_col_name]
# Use manual value provided by user if available, otherwise fallback to filename
final_val = manual_source_value if manual_source_value else f"Imported from {excel_file_name}"
if target_col_def.type == 15: # Hyperlink
# If final_val is a string that looks like a URL, use it
# If it's just text, treat as text
link_url = str(final_val) if final_val else ""
if not link_url.startswith("http"):
link_url = "" # Can't link if no URL
new_fields[special_col_name] = {
"link": link_url,
"text": str(final_val)
}
else:
new_fields[special_col_name] = final_val
batch_records.append(AppTableRecord.builder().fields(new_fields).build())
if len(batch_records) >= 100:
_send_batch(client, target_app_token, target_table_id, batch_records)
processed_count += len(batch_records)
progress_bar.progress(processed_count / total_records)
status_text.text(f"Synced {processed_count}/{total_records} records")
batch_records = []
if batch_records:
_send_batch(client, target_app_token, target_table_id, batch_records)
processed_count += len(batch_records)
progress_bar.progress(1.0)
status_text.text(f"Synced {processed_count}/{total_records} records")
def main():
st.set_page_config(page_title="Bitbcpy - 多维表格合并工具", layout="wide")
st.title("Bitbcpy - 多维表格合并脚本")
st.markdown("将本地 Excel 的行抄到目标表,同名列合并,忽略目标表不存在的列。")
if 'target_fields' not in st.session_state:
st.session_state.target_fields = []
if 'target_fields_loaded' not in st.session_state:
st.session_state.target_fields_loaded = False
with st.sidebar:
st.header("配置")
default_app_id = os.getenv("LARK_OAPI_APPID", "")
default_app_secret = os.getenv("LARK_OAPI_SECRET", "")
app_id = st.text_input("App ID", value=default_app_id)
app_secret = st.text_input("App Secret", type="password", value=default_app_secret)
target_url = st.text_input("目标文档链接")
if st.button("读取目标表字段"):
if not app_id or not app_secret:
st.error("请填写 App ID 和 App Secret")
elif not target_url:
st.error("请填写目标文档链接")
else:
t_token, t_table = parse_bitable_url(target_url)
if not (t_token and t_table):
st.error("目标链接解析失败")
else:
client = get_bitable_client(app_id, app_secret)
try:
with st.spinner("Fetching schema..."):
fields = get_table_fields(client, t_token, t_table)
st.session_state.target_fields = fields
st.session_state.target_fields_loaded = True
st.success("字段加载成功")
except Exception as e:
st.error(f"Error: {str(e)}")
source_col = None
manual_source_val = None
if st.session_state.target_fields_loaded:
field_names = [f.field_name for f in st.session_state.target_fields]
source_col = st.selectbox("哪一列记录来源", options=field_names)
manual_source_val = st.text_input("该列统一填写的来源信息", help="例如:2023年Q1销售数据")
uploaded_file = st.file_uploader("上传来源 Excel (xlsx)", type=['xlsx'])
if st.button("开始合并"):
if not app_id or not app_secret:
st.error("请在左侧侧边栏填写 App ID 和 App Secret")
return
if not uploaded_file:
st.error("请上传来源 Excel 文件")
return
t_token, t_table = parse_bitable_url(target_url)
if not (t_token and t_table):
st.error("目标链接解析失败,请检查链接格式")
return
client = get_bitable_client(app_id, app_secret)
try:
# Read Excel
wb = openpyxl.load_workbook(uploaded_file, data_only=True)
sheet = wb.active
# Assuming first row is header
rows = list(sheet.iter_rows(values_only=True))
if not rows:
st.error("Excel 文件为空")
return
header = rows[0]
data_rows = rows[1:]
# Filter None headers
excel_columns = [str(h) for h in header if h is not None]
# Use cached fields if available, else fetch
t_fields = st.session_state.target_fields
if not t_fields:
with st.spinner("Fetching target table schema..."):
t_fields = get_table_fields(client, t_token, t_table)
valid, msg = validate_primary_keys(excel_columns, t_fields)
if not valid:
st.error(f"主键校验失败: {msg}")
return
else:
st.success("主键校验通过")
st.info(f"Found {len(data_rows)} records in Excel.")
sync_data(client, data_rows, excel_columns, source_col, manual_source_val, uploaded_file.name, t_token, t_table, t_fields)
st.success("合并完成!")
except Exception as e:
logger.error(f"Error in main loop: {e}", exc_info=True)
st.error(f"Error: {str(e)}")
if __name__ == "__main__":
main()
|